diff --git a/async_producer.go b/async_producer.go index 6378ae5c5..aced88631 100644 --- a/async_producer.go +++ b/async_producer.go @@ -224,11 +224,18 @@ func (p *asyncProducer) topicDispatcher() { p.inFlight.Done() continue } else if msg.retries == 0 { - p.inFlight.Add(1) if shuttingDown { - p.returnError(msg, ErrShuttingDown) + // we can't just call returnError here because that decrements the wait group, + // which hasn't been incremented yet for this message, and shouldn't be + pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown} + if p.conf.Producer.Return.Errors { + p.errors <- pErr + } else { + Logger.Println(pErr) + } continue } + p.inFlight.Add(1) } if (p.conf.Producer.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.conf.Producer.MaxMessageBytes) ||