From 2d44c085d8c092d222b2d00700fa6262ec27f83f Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Tue, 8 Dec 2020 12:24:41 -0500 Subject: [PATCH] fix custom consumer group error channel lifecycle (#263) (#1676) * fix custom consumer group error channel lifecycle * fix import * remove unused var --- kafka/common/pkg/kafka/consumer_factory.go | 11 +++++++---- kafka/common/pkg/kafka/consumer_factory_test.go | 2 +- kafka/common/pkg/kafka/consumer_handler.go | 15 +++++++-------- kafka/common/pkg/kafka/consumer_handler_test.go | 4 +++- 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/kafka/common/pkg/kafka/consumer_factory.go b/kafka/common/pkg/kafka/consumer_factory.go index 9aa63dcae0..1c441a1078 100644 --- a/kafka/common/pkg/kafka/consumer_factory.go +++ b/kafka/common/pkg/kafka/consumer_factory.go @@ -48,6 +48,7 @@ func (c *customConsumerGroup) Errors() <-chan error { func (c *customConsumerGroup) Close() error { c.cancel() + close(c.handlerErrorChannel) return c.ConsumerGroup.Close() } @@ -59,18 +60,20 @@ func (c kafkaConsumerGroupFactoryImpl) StartConsumerGroup(groupID string, topics return nil, err } - consumerHandler := NewConsumerHandler(logger, handler) - + errorCh := make(chan error, 10) ctx, cancel := context.WithCancel(context.Background()) go func() { for { + consumerHandler := NewConsumerHandler(logger, handler, errorCh) + err := consumerGroup.Consume(context.Background(), topics, &consumerHandler) + if err == sarama.ErrClosedConsumerGroup { return } if err != nil { - consumerHandler.errors <- err + errorCh <- err } select { @@ -81,7 +84,7 @@ func (c kafkaConsumerGroupFactoryImpl) StartConsumerGroup(groupID string, topics } }() - return &customConsumerGroup{cancel, consumerHandler.errors, consumerGroup}, err + return &customConsumerGroup{cancel, errorCh, consumerGroup}, err } func NewConsumerGroupFactory(addrs []string, config *sarama.Config) KafkaConsumerGroupFactory { diff --git a/kafka/common/pkg/kafka/consumer_factory_test.go b/kafka/common/pkg/kafka/consumer_factory_test.go index cb9407a913..431d6b7a1f 100644 --- a/kafka/common/pkg/kafka/consumer_factory_test.go +++ b/kafka/common/pkg/kafka/consumer_factory_test.go @@ -42,7 +42,7 @@ func (m *mockConsumerGroup) Consume(ctx context.Context, topics []string, handle m.generateErrorOnce.Do(func() { h := handler.(*SaramaConsumerHandler) h.errors <- errors.New("cgh") - _ = h.Cleanup(nil) + close(h.errors) }) }() } diff --git a/kafka/common/pkg/kafka/consumer_handler.go b/kafka/common/pkg/kafka/consumer_handler.go index cf525e1594..84c0c5553e 100644 --- a/kafka/common/pkg/kafka/consumer_handler.go +++ b/kafka/common/pkg/kafka/consumer_handler.go @@ -19,7 +19,6 @@ package kafka import ( "context" "fmt" - "sync" "github.com/Shopify/sarama" "go.uber.org/zap" @@ -38,29 +37,28 @@ type SaramaConsumerHandler struct { handler KafkaConsumerHandler logger *zap.SugaredLogger + // Errors channel - closeErrors sync.Once - errors chan error + errors chan error } -func NewConsumerHandler(logger *zap.SugaredLogger, handler KafkaConsumerHandler) SaramaConsumerHandler { +func NewConsumerHandler(logger *zap.SugaredLogger, handler KafkaConsumerHandler, errorsCh chan error) SaramaConsumerHandler { return SaramaConsumerHandler{ logger: logger, handler: handler, - errors: make(chan error, 10), // Some buffering to avoid blocking the message processing + errors: errorsCh, } } // Setup is run at the beginning of a new session, before ConsumeClaim func (consumer *SaramaConsumerHandler) Setup(sarama.ConsumerGroupSession) error { + consumer.logger.Info("setting up handler") return nil } // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited func (consumer *SaramaConsumerHandler) Cleanup(session sarama.ConsumerGroupSession) error { - consumer.closeErrors.Do(func() { - close(consumer.errors) - }) + consumer.logger.Info("cleanup handler") return nil } @@ -85,6 +83,7 @@ func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroup consumer.logger.Infow("Failure while handling a message", zap.String("topic", message.Topic), zap.Int32("partition", message.Partition), zap.Int64("offset", message.Offset), zap.Error(err)) consumer.errors <- err } + if mustMark { session.MarkMessage(message, "") // Mark kafka message as processed if ce := consumer.logger.Desugar().Check(zap.DebugLevel, "debugging"); ce != nil { diff --git a/kafka/common/pkg/kafka/consumer_handler_test.go b/kafka/common/pkg/kafka/consumer_handler_test.go index 1c3594250a..e2a9503958 100644 --- a/kafka/common/pkg/kafka/consumer_handler_test.go +++ b/kafka/common/pkg/kafka/consumer_handler_test.go @@ -133,7 +133,8 @@ func Test(t *testing.T) { } for _, test := range tests { t.Run(fmt.Sprintf("shouldErr: %v, shouldMark: %v", test.shouldErr, test.shouldMark), func(t *testing.T) { - cgh := NewConsumerHandler(zap.NewNop().Sugar(), test) + errorCh := make(chan error, 1) + cgh := NewConsumerHandler(zap.NewNop().Sugar(), test, errorCh) session := mockConsumerGroupSession{} claim := mockConsumerGroupClaim{msg: &mockMessage} @@ -155,6 +156,7 @@ func Test(t *testing.T) { } _ = cgh.Cleanup(&session) + close(errorCh) }) }