From bd430e62d8d4ea8e8977fe19e0e780c7b8ef21d0 Mon Sep 17 00:00:00 2001 From: antshbean Date: Thu, 16 Jan 2020 17:24:34 +0800 Subject: [PATCH 1/2] fixed ConsumerGroup flooding logs with client/metadata update req #1544 --- consumer_group.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/consumer_group.go b/consumer_group.go index da99e8811..7f1d2b0a2 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -442,7 +442,7 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) { } func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *consumerGroupSession) { - pause := time.NewTicker(c.config.Consumer.Group.Heartbeat.Interval * 2) + pause := time.NewTicker(c.config.Metadata.RefreshFrequency) defer session.cancel() defer pause.Stop() var oldTopicToPartitionNum map[string]int @@ -469,10 +469,6 @@ func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *cons } func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int, error) { - if err := c.client.RefreshMetadata(topics...); err != nil { - Logger.Printf("Consumer Group refresh metadata failed %v", err) - return nil, err - } topicToPartitionNum := make(map[string]int, len(topics)) for _, topic := range topics { if partitionNum, err := c.client.Partitions(topic); err != nil { From fd421dc8124b7b5281222afcda9372cad7a1688c Mon Sep 17 00:00:00 2001 From: antshbean Date: Thu, 16 Jan 2020 19:53:15 +0800 Subject: [PATCH 2/2] solve call Consumer function agin, will generate more than loopCheckPartitionsNumber Coroutine --- consumer_group.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/consumer_group.go b/consumer_group.go index 7f1d2b0a2..8b248ed97 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -175,6 +175,7 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co // loop check topic partition numbers changed // will trigger rebalance when any topic partitions number had changed + // avoid Consume function called again that will generate more than loopCheckPartitionNumbers coroutine go c.loopCheckPartitionNumbers(topics, sess) // Wait for session exit signal @@ -462,6 +463,10 @@ func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *cons } select { case <-pause.C: + case <-session.ctx.Done(): + Logger.Printf("loop check partition number coroutine will exit, topics %s", topics) + // if session closed by other, should be exited + return case <-c.closed: return }