diff --git a/async_producer.go b/async_producer.go index e1ae5b0da..3af47fddd 100644 --- a/async_producer.go +++ b/async_producer.go @@ -210,6 +210,8 @@ func (p *asyncProducer) Close() error { for event := range p.errors { errors = append(errors, event) } + } else { + <-p.errors } if len(errors) > 0 { diff --git a/async_producer_test.go b/async_producer_test.go index 517ef2a34..13af1202c 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -709,6 +709,46 @@ func TestAsyncProducerRetryShutdown(t *testing.T) { } } +func TestAsyncProducerNoReturns(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + leader := NewMockBroker(t, 2) + + metadataLeader := new(MetadataResponse) + metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) + metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) + seedBroker.Returns(metadataLeader) + + config := NewConfig() + config.Producer.Flush.Messages = 10 + config.Producer.Return.Successes = false + config.Producer.Return.Errors = false + config.Producer.Retry.Backoff = 0 + producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} + } + + wait := make(chan bool) + go func() { + if err := producer.Close(); err != nil { + t.Error(err) + } + close(wait) + }() + + prodSuccess := new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + leader.Returns(prodSuccess) + + <-wait + seedBroker.Close() + leader.Close() +} + // This example shows how to use the producer while simultaneously // reading the Errors channel to know about any failures. func ExampleAsyncProducer_select() {