Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Fix/mitigate data race in kafka source #1266

Merged
merged 1 commit into from
May 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 21 additions & 23 deletions kafka/common/pkg/kafka/consumer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,28 +43,7 @@ type customConsumerGroup struct {

// Merge handler errors chan and consumer group error chan
func (c *customConsumerGroup) Errors() <-chan error {
errors := make(chan error, 10)
var wg sync.WaitGroup
wg.Add(2)
go func() {
for e := range c.ConsumerGroup.Errors() {
errors <- e
}
wg.Done()
}()
go func() {
for e := range c.handlerErrorChannel {
errors <- e
}
wg.Done()
}()

// Synchronization routine to close the error channel
go func() {
wg.Wait()
close(errors)
}()
return errors
return mergeErrorChannels(c.ConsumerGroup.Errors(), c.handlerErrorChannel)
}

func (c *customConsumerGroup) Close() error {
Expand All @@ -86,7 +65,7 @@ func (c kafkaConsumerGroupFactoryImpl) StartConsumerGroup(groupID string, topics

go func() {
for {
err := consumerGroup.Consume(ctx, topics, &consumerHandler)
err := consumerGroup.Consume(context.Background(), topics, &consumerHandler)
if err == sarama.ErrClosedConsumerGroup {
return
}
Expand All @@ -110,3 +89,22 @@ func NewConsumerGroupFactory(addrs []string, config *sarama.Config) KafkaConsume
}

var _ KafkaConsumerGroupFactory = (*kafkaConsumerGroupFactoryImpl)(nil)

func mergeErrorChannels(channels ...<-chan error) <-chan error {
out := make(chan error)
var wg sync.WaitGroup
wg.Add(len(channels))
for _, channel := range channels {
go func(c <-chan error) {
for v := range c {
out <- v
}
wg.Done()
}(channel)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
2 changes: 1 addition & 1 deletion kafka/common/pkg/kafka/consumer_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (m *mockConsumerGroup) Consume(ctx context.Context, topics []string, handle
if m.mustGenerateHandlerError {
go func() {
m.generateErrorOnce.Do(func() {
h := handler.(*saramaConsumerHandler)
h := handler.(*SaramaConsumerHandler)
h.errors <- errors.New("cgh")
_ = h.Cleanup(nil)
})
Expand Down
24 changes: 14 additions & 10 deletions kafka/common/pkg/kafka/consumer_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kafka
import (
"context"
"fmt"
"sync"

"github.com/Shopify/sarama"
"go.uber.org/zap"
Expand All @@ -31,37 +32,40 @@ type KafkaConsumerHandler interface {
}

// ConsumerHandler implements sarama.ConsumerGroupHandler and provides some glue code to simplify message handling
// You must implement KafkaConsumerHandler and create a new saramaConsumerHandler with it
type saramaConsumerHandler struct {
// You must implement KafkaConsumerHandler and create a new SaramaConsumerHandler with it
type SaramaConsumerHandler struct {
// The user message handler
handler KafkaConsumerHandler

logger *zap.Logger
// Errors channel
errors chan error
closeErrors sync.Once
errors chan error
}

func NewConsumerHandler(logger *zap.Logger, handler KafkaConsumerHandler) saramaConsumerHandler {
return saramaConsumerHandler{
func NewConsumerHandler(logger *zap.Logger, handler KafkaConsumerHandler) SaramaConsumerHandler {
return SaramaConsumerHandler{
logger: logger,
handler: handler,
errors: make(chan error, 10), // Some buffering to avoid blocking the message processing
}
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *saramaConsumerHandler) Setup(sarama.ConsumerGroupSession) error {
func (consumer *SaramaConsumerHandler) Setup(sarama.ConsumerGroupSession) error {
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *saramaConsumerHandler) Cleanup(session sarama.ConsumerGroupSession) error {
close(consumer.errors)
func (consumer *SaramaConsumerHandler) Cleanup(session sarama.ConsumerGroupSession) error {
consumer.closeErrors.Do(func() {
close(consumer.errors)
})
return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *saramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
consumer.logger.Info(fmt.Sprintf("Starting partition consumer, topic: %s, partition: %d, initialOffset: %d", claim.Topic(), claim.Partition(), claim.InitialOffset()))

// NOTE:
Expand Down Expand Up @@ -92,4 +96,4 @@ func (consumer *saramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroup
return nil
}

var _ sarama.ConsumerGroupHandler = (*saramaConsumerHandler)(nil)
var _ sarama.ConsumerGroupHandler = (*SaramaConsumerHandler)(nil)