Skip to content

Commit

Permalink
Merge pull request redpanda-data#18076 from bharathv/rem_mem_state
Browse files Browse the repository at this point in the history
tx/rm_stm: remove mem_state
  • Loading branch information
bharathv authored Apr 29, 2024
2 parents a71cfaa + 88ac775 commit 6180623
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 119 deletions.
2 changes: 2 additions & 0 deletions src/v/cluster/producer_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ class producer_state {
return _current_txn_start_offset;
}

model::producer_identity id() const { return _id; }

void update_current_txn_start_offset(std::optional<kafka::offset> offset) {
_current_txn_start_offset = offset;
}
Expand Down
123 changes: 40 additions & 83 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ rm_stm::rm_stm(
map<absl::flat_hash_map, model::producer_id, ss::lw_shared_ptr<mutex>>(
_tx_root_tracker.create_child("tx-locks")))
, _log_state(_tx_root_tracker)
, _mem_state(_tx_root_tracker)
, _sync_timeout(config::shard_local_cfg().rm_sync_timeout_ms.value())
, _tx_timeout_delay(config::shard_local_cfg().tx_timeout_delay_ms.value())
, _abort_interval_ms(config::shard_local_cfg()
Expand Down Expand Up @@ -351,7 +350,6 @@ void rm_stm::cleanup_producer_state(model::producer_identity pid) {
// No active transactions for this producer.
// note: this branch can be removed once we port tx state
// into producer_state.
_mem_state.forget(pid);
_log_state.forget(pid);
}
_producers.erase(pid);
Expand Down Expand Up @@ -919,28 +917,13 @@ ss::future<result<rm_stm::transaction_set>> rm_stm::get_transactions() {
if (!co_await sync(_sync_timeout)) {
co_return errc::not_leader;
}

transaction_set ans;

// When redpanda starts writing the first batch of a transaction it
// estimates its offset and only when the write passes it updates the offset
// to the exact value; so for a short period of time (while tx is in the
// initiating state) lso_bound is the offset of the last operation known at
// moment the transaction started and when the first tx batch is written
// it's updated to the first offset of the transaction
for (auto& [id, offset] : _mem_state.estimated) {
transaction_info tx_info;
tx_info.lso_bound = offset;
tx_info.status = rm_stm::transaction_info::status_t::initiating;
tx_info.info = get_expiration_info(id);
tx_info.seq = get_seq_number(id);
ans.emplace(id, tx_info);
}

for (auto& [id, offset] : _log_state.ongoing_map) {
transaction_info tx_info;
tx_info.lso_bound = offset.first;
tx_info.status = transaction_info::status_t::ongoing;
tx_info.status = offset.last > offset.first
? transaction_info::status_t::ongoing
: transaction_info::status_t::initiating;
tx_info.info = get_expiration_info(id);
tx_info.seq = get_seq_number(id);
ans.emplace(id, tx_info);
Expand All @@ -949,6 +932,28 @@ ss::future<result<rm_stm::transaction_set>> rm_stm::get_transactions() {
co_return ans;
}

void rm_stm::update_tx_offsets(
producer_ptr producer, const model::record_batch_header& header) {
const auto& pid = producer->id();
const auto base_offset = header.base_offset;
const auto last_offset = header.last_offset();
auto ongoing_it = _log_state.ongoing_map.find(pid);
if (ongoing_it != _log_state.ongoing_map.end()) {
// transaction already known, update the end offset.
if (ongoing_it->second.last < last_offset) {
ongoing_it->second.last = last_offset;
}
} else {
// we do no have to check if the value is empty as it is already
// done with ongoing map
producer->update_current_txn_start_offset(from_log_offset(base_offset));

_log_state.ongoing_map.emplace(
pid, tx_range{.pid = pid, .first = base_offset, .last = last_offset});
_log_state.ongoing_set.insert(header.base_offset);
}
}

ss::future<std::error_code> rm_stm::mark_expired(model::producer_identity pid) {
return _state_lock.hold_read_lock().then(
[this, pid](ss::basic_rwlock<>::holder unit) mutable {
Expand Down Expand Up @@ -1037,13 +1042,6 @@ ss::future<result<kafka_result>> rm_stm::do_transactional_replicate(
auto tx_seq = _log_state.current_txes[bid.pid].tx_seq;
vlog(_ctx_log.trace, "found tx_seq:{} for pid:{}", tx_seq, bid.pid);

if (_mem_state.estimated.contains(bid.pid)) {
// we received second produce request while the first is still
// being processed.
vlog(_ctx_log.warn, "Too frequent produce with same pid:{}", bid.pid);
co_return errc::generic_tx_error;
}

// For the first batch of a transaction, reset sequence tracking to handle
// an edge case where client reuses sequence number after an aborted
// transaction see https://github.com/redpanda-data/redpanda/pull/5026
Expand All @@ -1060,7 +1058,6 @@ ss::future<result<kafka_result>> rm_stm::do_transactional_replicate(
co_return co_await req_ptr->result();
}
req_ptr->mark_request_in_progress();
_mem_state.estimated[bid.pid] = last_applied_offset();

auto expiration_it = _log_state.expiration.find(bid.pid);
if (expiration_it == _log_state.expiration.end()) {
Expand Down Expand Up @@ -1092,7 +1089,6 @@ ss::future<result<kafka_result>> rm_stm::do_transactional_replicate(
req_ptr->set_value(errc::timeout);
co_return tx_errc::timeout;
}
_mem_state.estimated.erase(bid.pid);
auto result = kafka_result{
.last_offset = from_log_offset(r.value().last_offset)};
req_ptr->set_value(result);
Expand Down Expand Up @@ -1310,15 +1306,9 @@ model::offset rm_stm::last_stable_offset() {
if (!_log_state.ongoing_set.empty()) {
first_tx_start = *_log_state.ongoing_set.begin();
}

for (auto& entry : _mem_state.estimated) {
first_tx_start = std::min(first_tx_start, entry.second);
}
}

auto synced_leader = _raft->is_leader() && _raft->term() == _insync_term
&& _mem_state.term == _insync_term;

auto synced_leader = _raft->is_leader() && _raft->term() == _insync_term;
model::offset lso{-1};
auto last_visible_index = _raft->last_visible_index();
auto next_to_apply = model::next_offset(last_applied);
Expand All @@ -1336,8 +1326,8 @@ model::offset rm_stm::last_stable_offset() {
// should not advance lso beyond last_applied
lso = next_to_apply;
}
_mem_state.last_lso = std::max(_mem_state.last_lso, lso);
return _mem_state.last_lso;
_last_known_lso = std::max(_last_known_lso, lso);
return _last_known_lso;
}

static void filter_intersecting(
Expand Down Expand Up @@ -1409,11 +1399,11 @@ rm_stm::do_aborted_transactions(model::offset from, model::offset to) {
}

ss::future<bool> rm_stm::sync(model::timeout_clock::duration timeout) {
auto current_insync_term = _insync_term;
auto ready = co_await persisted_stm::sync(timeout);
if (ready) {
if (_mem_state.term != _insync_term) {
_mem_state = mem_state{_tx_root_tracker};
_mem_state.term = _insync_term;
if (current_insync_term != _insync_term) {
_last_known_lso = model::offset{-1};
}
}
co_return ready;
Expand Down Expand Up @@ -1454,9 +1444,6 @@ ss::future<> rm_stm::do_abort_old_txes() {
}

fragmented_vector<model::producer_identity> pids;
for (auto& [k, _] : _mem_state.estimated) {
pids.push_back(k);
}
for (auto& [k, _] : _log_state.ongoing_map) {
pids.push_back(k);
}
Expand Down Expand Up @@ -1681,8 +1668,10 @@ void rm_stm::try_arm(time_point_type deadline) {
}
}

void rm_stm::apply_fence(model::record_batch&& b) {
void rm_stm::apply_fence(model::producer_identity pid, model::record_batch b) {
auto producer = maybe_create_producer(pid);
auto batch_base_offset = b.base_offset();
auto header = b.header();
auto batch_data = read_fence_batch(std::move(b));
vlog(
_ctx_log.trace,
Expand All @@ -1694,6 +1683,7 @@ void rm_stm::apply_fence(model::record_batch&& b) {
_highest_producer_id, batch_data.bid.pid.get_id());
auto [fence_it, _] = _log_state.fence_pid_epoch.try_emplace(
batch_data.bid.pid.get_id(), batch_data.bid.pid.get_epoch());
update_tx_offsets(producer, header);
// using less-or-equal to update tx_seqs on every transaction
if (fence_it->second <= batch_data.bid.pid.get_epoch()) {
fence_it->second = batch_data.bid.pid.get_epoch();
Expand All @@ -1714,9 +1704,10 @@ void rm_stm::apply_fence(model::record_batch&& b) {

ss::future<> rm_stm::apply(const model::record_batch& b) {
const auto& hdr = b.header();
const auto bid = model::batch_identity::from(hdr);

if (hdr.type == model::record_batch_type::tx_fence) {
apply_fence(b.copy());
apply_fence(bid.pid, b.copy());
} else if (hdr.type == model::record_batch_type::tx_prepare) {
// prepare phase was used pre-transactions GA. Ideally these
// batches should not appear anymore and should not be a part
Expand All @@ -1728,7 +1719,6 @@ ss::future<> rm_stm::apply(const model::record_batch& b) {
b.base_offset(),
b.header().producer_id);
} else if (hdr.type == model::record_batch_type::raft_data) {
auto bid = model::batch_identity::from(hdr);
if (hdr.attrs.is_control()) {
apply_control(bid.pid, parse_control_batch(b));
} else {
Expand Down Expand Up @@ -1772,7 +1762,6 @@ void rm_stm::apply_control(
_log_state.ongoing_map.erase(pid);
}

_mem_state.forget(pid);
_log_state.expiration.erase(pid);

if (
Expand All @@ -1789,8 +1778,6 @@ void rm_stm::apply_control(
_log_state.ongoing_set.erase(offset_it->second.first);
_log_state.ongoing_map.erase(pid);
}

_mem_state.forget(pid);
_log_state.expiration.erase(pid);
}
}
Expand All @@ -1811,7 +1798,6 @@ void rm_stm::apply_data(
model::batch_identity bid, const model::record_batch_header& header) {
if (bid.is_idempotent()) {
_highest_producer_id = std::max(_highest_producer_id, bid.pid.get_id());
const auto last_offset = header.last_offset();
const auto last_kafka_offset = from_log_offset(header.last_offset());
auto producer = maybe_create_producer(bid.pid);
auto needs_touch = producer->update(bid, last_kafka_offset);
Expand All @@ -1826,28 +1812,9 @@ void rm_stm::apply_data(
"[{},{}], last kafka offset: {}",
bid,
header.base_offset,
last_offset,
header.last_offset(),
last_kafka_offset);
auto ongoing_it = _log_state.ongoing_map.find(bid.pid);
if (ongoing_it != _log_state.ongoing_map.end()) {
if (ongoing_it->second.last < last_offset) {
ongoing_it->second.last = last_offset;
}
} else {
// we do no have to check if the value is empty as it is already
// done with ongoing map
producer->update_current_txn_start_offset(
from_log_offset(header.base_offset));

_log_state.ongoing_map.emplace(
bid.pid,
tx_range{
.pid = bid.pid,
.first = header.base_offset,
.last = last_offset});
_log_state.ongoing_set.insert(header.base_offset);
_mem_state.estimated.erase(bid.pid);
}
update_tx_offsets(producer, header);
}
}
}
Expand Down Expand Up @@ -2301,7 +2268,6 @@ ss::future<> rm_stm::apply_raft_snapshot(const iobuf&) {
"Resetting all state, reason: log eviction, offset: {}",
_raft->start_offset());
_log_state.reset();
_mem_state = mem_state{_tx_root_tracker};
co_await reset_producers();
set_next(_raft->start_offset());
co_return;
Expand All @@ -2317,11 +2283,6 @@ std::ostream& operator<<(std::ostream& o, const rm_stm::abort_snapshot& as) {
return o;
}

std::ostream& operator<<(std::ostream& o, const rm_stm::mem_state& state) {
fmt::print(o, "{{ estimated: {} }} ", state.estimated.size());
return o;
}

std::ostream& operator<<(std::ostream& o, const rm_stm::log_state& state) {
fmt::print(
o,
Expand Down Expand Up @@ -2388,11 +2349,7 @@ ss::future<> rm_stm::maybe_log_tx_stats() {
_ctx_log.debug(
"tx mem tracker breakdown: {}", _tx_root_tracker.pretty_print_json());
auto units = co_await _state_lock.hold_read_lock();
_ctx_log.debug(
"tx memory snapshot stats: {{mem_state: {}, log_state: "
"{}}}",
_mem_state,
_log_state);
_ctx_log.debug("tx memory snapshot stats: {{log_state: {}}}", _log_state);
}

void rm_stm::log_tx_stats() {
Expand Down
40 changes: 4 additions & 36 deletions src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ class rm_stm final : public raft::persisted_stm<> {
ss::future<raft::stm_snapshot> do_take_local_snapshot(uint8_t version);
ss::future<std::optional<abort_snapshot>> load_abort_snapshot(abort_index);
ss::future<> save_abort_snapshot(abort_snapshot);
void update_tx_offsets(producer_ptr, const model::record_batch_header&);

ss::future<result<kafka_result>> do_replicate(
model::batch_identity,
Expand Down Expand Up @@ -362,7 +363,6 @@ class rm_stm final : public raft::persisted_stm<> {

bool is_known_session(model::producer_identity pid) const {
auto is_known = false;
is_known |= _mem_state.estimated.contains(pid);
is_known |= _log_state.ongoing_map.contains(pid);
is_known |= _log_state.current_txes.contains(pid);
return is_known;
Expand All @@ -372,7 +372,7 @@ class rm_stm final : public raft::persisted_stm<> {
get_abort_origin(const model::producer_identity&, model::tx_seq) const;

ss::future<> apply(const model::record_batch&) override;
void apply_fence(model::record_batch&&);
void apply_fence(model::producer_identity, model::record_batch);
void apply_control(model::producer_identity, model::control_record_type);
void apply_data(model::batch_identity, const model::record_batch_header&);

Expand Down Expand Up @@ -473,38 +473,6 @@ class rm_stm final : public raft::persisted_stm<> {
void reset();
};

struct mem_state {
explicit mem_state(util::mem_tracker& parent)
: _tracker(parent.create_child("mem-state"))
, estimated(mt::map<
absl::flat_hash_map,
model::producer_identity,
model::offset>(_tracker)) {}

ss::shared_ptr<util::mem_tracker> _tracker;
// once raft's term has passed mem_state::term we wipe mem_state
// and wait until log_state catches up with current committed index.
// with this approach a combination of mem_state and log_state is
// always up to date
model::term_id term;
// before we replicate the first batch of a transaction we don't know
// its offset but we must prevent read_comitted fetch from getting it
// so we use last seen offset to estimate it
mt::unordered_map_t<
absl::flat_hash_map,
model::producer_identity,
model::offset>
estimated;

// depending on the inflight state we may use last_applied or
// committed_index as LSO; the alternation between them may
// violate LSO monotonicity so we need to explicitly maintain
// it with last_lso
model::offset last_lso{-1};

void forget(model::producer_identity pid) { estimated.erase(pid); }
};

ss::lw_shared_ptr<mutex> get_tx_lock(model::producer_id pid) {
auto lock_it = _tx_locks.find(pid);
if (lock_it == _tx_locks.end()) {
Expand Down Expand Up @@ -532,7 +500,6 @@ class rm_stm final : public raft::persisted_stm<> {
features::feature::transaction_partitioning);
}

friend std::ostream& operator<<(std::ostream&, const mem_state&);
friend std::ostream& operator<<(std::ostream&, const log_state&);
ss::future<> maybe_log_tx_stats();
void log_tx_stats();
Expand All @@ -549,7 +516,6 @@ class rm_stm final : public raft::persisted_stm<> {
ss::lw_shared_ptr<mutex>>
_tx_locks;
log_state _log_state;
mem_state _mem_state;
ss::timer<clock_type> auto_abort_timer;
std::chrono::milliseconds _sync_timeout;
std::chrono::milliseconds _tx_timeout_delay;
Expand All @@ -575,6 +541,8 @@ class rm_stm final : public raft::persisted_stm<> {
ss::gate _gate;
// Highest producer ID applied to this stm.
model::producer_id _highest_producer_id;
// for monotonicity of computed LSO.
model::offset _last_known_lso{-1};

friend struct ::rm_stm_test_fixture;
};
Expand Down

0 comments on commit 6180623

Please sign in to comment.