diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 6e1bbc62..7119b9c4 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -1015,6 +1015,8 @@ func (p *producer) maybeAddInflight() bool { func (p *producer) decInflight() { if p.inflight.Add(-1)>>48 > 0 { + p.mu.Lock() + p.mu.Unlock() //nolint:gocritic,staticcheck // We use the lock as a barrier, unlocking immediately is safe. p.c.Broadcast() } } diff --git a/pkg/kgo/ring.go b/pkg/kgo/ring.go index c0eb5bd2..43ee72a6 100644 --- a/pkg/kgo/ring.go +++ b/pkg/kgo/ring.go @@ -172,32 +172,27 @@ type ringSeqResp struct { head uint8 tail uint8 l uint8 - dead bool } -func (r *ringSeqResp) push(sr *seqResp) (first, dead bool) { +func (r *ringSeqResp) push(sr *seqResp) (first bool) { r.mu.Lock() defer r.mu.Unlock() - for r.l == eight && !r.dead { + for r.l == eight { if r.c == nil { r.c = sync.NewCond(&r.mu) } r.c.Wait() } - if r.dead { - return false, true - } - r.elems[r.tail] = sr r.tail = (r.tail + 1) & mask7 r.l++ - return r.l == 1, false + return r.l == 1 } -func (r *ringSeqResp) dropPeek() (next *seqResp, more, dead bool) { +func (r *ringSeqResp) dropPeek() (next *seqResp, more bool) { r.mu.Lock() defer r.mu.Unlock() @@ -209,7 +204,7 @@ func (r *ringSeqResp) dropPeek() (next *seqResp, more, dead bool) { r.c.Signal() } - return r.elems[r.head], r.l > 0, r.dead + return r.elems[r.head], r.l > 0 } // Also no die; this type is slightly different because we can have overflow. diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 1a37448b..ea013efc 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -398,7 +398,7 @@ func (s *sink) doSequenced( wait.br = br } - if first, _ := s.seqResps.push(wait); first { + if first := s.seqResps.push(wait); first { go s.handleSeqResps(wait) } } @@ -410,7 +410,7 @@ start: <-wait.done wait.promise(wait.br, wait.resp, wait.err) - wait, more, _ = s.seqResps.dropPeek() + wait, more = s.seqResps.dropPeek() if more { goto start }