diff --git a/async_producer.go b/async_producer.go index 0674e6a9f..5ef33cd05 100644 --- a/async_producer.go +++ b/async_producer.go @@ -6,6 +6,7 @@ import ( "time" "github.com/eapache/go-resiliency/breaker" + "github.com/eapache/queue" ) func forceFlushThreshold() int { @@ -592,19 +593,21 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) { // effectively a "bridge" between the flushers and the topicDispatcher in order to avoid deadlock // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel func (p *asyncProducer) retryHandler() { - var buf []*ProducerMessage - var msg *ProducerMessage - refs := 0 - shuttingDown := false + var ( + msg *ProducerMessage + buf = queue.New() + refs = 0 + shuttingDown = false + ) for { - if len(buf) == 0 { + if buf.Length() == 0 { msg = <-p.retries } else { select { case msg = <-p.retries: - case p.input <- buf[0]: - buf = buf[1:] + case p.input <- buf.Peek().(*ProducerMessage): + buf.Remove() continue } } @@ -622,13 +625,14 @@ func (p *asyncProducer) retryHandler() { break } } else { - buf = append(buf, msg) + buf.Add(msg) } } close(p.retries) - for i := range buf { - p.input <- buf[i] + for buf.Length() != 0 { + p.input <- buf.Peek().(*ProducerMessage) + buf.Remove() } close(p.input) }