Skip to content

Commit

Permalink
tx/rm_stm: remove mem_state
Browse files Browse the repository at this point in the history
  • Loading branch information
bharathv committed Apr 26, 2024
1 parent 9fa6a76 commit 59b457a
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 45 deletions.
30 changes: 5 additions & 25 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ rm_stm::rm_stm(
map<absl::flat_hash_map, model::producer_id, ss::lw_shared_ptr<mutex>>(
_tx_root_tracker.create_child("tx-locks")))
, _log_state(_tx_root_tracker)
, _mem_state(_tx_root_tracker)
, _sync_timeout(config::shard_local_cfg().rm_sync_timeout_ms.value())
, _tx_timeout_delay(config::shard_local_cfg().tx_timeout_delay_ms.value())
, _abort_interval_ms(config::shard_local_cfg()
Expand Down Expand Up @@ -1307,9 +1306,7 @@ model::offset rm_stm::last_stable_offset() {
}
}

auto synced_leader = _raft->is_leader() && _raft->term() == _insync_term
&& _mem_state.term == _insync_term;

auto synced_leader = _raft->is_leader() && _raft->term() == _insync_term;
model::offset lso{-1};
auto last_visible_index = _raft->last_visible_index();
auto next_to_apply = model::next_offset(last_applied);
Expand All @@ -1327,8 +1324,8 @@ model::offset rm_stm::last_stable_offset() {
// should not advance lso beyond last_applied
lso = next_to_apply;
}
_mem_state.last_lso = std::max(_mem_state.last_lso, lso);
return _mem_state.last_lso;
_last_known_lso = std::max(_last_known_lso, lso);
return _last_known_lso;
}

static void filter_intersecting(
Expand Down Expand Up @@ -1400,14 +1397,7 @@ rm_stm::do_aborted_transactions(model::offset from, model::offset to) {
}

ss::future<bool> rm_stm::sync(model::timeout_clock::duration timeout) {
auto ready = co_await persisted_stm::sync(timeout);
if (ready) {
if (_mem_state.term != _insync_term) {
_mem_state = mem_state{_tx_root_tracker};
_mem_state.term = _insync_term;
}
}
co_return ready;
co_return co_await persisted_stm::sync(timeout);
}

void rm_stm::track_tx(
Expand Down Expand Up @@ -2269,7 +2259,6 @@ ss::future<> rm_stm::apply_raft_snapshot(const iobuf&) {
"Resetting all state, reason: log eviction, offset: {}",
_raft->start_offset());
_log_state.reset();
_mem_state = mem_state{_tx_root_tracker};
co_await reset_producers();
set_next(_raft->start_offset());
co_return;
Expand All @@ -2285,11 +2274,6 @@ std::ostream& operator<<(std::ostream& o, const rm_stm::abort_snapshot& as) {
return o;
}

std::ostream& operator<<(std::ostream& o, const rm_stm::mem_state&) {
fmt::print(o, "{{ to be removed }} ");
return o;
}

std::ostream& operator<<(std::ostream& o, const rm_stm::log_state& state) {
fmt::print(
o,
Expand Down Expand Up @@ -2356,11 +2340,7 @@ ss::future<> rm_stm::maybe_log_tx_stats() {
_ctx_log.debug(
"tx mem tracker breakdown: {}", _tx_root_tracker.pretty_print_json());
auto units = co_await _state_lock.hold_read_lock();
_ctx_log.debug(
"tx memory snapshot stats: {{mem_state: {}, log_state: "
"{}}}",
_mem_state,
_log_state);
_ctx_log.debug("tx memory snapshot stats: {{log_state: {}}}", _log_state);
}

void rm_stm::log_tx_stats() {
Expand Down
22 changes: 2 additions & 20 deletions src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -473,24 +473,6 @@ class rm_stm final : public raft::persisted_stm<> {
void reset();
};

struct mem_state {
explicit mem_state(util::mem_tracker& parent)
: _tracker(parent.create_child("mem-state")) {}

ss::shared_ptr<util::mem_tracker> _tracker;
// once raft's term has passed mem_state::term we wipe mem_state
// and wait until log_state catches up with current committed index.
// with this approach a combination of mem_state and log_state is
// always up to date
model::term_id term;

// depending on the inflight state we may use last_applied or
// committed_index as LSO; the alternation between them may
// violate LSO monotonicity so we need to explicitly maintain
// it with last_lso
model::offset last_lso{-1};
};

ss::lw_shared_ptr<mutex> get_tx_lock(model::producer_id pid) {
auto lock_it = _tx_locks.find(pid);
if (lock_it == _tx_locks.end()) {
Expand Down Expand Up @@ -518,7 +500,6 @@ class rm_stm final : public raft::persisted_stm<> {
features::feature::transaction_partitioning);
}

friend std::ostream& operator<<(std::ostream&, const mem_state&);
friend std::ostream& operator<<(std::ostream&, const log_state&);
ss::future<> maybe_log_tx_stats();
void log_tx_stats();
Expand All @@ -535,7 +516,6 @@ class rm_stm final : public raft::persisted_stm<> {
ss::lw_shared_ptr<mutex>>
_tx_locks;
log_state _log_state;
mem_state _mem_state;
ss::timer<clock_type> auto_abort_timer;
std::chrono::milliseconds _sync_timeout;
std::chrono::milliseconds _tx_timeout_delay;
Expand All @@ -561,6 +541,8 @@ class rm_stm final : public raft::persisted_stm<> {
ss::gate _gate;
// Highest producer ID applied to this stm.
model::producer_id _highest_producer_id;
// for monotonicity of computed LSO.
model::offset _last_known_lso{-1};

friend struct ::rm_stm_test_fixture;
};
Expand Down

0 comments on commit 59b457a

Please sign in to comment.