Skip to content

Commit

Permalink
kgo: bugfix transaction ending & beginning
Browse files Browse the repository at this point in the history
Not having a mutext lock & unlock caused a race where a transaction
could
1 A call .pause to pause inflight
2 A that could load that there _are_ inflight requests
3 B inflight could finish and broadcast
4 A could then signal.Wait
The broadcast needed to trigger the wait in step 4, but the broadcast
already happened because there was no lock.

Also removes some redundant returns that made some aspects of
seqRingResp confusing.
  • Loading branch information
twmb committed Mar 14, 2023
1 parent 0eae6ae commit 1b229ce
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 12 deletions.
2 changes: 2 additions & 0 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,8 @@ func (p *producer) maybeAddInflight() bool {

func (p *producer) decInflight() {
if p.inflight.Add(-1)>>48 > 0 {
p.mu.Lock()
p.mu.Unlock() //nolint:gocritic,staticcheck // We use the lock as a barrier, unlocking immediately is safe.
p.c.Broadcast()
}
}
Expand Down
15 changes: 5 additions & 10 deletions pkg/kgo/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,32 +172,27 @@ type ringSeqResp struct {
head uint8
tail uint8
l uint8
dead bool
}

func (r *ringSeqResp) push(sr *seqResp) (first, dead bool) {
func (r *ringSeqResp) push(sr *seqResp) (first bool) {
r.mu.Lock()
defer r.mu.Unlock()

for r.l == eight && !r.dead {
for r.l == eight {
if r.c == nil {
r.c = sync.NewCond(&r.mu)
}
r.c.Wait()
}

if r.dead {
return false, true
}

r.elems[r.tail] = sr
r.tail = (r.tail + 1) & mask7
r.l++

return r.l == 1, false
return r.l == 1
}

func (r *ringSeqResp) dropPeek() (next *seqResp, more, dead bool) {
func (r *ringSeqResp) dropPeek() (next *seqResp, more bool) {
r.mu.Lock()
defer r.mu.Unlock()

Expand All @@ -209,7 +204,7 @@ func (r *ringSeqResp) dropPeek() (next *seqResp, more, dead bool) {
r.c.Signal()
}

return r.elems[r.head], r.l > 0, r.dead
return r.elems[r.head], r.l > 0
}

// Also no die; this type is slightly different because we can have overflow.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func (s *sink) doSequenced(
wait.br = br
}

if first, _ := s.seqResps.push(wait); first {
if first := s.seqResps.push(wait); first {
go s.handleSeqResps(wait)
}
}
Expand All @@ -410,7 +410,7 @@ start:
<-wait.done
wait.promise(wait.br, wait.resp, wait.err)

wait, more, _ = s.seqResps.dropPeek()
wait, more = s.seqResps.dropPeek()
if more {
goto start
}
Expand Down

0 comments on commit 1b229ce

Please sign in to comment.