Skip to content

Commit

Permalink
Merge pull request #933 from wmille/timer_performance
Browse files Browse the repository at this point in the history
Add an optional fast checker to (*partitionConsumer).responseFeeder
  • Loading branch information
eapache authored Sep 12, 2017
2 parents c20cf33 + 9112d76 commit 298679c
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 21 deletions.
18 changes: 15 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,23 @@ type Config struct {
// Equivalent to the JVM's `fetch.wait.max.ms`.
MaxWaitTime time.Duration

// The maximum amount of time the consumer expects a message takes to process
// for the user. If writing to the Messages channel takes longer than this,
// that partition will stop fetching more messages until it can proceed again.
// The maximum amount of time the consumer expects a message takes to
// process for the user. If writing to the Messages channel takes longer
// than this, that partition will stop fetching more messages until it
// can proceed again.
// Note that, since the Messages channel is buffered, the actual grace time is
// (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms.
// If a message is not written to the Messages channel between two ticks
// of the expiryTicker then a timeout is detected.
// Using a ticker instead of a timer to detect timeouts should typically
// result in many fewer calls to Timer functions which may result in a
// significant performance improvement if many messages are being sent
// and timeouts are infrequent.
// The disadvantage of using a ticker instead of a timer is that
// timeouts will be less accurate. That is, the effective timeout could
// be between `MaxProcessingTime` and `2 * MaxProcessingTime`. For
// example, if `MaxProcessingTime` is 100ms then a delay of 180ms
// between two messages being sent may not be recognized as a timeout.
MaxProcessingTime time.Duration

// Return specifies what channels will be populated. If they are set to true,
Expand Down
36 changes: 19 additions & 17 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,35 +440,37 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {

func (child *partitionConsumer) responseFeeder() {
var msgs []*ConsumerMessage
expiryTimer := time.NewTimer(child.conf.Consumer.MaxProcessingTime)
expireTimedOut := false
msgSent := false

feederLoop:
for response := range child.feeder {
msgs, child.responseResult = child.parseResponse(response)
expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)

for i, msg := range msgs {
if !expiryTimer.Stop() && !expireTimedOut {
// expiryTimer was expired; clear out the waiting msg
<-expiryTimer.C
}
expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime)
expireTimedOut = false

messageSelect:
select {
case child.messages <- msg:
case <-expiryTimer.C:
expireTimedOut = true
child.responseResult = errTimedOut
child.broker.acks.Done()
for _, msg = range msgs[i:] {
child.messages <- msg
msgSent = true
case <-expiryTicker.C:
if !msgSent {
child.responseResult = errTimedOut
child.broker.acks.Done()
for _, msg = range msgs[i:] {
child.messages <- msg
}
child.broker.input <- child
continue feederLoop
} else {
// current message has not been sent, return to select
// statement
msgSent = false
goto messageSelect
}
child.broker.input <- child
continue feederLoop
}
}

expiryTicker.Stop()
child.broker.acks.Done()
}

Expand Down
42 changes: 42 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,48 @@ func TestConsumerOffsetOutOfRange(t *testing.T) {
broker0.Close()
}

func TestConsumerExpiryTicker(t *testing.T) {
// Given
broker0 := NewMockBroker(t, 0)
fetchResponse1 := &FetchResponse{}
for i := 1; i <= 8; i++ {
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i))
}
broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 1),
"FetchRequest": NewMockSequence(fetchResponse1),
})

config := NewConfig()
config.ChannelBufferSize = 0
config.Consumer.MaxProcessingTime = 10 * time.Millisecond
master, err := NewConsumer([]string{broker0.Addr()}, config)
if err != nil {
t.Fatal(err)
}

// When
consumer, err := master.ConsumePartition("my_topic", 0, 1)
if err != nil {
t.Fatal(err)
}

// Then: messages with offsets 1 through 8 are read
for i := 1; i <= 8; i++ {
assertMessageOffset(t, <-consumer.Messages(), int64(i))
time.Sleep(2 * time.Millisecond)
}

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
}

func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
if msg.Offset != expectedOffset {
t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)
Expand Down
2 changes: 1 addition & 1 deletion message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestMessageEncoding(t *testing.T) {

message.Value = []byte{}
message.Codec = CompressionGZIP
if runtime.Version() == "go1.8" {
if runtime.Version() == "go1.8" || runtime.Version() == "go1.8.1" {
testEncodable(t, "empty gzip", &message, emptyGzipMessage18)
} else {
testEncodable(t, "empty gzip", &message, emptyGzipMessage)
Expand Down

0 comments on commit 298679c

Please sign in to comment.