Skip to content

Commit

Permalink
tx/rm_stm: remove mem_state::estimated
Browse files Browse the repository at this point in the history
'estimated' was used as an estimate for the first data batch offset (so
LSO cannot exceed it) while replication is in progress. This is the last
in memory state that is pending for cleanup.

This commit redefines the transacation boundary to begin from fence batch
rather the first data batch which allows us to get rid of this
additional state which lays the foundation for producer state move.
  • Loading branch information
bharathv committed Apr 25, 2024
1 parent e19189a commit 9fa6a76
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 60 deletions.
51 changes: 8 additions & 43 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,6 @@ void rm_stm::cleanup_producer_state(model::producer_identity pid) {
// No active transactions for this producer.
// note: this branch can be removed once we port tx state
// into producer_state.
_mem_state.forget(pid);
_log_state.forget(pid);
}
_producers.erase(pid);
Expand Down Expand Up @@ -919,24 +918,7 @@ ss::future<result<rm_stm::transaction_set>> rm_stm::get_transactions() {
if (!co_await sync(_sync_timeout)) {
co_return errc::not_leader;
}

transaction_set ans;

// When redpanda starts writing the first batch of a transaction it
// estimates its offset and only when the write passes it updates the offset
// to the exact value; so for a short period of time (while tx is in the
// initiating state) lso_bound is the offset of the last operation known at
// moment the transaction started and when the first tx batch is written
// it's updated to the first offset of the transaction
for (auto& [id, offset] : _mem_state.estimated) {
transaction_info tx_info;
tx_info.lso_bound = offset;
tx_info.status = rm_stm::transaction_info::status_t::initiating;
tx_info.info = get_expiration_info(id);
tx_info.seq = get_seq_number(id);
ans.emplace(id, tx_info);
}

for (auto& [id, offset] : _log_state.ongoing_map) {
transaction_info tx_info;
tx_info.lso_bound = offset.first;
Expand Down Expand Up @@ -968,7 +950,6 @@ void rm_stm::update_tx_offsets(
_log_state.ongoing_map.emplace(
pid, tx_range{.pid = pid, .first = base_offset, .last = last_offset});
_log_state.ongoing_set.insert(header.base_offset);
_mem_state.estimated.erase(pid);
}
}

Expand Down Expand Up @@ -1060,13 +1041,6 @@ ss::future<result<kafka_result>> rm_stm::do_transactional_replicate(
auto tx_seq = _log_state.current_txes[bid.pid].tx_seq;
vlog(_ctx_log.trace, "found tx_seq:{} for pid:{}", tx_seq, bid.pid);

if (_mem_state.estimated.contains(bid.pid)) {
// we received second produce request while the first is still
// being processed.
vlog(_ctx_log.warn, "Too frequent produce with same pid:{}", bid.pid);
co_return errc::generic_tx_error;
}

// For the first batch of a transaction, reset sequence tracking to handle
// an edge case where client reuses sequence number after an aborted
// transaction see https://github.com/redpanda-data/redpanda/pull/5026
Expand All @@ -1083,7 +1057,6 @@ ss::future<result<kafka_result>> rm_stm::do_transactional_replicate(
co_return co_await req_ptr->result();
}
req_ptr->mark_request_in_progress();
_mem_state.estimated[bid.pid] = last_applied_offset();

auto expiration_it = _log_state.expiration.find(bid.pid);
if (expiration_it == _log_state.expiration.end()) {
Expand Down Expand Up @@ -1115,7 +1088,6 @@ ss::future<result<kafka_result>> rm_stm::do_transactional_replicate(
req_ptr->set_value(errc::timeout);
co_return tx_errc::timeout;
}
_mem_state.estimated.erase(bid.pid);
auto result = kafka_result{
.last_offset = from_log_offset(r.value().last_offset)};
req_ptr->set_value(result);
Expand Down Expand Up @@ -1333,10 +1305,6 @@ model::offset rm_stm::last_stable_offset() {
if (!_log_state.ongoing_set.empty()) {
first_tx_start = *_log_state.ongoing_set.begin();
}

for (auto& entry : _mem_state.estimated) {
first_tx_start = std::min(first_tx_start, entry.second);
}
}

auto synced_leader = _raft->is_leader() && _raft->term() == _insync_term
Expand Down Expand Up @@ -1477,9 +1445,6 @@ ss::future<> rm_stm::do_abort_old_txes() {
}

fragmented_vector<model::producer_identity> pids;
for (auto& [k, _] : _mem_state.estimated) {
pids.push_back(k);
}
for (auto& [k, _] : _log_state.ongoing_map) {
pids.push_back(k);
}
Expand Down Expand Up @@ -1704,8 +1669,10 @@ void rm_stm::try_arm(time_point_type deadline) {
}
}

void rm_stm::apply_fence(model::record_batch&& b) {
void rm_stm::apply_fence(model::producer_identity pid, model::record_batch b) {
auto producer = maybe_create_producer(pid);
auto batch_base_offset = b.base_offset();
auto header = b.header();
auto batch_data = read_fence_batch(std::move(b));
vlog(
_ctx_log.trace,
Expand All @@ -1717,6 +1684,7 @@ void rm_stm::apply_fence(model::record_batch&& b) {
_highest_producer_id, batch_data.bid.pid.get_id());
auto [fence_it, _] = _log_state.fence_pid_epoch.try_emplace(
batch_data.bid.pid.get_id(), batch_data.bid.pid.get_epoch());
update_tx_offsets(producer, header);
// using less-or-equal to update tx_seqs on every transaction
if (fence_it->second <= batch_data.bid.pid.get_epoch()) {
fence_it->second = batch_data.bid.pid.get_epoch();
Expand All @@ -1737,9 +1705,10 @@ void rm_stm::apply_fence(model::record_batch&& b) {

ss::future<> rm_stm::apply(const model::record_batch& b) {
const auto& hdr = b.header();
const auto bid = model::batch_identity::from(hdr);

if (hdr.type == model::record_batch_type::tx_fence) {
apply_fence(b.copy());
apply_fence(bid.pid, b.copy());
} else if (hdr.type == model::record_batch_type::tx_prepare) {
// prepare phase was used pre-transactions GA. Ideally these
// batches should not appear anymore and should not be a part
Expand All @@ -1751,7 +1720,6 @@ ss::future<> rm_stm::apply(const model::record_batch& b) {
b.base_offset(),
b.header().producer_id);
} else if (hdr.type == model::record_batch_type::raft_data) {
auto bid = model::batch_identity::from(hdr);
if (hdr.attrs.is_control()) {
apply_control(bid.pid, parse_control_batch(b));
} else {
Expand Down Expand Up @@ -1795,7 +1763,6 @@ void rm_stm::apply_control(
_log_state.ongoing_map.erase(pid);
}

_mem_state.forget(pid);
_log_state.expiration.erase(pid);

if (
Expand All @@ -1812,8 +1779,6 @@ void rm_stm::apply_control(
_log_state.ongoing_set.erase(offset_it->second.first);
_log_state.ongoing_map.erase(pid);
}

_mem_state.forget(pid);
_log_state.expiration.erase(pid);
}
}
Expand Down Expand Up @@ -2320,8 +2285,8 @@ std::ostream& operator<<(std::ostream& o, const rm_stm::abort_snapshot& as) {
return o;
}

std::ostream& operator<<(std::ostream& o, const rm_stm::mem_state& state) {
fmt::print(o, "{{ estimated: {} }} ", state.estimated.size());
std::ostream& operator<<(std::ostream& o, const rm_stm::mem_state&) {
fmt::print(o, "{{ to be removed }} ");
return o;
}

Expand Down
19 changes: 2 additions & 17 deletions src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ class rm_stm final : public raft::persisted_stm<> {

bool is_known_session(model::producer_identity pid) const {
auto is_known = false;
is_known |= _mem_state.estimated.contains(pid);
is_known |= _log_state.ongoing_map.contains(pid);
is_known |= _log_state.current_txes.contains(pid);
return is_known;
Expand All @@ -373,7 +372,7 @@ class rm_stm final : public raft::persisted_stm<> {
get_abort_origin(const model::producer_identity&, model::tx_seq) const;

ss::future<> apply(const model::record_batch&) override;
void apply_fence(model::record_batch&&);
void apply_fence(model::producer_identity, model::record_batch);
void apply_control(model::producer_identity, model::control_record_type);
void apply_data(model::batch_identity, const model::record_batch_header&);

Expand Down Expand Up @@ -476,34 +475,20 @@ class rm_stm final : public raft::persisted_stm<> {

struct mem_state {
explicit mem_state(util::mem_tracker& parent)
: _tracker(parent.create_child("mem-state"))
, estimated(mt::map<
absl::flat_hash_map,
model::producer_identity,
model::offset>(_tracker)) {}
: _tracker(parent.create_child("mem-state")) {}

ss::shared_ptr<util::mem_tracker> _tracker;
// once raft's term has passed mem_state::term we wipe mem_state
// and wait until log_state catches up with current committed index.
// with this approach a combination of mem_state and log_state is
// always up to date
model::term_id term;
// before we replicate the first batch of a transaction we don't know
// its offset but we must prevent read_comitted fetch from getting it
// so we use last seen offset to estimate it
mt::unordered_map_t<
absl::flat_hash_map,
model::producer_identity,
model::offset>
estimated;

// depending on the inflight state we may use last_applied or
// committed_index as LSO; the alternation between them may
// violate LSO monotonicity so we need to explicitly maintain
// it with last_lso
model::offset last_lso{-1};

void forget(model::producer_identity pid) { estimated.erase(pid); }
};

ss::lw_shared_ptr<mutex> get_tx_lock(model::producer_id pid) {
Expand Down

0 comments on commit 9fa6a76

Please sign in to comment.