diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 7d3378e824fbf..41606814400ab 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -488,14 +488,26 @@ ss::future> rm_stm::do_begin_tx( txseq_it->second.tx_seq); co_return tx_errc::unknown_server_error; } - if (_log_state.ongoing_map.contains(pid)) { - vlog( - _ctx_log.warn, - "can't begin a tx {} with tx_seq {}: it was already begun and " - "accepted writes", - pid, - tx_seq); - co_return tx_errc::unknown_server_error; + auto it = _log_state.ongoing_map.find(pid); + if (it != _log_state.ongoing_map.end()) { + // there is already a transaction in progress. + // handle back to back duplicate begin_tx requests + // this can happen due to some undefined client behavior. + // note: this code is getting refactored soon with move to + // producer_state, temporary fix up to make chaos happy. + const auto& tx = it->second; + if (tx.last > tx.first) { + // the transaction already has a data batch and + // begin_tx at this point is unpexpected. + vlog( + _ctx_log.warn, + "can't begin a tx {} with tx_seq {}: it was already begun " + "and " + "accepted writes", + pid, + tx_seq); + co_return tx_errc::unknown_server_error; + } } co_return synced_term; } @@ -1039,6 +1051,16 @@ ss::future> rm_stm::do_transactional_replicate( vlog(_ctx_log.warn, "can't find ongoing tx for pid:{}", bid.pid); co_return errc::invalid_producer_epoch; } + auto it = _log_state.ongoing_map.find(bid.pid); + if (it == _log_state.ongoing_map.end()) { + // this is a bug and an incorrect state change as a begin is supposed to + // update this map and replicate request strictly happens after begin. + vlog( + _ctx_log.error, + "unable to find ongoing transaction for pid: {}", + bid.pid); + co_return errc::generic_tx_error; + } auto tx_seq = _log_state.current_txes[bid.pid].tx_seq; vlog(_ctx_log.trace, "found tx_seq:{} for pid:{}", tx_seq, bid.pid); @@ -1046,7 +1068,11 @@ ss::future> rm_stm::do_transactional_replicate( // an edge case where client reuses sequence number after an aborted // transaction see https://github.com/redpanda-data/redpanda/pull/5026 // for details - bool reset_sequence_tracking = !_log_state.ongoing_map.contains(bid.pid); + const auto& tx = it->second; + // Check if the incoming batch is the first data batch. + // note: this code is going away soon with move to producer state, will be + // refactored with better utilities soon. + bool reset_sequence_tracking = tx.first == tx.last; auto request = producer->try_emplace_request( bid, synced_term, reset_sequence_tracking);