From 80a846dbf98b1ff4387a0a3cb984b400e7a9fec1 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Mon, 23 Mar 2015 18:18:17 +0000 Subject: [PATCH] Add better logs to the consumer --- consumer.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/consumer.go b/consumer.go index d57b4b201..2a031500c 100644 --- a/consumer.go +++ b/consumer.go @@ -279,6 +279,7 @@ func (child *partitionConsumer) dispatcher() { child.broker = nil } + Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition) if err := child.dispatch(); err != nil { child.sendError(err) child.trigger <- none{} @@ -435,7 +436,7 @@ func (w *brokerConsumer) subscriptionConsumer() { response, err := w.fetchNewMessages() if err != nil { - Logger.Printf("Unexpected error processing FetchRequest; disconnecting from broker %s: %s\n", w.broker.addr, err) + Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", w.broker.ID(), err) w.abort(err) return } @@ -450,6 +451,7 @@ func (w *brokerConsumer) subscriptionConsumer() { // these three are not fatal errors, but do require redispatching child.trigger <- none{} delete(w.subscriptions, child) + Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", w.broker.ID(), child.topic, child.partition, err) } } } @@ -460,6 +462,7 @@ func (w *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionCo // take new subscriptions, and abandon subscriptions that have been closed for _, child := range newSubscriptions { w.subscriptions[child] = none{} + Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", w.broker.ID(), child.topic, child.partition) } for child := range w.subscriptions { @@ -467,6 +470,7 @@ func (w *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionCo case <-child.dying: close(child.trigger) delete(w.subscriptions, child) + Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", w.broker.ID(), child.topic, child.partition) default: } }