Skip to content

Commit

Permalink
Optionally use a ticker instead of a timer to detect timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
wmille committed Aug 22, 2017
1 parent 7bbb175 commit e964019
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 51 deletions.
22 changes: 9 additions & 13 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,22 +197,20 @@ type Config struct {
// (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms.
MaxProcessingTime time.Duration

// The time interval between ticks of the fast checker. A value of 0
// turns off the fast checker.
// If this is set to a non-zero value, then there will be periodic
// checks to see if messages have been written to the Messages channel.
// If a message has not been written to the Messages channel since the
// last tick of the fast checker, then the timer will be set.
// Whether or not to use the fast checker. The fast checker uses a
// ticker instead of a timer to implement the timeout functionality in
// (*partitionConsumer).responseFeeder.
// If a message is not written to the Messages channel between two ticks
// of the fast checker then a timeout is detected.
// Using the fast checker should typically result in many fewer calls to
// Timer functions resulting in a significant performance improvement if
// many messages are being sent and timeouts are infrequent.
// The disadvantage of using the fast checker is that timeouts will be
// less accurate. That is, the effective timeout could be between
// `MaxProcessingTime` and `MaxProcessingTime + FastCheckerInterval`.
// For example, if `MaxProcessingTime` is 100ms and
// `FastCheckerInterval` is 10ms, then a delay of 108ms between two
// `MaxProcessingTime` and `2 * MaxProcessingTime`. For example, if
// `MaxProcessingTime` is 100ms then a delay of 180ms between two
// messages being sent may not be recognized as a timeout.
FastCheckerInterval time.Duration
UseFastChecker bool

// Return specifies what channels will be populated. If they are set to true,
// you must read from them to prevent deadlock.
Expand Down Expand Up @@ -294,7 +292,7 @@ func NewConfig() *Config {
c.Consumer.Retry.Backoff = 2 * time.Second
c.Consumer.MaxWaitTime = 250 * time.Millisecond
c.Consumer.MaxProcessingTime = 100 * time.Millisecond
c.Consumer.FastCheckerInterval = 0
c.Consumer.UseFastChecker = false
c.Consumer.Return.Errors = false
c.Consumer.Offsets.CommitInterval = 1 * time.Second
c.Consumer.Offsets.Initial = OffsetNewest
Expand Down Expand Up @@ -420,8 +418,6 @@ func (c *Config) Validate() error {
return ConfigurationError("Consumer.MaxWaitTime must be >= 1ms")
case c.Consumer.MaxProcessingTime <= 0:
return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
case c.Consumer.FastCheckerInterval < 0:
return ConfigurationError("Consumer.FastCheckerInterval must be >= 0")
case c.Consumer.Retry.Backoff < 0:
return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
case c.Consumer.Offsets.CommitInterval <= 0:
Expand Down
62 changes: 26 additions & 36 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,61 +444,51 @@ func (child *partitionConsumer) responseFeeder() {
// Initialize timer without a pending send on its channel
expiryTimer := time.NewTimer(0)
<-expiryTimer.C
expiryTimerSet := false
expireTimedOut := true

var fastCheckerChan <-chan (time.Time)
if child.conf.Consumer.FastCheckerInterval > 0 {
fastChecker := time.NewTicker(child.conf.Consumer.FastCheckerInterval)
var timerChan <-chan (time.Time)
if child.conf.Consumer.UseFastChecker {
fastChecker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)
defer fastChecker.Stop()
fastCheckerChan = fastChecker.C
timerChan = fastChecker.C
} else {
timerChan = expiryTimer.C
}

feederLoop:
for response := range child.feeder {
msgs, child.responseResult = child.parseResponse(response)

for i, msg := range msgs {
if child.conf.Consumer.FastCheckerInterval <= 0 {
expiryTimerSet = true
if !child.conf.Consumer.UseFastChecker {
if !expiryTimer.Stop() && !expireTimedOut {
// expiryTimer was expired; clear out the waiting msg
<-expiryTimer.C
}
expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime)
expireTimedOut = false
}

messageSelect:
select {
case child.messages <- msg:
msgSent = true
if expiryTimerSet {
// The timer was set and a message was sent, stop the
// timer and resume using the fast checker
if !expiryTimer.Stop() {
<-expiryTimer.C
case <-timerChan:
if !child.conf.Consumer.UseFastChecker || !msgSent {
expireTimedOut = true
child.responseResult = errTimedOut
child.broker.acks.Done()
for _, msg = range msgs[i:] {
child.messages <- msg
}
expiryTimerSet = false
}
// Periodically check if messages have been sent
case <-fastCheckerChan:
if msgSent {
child.broker.input <- child
continue feederLoop
} else {
// current message has not been sent, return to select
// statement
msgSent = false
} else if !expiryTimerSet {
// No messages have been sent since the last tick,
// start the timer
expiryTimerSet = true
// If the fast checker is being used, then at least
// the time between two fast checker ticks has already
// passed since the last message was sent.
expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime - child.conf.Consumer.FastCheckerInterval)
}
// message has not been sent, return to select statement
goto messageSelect
case <-expiryTimer.C:
expiryTimerSet = false
child.responseResult = errTimedOut
child.broker.acks.Done()
for _, msg = range msgs[i:] {
child.messages <- msg
goto messageSelect
}
child.broker.input <- child
continue feederLoop
}
}

Expand Down
4 changes: 2 additions & 2 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ func TestConsumerFastCheckerOff(t *testing.T) {

config := NewConfig()
config.ChannelBufferSize = 0
config.Consumer.FastCheckerInterval = 0
config.Consumer.UseFastChecker = false
config.Consumer.MaxProcessingTime = 10 * time.Millisecond
master, err := NewConsumer([]string{broker0.Addr()}, config)
if err != nil {
Expand Down Expand Up @@ -865,7 +865,7 @@ func TestConsumerFastCheckerOn(t *testing.T) {

config := NewConfig()
config.ChannelBufferSize = 0
config.Consumer.FastCheckerInterval = 1 * time.Millisecond
config.Consumer.UseFastChecker = true
config.Consumer.MaxProcessingTime = 10 * time.Millisecond
master, err := NewConsumer([]string{broker0.Addr()}, config)
if err != nil {
Expand Down

0 comments on commit e964019

Please sign in to comment.