Skip to content

Commit

Permalink
Merge pull request #787 from Shopify/block-even-if-no-errors-returned
Browse files Browse the repository at this point in the history
Producer: block in `Close` without `Return.Errors`
  • Loading branch information
eapache authored Nov 24, 2016
2 parents e6df07f + 17b655d commit f4842fc
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
2 changes: 2 additions & 0 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ func (p *asyncProducer) Close() error {
for event := range p.errors {
errors = append(errors, event)
}
} else {
<-p.errors
}

if len(errors) > 0 {
Expand Down
40 changes: 40 additions & 0 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,46 @@ func TestAsyncProducerRetryShutdown(t *testing.T) {
}
}

func TestAsyncProducerNoReturns(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

metadataLeader := new(MetadataResponse)
metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataLeader)

config := NewConfig()
config.Producer.Flush.Messages = 10
config.Producer.Return.Successes = false
config.Producer.Return.Errors = false
config.Producer.Retry.Backoff = 0
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}

wait := make(chan bool)
go func() {
if err := producer.Close(); err != nil {
t.Error(err)
}
close(wait)
}()

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)

<-wait
seedBroker.Close()
leader.Close()
}

// This example shows how to use the producer while simultaneously
// reading the Errors channel to know about any failures.
func ExampleAsyncProducer_select() {
Expand Down

0 comments on commit f4842fc

Please sign in to comment.