From 96702823f4d4eeaf2c7421afb5959d7a6aa3cec4 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Wed, 25 Mar 2015 20:30:21 +0000 Subject: [PATCH] Improve memory of producer retry queue The old version just used a simple slice as a queue, using append and slice[1:]. This had correct (amortized O(1)) running time, but did not behave well with the garbage collector. In particular: - elements removed from the slice were still pointed to by the underlying array, preventing them from being collected until the slice was resized - the slice would only be resized when its current capacity was "full", even if it had a substantial amount of free space it was wasting Switch instead to my queue implementation (which I wrote a while ago to solve these exact problems somewhere else). It uses a ringbuffer that is guaranteed to stay with a factor of 2 size of the actual number of messages stored, it generates a lot less garbage when the input and output rates are about the same, and it nils removed elements in the underlying array so the garbage collector can pick them up at its leisure. The only possible concern is that it uses `interface{}` but the usage is so restrained that type-safety is trivial and the performance will not be noticable given everything else the producer has to do. --- async_producer.go | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/async_producer.go b/async_producer.go index 0674e6a9f..5ef33cd05 100644 --- a/async_producer.go +++ b/async_producer.go @@ -6,6 +6,7 @@ import ( "time" "github.com/eapache/go-resiliency/breaker" + "github.com/eapache/queue" ) func forceFlushThreshold() int { @@ -592,19 +593,21 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) { // effectively a "bridge" between the flushers and the topicDispatcher in order to avoid deadlock // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel func (p *asyncProducer) retryHandler() { - var buf []*ProducerMessage - var msg *ProducerMessage - refs := 0 - shuttingDown := false + var ( + msg *ProducerMessage + buf = queue.New() + refs = 0 + shuttingDown = false + ) for { - if len(buf) == 0 { + if buf.Length() == 0 { msg = <-p.retries } else { select { case msg = <-p.retries: - case p.input <- buf[0]: - buf = buf[1:] + case p.input <- buf.Peek().(*ProducerMessage): + buf.Remove() continue } } @@ -622,13 +625,14 @@ func (p *asyncProducer) retryHandler() { break } } else { - buf = append(buf, msg) + buf.Add(msg) } } close(p.retries) - for i := range buf { - p.input <- buf[i] + for buf.Length() != 0 { + p.input <- buf.Peek().(*ProducerMessage) + buf.Remove() } close(p.input) }