From 790b7b1368e3e2a6bf11d3c70835233cc32a092d Mon Sep 17 00:00:00 2001 From: gdm85 Date: Tue, 31 Aug 2021 17:08:33 +0200 Subject: [PATCH] feat: add counter metrics for consumer group join/sync and their failures --- consumer_group.go | 40 ++++++++++++++++++++++++++++++++++++++++ sarama.go | 4 ++++ 2 files changed, 44 insertions(+) diff --git a/consumer_group.go b/consumer_group.go index 2bf236ae5..b64576aba 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -7,6 +7,8 @@ import ( "sort" "sync" "time" + + "github.com/rcrowley/go-metrics" ) // ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed. @@ -212,12 +214,38 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler return c.retryNewSession(ctx, topics, handler, retries, true) } + var ( + metricRegistry = c.config.MetricRegistry + consumerGroupJoinTotal metrics.Counter + consumerGroupJoinFailed metrics.Counter + consumerGroupSyncTotal metrics.Counter + consumerGroupSyncFailed metrics.Counter + ) + + if metricRegistry != nil { + consumerGroupJoinTotal = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-join-total-%s", c.groupID), metricRegistry) + consumerGroupJoinFailed = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-join-failed-%s", c.groupID), metricRegistry) + consumerGroupSyncTotal = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-sync-total-%s", c.groupID), metricRegistry) + consumerGroupSyncFailed = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-sync-failed-%s", c.groupID), metricRegistry) + } + // Join consumer group join, err := c.joinGroupRequest(coordinator, topics) + if consumerGroupJoinTotal != nil { + consumerGroupJoinTotal.Inc(1) + } if err != nil { _ = coordinator.Close() + if consumerGroupJoinFailed != nil { + consumerGroupJoinFailed.Inc(1) + } return nil, err } + if join.Err != ErrNoError { + if consumerGroupJoinFailed != nil { + consumerGroupJoinFailed.Inc(1) + } + } switch join.Err { case ErrNoError: c.memberID = join.MemberId @@ -256,10 +284,22 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler // Sync consumer group groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId) + if consumerGroupSyncTotal != nil { + consumerGroupSyncTotal.Inc(1) + } if err != nil { _ = coordinator.Close() + if consumerGroupSyncFailed != nil { + consumerGroupSyncFailed.Inc(1) + } return nil, err } + if groupRequest.Err != ErrNoError { + if consumerGroupSyncFailed != nil { + consumerGroupSyncFailed.Inc(1) + } + } + switch groupRequest.Err { case ErrNoError: case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately diff --git a/sarama.go b/sarama.go index 48f362d28..564a12b5e 100644 --- a/sarama.go +++ b/sarama.go @@ -68,6 +68,10 @@ Consumer related metrics: | Name | Type | Description | +-------------------------------------------+------------+--------------------------------------------------------------------------------------+ | consumer-batch-size | histogram | Distribution of the number of messages in a batch | + | consumer-group-join-total- | counter | Total count of consumer group join attempts | + | consumer-group-join-failed- | counter | Total count of consumer group join failures | + | consumer-group-sync-total- | counter | Total count of consumer group sync attempts | + | consumer-group-sync-failed- | counter | Total count of consumer group sync failures | +-------------------------------------------+------------+--------------------------------------------------------------------------------------+ */