Skip to content

Commit

Permalink
Merge pull request #485 from Shopify/dont-require-all-consumers-drained
Browse files Browse the repository at this point in the history
Dont require all consumers drained
  • Loading branch information
eapache committed Aug 4, 2015
2 parents 1002788 + 292f3b0 commit e1729d6
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 40 deletions.
10 changes: 9 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ type Config struct {
// Equivalent to the JVM's `fetch.wait.max.ms`.
MaxWaitTime time.Duration

// The maximum amount of time the consumer expects a message takes to process for the user. If writing to the Messages channel
// takes longer than this, that partition will stop fetching more messages until it can proceed again. Note that, since the
// Messages channel is buffered, the actual grace time is (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms.
MaxProcessingTime time.Duration

// Return specifies what channels will be populated. If they are set to true, you must read from
// them to prevent deadlock.
Return struct {
Expand Down Expand Up @@ -147,6 +152,7 @@ func NewConfig() *Config {
c.Consumer.Fetch.Default = 32768
c.Consumer.Retry.Backoff = 2 * time.Second
c.Consumer.MaxWaitTime = 250 * time.Millisecond
c.Consumer.MaxProcessingTime = 100 * time.Millisecond
c.Consumer.Return.Errors = false

c.ChannelBufferSize = 256
Expand Down Expand Up @@ -239,7 +245,9 @@ func (c *Config) Validate() error {
case c.Consumer.Fetch.Max < 0:
return ConfigurationError("Consumer.Fetch.Max must be >= 0")
case c.Consumer.MaxWaitTime < 1*time.Millisecond:
return ConfigurationError("Consumer.MaxWaitTime must be > 1ms")
return ConfigurationError("Consumer.MaxWaitTime must be >= 1ms")
case c.Consumer.MaxProcessingTime <= 0:
return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
case c.Consumer.Retry.Backoff < 0:
return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
}
Expand Down
104 changes: 65 additions & 39 deletions consumer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sarama

import (
"errors"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -271,13 +272,15 @@ type partitionConsumer struct {
feeder chan *FetchResponse

trigger, dying chan none
dispatchReason error
responseResult error

fetchSize int32
offset int64
highWaterMarkOffset int64
}

var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing

func (child *partitionConsumer) sendError(err error) {
cErr := &ConsumerError{
Topic: child.topic,
Expand Down Expand Up @@ -401,23 +404,24 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {
}

func (child *partitionConsumer) responseFeeder() {
var msgs []*ConsumerMessage

feederLoop:
for response := range child.feeder {
switch err := child.handleResponse(response); err {
case nil:
break
case ErrOffsetOutOfRange:
// there's no point in retrying this it will just fail the same way again
// so shut it down and force the user to choose what to do
Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, err)
child.sendError(err)
child.AsyncClose()
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
// these three are not fatal errors, but do require redispatching
child.dispatchReason = err
default:
// dunno, tell the user and try redispatching
child.sendError(err)
child.dispatchReason = err
msgs, child.responseResult = child.parseResponse(response)

for i, msg := range msgs {
select {
case child.messages <- msg:
case <-time.After(child.conf.Consumer.MaxProcessingTime):
child.responseResult = errTimedOut
child.broker.acks.Done()
for _, msg = range msgs[i:] {
child.messages <- msg
}
child.broker.input <- child
continue feederLoop
}
}

child.broker.acks.Done()
Expand All @@ -427,14 +431,14 @@ func (child *partitionConsumer) responseFeeder() {
close(child.errors)
}

func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
block := response.GetBlock(child.topic, child.partition)
if block == nil {
return ErrIncompleteResponse
return nil, ErrIncompleteResponse
}

if block.Err != ErrNoError {
return block.Err
return nil, block.Err
}

if len(block.MsgSet.Messages) == 0 {
Expand All @@ -453,16 +457,16 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
}
}

return nil
return nil, nil
}

// we got messages, reset our fetch size in case it was increased for a previous request
child.fetchSize = child.conf.Consumer.Fetch.Default
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)

incomplete := false
atLeastOne := false
prelude := true
var messages []*ConsumerMessage
for _, msgBlock := range block.MsgSet.Messages {

for _, msg := range msgBlock.Messages() {
Expand All @@ -472,14 +476,13 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
prelude = false

if msg.Offset >= child.offset {
atLeastOne = true
child.messages <- &ConsumerMessage{
messages = append(messages, &ConsumerMessage{
Topic: child.topic,
Partition: child.partition,
Key: msg.Msg.Key,
Value: msg.Msg.Value,
Offset: msg.Offset,
}
})
child.offset = msg.Offset + 1
} else {
incomplete = true
Expand All @@ -488,10 +491,10 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {

}

if incomplete || !atLeastOne {
return ErrIncompleteResponse
if incomplete || len(messages) == 0 {
return nil, ErrIncompleteResponse
}
return nil
return messages, nil
}

// brokerConsumer
Expand Down Expand Up @@ -569,7 +572,10 @@ func (bc *brokerConsumer) subscriptionConsumer() {

// the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
for newSubscriptions := range bc.newSubscriptions {
bc.updateSubscriptionCache(newSubscriptions)
for _, child := range newSubscriptions {
bc.subscriptions[child] = none{}
Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
}

if len(bc.subscriptions) == 0 {
// We're about to be shut down or we're about to receive more subscriptions.
Expand All @@ -591,27 +597,47 @@ func (bc *brokerConsumer) subscriptionConsumer() {
child.feeder <- response
}
bc.acks.Wait()
bc.handleResponses()
}
}

func (bc *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionConsumer) {
// take new subscriptions, and abandon subscriptions that have been closed
for _, child := range newSubscriptions {
bc.subscriptions[child] = none{}
Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
}

func (bc *brokerConsumer) handleResponses() {
// handles the response codes left for us by our subscriptions, and abandons ones that have been closed
for child := range bc.subscriptions {
select {
case <-child.dying:
Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
close(child.trigger)
delete(bc.subscriptions, child)
default:
if child.dispatchReason != nil {
result := child.responseResult
child.responseResult = nil

switch result {
case nil:
break
case errTimedOut:
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
bc.broker.ID(), child.topic, child.partition)
delete(bc.subscriptions, child)
case ErrOffsetOutOfRange:
// there's no point in retrying this it will just fail the same way again
// shut it down and force the user to choose what to do
child.sendError(result)
Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
close(child.trigger)
delete(bc.subscriptions, child)
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
// not an error, but does need redispatching
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
bc.broker.ID(), child.topic, child.partition, result)
child.trigger <- none{}
delete(bc.subscriptions, child)
default:
// dunno, tell the user and try redispatching
child.sendError(result)
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
bc.broker.ID(), child.topic, child.partition, child.dispatchReason)
child.dispatchReason = nil
bc.broker.ID(), child.topic, child.partition, result)
child.trigger <- none{}
delete(bc.subscriptions, child)
}
Expand Down
1 change: 1 addition & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ func TestConsumerInterleavedClose(t *testing.T) {
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
leader.Returns(fetchResponse)
leader.Returns(fetchResponse)

safeClose(t, c1)
safeClose(t, c0)
Expand Down

0 comments on commit e1729d6

Please sign in to comment.