diff --git a/consumer.go b/consumer.go index b04761ca6..8a3ca6f02 100644 --- a/consumer.go +++ b/consumer.go @@ -130,7 +130,7 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) errors: make(chan *ConsumerError, c.conf.ChannelBufferSize), feeder: make(chan *FetchResponse, 1), trigger: make(chan none, 1), - dying: make(chan error, 1), + dying: make(chan none), fetchSize: c.conf.Consumer.Fetch.Default, } @@ -269,8 +269,9 @@ type partitionConsumer struct { messages chan *ConsumerMessage errors chan *ConsumerError feeder chan *FetchResponse - trigger chan none - dying chan error + + trigger, dying chan none + dispatchReason error fetchSize int32 offset int64 @@ -372,7 +373,7 @@ func (child *partitionConsumer) AsyncClose() { // the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and // 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will // also just close itself) - child.dying <- nil + close(child.dying) } func (child *partitionConsumer) Close() error { @@ -412,11 +413,11 @@ func (child *partitionConsumer) responseFeeder() { child.AsyncClose() case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable: // these three are not fatal errors, but do require redispatching - child.dying <- err + child.dispatchReason = err default: // dunno, tell the user and try redispatching child.sendError(err) - child.dying <- err + child.dispatchReason = err } child.broker.acks.Done() @@ -602,16 +603,18 @@ func (bc *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionC for child := range bc.subscriptions { select { - case err := <-child.dying: - if err == nil { - Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition) - close(child.trigger) - } else { - Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", bc.broker.ID(), child.topic, child.partition, err) - child.trigger <- none{} - } + case <-child.dying: + Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition) + close(child.trigger) delete(bc.subscriptions, child) default: + if child.dispatchReason != nil { + Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", + bc.broker.ID(), child.topic, child.partition, child.dispatchReason) + child.dispatchReason = nil + child.trigger <- none{} + delete(bc.subscriptions, child) + } } } }