From 7be093b1d23809da24902b3b44f4c746aa53ec5d Mon Sep 17 00:00:00 2001 From: Eran Duchan Date: Wed, 12 Jan 2022 14:27:52 +0000 Subject: [PATCH] fix: ensure heartbeats only stop after cleanup Sarama would immediately stop heartbeating on rebalance. If the session teardown took longer than the session timeout, it would get evicted from the group. This can be fixed by ensuring that heartbeats only stop _after_ the session has completed its cleanup process. Contributes-to: #1608 --- consumer_group.go | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/consumer_group.go b/consumer_group.go index b603d1705..5f99f7abd 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -507,7 +507,9 @@ func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *cons select { case <-pause.C: case <-session.ctx.Done(): - Logger.Printf("loop check partition number coroutine will exit, topics %s", topics) + Logger.Printf( + "consumergroup/%s loop check partition number coroutine will exit, topics %s\n", + c.groupID, topics) // if session closed by other, should be exited return case <-c.closed: @@ -520,7 +522,9 @@ func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int topicToPartitionNum := make(map[string]int, len(topics)) for _, topic := range topics { if partitionNum, err := c.client.Partitions(topic); err != nil { - Logger.Printf("Consumer Group topic %s get partition number failed %v", topic, err) + Logger.Printf( + "consumergroup/%s topic %s get partition number failed %v\n", + c.groupID, err) return nil, err } else { topicToPartitionNum[topic] = len(partitionNum) @@ -766,12 +770,21 @@ func (s *consumerGroupSession) release(withCleanup bool) (err error) { <-s.hbDead }) + Logger.Printf( + "consumergroup/session/%s/%d released\n", + s.MemberID(), s.GenerationID()) + return } func (s *consumerGroupSession) heartbeatLoop() { defer close(s.hbDead) defer s.cancel() // trigger the end of the session on exit + defer func() { + Logger.Printf( + "consumergroup/session/%s/%d heartbeat loop stopped\n", + s.MemberID(), s.GenerationID()) + }() pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval) defer pause.Stop() @@ -813,7 +826,10 @@ func (s *consumerGroupSession) heartbeatLoop() { switch resp.Err { case ErrNoError: retries = s.parent.config.Metadata.Retry.Max - case ErrRebalanceInProgress, ErrUnknownMemberId, ErrIllegalGeneration: + case ErrRebalanceInProgress: + retries = s.parent.config.Metadata.Retry.Max + s.cancel() + case ErrUnknownMemberId, ErrIllegalGeneration: return default: s.parent.handleError(resp.Err, "", -1)