Skip to content

Commit

Permalink
Merge pull request #2109 from Shopify/dnwe/subscription-starvation
Browse files Browse the repository at this point in the history
fix: avoid starvation in subscriptionManager
  • Loading branch information
dnwe authored Feb 25, 2022
2 parents 4b9b976 + dadcd80 commit 0ad6651
Showing 1 changed file with 39 additions and 21 deletions.
60 changes: 39 additions & 21 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
broker: broker,
input: make(chan *partitionConsumer),
newSubscriptions: make(chan []*partitionConsumer),
wait: make(chan none),
wait: make(chan none, 1),
subscriptions: make(map[*partitionConsumer]none),
refs: 0,
}
Expand All @@ -878,36 +878,54 @@ func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
// so the main goroutine can block waiting for work if it has none.
func (bc *brokerConsumer) subscriptionManager() {
var buffer []*partitionConsumer
var partitionConsumers []*partitionConsumer

for {
if len(buffer) > 0 {
select {
case event, ok := <-bc.input:
if !ok {
goto done
}
buffer = append(buffer, event)
case bc.newSubscriptions <- buffer:
buffer = nil
case bc.wait <- none{}:
// check for any partition consumer asking to subscribe if there aren't
// any, trigger the network request by sending "nil" to the
// newSubscriptions channel
select {
case pc, ok := <-bc.input:
if !ok {
goto done
}
} else {
select {
case event, ok := <-bc.input:
if !ok {
goto done

// add to list of subscribing consumers
partitionConsumers = append(partitionConsumers, pc)

// wait up to 250ms to drain input of any further incoming
// subscriptions
for batchComplete := false; !batchComplete; {
select {
case pc, ok := <-bc.input:
if !ok {
goto done
}

partitionConsumers = append(partitionConsumers, pc)
case <-time.After(250 * time.Millisecond):
batchComplete = true
}
buffer = append(buffer, event)
case bc.newSubscriptions <- nil:
}

Logger.Printf(
"consumer/broker/%d accumulated %d new subscriptions\n",
bc.broker.ID(), len(partitionConsumers))

bc.wait <- none{}
bc.newSubscriptions <- partitionConsumers

// clear out the batch
partitionConsumers = nil

case bc.newSubscriptions <- nil:
}
}

done:
close(bc.wait)
if len(buffer) > 0 {
bc.newSubscriptions <- buffer
if len(partitionConsumers) > 0 {
bc.newSubscriptions <- partitionConsumers
}
close(bc.newSubscriptions)
}
Expand Down

0 comments on commit 0ad6651

Please sign in to comment.