Skip to content

Commit

Permalink
Merge branch 'dev' into sasl-plain/core-8407
Browse files Browse the repository at this point in the history
  • Loading branch information
michael-redpanda authored Dec 12, 2024
2 parents 7771b04 + ad3206b commit 7ac13ca
Show file tree
Hide file tree
Showing 27 changed files with 573 additions and 101 deletions.
5 changes: 3 additions & 2 deletions src/v/cloud_io/remote.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ cloud_io::provider infer_provider(
model::cloud_storage_backend backend,
const cloud_storage_clients::client_configuration& conf) {
switch (backend) {
case model::cloud_storage_backend::unknown:
// NOTE: treat unknown cloud storage backend as a valid case
// in which we're assuming S3 compatible storage.
case model::cloud_storage_backend::aws:
case model::cloud_storage_backend::minio:
case model::cloud_storage_backend::oracle_s3_compat:
Expand All @@ -78,8 +81,6 @@ cloud_io::provider infer_provider(
.account_name = abs.storage_account_name(),
};
}
case model::cloud_storage_backend::unknown:
unreachable();
}
}

Expand Down
35 changes: 32 additions & 3 deletions src/v/cloud_storage_clients/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
#include "net/connection.h"
#include "utils/retry_chain_node.h"

#include <seastar/core/future.hh>

#include <boost/property_tree/xml_parser.hpp>

#include <exception>
#include <system_error>

namespace {

bool is_abort_or_gate_close_exception(const std::exception_ptr& ex) {
Expand All @@ -40,6 +45,28 @@ bool has_abort_or_gate_close_exception(const ss::nested_exception& ex) {
|| is_abort_or_gate_close_exception(ex.outer);
}

bool is_nested_reconnect_error(const ss::nested_exception& ex) {
try {
std::rethrow_exception(ex.inner);
} catch (const std::system_error& e) {
if (!net::is_reconnect_error(e)) {
return false;
}
} catch (...) {
return false;
}
try {
std::rethrow_exception(ex.outer);
} catch (const std::system_error& e) {
if (!net::is_reconnect_error(e)) {
return false;
}
} catch (...) {
return false;
}
return true;
}

template<typename Logger>
error_outcome handle_client_transport_error(
std::exception_ptr current_exception, Logger& logger) {
Expand Down Expand Up @@ -103,10 +130,12 @@ error_outcome handle_client_transport_error(
if (has_abort_or_gate_close_exception(ex)) {
vlog(logger.debug, "Nested abort or gate closed: {}", ex);
throw;
} else if (is_nested_reconnect_error(ex)) {
vlog(logger.warn, "Connection error {}", std::current_exception());
} else {
vlog(logger.error, "Unexpected error {}", std::current_exception());
outcome = error_outcome::fail;
}

vlog(logger.error, "Unexpected error {}", std::current_exception());
outcome = error_outcome::fail;
} catch (...) {
vlog(logger.error, "Unexpected error {}", std::current_exception());
outcome = error_outcome::fail;
Expand Down
10 changes: 8 additions & 2 deletions src/v/cluster/producer_state_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ producer_state_manager::producer_state_manager(

ss::future<> producer_state_manager::start() {
_reaper.set_callback([this] { evict_excess_producers(); });
_reaper.arm(period);
_reaper.arm(_reaper_period);
vlog(clusterlog.info, "Started producer state manager");
return ss::now();
}
Expand Down Expand Up @@ -69,6 +69,12 @@ void producer_state_manager::setup_metrics() {
sm::description("Number of evicted producers so far."))});
}

void producer_state_manager::rearm_eviction_timer_for_testing(
std::chrono::milliseconds new_period) {
_reaper_period = new_period;
_reaper.rearm(ss::lowres_clock::now() + _reaper_period);
}

void producer_state_manager::register_producer(
producer_state& state, std::optional<model::vcluster_id> vcluster) {
vlog(
Expand Down Expand Up @@ -97,7 +103,7 @@ void producer_state_manager::evict_excess_producers() {
_cache.evict_older_than<ss::lowres_system_clock>(
ss::lowres_system_clock::now() - _producer_expiration_ms());
if (!_gate.is_closed()) {
_reaper.arm(period);
_reaper.arm(_reaper_period);
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/v/cluster/producer_state_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ class producer_state_manager {
*/
void touch(producer_state&, std::optional<model::vcluster_id>);

void rearm_eviction_timer_for_testing(std::chrono::milliseconds);

private:
static constexpr std::chrono::seconds period{5};
std::chrono::milliseconds _reaper_period{5000};
/**
* Constant to be used when a partition has no vcluster_id assigned.
*/
Expand Down Expand Up @@ -83,7 +85,7 @@ class producer_state_manager {
config::binding<size_t> _virtual_cluster_min_producer_ids;
// cache of all producers on this shard
cache_t _cache;
ss::timer<ss::steady_clock_type> _reaper;
ss::timer<ss::lowres_clock> _reaper;
ss::gate _gate;
metrics::internal_metric_groups _metrics;

Expand Down
59 changes: 46 additions & 13 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ rm_stm::rm_stm(
, _feature_table(feature_table)
, _ctx_log(txlog, ssx::sformat("[{}]", c->ntp()))
, _producer_state_manager(producer_state_manager)
, _vcluster_id(vcluster_id) {
, _vcluster_id(vcluster_id)
, _producers_pending_cleanup(std::numeric_limits<size_t>::max()) {
setup_metrics();
if (!_is_tx_enabled) {
_is_autoabort_enabled = false;
Expand All @@ -114,6 +115,18 @@ rm_stm::rm_stm(
e);
});
});

ssx::repeat_until_gate_closed_or_aborted(_gate, _as, [this] {
return cleanup_evicted_producers().handle_exception(
[h = _gate.hold(), this](const std::exception_ptr& ex) {
if (!ssx::is_shutdown_exception(ex)) {
vlog(
_ctx_log.warn,
"encountered an exception while cleaning producers: {}",
ex);
}
});
});
}

ss::future<model::offset> rm_stm::bootstrap_committed_offset() {
Expand All @@ -129,6 +142,8 @@ ss::future<model::offset> rm_stm::bootstrap_committed_offset() {

std::pair<producer_ptr, rm_stm::producer_previously_known>
rm_stm::maybe_create_producer(model::producer_identity pid) {
// note: must be called under state_lock in shared/read mode.

// Double lookup because of two reasons
// 1. we are forced to use a ptr as map value_type because producer_state is
// not movable
Expand All @@ -148,23 +163,37 @@ rm_stm::maybe_create_producer(model::producer_identity pid) {
return std::make_pair(producer, producer_previously_known::no);
}

void rm_stm::cleanup_producer_state(model::producer_identity pid) {
auto it = _producers.find(pid.get_id());
if (it != _producers.end() && it->second->id() == pid) {
const auto& producer = *(it->second);
if (producer._active_transaction_hook.is_linked()) {
vlog(
_ctx_log.error,
"Ignoring cleanup request of producer {} due to in progress "
"transaction.",
producer);
return;
ss::future<> rm_stm::cleanup_evicted_producers() {
while (!_as.abort_requested() && !_gate.is_closed()) {
auto pid = co_await _producers_pending_cleanup.pop_eventually();
auto units = co_await _state_lock.hold_read_lock();
auto it = _producers.find(pid.get_id());
if (it != _producers.end() && it->second->id() == pid) {
const auto& producer = *(it->second);
if (producer._active_transaction_hook.is_linked()) {
vlog(
_ctx_log.error,
"Ignoring cleanup request of producer {} due to in progress "
"transaction.",
producer);
co_return;
}
_producers.erase(it);
vlog(_ctx_log.trace, "removed producer: {}", pid);
}
_producers.erase(it);
}
}

void rm_stm::cleanup_producer_state(model::producer_identity pid) noexcept {
if (_as.abort_requested() || _gate.is_closed()) {
return;
}
_producers_pending_cleanup.push(std::move(pid));
};

ss::future<> rm_stm::reset_producers() {
// note: must always be called under exlusive write lock to
// avoid concurrrent state changes to _producers.
co_await ss::max_concurrent_for_each(
_producers.begin(), _producers.end(), 32, [this](auto& it) {
auto& producer = it.second;
Expand Down Expand Up @@ -738,6 +767,8 @@ ss::future<result<kafka_result>> rm_stm::do_replicate(

ss::future<> rm_stm::stop() {
_as.request_abort();
_producers_pending_cleanup.abort(
std::make_exception_ptr(ss::abort_requested_exception{}));
auto_abort_timer.cancel();
co_await _gate.close();
co_await reset_producers();
Expand Down Expand Up @@ -1655,6 +1686,8 @@ model::offset rm_stm::to_log_offset(kafka::offset k_offset) const {

ss::future<>
rm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
auto units = co_await _state_lock.hold_write_lock();

vlog(
_ctx_log.trace,
"applying snapshot with last included offset: {}",
Expand Down
5 changes: 4 additions & 1 deletion src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ class rm_stm final : public raft::persisted_stm<> {
= ss::bool_class<struct new_producer_created_tag>;
std::pair<tx::producer_ptr, producer_previously_known>
maybe_create_producer(model::producer_identity);
void cleanup_producer_state(model::producer_identity);
void cleanup_producer_state(model::producer_identity) noexcept;
ss::future<> cleanup_evicted_producers();
ss::future<> reset_producers();
ss::future<checked<model::term_id, tx::errc>> do_begin_tx(
model::term_id,
Expand Down Expand Up @@ -414,6 +415,8 @@ class rm_stm final : public raft::persisted_stm<> {
// producers because epoch is unused.
producers_t _producers;

ss::queue<model::producer_identity> _producers_pending_cleanup;

// All the producers with open transactions in this partition.
// The list is sorted by the open transaction begin offset, so
// the first entry in the list is the earliest open transaction
Expand Down
50 changes: 45 additions & 5 deletions src/v/cluster/tests/rm_stm_test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#pragma once
#include "cluster/producer_state_manager.h"
#include "cluster/rm_stm.h"
#include "config/property.h"
#include "config/mock_property.h"
#include "raft/tests/simple_raft_fixture.h"

#include <seastar/core/sharded.hh>
Expand All @@ -23,12 +23,14 @@ static prefix_logger ctx_logger{logger, ""};
struct rm_stm_test_fixture : simple_raft_fixture {
void create_stm_and_start_raft(
storage::ntp_config::default_overrides overrides = {}) {
max_concurent_producers.start(std::numeric_limits<size_t>::max()).get();
producer_expiration_ms.start(std::chrono::milliseconds::max()).get();
producer_state_manager
.start(
config::mock_binding(std::numeric_limits<uint64_t>::max()),
config::mock_binding(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::microseconds::max())),
ss::sharded_parameter(
[this] { return max_concurent_producers.local().bind(); }),
ss::sharded_parameter(
[this] { return producer_expiration_ms.local().bind(); }),
config::mock_binding(std::numeric_limits<uint64_t>::max()))
.get();
producer_state_manager
Expand All @@ -55,6 +57,8 @@ struct rm_stm_test_fixture : simple_raft_fixture {
if (_started) {
stop_all();
producer_state_manager.stop().get();
producer_expiration_ms.stop().get();
max_concurent_producers.stop().get();
}
}

Expand All @@ -66,6 +70,17 @@ struct rm_stm_test_fixture : simple_raft_fixture {
return _stm->do_take_local_snapshot(version, {});
}

void update_producer_expiration(std::chrono::milliseconds value) {
producer_expiration_ms
.invoke_on_all(
[value](auto& local) mutable { local.update(std::move(value)); })
.get();
}

auto apply_raft_snapshot(const iobuf& buf) {
return _stm->apply_raft_snapshot(buf);
}

auto apply_snapshot(raft::stm_snapshot_header hdr, iobuf buf) {
return _stm->apply_local_snapshot(hdr, std::move(buf));
}
Expand All @@ -77,6 +92,31 @@ struct rm_stm_test_fixture : simple_raft_fixture {

auto get_expired_producers() const { return _stm->get_expired_producers(); }

auto stm_read_lock() { return _stm->_state_lock.hold_read_lock(); }

auto maybe_create_producer(model::producer_identity pid) {
return stm_read_lock().then([pid, this](auto /*units*/) {
return _stm->maybe_create_producer(pid);
});
}

auto reset_producers() {
return _stm->_state_lock.hold_write_lock().then([this](auto units) {
return _stm->reset_producers().then([units = std::move(units)] {});
});
}

auto rearm_eviction_timer(std::chrono::milliseconds period) {
return producer_state_manager
.invoke_on_all([period](auto& mgr) {
return mgr.rearm_eviction_timer_for_testing(period);
})
.get();
}

ss::sharded<config::mock_property<size_t>> max_concurent_producers;
ss::sharded<config::mock_property<std::chrono::milliseconds>>
producer_expiration_ms;
ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
ss::sharded<cluster::tx::producer_state_manager> producer_state_manager;
ss::shared_ptr<cluster::rm_stm> _stm;
Expand Down
Loading

0 comments on commit 7ac13ca

Please sign in to comment.