Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tx/rm_stm: deal with duplicate begin_tx requests #18187

Merged
merged 1 commit into from
May 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 35 additions & 9 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -488,14 +488,26 @@ ss::future<checked<model::term_id, tx_errc>> rm_stm::do_begin_tx(
txseq_it->second.tx_seq);
co_return tx_errc::unknown_server_error;
}
if (_log_state.ongoing_map.contains(pid)) {
vlog(
_ctx_log.warn,
"can't begin a tx {} with tx_seq {}: it was already begun and "
"accepted writes",
pid,
tx_seq);
co_return tx_errc::unknown_server_error;
auto it = _log_state.ongoing_map.find(pid);
if (it != _log_state.ongoing_map.end()) {
// there is already a transaction in progress.
// handle back to back duplicate begin_tx requests
// this can happen due to some undefined client behavior.
// note: this code is getting refactored soon with move to
// producer_state, temporary fix up to make chaos happy.
const auto& tx = it->second;
if (tx.last > tx.first) {
// the transaction already has a data batch and
// begin_tx at this point is unpexpected.
vlog(
_ctx_log.warn,
"can't begin a tx {} with tx_seq {}: it was already begun "
"and "
"accepted writes",
pid,
tx_seq);
co_return tx_errc::unknown_server_error;
mmaslankaprv marked this conversation as resolved.
Show resolved Hide resolved
}
}
co_return synced_term;
}
Expand Down Expand Up @@ -1039,14 +1051,28 @@ ss::future<result<kafka_result>> rm_stm::do_transactional_replicate(
vlog(_ctx_log.warn, "can't find ongoing tx for pid:{}", bid.pid);
co_return errc::invalid_producer_epoch;
}
auto it = _log_state.ongoing_map.find(bid.pid);
if (it == _log_state.ongoing_map.end()) {
// this is a bug and an incorrect state change as a begin is supposed to
// update this map and replicate request strictly happens after begin.
vlog(
_ctx_log.error,
"unable to find ongoing transaction for pid: {}",
bid.pid);
co_return errc::generic_tx_error;
}
auto tx_seq = _log_state.current_txes[bid.pid].tx_seq;
vlog(_ctx_log.trace, "found tx_seq:{} for pid:{}", tx_seq, bid.pid);

// For the first batch of a transaction, reset sequence tracking to handle
// an edge case where client reuses sequence number after an aborted
// transaction see https://github.com/redpanda-data/redpanda/pull/5026
// for details
bool reset_sequence_tracking = !_log_state.ongoing_map.contains(bid.pid);
const auto& tx = it->second;
// Check if the incoming batch is the first data batch.
// note: this code is going away soon with move to producer state, will be
// refactored with better utilities soon.
bool reset_sequence_tracking = tx.first == tx.last;
auto request = producer->try_emplace_request(
bid, synced_term, reset_sequence_tracking);

Expand Down
Loading