Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tx/rm_stm: remove mem_state #18076

Merged
merged 3 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/v/cluster/producer_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ class producer_state {
return _current_txn_start_offset;
}

model::producer_identity id() const { return _id; }

void update_current_txn_start_offset(std::optional<kafka::offset> offset) {
_current_txn_start_offset = offset;
}
Expand Down
123 changes: 40 additions & 83 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ rm_stm::rm_stm(
map<absl::flat_hash_map, model::producer_id, ss::lw_shared_ptr<mutex>>(
_tx_root_tracker.create_child("tx-locks")))
, _log_state(_tx_root_tracker)
, _mem_state(_tx_root_tracker)
, _sync_timeout(config::shard_local_cfg().rm_sync_timeout_ms.value())
, _tx_timeout_delay(config::shard_local_cfg().tx_timeout_delay_ms.value())
, _abort_interval_ms(config::shard_local_cfg()
Expand Down Expand Up @@ -351,7 +350,6 @@ void rm_stm::cleanup_producer_state(model::producer_identity pid) {
// No active transactions for this producer.
// note: this branch can be removed once we port tx state
// into producer_state.
_mem_state.forget(pid);
_log_state.forget(pid);
}
_producers.erase(pid);
Expand Down Expand Up @@ -919,28 +917,13 @@ ss::future<result<rm_stm::transaction_set>> rm_stm::get_transactions() {
if (!co_await sync(_sync_timeout)) {
co_return errc::not_leader;
}

transaction_set ans;

// When redpanda starts writing the first batch of a transaction it
// estimates its offset and only when the write passes it updates the offset
// to the exact value; so for a short period of time (while tx is in the
// initiating state) lso_bound is the offset of the last operation known at
// moment the transaction started and when the first tx batch is written
// it's updated to the first offset of the transaction
for (auto& [id, offset] : _mem_state.estimated) {
transaction_info tx_info;
tx_info.lso_bound = offset;
tx_info.status = rm_stm::transaction_info::status_t::initiating;
tx_info.info = get_expiration_info(id);
tx_info.seq = get_seq_number(id);
ans.emplace(id, tx_info);
}

for (auto& [id, offset] : _log_state.ongoing_map) {
transaction_info tx_info;
tx_info.lso_bound = offset.first;
tx_info.status = transaction_info::status_t::ongoing;
tx_info.status = offset.last > offset.first
? transaction_info::status_t::ongoing
: transaction_info::status_t::initiating;
tx_info.info = get_expiration_info(id);
tx_info.seq = get_seq_number(id);
ans.emplace(id, tx_info);
Expand All @@ -949,6 +932,28 @@ ss::future<result<rm_stm::transaction_set>> rm_stm::get_transactions() {
co_return ans;
}

void rm_stm::update_tx_offsets(
producer_ptr producer, const model::record_batch_header& header) {
const auto& pid = producer->id();
const auto base_offset = header.base_offset;
const auto last_offset = header.last_offset();
auto ongoing_it = _log_state.ongoing_map.find(pid);
if (ongoing_it != _log_state.ongoing_map.end()) {
// transaction already known, update the end offset.
if (ongoing_it->second.last < last_offset) {
ongoing_it->second.last = last_offset;
}
} else {
// we do no have to check if the value is empty as it is already
// done with ongoing map
producer->update_current_txn_start_offset(from_log_offset(base_offset));

_log_state.ongoing_map.emplace(
pid, tx_range{.pid = pid, .first = base_offset, .last = last_offset});
_log_state.ongoing_set.insert(header.base_offset);
}
}

ss::future<std::error_code> rm_stm::mark_expired(model::producer_identity pid) {
return _state_lock.hold_read_lock().then(
[this, pid](ss::basic_rwlock<>::holder unit) mutable {
Expand Down Expand Up @@ -1037,13 +1042,6 @@ ss::future<result<kafka_result>> rm_stm::do_transactional_replicate(
auto tx_seq = _log_state.current_txes[bid.pid].tx_seq;
vlog(_ctx_log.trace, "found tx_seq:{} for pid:{}", tx_seq, bid.pid);

if (_mem_state.estimated.contains(bid.pid)) {
// we received second produce request while the first is still
// being processed.
vlog(_ctx_log.warn, "Too frequent produce with same pid:{}", bid.pid);
co_return errc::generic_tx_error;
}

// For the first batch of a transaction, reset sequence tracking to handle
// an edge case where client reuses sequence number after an aborted
// transaction see https://github.com/redpanda-data/redpanda/pull/5026
Expand All @@ -1060,7 +1058,6 @@ ss::future<result<kafka_result>> rm_stm::do_transactional_replicate(
co_return co_await req_ptr->result();
}
req_ptr->mark_request_in_progress();
_mem_state.estimated[bid.pid] = last_applied_offset();

auto expiration_it = _log_state.expiration.find(bid.pid);
if (expiration_it == _log_state.expiration.end()) {
Expand Down Expand Up @@ -1092,7 +1089,6 @@ ss::future<result<kafka_result>> rm_stm::do_transactional_replicate(
req_ptr->set_value(errc::timeout);
co_return tx_errc::timeout;
}
_mem_state.estimated.erase(bid.pid);
auto result = kafka_result{
.last_offset = from_log_offset(r.value().last_offset)};
req_ptr->set_value(result);
Expand Down Expand Up @@ -1310,15 +1306,9 @@ model::offset rm_stm::last_stable_offset() {
if (!_log_state.ongoing_set.empty()) {
first_tx_start = *_log_state.ongoing_set.begin();
}

for (auto& entry : _mem_state.estimated) {
first_tx_start = std::min(first_tx_start, entry.second);
}
mmaslankaprv marked this conversation as resolved.
Show resolved Hide resolved
}

auto synced_leader = _raft->is_leader() && _raft->term() == _insync_term
&& _mem_state.term == _insync_term;

auto synced_leader = _raft->is_leader() && _raft->term() == _insync_term;
model::offset lso{-1};
auto last_visible_index = _raft->last_visible_index();
auto next_to_apply = model::next_offset(last_applied);
Expand All @@ -1336,8 +1326,8 @@ model::offset rm_stm::last_stable_offset() {
// should not advance lso beyond last_applied
lso = next_to_apply;
}
_mem_state.last_lso = std::max(_mem_state.last_lso, lso);
return _mem_state.last_lso;
_last_known_lso = std::max(_last_known_lso, lso);
return _last_known_lso;
}

static void filter_intersecting(
Expand Down Expand Up @@ -1409,11 +1399,11 @@ rm_stm::do_aborted_transactions(model::offset from, model::offset to) {
}

ss::future<bool> rm_stm::sync(model::timeout_clock::duration timeout) {
auto current_insync_term = _insync_term;
auto ready = co_await persisted_stm::sync(timeout);
if (ready) {
if (_mem_state.term != _insync_term) {
_mem_state = mem_state{_tx_root_tracker};
_mem_state.term = _insync_term;
if (current_insync_term != _insync_term) {
_last_known_lso = model::offset{-1};
}
}
co_return ready;
Expand Down Expand Up @@ -1454,9 +1444,6 @@ ss::future<> rm_stm::do_abort_old_txes() {
}

fragmented_vector<model::producer_identity> pids;
for (auto& [k, _] : _mem_state.estimated) {
pids.push_back(k);
}
for (auto& [k, _] : _log_state.ongoing_map) {
pids.push_back(k);
}
Expand Down Expand Up @@ -1681,8 +1668,10 @@ void rm_stm::try_arm(time_point_type deadline) {
}
}

void rm_stm::apply_fence(model::record_batch&& b) {
void rm_stm::apply_fence(model::producer_identity pid, model::record_batch b) {
auto producer = maybe_create_producer(pid);
auto batch_base_offset = b.base_offset();
auto header = b.header();
auto batch_data = read_fence_batch(std::move(b));
vlog(
_ctx_log.trace,
Expand All @@ -1694,6 +1683,7 @@ void rm_stm::apply_fence(model::record_batch&& b) {
_highest_producer_id, batch_data.bid.pid.get_id());
auto [fence_it, _] = _log_state.fence_pid_epoch.try_emplace(
batch_data.bid.pid.get_id(), batch_data.bid.pid.get_epoch());
update_tx_offsets(producer, header);
// using less-or-equal to update tx_seqs on every transaction
if (fence_it->second <= batch_data.bid.pid.get_epoch()) {
fence_it->second = batch_data.bid.pid.get_epoch();
Expand All @@ -1714,9 +1704,10 @@ void rm_stm::apply_fence(model::record_batch&& b) {

ss::future<> rm_stm::apply(const model::record_batch& b) {
const auto& hdr = b.header();
const auto bid = model::batch_identity::from(hdr);

if (hdr.type == model::record_batch_type::tx_fence) {
apply_fence(b.copy());
apply_fence(bid.pid, b.copy());
} else if (hdr.type == model::record_batch_type::tx_prepare) {
// prepare phase was used pre-transactions GA. Ideally these
// batches should not appear anymore and should not be a part
Expand All @@ -1728,7 +1719,6 @@ ss::future<> rm_stm::apply(const model::record_batch& b) {
b.base_offset(),
b.header().producer_id);
} else if (hdr.type == model::record_batch_type::raft_data) {
auto bid = model::batch_identity::from(hdr);
if (hdr.attrs.is_control()) {
apply_control(bid.pid, parse_control_batch(b));
} else {
Expand Down Expand Up @@ -1772,7 +1762,6 @@ void rm_stm::apply_control(
_log_state.ongoing_map.erase(pid);
}

_mem_state.forget(pid);
_log_state.expiration.erase(pid);

if (
Expand All @@ -1789,8 +1778,6 @@ void rm_stm::apply_control(
_log_state.ongoing_set.erase(offset_it->second.first);
_log_state.ongoing_map.erase(pid);
}

_mem_state.forget(pid);
_log_state.expiration.erase(pid);
}
}
Expand All @@ -1811,7 +1798,6 @@ void rm_stm::apply_data(
model::batch_identity bid, const model::record_batch_header& header) {
if (bid.is_idempotent()) {
_highest_producer_id = std::max(_highest_producer_id, bid.pid.get_id());
const auto last_offset = header.last_offset();
const auto last_kafka_offset = from_log_offset(header.last_offset());
auto producer = maybe_create_producer(bid.pid);
auto needs_touch = producer->update(bid, last_kafka_offset);
Expand All @@ -1826,28 +1812,9 @@ void rm_stm::apply_data(
"[{},{}], last kafka offset: {}",
bid,
header.base_offset,
last_offset,
header.last_offset(),
last_kafka_offset);
auto ongoing_it = _log_state.ongoing_map.find(bid.pid);
if (ongoing_it != _log_state.ongoing_map.end()) {
if (ongoing_it->second.last < last_offset) {
ongoing_it->second.last = last_offset;
}
} else {
// we do no have to check if the value is empty as it is already
// done with ongoing map
producer->update_current_txn_start_offset(
from_log_offset(header.base_offset));

_log_state.ongoing_map.emplace(
bid.pid,
tx_range{
.pid = bid.pid,
.first = header.base_offset,
.last = last_offset});
_log_state.ongoing_set.insert(header.base_offset);
_mem_state.estimated.erase(bid.pid);
}
update_tx_offsets(producer, header);
}
}
}
Expand Down Expand Up @@ -2301,7 +2268,6 @@ ss::future<> rm_stm::apply_raft_snapshot(const iobuf&) {
"Resetting all state, reason: log eviction, offset: {}",
_raft->start_offset());
_log_state.reset();
_mem_state = mem_state{_tx_root_tracker};
co_await reset_producers();
set_next(_raft->start_offset());
co_return;
Expand All @@ -2317,11 +2283,6 @@ std::ostream& operator<<(std::ostream& o, const rm_stm::abort_snapshot& as) {
return o;
}

std::ostream& operator<<(std::ostream& o, const rm_stm::mem_state& state) {
fmt::print(o, "{{ estimated: {} }} ", state.estimated.size());
return o;
}

std::ostream& operator<<(std::ostream& o, const rm_stm::log_state& state) {
fmt::print(
o,
Expand Down Expand Up @@ -2388,11 +2349,7 @@ ss::future<> rm_stm::maybe_log_tx_stats() {
_ctx_log.debug(
"tx mem tracker breakdown: {}", _tx_root_tracker.pretty_print_json());
auto units = co_await _state_lock.hold_read_lock();
_ctx_log.debug(
"tx memory snapshot stats: {{mem_state: {}, log_state: "
"{}}}",
_mem_state,
_log_state);
_ctx_log.debug("tx memory snapshot stats: {{log_state: {}}}", _log_state);
}

void rm_stm::log_tx_stats() {
Expand Down
40 changes: 4 additions & 36 deletions src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ class rm_stm final : public raft::persisted_stm<> {
ss::future<raft::stm_snapshot> do_take_local_snapshot(uint8_t version);
ss::future<std::optional<abort_snapshot>> load_abort_snapshot(abort_index);
ss::future<> save_abort_snapshot(abort_snapshot);
void update_tx_offsets(producer_ptr, const model::record_batch_header&);

ss::future<result<kafka_result>> do_replicate(
model::batch_identity,
Expand Down Expand Up @@ -362,7 +363,6 @@ class rm_stm final : public raft::persisted_stm<> {

bool is_known_session(model::producer_identity pid) const {
auto is_known = false;
is_known |= _mem_state.estimated.contains(pid);
is_known |= _log_state.ongoing_map.contains(pid);
is_known |= _log_state.current_txes.contains(pid);
return is_known;
Expand All @@ -372,7 +372,7 @@ class rm_stm final : public raft::persisted_stm<> {
get_abort_origin(const model::producer_identity&, model::tx_seq) const;

ss::future<> apply(const model::record_batch&) override;
void apply_fence(model::record_batch&&);
void apply_fence(model::producer_identity, model::record_batch);
void apply_control(model::producer_identity, model::control_record_type);
void apply_data(model::batch_identity, const model::record_batch_header&);

Expand Down Expand Up @@ -473,38 +473,6 @@ class rm_stm final : public raft::persisted_stm<> {
void reset();
};

struct mem_state {
explicit mem_state(util::mem_tracker& parent)
: _tracker(parent.create_child("mem-state"))
, estimated(mt::map<
absl::flat_hash_map,
model::producer_identity,
model::offset>(_tracker)) {}

ss::shared_ptr<util::mem_tracker> _tracker;
// once raft's term has passed mem_state::term we wipe mem_state
// and wait until log_state catches up with current committed index.
// with this approach a combination of mem_state and log_state is
// always up to date
model::term_id term;
// before we replicate the first batch of a transaction we don't know
// its offset but we must prevent read_comitted fetch from getting it
// so we use last seen offset to estimate it
mt::unordered_map_t<
absl::flat_hash_map,
model::producer_identity,
model::offset>
estimated;

// depending on the inflight state we may use last_applied or
// committed_index as LSO; the alternation between them may
// violate LSO monotonicity so we need to explicitly maintain
// it with last_lso
model::offset last_lso{-1};

void forget(model::producer_identity pid) { estimated.erase(pid); }
};

ss::lw_shared_ptr<mutex> get_tx_lock(model::producer_id pid) {
auto lock_it = _tx_locks.find(pid);
if (lock_it == _tx_locks.end()) {
Expand Down Expand Up @@ -532,7 +500,6 @@ class rm_stm final : public raft::persisted_stm<> {
features::feature::transaction_partitioning);
}

friend std::ostream& operator<<(std::ostream&, const mem_state&);
friend std::ostream& operator<<(std::ostream&, const log_state&);
ss::future<> maybe_log_tx_stats();
void log_tx_stats();
Expand All @@ -549,7 +516,6 @@ class rm_stm final : public raft::persisted_stm<> {
ss::lw_shared_ptr<mutex>>
_tx_locks;
log_state _log_state;
mem_state _mem_state;
ss::timer<clock_type> auto_abort_timer;
std::chrono::milliseconds _sync_timeout;
std::chrono::milliseconds _tx_timeout_delay;
Expand All @@ -575,6 +541,8 @@ class rm_stm final : public raft::persisted_stm<> {
ss::gate _gate;
// Highest producer ID applied to this stm.
model::producer_id _highest_producer_id;
// for monotonicity of computed LSO.
model::offset _last_known_lso{-1};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do not need to be initialized, it will be default initialized

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made a small change in sync() related to this.. retained this to be -1 to be compatible with previous code.


friend struct ::rm_stm_test_fixture;
};
Expand Down
Loading