From 1b229ce0df2931410e52f9c2334fd0a2a6c49498 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Mon, 13 Mar 2023 22:15:41 -0600 Subject: [PATCH] kgo: bugfix transaction ending & beginning Not having a mutext lock & unlock caused a race where a transaction could 1 A call .pause to pause inflight 2 A that could load that there _are_ inflight requests 3 B inflight could finish and broadcast 4 A could then signal.Wait The broadcast needed to trigger the wait in step 4, but the broadcast already happened because there was no lock. Also removes some redundant returns that made some aspects of seqRingResp confusing. --- pkg/kgo/producer.go | 2 ++ pkg/kgo/ring.go | 15 +++++---------- pkg/kgo/sink.go | 4 ++-- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 6e1bbc62..7119b9c4 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -1015,6 +1015,8 @@ func (p *producer) maybeAddInflight() bool { func (p *producer) decInflight() { if p.inflight.Add(-1)>>48 > 0 { + p.mu.Lock() + p.mu.Unlock() //nolint:gocritic,staticcheck // We use the lock as a barrier, unlocking immediately is safe. p.c.Broadcast() } } diff --git a/pkg/kgo/ring.go b/pkg/kgo/ring.go index c0eb5bd2..43ee72a6 100644 --- a/pkg/kgo/ring.go +++ b/pkg/kgo/ring.go @@ -172,32 +172,27 @@ type ringSeqResp struct { head uint8 tail uint8 l uint8 - dead bool } -func (r *ringSeqResp) push(sr *seqResp) (first, dead bool) { +func (r *ringSeqResp) push(sr *seqResp) (first bool) { r.mu.Lock() defer r.mu.Unlock() - for r.l == eight && !r.dead { + for r.l == eight { if r.c == nil { r.c = sync.NewCond(&r.mu) } r.c.Wait() } - if r.dead { - return false, true - } - r.elems[r.tail] = sr r.tail = (r.tail + 1) & mask7 r.l++ - return r.l == 1, false + return r.l == 1 } -func (r *ringSeqResp) dropPeek() (next *seqResp, more, dead bool) { +func (r *ringSeqResp) dropPeek() (next *seqResp, more bool) { r.mu.Lock() defer r.mu.Unlock() @@ -209,7 +204,7 @@ func (r *ringSeqResp) dropPeek() (next *seqResp, more, dead bool) { r.c.Signal() } - return r.elems[r.head], r.l > 0, r.dead + return r.elems[r.head], r.l > 0 } // Also no die; this type is slightly different because we can have overflow. diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 1a37448b..ea013efc 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -398,7 +398,7 @@ func (s *sink) doSequenced( wait.br = br } - if first, _ := s.seqResps.push(wait); first { + if first := s.seqResps.push(wait); first { go s.handleSeqResps(wait) } } @@ -410,7 +410,7 @@ start: <-wait.done wait.promise(wait.br, wait.resp, wait.err) - wait, more, _ = s.seqResps.dropPeek() + wait, more = s.seqResps.dropPeek() if more { goto start }