Skip to content

Commit

Permalink
Merge pull request redpanda-data#22961 from mmaslankaprv/stm-snapshot…
Browse files Browse the repository at this point in the history
…-opt-out

Giving state machine implementers a way to opt out from being forced to support snapshotting at arbitrary offset.
  • Loading branch information
mmaslankaprv authored Oct 8, 2024
2 parents 06e9f17 + 0f26b8f commit dc252d7
Show file tree
Hide file tree
Showing 17 changed files with 298 additions and 97 deletions.
2 changes: 1 addition & 1 deletion src/v/cluster/distributed_kv_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class distributed_kv_stm final : public raft::persisted_stm<> {
static constexpr std::string_view name = Name;
explicit distributed_kv_stm(
size_t max_partitions, ss::logger& logger, raft::consensus* raft)
: persisted_stm<>("distributed_kv_stm.snapshot", logger, raft)
: raft::persisted_stm<>("distributed_kv_stm.snapshot", logger, raft)
, _default_max_partitions(max_partitions)
, _is_routing_partition(_raft->ntp().tp.partition == routing_partition) {}

Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/id_allocator_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ id_allocator_stm::id_allocator_stm(ss::logger& logger, raft::consensus* c)

id_allocator_stm::id_allocator_stm(
ss::logger& logger, raft::consensus* c, config::configuration& cfg)
: persisted_stm(id_allocator_snapshot, logger, c)
: raft::persisted_stm<>(id_allocator_snapshot, logger, c)
, _batch_size(cfg.id_allocator_batch_size.value())
, _log_capacity(cfg.id_allocator_log_capacity.value()) {}

ss::future<bool>
id_allocator_stm::sync(model::timeout_clock::duration timeout) {
auto term = _insync_term;
auto is_synced = co_await persisted_stm::sync(timeout);
auto is_synced = co_await raft::persisted_stm<>::sync(timeout);
if (is_synced) {
if (term != _insync_term) {
_curr_id = _state;
Expand Down
12 changes: 6 additions & 6 deletions src/v/cluster/log_eviction_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,19 @@ struct snapshot_data

log_eviction_stm::log_eviction_stm(
raft::consensus* raft, ss::logger& logger, storage::kvstore& kvstore)
: persisted_stm("log_eviction_stm.snapshot", logger, raft, kvstore) {}
: base_t("log_eviction_stm.snapshot", logger, raft, kvstore) {}

ss::future<> log_eviction_stm::start() {
ssx::spawn_with_gate(_gate, [this] { return monitor_log_eviction(); });
ssx::spawn_with_gate(
_gate, [this] { return handle_log_eviction_events(); });
return persisted_stm::start();
return base_t::start();
}

ss::future<> log_eviction_stm::stop() {
_as.request_abort();
_has_pending_truncation.broken();
co_await persisted_stm::stop();
co_await base_t::stop();
}

ss::future<> log_eviction_stm::handle_log_eviction_events() {
Expand Down Expand Up @@ -200,7 +200,7 @@ log_eviction_stm::do_write_raft_snapshot(model::offset truncation_point) {
_log.debug,
"Requesting raft snapshot with final offset: {}",
truncation_point);
auto snapshot_data = co_await _raft->stm_manager()->take_snapshot(
auto snapshot_result = co_await _raft->stm_manager()->take_snapshot(
truncation_point);
// we need to check snapshot index again as it may already progressed after
// snapshot is taken by stm_manager
Expand All @@ -214,8 +214,8 @@ log_eviction_stm::do_write_raft_snapshot(model::offset truncation_point) {
truncation_point);
co_return;
}
co_await _raft->write_snapshot(
raft::write_snapshot_cfg(truncation_point, std::move(snapshot_data)));
co_await _raft->write_snapshot(raft::write_snapshot_cfg(
snapshot_result.last_included_offset, std::move(snapshot_result.data)));
}

kafka::offset log_eviction_stm::kafka_start_offset_override() {
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/log_eviction_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class log_eviction_stm
virtual ss::future<model::offset> storage_eviction_event();

private:
using base_t = raft::persisted_stm<raft::kvstore_backed_stm_snapshot>;
void increment_start_offset(model::offset);
bool should_process_evict(model::offset);

Expand Down
9 changes: 5 additions & 4 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ ss::future<> rm_stm::stop() {
co_await raft::persisted_stm<>::stop();
}

ss::future<> rm_stm::start() { return persisted_stm::start(); }
ss::future<> rm_stm::start() { return raft::persisted_stm<>::start(); }

std::optional<int32_t>
rm_stm::get_seq_number(model::producer_identity pid) const {
Expand Down Expand Up @@ -1255,7 +1255,7 @@ rm_stm::do_aborted_transactions(model::offset from, model::offset to) {

ss::future<bool> rm_stm::sync(model::timeout_clock::duration timeout) {
auto current_insync_term = _insync_term;
auto ready = co_await persisted_stm::sync(timeout);
auto ready = co_await raft::persisted_stm<>::sync(timeout);
if (ready) {
if (current_insync_term != _insync_term) {
_last_known_lso = model::offset{-1};
Expand Down Expand Up @@ -1935,7 +1935,8 @@ uint64_t rm_stm::get_local_snapshot_size() const {
clusterlog.trace,
"rm_stm: aborted snapshots size {}",
abort_snapshots_size);
return persisted_stm::get_local_snapshot_size() + abort_snapshots_size;
return raft::persisted_stm<>::get_local_snapshot_size()
+ abort_snapshots_size;
}

ss::future<> rm_stm::save_abort_snapshot(abort_snapshot snapshot) {
Expand Down Expand Up @@ -2016,7 +2017,7 @@ ss::future<> rm_stm::do_remove_persistent_state() {
co_await _abort_snapshot_mgr.remove_snapshot(filename);
}
co_await _abort_snapshot_mgr.remove_partial_snapshots();
co_return co_await persisted_stm::remove_persistent_state();
co_return co_await raft::persisted_stm<>::remove_persistent_state();
}

ss::future<> rm_stm::apply_raft_snapshot(const iobuf&) {
Expand Down
9 changes: 4 additions & 5 deletions src/v/cluster/tests/partition_properties_stm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,11 +301,10 @@ TEST_F_CORO(
auto base_offset = co_await node->random_batch_base_offset(
node->raft()->committed_offset(), model::offset(100));
auto snapshot_offset = model::prev_offset(base_offset);

co_await node->raft()->write_snapshot(raft::write_snapshot_cfg(
snapshot_offset,
co_await node->raft()->stm_manager()->take_snapshot(
snapshot_offset)));
auto result = co_await node->raft()->stm_manager()->take_snapshot(
snapshot_offset);
co_await node->raft()->write_snapshot(
raft::write_snapshot_cfg(snapshot_offset, std::move(result.data)));
}

// test follower recovery with snapshot
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/tm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ tm_stm::tm_stm(
, _feature_table(feature_table)
, _ctx_log(logger, ssx::sformat("[{}]", _raft->ntp())) {}

ss::future<> tm_stm::start() { co_await persisted_stm::start(); }
ss::future<> tm_stm::start() { co_await raft::persisted_stm<>::start(); }

uint8_t tm_stm::active_snapshot_version() { return tm_snapshot::version; }

Expand Down Expand Up @@ -179,7 +179,7 @@ tm_stm::do_sync(model::timeout_clock::duration timeout) {
co_return tm_stm::op_status::not_leader;
}

auto ready = co_await persisted_stm::sync(timeout);
auto ready = co_await raft::persisted_stm<>::sync(timeout);
if (!ready) {
co_return tm_stm::op_status::unknown;
}
Expand Down
114 changes: 67 additions & 47 deletions src/v/raft/persisted_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ stm_snapshot_key(const ss::sstring& snapshot_name, const model::ntp& ntp) {

} // namespace

template<supported_stm_snapshot T>
template<typename BaseT, supported_stm_snapshot T>
template<typename... Args>
persisted_stm<T>::persisted_stm(
persisted_stm_base<BaseT, T>::persisted_stm_base(
ss::sstring snapshot_mgr_name,
ss::logger& logger,
raft::consensus* c,
Expand All @@ -66,20 +66,20 @@ persisted_stm<T>::persisted_stm(
, _snapshot_backend(snapshot_mgr_name, _log, c, std::forward<Args>(args)...) {
}

template<supported_stm_snapshot T>
template<typename BaseT, supported_stm_snapshot T>
ss::future<std::optional<stm_snapshot>>
persisted_stm<T>::load_local_snapshot() {
persisted_stm_base<BaseT, T>::load_local_snapshot() {
return _snapshot_backend.load_snapshot();
}
template<supported_stm_snapshot T>
ss::future<> persisted_stm<T>::stop() {
template<typename BaseT, supported_stm_snapshot T>
ss::future<> persisted_stm_base<BaseT, T>::stop() {
_apply_lock.broken();
co_await raft::state_machine_base::stop();
co_await _gate.close();
}

template<supported_stm_snapshot T>
ss::future<> persisted_stm<T>::remove_persistent_state() {
template<typename BaseT, supported_stm_snapshot T>
ss::future<> persisted_stm_base<BaseT, T>::remove_persistent_state() {
return _snapshot_backend.remove_persistent_state();
}

Expand Down Expand Up @@ -283,13 +283,13 @@ size_t kvstore_backed_stm_snapshot::get_snapshot_size() const {
return 0;
}

template<supported_stm_snapshot T>
ss::future<> persisted_stm<T>::wait_for_snapshot_hydrated() {
template<typename BaseT, supported_stm_snapshot T>
ss::future<> persisted_stm_base<BaseT, T>::wait_for_snapshot_hydrated() {
return _on_snapshot_hydrated.wait([this] { return _snapshot_hydrated; });
}

template<supported_stm_snapshot T>
ss::future<> persisted_stm<T>::do_write_local_snapshot() {
template<typename BaseT, supported_stm_snapshot T>
ss::future<> persisted_stm_base<BaseT, T>::do_write_local_snapshot() {
auto u = co_await _apply_lock.get_units();
auto snapshot = co_await take_local_snapshot(std::move(u));
auto offset = snapshot.header.offset;
Expand All @@ -298,27 +298,27 @@ ss::future<> persisted_stm<T>::do_write_local_snapshot() {
_last_snapshot_offset = std::max(_last_snapshot_offset, offset);
}

template<supported_stm_snapshot T>
void persisted_stm<T>::write_local_snapshot_in_background() {
template<typename BaseT, supported_stm_snapshot T>
void persisted_stm_base<BaseT, T>::write_local_snapshot_in_background() {
ssx::spawn_with_gate(_gate, [this] { return write_local_snapshot(); });
}

template<supported_stm_snapshot T>
ss::future<> persisted_stm<T>::write_local_snapshot() {
template<typename BaseT, supported_stm_snapshot T>
ss::future<> persisted_stm_base<BaseT, T>::write_local_snapshot() {
return _op_lock.with([this]() {
return wait_for_snapshot_hydrated().then(
[this] { return do_write_local_snapshot(); });
});
}

template<supported_stm_snapshot T>
uint64_t persisted_stm<T>::get_local_snapshot_size() const {
template<typename BaseT, supported_stm_snapshot T>
uint64_t persisted_stm_base<BaseT, T>::get_local_snapshot_size() const {
return _snapshot_backend.get_snapshot_size();
}

template<supported_stm_snapshot T>
ss::future<>
persisted_stm<T>::ensure_local_snapshot_exists(model::offset target_offset) {
template<typename BaseT, supported_stm_snapshot T>
ss::future<> persisted_stm_base<BaseT, T>::ensure_local_snapshot_exists(
model::offset target_offset) {
vlog(
_log.debug,
"ensure snapshot_exists with target offset: {}",
Expand All @@ -328,35 +328,35 @@ persisted_stm<T>::ensure_local_snapshot_exists(model::offset target_offset) {
if (target_offset <= _last_snapshot_offset) {
return ss::now();
}
return wait(target_offset, model::no_timeout)
return BaseT::wait(target_offset, model::no_timeout)
.then([this, target_offset]() {
vassert(
target_offset < next(),
target_offset < BaseT::next(),
"[{} ({})] after we waited for target_offset ({}) "
"next ({}) must be greater",
_raft->ntp(),
name(),
target_offset,
next());
BaseT::next());
return do_write_local_snapshot();
});
});
});
}

template<supported_stm_snapshot T>
model::offset persisted_stm<T>::max_collectible_offset() {
template<typename BaseT, supported_stm_snapshot T>
model::offset persisted_stm_base<BaseT, T>::max_collectible_offset() {
return model::offset::max();
}

template<supported_stm_snapshot T>
template<typename BaseT, supported_stm_snapshot T>
ss::future<fragmented_vector<model::tx_range>>
persisted_stm<T>::aborted_tx_ranges(model::offset, model::offset) {
persisted_stm_base<BaseT, T>::aborted_tx_ranges(model::offset, model::offset) {
return ss::make_ready_future<fragmented_vector<model::tx_range>>();
}

template<supported_stm_snapshot T>
ss::future<> persisted_stm<T>::wait_offset_committed(
template<typename BaseT, supported_stm_snapshot T>
ss::future<> persisted_stm_base<BaseT, T>::wait_offset_committed(
model::timeout_clock::duration timeout,
model::offset offset,
model::term_id term) {
Expand All @@ -367,8 +367,8 @@ ss::future<> persisted_stm<T>::wait_offset_committed(
return _raft->commit_index_updated().wait(timeout, stop_cond);
}

template<supported_stm_snapshot T>
ss::future<bool> persisted_stm<T>::do_sync(
template<typename BaseT, supported_stm_snapshot T>
ss::future<bool> persisted_stm_base<BaseT, T>::do_sync(
model::timeout_clock::duration timeout,
model::offset offset,
model::term_id term) {
Expand Down Expand Up @@ -403,7 +403,7 @@ ss::future<bool> persisted_stm<T>::do_sync(

if (_raft->term() == term) {
try {
co_await wait(offset, model::timeout_clock::now() + timeout);
co_await BaseT::wait(offset, model::timeout_clock::now() + timeout);
} catch (const ss::broken_condition_variable&) {
co_return false;
} catch (const ss::gate_closed_exception&) {
Expand Down Expand Up @@ -440,9 +440,9 @@ ss::future<bool> persisted_stm<T>::do_sync(
co_return false;
}

template<supported_stm_snapshot T>
template<typename BaseT, supported_stm_snapshot T>
ss::future<bool>
persisted_stm<T>::sync(model::timeout_clock::duration timeout) {
persisted_stm_base<BaseT, T>::sync(model::timeout_clock::duration timeout) {
auto term = _raft->term();
if (!_raft->is_leader()) {
return ss::make_ready_future<bool>(false);
Expand Down Expand Up @@ -492,12 +492,12 @@ persisted_stm<T>::sync(model::timeout_clock::duration timeout) {
});
}

template<supported_stm_snapshot T>
ss::future<bool> persisted_stm<T>::wait_no_throw(
template<typename BaseT, supported_stm_snapshot T>
ss::future<bool> persisted_stm_base<BaseT, T>::wait_no_throw(
model::offset offset,
model::timeout_clock::time_point deadline,
std::optional<std::reference_wrapper<ss::abort_source>> as) noexcept {
return wait(offset, deadline, as)
return BaseT::wait(offset, deadline, as)
.then([] { return true; })
.handle_exception_type([](const ss::abort_requested_exception&) {
// Shutting down
Expand All @@ -519,8 +519,8 @@ ss::future<bool> persisted_stm<T>::wait_no_throw(
});
}

template<supported_stm_snapshot T>
ss::future<> persisted_stm<T>::start() {
template<typename BaseT, supported_stm_snapshot T>
ss::future<> persisted_stm_base<BaseT, T>::start() {
if (_raft->dirty_offset() == model::offset{}) {
co_await _snapshot_backend.perform_initial_cleanup();
}
Expand Down Expand Up @@ -549,7 +549,7 @@ ss::future<> persisted_stm<T>::start() {
next_offset);
co_await apply_local_snapshot(
snapshot.header, std::move(snapshot.data));
set_next(next_offset);
BaseT::set_next(next_offset);
_last_snapshot_offset = snapshot.header.offset;
} else {
// This can happen on an out-of-date replica that re-joins the group
Expand All @@ -570,15 +570,35 @@ ss::future<> persisted_stm<T>::start() {
_on_snapshot_hydrated.broadcast();
}

template class persisted_stm<file_backed_stm_snapshot>;
template class persisted_stm<kvstore_backed_stm_snapshot>;
template class persisted_stm_base<state_machine_base, file_backed_stm_snapshot>;
template class persisted_stm_base<
state_machine_base,
kvstore_backed_stm_snapshot>;

template class persisted_stm_base<
no_at_offset_snapshot_stm_base,
file_backed_stm_snapshot>;
template class persisted_stm_base<
no_at_offset_snapshot_stm_base,
kvstore_backed_stm_snapshot>;

template persisted_stm_base<state_machine_base, file_backed_stm_snapshot>::
persisted_stm_base(ss::sstring, seastar::logger&, raft::consensus*);

template persisted_stm<file_backed_stm_snapshot>::persisted_stm(
ss::sstring, seastar::logger&, raft::consensus*);
template persisted_stm_base<state_machine_base, kvstore_backed_stm_snapshot>::
persisted_stm_base(
ss::sstring, seastar::logger&, raft::consensus*, storage::kvstore&);

template persisted_stm<kvstore_backed_stm_snapshot>::persisted_stm(
ss::sstring, seastar::logger&, raft::consensus*, storage::kvstore&);
template persisted_stm_base<
no_at_offset_snapshot_stm_base,
file_backed_stm_snapshot>::
persisted_stm_base(ss::sstring, seastar::logger&, raft::consensus*);

template persisted_stm_base<
no_at_offset_snapshot_stm_base,
kvstore_backed_stm_snapshot>::
persisted_stm_base(
ss::sstring, seastar::logger&, raft::consensus*, storage::kvstore&);
ss::sstring kvstore_backed_stm_snapshot::snapshot_key(
const ss::sstring& snapshot_name, const model::ntp& ntp) {
return stm_snapshot_key(snapshot_name, ntp);
Expand Down
Loading

0 comments on commit dc252d7

Please sign in to comment.