From 733c74f30dedcabeb062f1c01d115c790a7859ad Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Fri, 9 Oct 2015 10:07:11 -0400 Subject: [PATCH] Refactor the producer, part 2 Second stand-alone chunk extracted from #544, (first chunk: #549). This uses the `produceSet` struct in the aggregator as well, and moves the `wouldOverflow` and `readyToFlush` methods to methods on the `produceSet`. Knock-on effects: - now that we do per-partition size tracking in the aggregator we can do much more precise overflow checking (see the compressed-message-batch-size-limit case in `wouldOverflow` which has changed) which will be more efficient in high-volume scenarios - since the produceSet encodes immediately, messages which fail to encode are now rejected from the aggregator and don't count towards batch size - we still have to iterate the messages in the flusher in order to reject those which need retrying due to the state machine; for simplicity I add them to a second produceSet still, which means all messages get encoded twice; this is a definite major performance regression which will go away again in part 3 of this refactor --- async_producer.go | 146 ++++++++++++++++++++++------------------- async_producer_test.go | 4 +- 2 files changed, 80 insertions(+), 70 deletions(-) diff --git a/async_producer.go b/async_producer.go index 3fbe4265b..6a5bfa0d5 100644 --- a/async_producer.go +++ b/async_producer.go @@ -508,13 +508,14 @@ func (pp *partitionProducer) updateLeader() error { // one per broker, constructs both an aggregator and a flusher func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage { input := make(chan *ProducerMessage) - bridge := make(chan []*ProducerMessage) + bridge := make(chan *produceSet) a := &aggregator{ parent: p, broker: broker, input: input, output: bridge, + buffer: newProduceSet(p), } go withRecover(a.run) @@ -535,15 +536,14 @@ type aggregator struct { parent *asyncProducer broker *Broker input <-chan *ProducerMessage - output chan<- []*ProducerMessage + output chan<- *produceSet - buffer []*ProducerMessage - bufferBytes int - timer <-chan time.Time + buffer *produceSet + timer <-chan time.Time } func (a *aggregator) run() { - var output chan<- []*ProducerMessage + var output chan<- *produceSet for { select { @@ -552,17 +552,19 @@ func (a *aggregator) run() { goto shutdown } - if a.wouldOverflow(msg) { + if a.buffer.wouldOverflow(msg) { Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID()) a.output <- a.buffer a.reset() output = nil } - a.buffer = append(a.buffer, msg) - a.bufferBytes += msg.byteSize() + if err := a.buffer.add(msg); err != nil { + a.parent.returnError(msg, err) + continue + } - if a.readyToFlush(msg) { + if a.buffer.readyToFlush(msg) { output = a.output } else if a.parent.conf.Producer.Flush.Frequency > 0 && a.timer == nil { a.timer = time.After(a.parent.conf.Producer.Flush.Frequency) @@ -576,58 +578,22 @@ func (a *aggregator) run() { } shutdown: - if len(a.buffer) > 0 { + if !a.buffer.empty() { a.output <- a.buffer } close(a.output) } -func (a *aggregator) wouldOverflow(msg *ProducerMessage) bool { - switch { - // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety. - case a.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)): - return true - // Would we overflow the size-limit of a compressed message-batch? - case a.parent.conf.Producer.Compression != CompressionNone && a.bufferBytes+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes: - return true - // Would we overflow simply in number of messages? - case a.parent.conf.Producer.Flush.MaxMessages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.MaxMessages: - return true - default: - return false - } -} - -func (a *aggregator) readyToFlush(msg *ProducerMessage) bool { - switch { - // If all three config values are 0, we always flush as-fast-as-possible - case a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0: - return true - // If the messages is a chaser we must flush to maintain the state-machine - case msg.flags&chaser == chaser: - return true - // If we've passed the message trigger-point - case a.parent.conf.Producer.Flush.Messages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.Messages: - return true - // If we've passed the byte trigger-point - case a.parent.conf.Producer.Flush.Bytes > 0 && a.bufferBytes >= a.parent.conf.Producer.Flush.Bytes: - return true - default: - return false - } -} - func (a *aggregator) reset() { a.timer = nil - a.buffer = nil - a.bufferBytes = 0 + a.buffer = newProduceSet(a.parent) } // takes a batch at a time from the aggregator and sends to the broker type flusher struct { parent *asyncProducer broker *Broker - input <-chan []*ProducerMessage + input <-chan *produceSet currentRetries map[string]map[int32]error } @@ -639,11 +605,13 @@ func (f *flusher) run() { for batch := range f.input { if closing != nil { - f.parent.retryMessages(batch, closing) + batch.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + f.parent.retryMessages(msgs, closing) + }) continue } - set := f.groupAndFilter(batch) + set := f.filter(batch) if set.empty() { continue } @@ -683,30 +651,32 @@ func (f *flusher) run() { Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID()) } -func (f *flusher) groupAndFilter(batch []*ProducerMessage) *produceSet { +func (f *flusher) filter(batch *produceSet) *produceSet { set := newProduceSet(f.parent) - for _, msg := range batch { + batch.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + for _, msg := range msgs { - if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil { - // we're currently retrying this partition so we need to filter out this message - f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition]) + if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil { + // we're currently retrying this partition so we need to filter out this message + f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition]) - if msg.flags&chaser == chaser { - // ...but now we can start processing future messages again - Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n", - f.broker.ID(), msg.Topic, msg.Partition) - delete(f.currentRetries[msg.Topic], msg.Partition) - } + if msg.flags&chaser == chaser { + // ...but now we can start processing future messages again + Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n", + f.broker.ID(), msg.Topic, msg.Partition) + delete(f.currentRetries[msg.Topic], msg.Partition) + } - continue - } + continue + } - if err := set.add(msg); err != nil { - f.parent.returnError(msg, err) - continue + if err := set.add(msg); err != nil { + f.parent.returnError(msg, err) + continue + } } - } + }) return set } @@ -874,6 +844,46 @@ func (ps *produceSet) eachPartition(cb func(topic string, partition int32, msgs } } +func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool { + switch { + // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety. + case ps.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)): + return true + // Would we overflow the size-limit of a compressed message-batch for this partition? + case ps.parent.conf.Producer.Compression != CompressionNone && + ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil && + ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize() >= ps.parent.conf.Producer.MaxMessageBytes: + return true + // Would we overflow simply in number of messages? + case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages: + return true + default: + return false + } +} + +func (ps *produceSet) readyToFlush(msg *ProducerMessage) bool { + switch { + // If we don't have any messages, nothing else matters + case ps.empty(): + return false + // If all three config values are 0, we always flush as-fast-as-possible + case ps.parent.conf.Producer.Flush.Frequency == 0 && ps.parent.conf.Producer.Flush.Bytes == 0 && ps.parent.conf.Producer.Flush.Messages == 0: + return true + // If the messages is ps chaser we must flush to maintain the state-machine + case msg.flags&chaser == chaser: + return true + // If we've passed the message trigger-point + case ps.parent.conf.Producer.Flush.Messages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.Messages: + return true + // If we've passed the byte trigger-point + case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes: + return true + default: + return false + } +} + func (ps *produceSet) empty() bool { return ps.bufferCount == 0 } diff --git a/async_producer_test.go b/async_producer_test.go index 7306256aa..9aa13da53 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -320,7 +320,7 @@ func TestAsyncProducerEncoderFailures(t *testing.T) { leader.Returns(prodSuccess) config := NewConfig() - config.Producer.Flush.Messages = 3 + config.Producer.Flush.Messages = 1 config.Producer.Return.Successes = true config.Producer.Partitioner = NewManualPartitioner producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) @@ -330,8 +330,8 @@ func TestAsyncProducerEncoderFailures(t *testing.T) { for flush := 0; flush < 3; flush++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(false)} - producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)} producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(false), Value: flakyEncoder(true)} + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)} expectResults(t, producer, 1, 2) }