diff --git a/pkg/kgo/consumer_group.go b/pkg/kgo/consumer_group.go index 81832a6e..cd76684b 100644 --- a/pkg/kgo/consumer_group.go +++ b/pkg/kgo/consumer_group.go @@ -2580,13 +2580,13 @@ func (g *groupConsumer) commitOffsetsSync( onDone = func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error) {} } - g.syncCommitMu.Lock() // block all other concurrent commits until our OnDone is done. - if err := g.waitJoinSyncMu(ctx); err != nil { onDone(g.cl, kmsg.NewPtrOffsetCommitRequest(), kmsg.NewPtrOffsetCommitResponse(), err) close(done) return } + + g.syncCommitMu.Lock() // block all other concurrent commits until our OnDone is done. unblockCommits := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { g.noCommitDuringJoinAndSync.RUnlock() defer close(done) @@ -2663,19 +2663,16 @@ func (cl *Client) CommitOffsets( return } - g.syncCommitMu.RLock() // block sync commit, but allow other concurrent Commit to cancel us - unblockSyncCommit := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { - defer g.syncCommitMu.RUnlock() - onDone(cl, req, resp, err) - } - if err := g.waitJoinSyncMu(ctx); err != nil { onDone(g.cl, kmsg.NewPtrOffsetCommitRequest(), kmsg.NewPtrOffsetCommitResponse(), err) return } + + g.syncCommitMu.RLock() // block sync commit, but allow other concurrent Commit to cancel us unblockJoinSync := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) { g.noCommitDuringJoinAndSync.RUnlock() - unblockSyncCommit(cl, req, resp, err) + defer g.syncCommitMu.RUnlock() + onDone(cl, req, resp, err) } g.mu.Lock()