Skip to content

Commit

Permalink
kafka: fix holding of attached_partition
Browse files Browse the repository at this point in the history
We previously accessed the iterator, when we really should be using the
pointer to the partition.
  • Loading branch information
andrwng committed Dec 17, 2023
1 parent 9135f9e commit 8ba57f4
Showing 1 changed file with 16 additions and 13 deletions.
29 changes: 16 additions & 13 deletions src/v/kafka/server/group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -626,19 +626,20 @@ ss::future<> group_manager::reload_groups() {

ss::future<group_offsets_snapshot_result>
group_manager::snapshot_groups(const model::ntp& ntp) {
auto attached_partition = _partitions.find(ntp);
if (attached_partition == _partitions.end()) {
auto it = _partitions.find(ntp);
if (it == _partitions.end()) {
co_return cluster::cloud_metadata::error_outcome::ntp_not_found;
}
auto attached_partition = it->second;
// Avoid overlapping with concurrent reloads of the partition.
auto units = co_await ss::get_units(
attached_partition->second->sem, 1, attached_partition->second->as);
auto& catchup = attached_partition->second->catchup_lock;
attached_partition->sem, 1, attached_partition->as);
auto& catchup = attached_partition->catchup_lock;
auto read_lock = co_await catchup->hold_read_lock();
if (!attached_partition->second->partition->is_leader()) {
if (!attached_partition->partition->is_leader()) {
co_return cluster::cloud_metadata::error_outcome::not_ready;
}
if (attached_partition->second->loading) {
if (attached_partition->loading) {
co_return cluster::cloud_metadata::error_outcome::not_ready;
}
group_offsets_snapshot snap;
Expand Down Expand Up @@ -694,18 +695,20 @@ group_manager::recover_offsets(group_offsets_snapshot snap) {
"Received request to recover {} groups from snapshot on partition {}",
snap.groups.size(),
offsets_ntp);
auto p = _partitions.find(offsets_ntp);
if (p == _partitions.end()) {
co_return kafka::error_code::not_leader_for_partition;
auto it = _partitions.find(offsets_ntp);
if (it == _partitions.end()) {
co_return cluster::cloud_metadata::error_outcome::ntp_not_found;
}
auto units = co_await ss::get_units(p->second->sem, 1, p->second->as);
if (!p->second->partition->is_leader()) {
auto attached_partition = it->second;
auto units = co_await ss::get_units(
attached_partition->sem, 1, attached_partition->as);
if (!attached_partition->partition->is_leader()) {
co_return kafka::error_code::not_leader_for_partition;
}
if (p->second->loading) {
if (attached_partition->loading) {
co_return kafka::error_code::coordinator_load_in_progress;
}
auto lock = co_await p->second->catchup_lock->hold_write_lock();
auto lock = co_await attached_partition->catchup_lock->hold_write_lock();

vlog(
klog.info,
Expand Down

0 comments on commit 8ba57f4

Please sign in to comment.