Skip to content

Commit

Permalink
fix custom consumer group error channel lifecycle (knative#263)
Browse files Browse the repository at this point in the history
* fix custom consumer group error channel lifecycle

* fix import

* remove unused var
  • Loading branch information
lionelvillard committed Dec 8, 2020
1 parent 4508fd1 commit 52e4117
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 14 deletions.
11 changes: 7 additions & 4 deletions kafka/common/pkg/kafka/consumer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (c *customConsumerGroup) Errors() <-chan error {

func (c *customConsumerGroup) Close() error {
c.cancel()
close(c.handlerErrorChannel)
return c.ConsumerGroup.Close()
}

Expand All @@ -59,18 +60,20 @@ func (c kafkaConsumerGroupFactoryImpl) StartConsumerGroup(groupID string, topics
return nil, err
}

consumerHandler := NewConsumerHandler(logger, handler)

errorCh := make(chan error, 10)
ctx, cancel := context.WithCancel(context.Background())

go func() {
for {
consumerHandler := NewConsumerHandler(logger, handler, errorCh)

err := consumerGroup.Consume(context.Background(), topics, &consumerHandler)

if err == sarama.ErrClosedConsumerGroup {
return
}
if err != nil {
consumerHandler.errors <- err
errorCh <- err
}

select {
Expand All @@ -81,7 +84,7 @@ func (c kafkaConsumerGroupFactoryImpl) StartConsumerGroup(groupID string, topics
}
}()

return &customConsumerGroup{cancel, consumerHandler.errors, consumerGroup}, err
return &customConsumerGroup{cancel, errorCh, consumerGroup}, err
}

func NewConsumerGroupFactory(addrs []string, config *sarama.Config) KafkaConsumerGroupFactory {
Expand Down
2 changes: 1 addition & 1 deletion kafka/common/pkg/kafka/consumer_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (m *mockConsumerGroup) Consume(ctx context.Context, topics []string, handle
m.generateErrorOnce.Do(func() {
h := handler.(*SaramaConsumerHandler)
h.errors <- errors.New("cgh")
_ = h.Cleanup(nil)
close(h.errors)
})
}()
}
Expand Down
15 changes: 7 additions & 8 deletions kafka/common/pkg/kafka/consumer_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package kafka
import (
"context"
"fmt"
"sync"

"github.com/Shopify/sarama"
"go.uber.org/zap"
Expand All @@ -38,29 +37,28 @@ type SaramaConsumerHandler struct {
handler KafkaConsumerHandler

logger *zap.SugaredLogger

// Errors channel
closeErrors sync.Once
errors chan error
errors chan error
}

func NewConsumerHandler(logger *zap.SugaredLogger, handler KafkaConsumerHandler) SaramaConsumerHandler {
func NewConsumerHandler(logger *zap.SugaredLogger, handler KafkaConsumerHandler, errorsCh chan error) SaramaConsumerHandler {
return SaramaConsumerHandler{
logger: logger,
handler: handler,
errors: make(chan error, 10), // Some buffering to avoid blocking the message processing
errors: errorsCh,
}
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *SaramaConsumerHandler) Setup(sarama.ConsumerGroupSession) error {
consumer.logger.Info("setting up handler")
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *SaramaConsumerHandler) Cleanup(session sarama.ConsumerGroupSession) error {
consumer.closeErrors.Do(func() {
close(consumer.errors)
})
consumer.logger.Info("cleanup handler")
return nil
}

Expand All @@ -85,6 +83,7 @@ func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroup
consumer.logger.Infow("Failure while handling a message", zap.String("topic", message.Topic), zap.Int32("partition", message.Partition), zap.Int64("offset", message.Offset), zap.Error(err))
consumer.errors <- err
}

if mustMark {
session.MarkMessage(message, "") // Mark kafka message as processed
if ce := consumer.logger.Desugar().Check(zap.DebugLevel, "debugging"); ce != nil {
Expand Down
4 changes: 3 additions & 1 deletion kafka/common/pkg/kafka/consumer_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ func Test(t *testing.T) {
}
for _, test := range tests {
t.Run(fmt.Sprintf("shouldErr: %v, shouldMark: %v", test.shouldErr, test.shouldMark), func(t *testing.T) {
cgh := NewConsumerHandler(zap.NewNop().Sugar(), test)
errorCh := make(chan error, 1)
cgh := NewConsumerHandler(zap.NewNop().Sugar(), test, errorCh)

session := mockConsumerGroupSession{}
claim := mockConsumerGroupClaim{msg: &mockMessage}
Expand All @@ -155,6 +156,7 @@ func Test(t *testing.T) {
}

_ = cgh.Cleanup(&session)
close(errorCh)

})
}
Expand Down

0 comments on commit 52e4117

Please sign in to comment.