From 8ba57f413344cc81354bd2f8ec8cc506654ab1d6 Mon Sep 17 00:00:00 2001 From: Andrew Wong Date: Sun, 17 Dec 2023 10:36:22 -0800 Subject: [PATCH] kafka: fix holding of attached_partition We previously accessed the iterator, when we really should be using the pointer to the partition. --- src/v/kafka/server/group_manager.cc | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index dde2aadf6e43d..98c006a1fa099 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -626,19 +626,20 @@ ss::future<> group_manager::reload_groups() { ss::future group_manager::snapshot_groups(const model::ntp& ntp) { - auto attached_partition = _partitions.find(ntp); - if (attached_partition == _partitions.end()) { + auto it = _partitions.find(ntp); + if (it == _partitions.end()) { co_return cluster::cloud_metadata::error_outcome::ntp_not_found; } + auto attached_partition = it->second; // Avoid overlapping with concurrent reloads of the partition. auto units = co_await ss::get_units( - attached_partition->second->sem, 1, attached_partition->second->as); - auto& catchup = attached_partition->second->catchup_lock; + attached_partition->sem, 1, attached_partition->as); + auto& catchup = attached_partition->catchup_lock; auto read_lock = co_await catchup->hold_read_lock(); - if (!attached_partition->second->partition->is_leader()) { + if (!attached_partition->partition->is_leader()) { co_return cluster::cloud_metadata::error_outcome::not_ready; } - if (attached_partition->second->loading) { + if (attached_partition->loading) { co_return cluster::cloud_metadata::error_outcome::not_ready; } group_offsets_snapshot snap; @@ -694,18 +695,20 @@ group_manager::recover_offsets(group_offsets_snapshot snap) { "Received request to recover {} groups from snapshot on partition {}", snap.groups.size(), offsets_ntp); - auto p = _partitions.find(offsets_ntp); - if (p == _partitions.end()) { - co_return kafka::error_code::not_leader_for_partition; + auto it = _partitions.find(offsets_ntp); + if (it == _partitions.end()) { + co_return cluster::cloud_metadata::error_outcome::ntp_not_found; } - auto units = co_await ss::get_units(p->second->sem, 1, p->second->as); - if (!p->second->partition->is_leader()) { + auto attached_partition = it->second; + auto units = co_await ss::get_units( + attached_partition->sem, 1, attached_partition->as); + if (!attached_partition->partition->is_leader()) { co_return kafka::error_code::not_leader_for_partition; } - if (p->second->loading) { + if (attached_partition->loading) { co_return kafka::error_code::coordinator_load_in_progress; } - auto lock = co_await p->second->catchup_lock->hold_write_lock(); + auto lock = co_await attached_partition->catchup_lock->hold_write_lock(); vlog( klog.info,