From 17b655d247366cbacb00ecc09eeb6357cacb4ce4 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Tue, 22 Nov 2016 10:48:59 -0500 Subject: [PATCH] Producer: block in `Close` without `Return.Errors` If neither `Return.Errors` nor `Return.Successes` was set then `Close` wouldn't actually block at all. This is arguably correct given the send-it-into-the-void- and-pray configuration but it was still surprising. It turns out to be a very minimal change to block anyway (since the `Errors` channel is still present and closed at the appropriate moment) so just do that. Also add a test for this configuration, since it's not one I remember to think about on the regular. --- async_producer.go | 2 ++ async_producer_test.go | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/async_producer.go b/async_producer.go index e1ae5b0da..3af47fddd 100644 --- a/async_producer.go +++ b/async_producer.go @@ -210,6 +210,8 @@ func (p *asyncProducer) Close() error { for event := range p.errors { errors = append(errors, event) } + } else { + <-p.errors } if len(errors) > 0 { diff --git a/async_producer_test.go b/async_producer_test.go index 517ef2a34..13af1202c 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -709,6 +709,46 @@ func TestAsyncProducerRetryShutdown(t *testing.T) { } } +func TestAsyncProducerNoReturns(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + leader := NewMockBroker(t, 2) + + metadataLeader := new(MetadataResponse) + metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) + metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) + seedBroker.Returns(metadataLeader) + + config := NewConfig() + config.Producer.Flush.Messages = 10 + config.Producer.Return.Successes = false + config.Producer.Return.Errors = false + config.Producer.Retry.Backoff = 0 + producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} + } + + wait := make(chan bool) + go func() { + if err := producer.Close(); err != nil { + t.Error(err) + } + close(wait) + }() + + prodSuccess := new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + leader.Returns(prodSuccess) + + <-wait + seedBroker.Close() + leader.Close() +} + // This example shows how to use the producer while simultaneously // reading the Errors channel to know about any failures. func ExampleAsyncProducer_select() {