Skip to content

Commit

Permalink
Merge pull request #396 from Shopify/fix-retry-queue
Browse files Browse the repository at this point in the history
Improve memory of producer retry queue
  • Loading branch information
eapache committed Apr 5, 2015
2 parents a6c0681 + 9670282 commit d7cc796
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/eapache/go-resiliency/breaker"
"github.com/eapache/queue"
)

func forceFlushThreshold() int {
Expand Down Expand Up @@ -592,19 +593,21 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
// effectively a "bridge" between the flushers and the topicDispatcher in order to avoid deadlock
// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
func (p *asyncProducer) retryHandler() {
var buf []*ProducerMessage
var msg *ProducerMessage
refs := 0
shuttingDown := false
var (
msg *ProducerMessage
buf = queue.New()
refs = 0
shuttingDown = false
)

for {
if len(buf) == 0 {
if buf.Length() == 0 {
msg = <-p.retries
} else {
select {
case msg = <-p.retries:
case p.input <- buf[0]:
buf = buf[1:]
case p.input <- buf.Peek().(*ProducerMessage):
buf.Remove()
continue
}
}
Expand All @@ -622,13 +625,14 @@ func (p *asyncProducer) retryHandler() {
break
}
} else {
buf = append(buf, msg)
buf.Add(msg)
}
}

close(p.retries)
for i := range buf {
p.input <- buf[i]
for buf.Length() != 0 {
p.input <- buf.Peek().(*ProducerMessage)
buf.Remove()
}
close(p.input)
}
Expand Down

0 comments on commit d7cc796

Please sign in to comment.