Skip to content

Commit

Permalink
Merge pull request #2194 from niamster/fix-consumer-subscription-manager
Browse files Browse the repository at this point in the history
fix: prevent deadlock between subscription manager and consumer goroutines
  • Loading branch information
dnwe authored Apr 13, 2022
2 parents 9904b37 + 9a9a211 commit 8f8d8da
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 45 deletions.
76 changes: 31 additions & 45 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ type Consumer interface {
ResumeAll()
}

// max time to wait for more partition subscriptions
const partitionConsumersBatchTimeout = 100 * time.Millisecond

type consumer struct {
conf *Config
children map[string]map[int32]*partitionConsumer
Expand Down Expand Up @@ -850,7 +853,6 @@ type brokerConsumer struct {
input chan *partitionConsumer
newSubscriptions chan []*partitionConsumer
subscriptions map[*partitionConsumer]none
wait chan none
acks sync.WaitGroup
refs int
}
Expand All @@ -861,7 +863,6 @@ func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
broker: broker,
input: make(chan *partitionConsumer),
newSubscriptions: make(chan []*partitionConsumer),
wait: make(chan none, 1),
subscriptions: make(map[*partitionConsumer]none),
refs: 0,
}
Expand All @@ -875,72 +876,56 @@ func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
// up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
// so the main goroutine can block waiting for work if it has none.
// it nil if no new subscriptions are available.
func (bc *brokerConsumer) subscriptionManager() {
var partitionConsumers []*partitionConsumer
defer close(bc.newSubscriptions)

for {
// check for any partition consumer asking to subscribe if there aren't
// any, trigger the network request by sending "nil" to the
var partitionConsumers []*partitionConsumer

// Check for any partition consumer asking to subscribe if there aren't
// any, trigger the network request (to fetch Kafka messages) by sending "nil" to the
// newSubscriptions channel
select {
case pc, ok := <-bc.input:
if !ok {
goto done
return
}

// add to list of subscribing consumers
partitionConsumers = append(partitionConsumers, pc)
case bc.newSubscriptions <- nil:
continue
}

// wait up to 250ms to drain input of any further incoming
// subscriptions
for batchComplete := false; !batchComplete; {
select {
case pc, ok := <-bc.input:
if !ok {
goto done
}

partitionConsumers = append(partitionConsumers, pc)
case <-time.After(250 * time.Millisecond):
batchComplete = true
}
// drain input of any further incoming subscriptions
timer := time.NewTimer(partitionConsumersBatchTimeout)
for batchComplete := false; !batchComplete; {
select {
case pc := <-bc.input:
partitionConsumers = append(partitionConsumers, pc)
case <-timer.C:
batchComplete = true
}

Logger.Printf(
"consumer/broker/%d accumulated %d new subscriptions\n",
bc.broker.ID(), len(partitionConsumers))

bc.wait <- none{}
bc.newSubscriptions <- partitionConsumers

// clear out the batch
partitionConsumers = nil

case bc.newSubscriptions <- nil:
}
}
timer.Stop()

Logger.Printf(
"consumer/broker/%d accumulated %d new subscriptions\n",
bc.broker.ID(), len(partitionConsumers))

done:
close(bc.wait)
if len(partitionConsumers) > 0 {
bc.newSubscriptions <- partitionConsumers
}
close(bc.newSubscriptions)
}

// subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
// this is a the main loop that fetches Kafka messages
func (bc *brokerConsumer) subscriptionConsumer() {
<-bc.wait // wait for our first piece of work

for newSubscriptions := range bc.newSubscriptions {
bc.updateSubscriptions(newSubscriptions)

if len(bc.subscriptions) == 0 {
// We're about to be shut down or we're about to receive more subscriptions.
// Either way, the signal just hasn't propagated to our goroutine yet.
<-bc.wait
// Take a small nap to avoid burning the CPU.
time.Sleep(partitionConsumersBatchTimeout)
continue
}

Expand Down Expand Up @@ -1040,7 +1025,8 @@ func (bc *brokerConsumer) abort(err error) {

for newSubscriptions := range bc.newSubscriptions {
if len(newSubscriptions) == 0 {
<-bc.wait
// Take a small nap to avoid burning the CPU.
time.Sleep(partitionConsumersBatchTimeout)
continue
}
for _, child := range newSubscriptions {
Expand Down
126 changes: 126 additions & 0 deletions functional_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
package sarama

import (
"context"
"errors"
"fmt"
"math"
"os"
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -282,6 +284,130 @@ func TestReadOnlyAndAllCommittedMessages(t *testing.T) {
}
}

func TestConsumerGroupDeadlock(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

require := require.New(t)

const topic = "test_consumer_group_rebalance_test_topic"
const msgQty = 50
partitionsQty := len(FunctionalTestEnv.KafkaBrokerAddrs) * 3

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)

config := NewConfig()
config.ClientID = t.Name()
config.Producer.Return.Successes = true
config.ChannelBufferSize = 2 * msgQty

client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
require.NoError(err)

admin, err := NewClusterAdminFromClient(client)
require.NoError(err)

cgName := "test_consumer_group_rebalance_consumer_group"

err = admin.DeleteConsumerGroup(cgName)
if err != nil {
t.Logf("failed to delete topic: %s", err)
}

err = admin.DeleteTopic(topic)
if err != nil {
t.Logf("failed to delete topic: %s", err)
}

// it takes time to delete topic, the API is not sync
for i := 0; i < 5; i++ {
err = admin.CreateTopic(topic, &TopicDetail{NumPartitions: int32(partitionsQty), ReplicationFactor: 1}, false)
if err == nil {
break
}
if errors.Is(err, ErrTopicAlreadyExists) || strings.Contains(err.Error(), "is marked for deletion") {
time.Sleep(500 * time.Millisecond)
continue
}
break
}
require.NoError(err)
defer func() {
_ = admin.DeleteTopic(topic)
}()

var wg sync.WaitGroup

consumer, err := NewConsumerFromClient(client)
require.NoError(err)

ch := make(chan string, msgQty)
for i := 0; i < partitionsQty; i++ {
time.Sleep(250 * time.Millisecond) // ensure delays between the "claims"
wg.Add(1)
go func(i int) {
defer wg.Done()

pConsumer, err := consumer.ConsumePartition(topic, int32(i), OffsetOldest)
require.NoError(err)
defer pConsumer.Close()

for {
select {
case <-ctx.Done():
return
case msg, ok := <-pConsumer.Messages():
if !ok {
return
}
// t.Logf("consumer-group %d consumed: %v from %s/%d/%d", i, msg.Value, msg.Topic, msg.Partition, msg.Offset)
ch <- string(msg.Value)
}
}
}(i)
}

producer, err := NewSyncProducerFromClient(client)
require.NoError(err)

for i := 0; i < msgQty; i++ {
msg := &ProducerMessage{
Topic: topic,
Value: StringEncoder(strconv.FormatInt(int64(i), 10)),
}
_, _, err := producer.SendMessage(msg)
require.NoError(err)
}

var received []string
func() {
for len(received) < msgQty {
select {
case <-ctx.Done():
return
case msg := <-ch:
received = append(received, msg)
// t.Logf("received: %s, count: %d", msg, len(received))
}
}
}()

cancel()

require.Equal(msgQty, len(received))

err = producer.Close()
require.NoError(err)

err = consumer.Close()
require.NoError(err)

err = client.Close()
require.NoError(err)

wg.Wait()
}

func prodMsg2Str(prodMsg *ProducerMessage) string {
return fmt.Sprintf("{offset: %d, value: %s}", prodMsg.Offset, string(prodMsg.Value.(StringEncoder)))
}
Expand Down

0 comments on commit 8f8d8da

Please sign in to comment.