Skip to content

Commit

Permalink
Merge pull request #392 from twmb/txn_bugfix
Browse files Browse the repository at this point in the history
kgo: bugfix transaction ending & beginning
  • Loading branch information
twmb authored Mar 14, 2023
2 parents 0eae6ae + 1b229ce commit d845069
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 12 deletions.
2 changes: 2 additions & 0 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,8 @@ func (p *producer) maybeAddInflight() bool {

func (p *producer) decInflight() {
if p.inflight.Add(-1)>>48 > 0 {
p.mu.Lock()
p.mu.Unlock() //nolint:gocritic,staticcheck // We use the lock as a barrier, unlocking immediately is safe.
p.c.Broadcast()
}
}
Expand Down
15 changes: 5 additions & 10 deletions pkg/kgo/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,32 +172,27 @@ type ringSeqResp struct {
head uint8
tail uint8
l uint8
dead bool
}

func (r *ringSeqResp) push(sr *seqResp) (first, dead bool) {
func (r *ringSeqResp) push(sr *seqResp) (first bool) {
r.mu.Lock()
defer r.mu.Unlock()

for r.l == eight && !r.dead {
for r.l == eight {
if r.c == nil {
r.c = sync.NewCond(&r.mu)
}
r.c.Wait()
}

if r.dead {
return false, true
}

r.elems[r.tail] = sr
r.tail = (r.tail + 1) & mask7
r.l++

return r.l == 1, false
return r.l == 1
}

func (r *ringSeqResp) dropPeek() (next *seqResp, more, dead bool) {
func (r *ringSeqResp) dropPeek() (next *seqResp, more bool) {
r.mu.Lock()
defer r.mu.Unlock()

Expand All @@ -209,7 +204,7 @@ func (r *ringSeqResp) dropPeek() (next *seqResp, more, dead bool) {
r.c.Signal()
}

return r.elems[r.head], r.l > 0, r.dead
return r.elems[r.head], r.l > 0
}

// Also no die; this type is slightly different because we can have overflow.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func (s *sink) doSequenced(
wait.br = br
}

if first, _ := s.seqResps.push(wait); first {
if first := s.seqResps.push(wait); first {
go s.handleSeqResps(wait)
}
}
Expand All @@ -410,7 +410,7 @@ start:
<-wait.done
wait.promise(wait.br, wait.resp, wait.err)

wait, more, _ = s.seqResps.dropPeek()
wait, more = s.seqResps.dropPeek()
if more {
goto start
}
Expand Down

0 comments on commit d845069

Please sign in to comment.