Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Fixed Kafka disconnection issue #1179

Merged
merged 3 commits into from
Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions kafka/channel/pkg/dispatcher/dispatcher_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,20 @@ func TestDispatcher(t *testing.T) {
deadLetterWg.Wait()
transformationsWg.Wait()
receiverWg.Wait()

// Try to close consumer groups
err = dispatcher.UpdateHostToChannelMap(&multichannelfanout.Config{})
if err != nil {
t.Fatal(err)
}

failed, err = dispatcher.UpdateKafkaConsumers(&multichannelfanout.Config{})
if err != nil {
t.Fatal(err)
}
if len(failed) != 0 {
t.Fatal(err)
}
}

func createReverseProxy(t *testing.T, host string) *httputil.ReverseProxy {
Expand Down
26 changes: 21 additions & 5 deletions kafka/common/pkg/kafka/consumer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ type kafkaConsumerGroupFactoryImpl struct {
}

type customConsumerGroup struct {
closeFn context.CancelFunc
handlerErrorChannel chan error
sarama.ConsumerGroup
}

// Merge handler errors chan and consumer group error chan
func (c customConsumerGroup) Errors() <-chan error {
func (c *customConsumerGroup) Errors() <-chan error {
errors := make(chan error, 10)
var wg sync.WaitGroup
wg.Add(2)
Expand All @@ -65,6 +66,11 @@ func (c customConsumerGroup) Errors() <-chan error {
return errors
}

func (c *customConsumerGroup) Close() error {
c.closeFn()
return c.ConsumerGroup.Close()
}

var _ sarama.ConsumerGroup = (*customConsumerGroup)(nil)

func (c kafkaConsumerGroupFactoryImpl) StartConsumerGroup(groupID string, topics []string, logger *zap.Logger, handler KafkaConsumerHandler) (sarama.ConsumerGroup, error) {
Expand All @@ -76,14 +82,24 @@ func (c kafkaConsumerGroupFactoryImpl) StartConsumerGroup(groupID string, topics

consumerHandler := NewConsumerHandler(logger, handler)

ctx, cancelFn := context.WithCancel(context.Background())

go func() {
err2 := consumerGroup.Consume(context.TODO(), topics, &consumerHandler)
if err2 != nil {
consumerHandler.errors <- err2
for {
if err2 := consumerGroup.Consume(context.Background(), topics, &consumerHandler); err2 != nil {
consumerHandler.errors <- err2
}
// Let's check if Consume stopped because of closing
select {
case <-ctx.Done():
return
default:
continue
}
}
}()

return customConsumerGroup{consumerHandler.errors, consumerGroup}, err
return &customConsumerGroup{cancelFn, consumerHandler.errors, consumerGroup}, err
}

func NewConsumerGroupFactory(client sarama.Client) KafkaConsumerGroupFactory {
Expand Down
19 changes: 12 additions & 7 deletions kafka/common/pkg/kafka/consumer_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kafka
import (
"context"
"errors"
"sync"
"testing"

"github.com/Shopify/sarama"
Expand All @@ -32,14 +33,17 @@ type mockConsumerGroup struct {
mustGenerateConsumerGroupError bool
mustGenerateHandlerError bool
consumeMustReturnError bool
generateErrorOnce sync.Once
}

func (m mockConsumerGroup) Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error {
func (m *mockConsumerGroup) Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error {
if m.mustGenerateHandlerError {
go func() {
h := handler.(*saramaConsumerHandler)
h.errors <- errors.New("cgh")
_ = h.Cleanup(nil)
m.generateErrorOnce.Do(func() {
h := handler.(*saramaConsumerHandler)
h.errors <- errors.New("cgh")
_ = h.Cleanup(nil)
})
}()
}
if m.consumeMustReturnError {
Expand All @@ -48,7 +52,7 @@ func (m mockConsumerGroup) Consume(ctx context.Context, topics []string, handler
return nil
}

func (m mockConsumerGroup) Errors() <-chan error {
func (m *mockConsumerGroup) Errors() <-chan error {
ch := make(chan error)
go func() {
if m.mustGenerateConsumerGroupError {
Expand All @@ -59,18 +63,19 @@ func (m mockConsumerGroup) Errors() <-chan error {
return ch
}

func (m mockConsumerGroup) Close() error {
func (m *mockConsumerGroup) Close() error {
return nil
}

func mockedNewConsumerGroupFromClient(mockInputMessageCh chan *sarama.ConsumerMessage, mustGenerateConsumerGroupError bool, mustGenerateHandlerError bool, consumeMustReturnError bool, mustFail bool) func(groupID string, client sarama.Client) (group sarama.ConsumerGroup, e error) {
if !mustFail {
return func(groupID string, client sarama.Client) (group sarama.ConsumerGroup, e error) {
return mockConsumerGroup{
return &mockConsumerGroup{
mockInputMessageCh: mockInputMessageCh,
mustGenerateConsumerGroupError: mustGenerateConsumerGroupError,
mustGenerateHandlerError: mustGenerateHandlerError,
consumeMustReturnError: consumeMustReturnError,
generateErrorOnce: sync.Once{},
}, nil
}
} else {
Expand Down