From 99d6dfb3dab5b05c6474c33a538c2b6b2aa53012 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 6 Feb 2024 18:00:58 -0700 Subject: [PATCH] kgo: further fix for cd65d77d05f71ca51d81d247312d0d465fdb76c8 The prior commit was insufficient -- we left a dangling lock, and we had that same dangling lock in the cancelable commit offsets. --- pkg/kgo/consumer_group.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 81832a6e..cd76684b 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -2580,13 +2580,13 @@ func (g *groupConsumer) commitOffsetsSync( onDone = func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error) {} } - g.syncCommitMu.Lock() // block all other concurrent commits until our OnDone is done. - if err := g.waitJoinSyncMu(ctx); err != nil { onDone(g.cl, kmsg.NewPtrOffsetCommitRequest(), kmsg.NewPtrOffsetCommitResponse(), err) close(done) return } + + g.syncCommitMu.Lock() // block all other concurrent commits until our OnDone is done. unblockCommits := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { g.noCommitDuringJoinAndSync.RUnlock() defer close(done) @@ -2663,19 +2663,16 @@ func (cl *Client) CommitOffsets( return } - g.syncCommitMu.RLock() // block sync commit, but allow other concurrent Commit to cancel us - unblockSyncCommit := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { - defer g.syncCommitMu.RUnlock() - onDone(cl, req, resp, err) - } - if err := g.waitJoinSyncMu(ctx); err != nil { onDone(g.cl, kmsg.NewPtrOffsetCommitRequest(), kmsg.NewPtrOffsetCommitResponse(), err) return } + + g.syncCommitMu.RLock() // block sync commit, but allow other concurrent Commit to cancel us unblockJoinSync := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { g.noCommitDuringJoinAndSync.RUnlock() - unblockSyncCommit(cl, req, resp, err) + defer g.syncCommitMu.RUnlock() + onDone(cl, req, resp, err) } g.mu.Lock()