From 8e2b04b363c226275dbf10d771fb044e387bbd75 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Sun, 5 Aug 2018 18:31:41 +0100 Subject: [PATCH] Added support for Idempotent Producer Co-authored-by: Edoardo Comar Co-authored-by: Mickael Maison --- async_producer.go | 65 ++++++++++++- async_producer_test.go | 180 ++++++++++++++++++++++++++++++++++++ client.go | 23 +++++ config.go | 18 ++++ config_test.go | 27 ++++++ functional_consumer_test.go | 23 ++++- produce_response.go | 8 +- produce_set.go | 21 ++++- produce_set_test.go | 93 ++++++++++++++++++- 9 files changed, 444 insertions(+), 14 deletions(-) diff --git a/async_producer.go b/async_producer.go index 897225540..a4f614ff7 100644 --- a/async_producer.go +++ b/async_producer.go @@ -47,6 +47,50 @@ type AsyncProducer interface { Errors() <-chan *ProducerError } +// transactionManager keeps the state necessary to ensure idempotent production +type transactionManager struct { + producerID int64 + producerEpoch int16 + sequenceNumbers map[string]int32 + mutex sync.Mutex +} + +const ( + noProducerID = -1 + noProducerEpoch = -1 +) + +func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) int32 { + key := fmt.Sprintf("%s-%d", topic, partition) + t.mutex.Lock() + defer t.mutex.Unlock() + sequence := t.sequenceNumbers[key] + t.sequenceNumbers[key] = sequence + 1 + return sequence +} + +func newTransactionManager(conf *Config, client Client) (*transactionManager, error) { + txnmgr := &transactionManager{ + producerID: noProducerID, + producerEpoch: noProducerEpoch, + } + + if conf.Producer.Idempotent { + initProducerIDResponse, err := client.InitProducerID() + if err != nil { + return nil, err + } + txnmgr.producerID = initProducerIDResponse.ProducerID + txnmgr.producerEpoch = initProducerIDResponse.ProducerEpoch + txnmgr.sequenceNumbers = make(map[string]int32) + txnmgr.mutex = sync.Mutex{} + + Logger.Printf("Obtained a ProducerId: %d epoch: %d\n", txnmgr.producerID, txnmgr.producerEpoch) + } + + return txnmgr, nil +} + type asyncProducer struct { client Client conf *Config @@ -59,6 +103,8 @@ type asyncProducer struct { brokers map[*Broker]chan<- *ProducerMessage brokerRefs map[chan<- *ProducerMessage]int brokerLock sync.Mutex + + txnmgr *transactionManager } // NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration. @@ -84,6 +130,11 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) { return nil, ErrClosedClient } + txnmgr, err := newTransactionManager(client.Config(), client) + if err != nil { + return nil, err + } + p := &asyncProducer{ client: client, conf: client.Config(), @@ -93,6 +144,7 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) { retries: make(chan *ProducerMessage), brokers: make(map[*Broker]chan<- *ProducerMessage), brokerRefs: make(map[chan<- *ProducerMessage]int), + txnmgr: txnmgr, } // launch our singleton dispatchers @@ -145,9 +197,10 @@ type ProducerMessage struct { // least version 0.10.0. Timestamp time.Time - retries int - flags flagSet - expectation chan *ProducerError + retries int + flags flagSet + expectation chan *ProducerError + sequenceNumber int32 } const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc. @@ -328,6 +381,10 @@ func (tp *topicProducer) dispatch() { continue } } + if tp.parent.conf.Producer.Idempotent && msg.retries == 0 { + msg.sequenceNumber = tp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition) + //Logger.Printf("Message %s for TP %s-%d got sequence number: %d\n", msg.Value, msg.Topic, msg.Partition, msg.sequenceNumber) + } handler := tp.handlers[msg.Partition] if handler == nil { @@ -752,7 +809,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo bp.parent.returnErrors(msgs, ErrIncompleteResponse) return } - + fmt.Printf("response has error %v", block.Err) switch block.Err { // Success case ErrNoError: diff --git a/async_producer_test.go b/async_producer_test.go index 478dca4cf..9f3ed9099 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -2,9 +2,11 @@ package sarama import ( "errors" + "fmt" "log" "os" "os/signal" + "strconv" "sync" "testing" "time" @@ -753,6 +755,184 @@ func TestAsyncProducerNoReturns(t *testing.T) { leader.Close() } +func TestAsyncProducerIdempotent(t *testing.T) { + broker := NewMockBroker(t, 1) + + clusterID := "cid" + metadataResponse := &MetadataResponse{ + Version: 3, + ThrottleTimeMs: 0, + ClusterID: &clusterID, + ControllerID: 1, + } + metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError) + broker.Returns(metadataResponse) + + initProducerID := &InitProducerIDResponse{ + ThrottleTime: 0, + ProducerID: 1000, + ProducerEpoch: 1, + } + broker.Returns(initProducerID) + + config := NewConfig() + config.Producer.Flush.Messages = 10 + config.Producer.Return.Successes = true + config.Producer.Retry.Max = 4 + config.Producer.RequiredAcks = WaitForAll + config.Producer.Retry.Backoff = 0 + config.Producer.Idempotent = true + config.Version = V0_11_0_0 + producer, err := NewAsyncProducer([]string{broker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} + } + + prodSuccess := &ProduceResponse{ + Version: 3, + ThrottleTime: 0, + } + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + broker.Returns(prodSuccess) + expectResults(t, producer, 10, 0) + + broker.Close() + closeProducer(t, producer) +} + +func TestAsyncProducerIdempotentRetry(t *testing.T) { + broker := NewMockBroker(t, 1) + + clusterID := "cid" + metadataResponse := &MetadataResponse{ + Version: 3, + ThrottleTimeMs: 0, + ClusterID: &clusterID, + ControllerID: 1, + } + metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError) + broker.Returns(metadataResponse) + + initProducerID := &InitProducerIDResponse{ + ThrottleTime: 0, + ProducerID: 1000, + ProducerEpoch: 1, + } + broker.Returns(initProducerID) + + config := NewConfig() + config.Producer.Flush.Messages = 10 + config.Producer.Return.Successes = true + config.Producer.Retry.Max = 4 + config.Producer.RequiredAcks = WaitForAll + config.Producer.Retry.Backoff = 0 + config.Producer.Idempotent = true + config.Version = V0_11_0_0 + producer, err := NewAsyncProducer([]string{broker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} + } + + prodNotLeader := &ProduceResponse{ + Version: 3, + ThrottleTime: 0, + } + prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas) + broker.Returns(prodNotLeader) + + broker.Returns(metadataResponse) + + prodSuccess := &ProduceResponse{ + Version: 3, + ThrottleTime: 0, + } + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + broker.Returns(prodSuccess) + expectResults(t, producer, 10, 0) + + broker.Close() + closeProducer(t, producer) +} + +func TestAsyncProducerIdempotentRetryBatch(t *testing.T) { + Logger = log.New(os.Stderr, "", log.LstdFlags) + /*broker := NewMockBroker(t, 1) + + clusterID := "cid" + metadataResponse := &MetadataResponse{ + Version: 3, + ThrottleTimeMs: 0, + ClusterID: &clusterID, + ControllerID: 1, + } + metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError) + broker.Returns(metadataResponse) + + initProducerID := &InitProducerIDResponse{ + ThrottleTime: 0, + ProducerID: 1000, + ProducerEpoch: 1, + } + broker.Returns(initProducerID) + */ + config := NewConfig() + config.Producer.Flush.Messages = 3 + config.Producer.Return.Successes = true + config.Producer.Retry.Max = 4 + config.Producer.RequiredAcks = WaitForAll + config.Producer.Retry.Backoff = 100 * time.Millisecond + config.Producer.Idempotent = true + config.Version = V0_11_0_0 + producer, err := NewAsyncProducer([]string{"localhost:9092"}, config) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 3; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage + strconv.Itoa(i))} + } + /*prodNotLeader := &ProduceResponse{ + Version: 3, + ThrottleTime: 0, + } + prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas) + broker.Returns(prodNotLeader) + */ + go func() { + for i := 0; i < 6; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("goroutine" + strconv.Itoa(i))} + time.Sleep(100 * time.Millisecond) + } + }() + /* + broker.Returns(metadataResponse) + + prodSuccess := &ProduceResponse{ + Version: 3, + ThrottleTime: 0, + } + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + broker.Returns(prodSuccess)*/ + expectResults(t, producer, 9, 0) + + fmt.Printf("**** Closing Broker \n") + //broker.Close() + fmt.Printf("**** Closing producer \n") + closeProducer(t, producer) + fmt.Printf("**** Closed producer \n") +} + // This example shows how to use the producer while simultaneously // reading the Errors channel to know about any failures. func ExampleAsyncProducer_select() { diff --git a/client.go b/client.go index 811e99a7e..86555e62c 100644 --- a/client.go +++ b/client.go @@ -67,6 +67,9 @@ type Client interface { // in local cache. This function only works on Kafka 0.8.2 and higher. RefreshCoordinator(consumerGroup string) error + // InitProducerID retrieves information required for Idempotent Producer + InitProducerID() (*InitProducerIDResponse, error) + // Close shuts down all broker connections managed by this client. It is required // to call this function before a client object passes out of scope, as it will // otherwise leak memory. You must close any Producers or Consumers using a client @@ -183,6 +186,26 @@ func (client *client) Brokers() []*Broker { return brokers } +func (client *client) InitProducerID() (*InitProducerIDResponse, error) { + var err error + for broker := client.any(); broker != nil; broker = client.any() { + + req := &InitProducerIDRequest{} + + response, err := broker.InitProducerID(req) + switch err.(type) { + case nil: + return response, nil + default: + // some error, remove that broker and try again + Logger.Printf("Error is %v", err) + _ = broker.Close() + client.deregisterBroker(broker) + } + } + return nil, err +} + func (client *client) Close() error { if client.Closed() { // Chances are this is being called from a defer() and the error will go unobserved diff --git a/config.go b/config.go index faf11e838..1e4e08534 100644 --- a/config.go +++ b/config.go @@ -124,6 +124,9 @@ type Config struct { // (defaults to hashing the message key). Similar to the `partitioner.class` // setting for the JVM producer. Partitioner PartitionerConstructor + // If enabled, the producer will ensure that exactly one copy of each message is + // written. + Idempotent bool // Return specifies what channels will be populated. If they are set to true, // you must read from the respective channels to prevent deadlock. If, @@ -511,6 +514,21 @@ func (c *Config) Validate() error { } } + if c.Producer.Idempotent { + if !c.Version.IsAtLeast(V0_11_0_0) { + return ConfigurationError("Idempotent producer requires Version >= V0_11_0_0") + } + if c.Producer.Retry.Max == 0 { + return ConfigurationError("Idempotent producer requires Producer.Retry.Max >= 1") + } + if c.Producer.RequiredAcks != WaitForAll { + return ConfigurationError("Idempotent producer requires Producer.RequiredAcks to be WaitForAll") + } + if c.Net.MaxOpenRequests > 5 { + return ConfigurationError("Idempotent producer requires Net.MaxOpenRequests <= 5") + } + } + // validate the Consumer values switch { case c.Consumer.Fetch.Min <= 0: diff --git a/config_test.go b/config_test.go index d0e0af726..b8ed4eefe 100644 --- a/config_test.go +++ b/config_test.go @@ -207,6 +207,33 @@ func TestProducerConfigValidates(t *testing.T) { cfg.Producer.Retry.Backoff = -1 }, "Producer.Retry.Backoff must be >= 0"}, + {"Idempotent Version", + func(cfg *Config) { + cfg.Producer.Idempotent = true + cfg.Version = V0_10_0_0 + }, + "Idempotent producer requires Version >= V0_11_0_0"}, + {"Idempotent with Producer.Retry.Max", + func(cfg *Config) { + cfg.Version = V0_11_0_0 + cfg.Producer.Idempotent = true + cfg.Producer.Retry.Max = 0 + }, + "Idempotent producer requires Producer.Retry.Max >= 1"}, + {"Idempotent with Producer.RequiredAcks", + func(cfg *Config) { + cfg.Version = V0_11_0_0 + cfg.Producer.Idempotent = true + }, + "Idempotent producer requires Producer.RequiredAcks to be WaitForAll"}, + {"Idempotent with Net.MaxOpenRequests", + func(cfg *Config) { + cfg.Version = V0_11_0_0 + cfg.Producer.Idempotent = true + cfg.Producer.RequiredAcks = WaitForAll + cfg.Net.MaxOpenRequests = 6 + }, + "Idempotent producer requires Net.MaxOpenRequests <= 5"}, } for i, test := range tests { diff --git a/functional_consumer_test.go b/functional_consumer_test.go index 83bec0331..851f5d7c0 100644 --- a/functional_consumer_test.go +++ b/functional_consumer_test.go @@ -81,7 +81,7 @@ func TestVersionMatrix(t *testing.T) { // protocol versions and compressions for the except of LZ4. testVersions := versionRange(V0_8_2_0) allCodecsButLZ4 := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy} - producedMessages := produceMsgs(t, testVersions, allCodecsButLZ4, 17, 100) + producedMessages := produceMsgs(t, testVersions, allCodecsButLZ4, 17, 100, false) // When/Then consumeMsgs(t, testVersions, producedMessages) @@ -98,7 +98,20 @@ func TestVersionMatrixLZ4(t *testing.T) { // and all possible compressions. testVersions := versionRange(V0_10_0_0) allCodecs := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy, CompressionLZ4} - producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100) + producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100, false) + + // When/Then + consumeMsgs(t, testVersions, producedMessages) +} + +func TestVersionMatrixIdempotent(t *testing.T) { + setupFunctionalTest(t) + defer teardownFunctionalTest(t) + + // Produce lot's of message with all possible combinations of supported + // protocol versions starting with v0.11 (first where idempotent was supported) + testVersions := versionRange(V0_11_0_0) + producedMessages := produceMsgs(t, testVersions, []CompressionCodec{CompressionNone}, 17, 100, true) // When/Then consumeMsgs(t, testVersions, producedMessages) @@ -133,7 +146,7 @@ func versionRange(lower KafkaVersion) []KafkaVersion { return versions } -func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int) []*ProducerMessage { +func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int, idempotent bool) []*ProducerMessage { var wg sync.WaitGroup var producedMessagesMu sync.Mutex var producedMessages []*ProducerMessage @@ -145,6 +158,10 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi prodCfg.Producer.Return.Errors = true prodCfg.Producer.Flush.MaxMessages = flush prodCfg.Producer.Compression = codec + prodCfg.Producer.Idempotent = idempotent + if idempotent { + prodCfg.Producer.RequiredAcks = WaitForAll + } p, err := NewSyncProducer(kafkaBrokers, prodCfg) if err != nil { diff --git a/produce_response.go b/produce_response.go index 667e34c66..4c5cd3569 100644 --- a/produce_response.go +++ b/produce_response.go @@ -179,5 +179,11 @@ func (r *ProduceResponse) AddTopicPartition(topic string, partition int32, err K byTopic = make(map[int32]*ProduceResponseBlock) r.Blocks[topic] = byTopic } - byTopic[partition] = &ProduceResponseBlock{Err: err} + block := &ProduceResponseBlock{ + Err: err, + } + if r.Version >= 2 { + block.Timestamp = time.Now() + } + byTopic[partition] = block } diff --git a/produce_set.go b/produce_set.go index 13be2b3c9..dc71e2f1c 100644 --- a/produce_set.go +++ b/produce_set.go @@ -2,6 +2,8 @@ package sarama import ( "encoding/binary" + "errors" + "fmt" "time" ) @@ -58,12 +60,17 @@ func (ps *produceSet) add(msg *ProducerMessage) error { set := partitions[msg.Partition] if set == nil { if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { + fmt.Printf("Creating a new batch for partition %s-%d with base sequence %d \n", msg.Topic, msg.Partition, msg.sequenceNumber) batch := &RecordBatch{ FirstTimestamp: timestamp, Version: 2, - ProducerID: -1, /* No producer id */ Codec: ps.parent.conf.Producer.Compression, CompressionLevel: ps.parent.conf.Producer.CompressionLevel, + ProducerID: ps.parent.txnmgr.producerID, + ProducerEpoch: ps.parent.txnmgr.producerEpoch, + } + if ps.parent.conf.Producer.Idempotent { + batch.FirstSequence = msg.sequenceNumber } set = &partitionSet{recordsToSend: newDefaultRecords(batch)} size = recordBatchOverhead @@ -72,9 +79,13 @@ func (ps *produceSet) add(msg *ProducerMessage) error { } partitions[msg.Partition] = set } - + fmt.Printf("Adding message with sequence %d to batch for partition %s-%d value: %v\n", msg.sequenceNumber, msg.Topic, msg.Partition, msg.Value) set.msgs = append(set.msgs, msg) + if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { + if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence { + return errors.New("Assertion failed: Message out of sequence added to a batch") + } // We are being conservative here to avoid having to prep encode the record size += maximumRecordOverhead rec := &Record{ @@ -120,8 +131,8 @@ func (ps *produceSet) buildRequest() *ProduceRequest { req.Version = 3 } - for topic, partitionSet := range ps.msgs { - for partition, set := range partitionSet { + for topic, partitionSets := range ps.msgs { + for partition, set := range partitionSets { if req.Version >= 3 { // If the API version we're hitting is 3 or greater, we need to calculate // offsets for each record in the batch relative to FirstOffset. @@ -137,7 +148,7 @@ func (ps *produceSet) buildRequest() *ProduceRequest { record.OffsetDelta = int64(i) } } - + fmt.Printf("Add batch to ProduceRequest for TP %s-%d with firstSeq %d, size: %d\n", topic, partition, rb.FirstSequence, len(rb.Records)) req.AddBatch(topic, partition, rb) continue } diff --git a/produce_set_test.go b/produce_set_test.go index 6663f36f7..5e83bc1c8 100644 --- a/produce_set_test.go +++ b/produce_set_test.go @@ -7,8 +7,11 @@ import ( ) func makeProduceSet() (*asyncProducer, *produceSet) { + conf := NewConfig() + txnmgr, _ := newTransactionManager(conf, nil) parent := &asyncProducer{ - conf: NewConfig(), + conf: conf, + txnmgr: txnmgr, } return parent, newProduceSet(parent) } @@ -253,3 +256,91 @@ func TestProduceSetV3RequestBuilding(t *testing.T) { } } } + +func TestProduceSetIdempotentRequestBuilding(t *testing.T) { + const pID = 1000 + const pEpoch = 1234 + + config := NewConfig() + config.Producer.RequiredAcks = WaitForAll + config.Producer.Idempotent = true + config.Version = V0_11_0_0 + + parent := &asyncProducer{ + conf: config, + txnmgr: &transactionManager{ + producerID: pID, + producerEpoch: pEpoch, + }, + } + ps := newProduceSet(parent) + + now := time.Now() + msg := &ProducerMessage{ + Topic: "t1", + Partition: 0, + Key: StringEncoder(TestMessage), + Value: StringEncoder(TestMessage), + Headers: []RecordHeader{ + RecordHeader{ + Key: []byte("header-1"), + Value: []byte("value-1"), + }, + RecordHeader{ + Key: []byte("header-2"), + Value: []byte("value-2"), + }, + RecordHeader{ + Key: []byte("header-3"), + Value: []byte("value-3"), + }, + }, + Timestamp: now, + sequenceNumber: 123, + } + for i := 0; i < 10; i++ { + safeAddMessage(t, ps, msg) + msg.Timestamp = msg.Timestamp.Add(time.Second) + } + + req := ps.buildRequest() + + if req.Version != 3 { + t.Error("Wrong request version") + } + + batch := req.records["t1"][0].RecordBatch + if batch.FirstTimestamp != now { + t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp) + } + if batch.ProducerID != pID { + t.Errorf("Wrong producerID: %v", batch.ProducerID) + } + if batch.ProducerEpoch != pEpoch { + t.Errorf("Wrong producerEpoch: %v", batch.ProducerEpoch) + } + if batch.FirstSequence != 123 { + t.Errorf("Wrong first sequence: %v", batch.FirstSequence) + } + for i := 0; i < 10; i++ { + rec := batch.Records[i] + if rec.TimestampDelta != time.Duration(i)*time.Second { + t.Errorf("Wrong timestamp delta: %v", rec.TimestampDelta) + } + + if rec.OffsetDelta != int64(i) { + t.Errorf("Wrong relative inner offset, expected %d, got %d", i, rec.OffsetDelta) + } + + for j, h := range batch.Records[i].Headers { + exp := fmt.Sprintf("header-%d", j+1) + if string(h.Key) != exp { + t.Errorf("Wrong header key, expected %v, got %v", exp, h.Key) + } + exp = fmt.Sprintf("value-%d", j+1) + if string(h.Value) != exp { + t.Errorf("Wrong header value, expected %v, got %v", exp, h.Value) + } + } + } +}