Skip to content

Commit

Permalink
Added support for Idempotent Producer
Browse files Browse the repository at this point in the history
Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
  • Loading branch information
mimaison and edoardocomar committed Oct 1, 2018
1 parent 7479983 commit 8e2b04b
Show file tree
Hide file tree
Showing 9 changed files with 444 additions and 14 deletions.
65 changes: 61 additions & 4 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,50 @@ type AsyncProducer interface {
Errors() <-chan *ProducerError
}

// transactionManager keeps the state necessary to ensure idempotent production
type transactionManager struct {
producerID int64
producerEpoch int16
sequenceNumbers map[string]int32
mutex sync.Mutex
}

const (
noProducerID = -1
noProducerEpoch = -1
)

func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) int32 {
key := fmt.Sprintf("%s-%d", topic, partition)
t.mutex.Lock()
defer t.mutex.Unlock()
sequence := t.sequenceNumbers[key]
t.sequenceNumbers[key] = sequence + 1
return sequence
}

func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
txnmgr := &transactionManager{
producerID: noProducerID,
producerEpoch: noProducerEpoch,
}

if conf.Producer.Idempotent {
initProducerIDResponse, err := client.InitProducerID()
if err != nil {
return nil, err
}
txnmgr.producerID = initProducerIDResponse.ProducerID
txnmgr.producerEpoch = initProducerIDResponse.ProducerEpoch
txnmgr.sequenceNumbers = make(map[string]int32)
txnmgr.mutex = sync.Mutex{}

Logger.Printf("Obtained a ProducerId: %d epoch: %d\n", txnmgr.producerID, txnmgr.producerEpoch)
}

return txnmgr, nil
}

type asyncProducer struct {
client Client
conf *Config
Expand All @@ -59,6 +103,8 @@ type asyncProducer struct {
brokers map[*Broker]chan<- *ProducerMessage
brokerRefs map[chan<- *ProducerMessage]int
brokerLock sync.Mutex

txnmgr *transactionManager
}

// NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
Expand All @@ -84,6 +130,11 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
return nil, ErrClosedClient
}

txnmgr, err := newTransactionManager(client.Config(), client)
if err != nil {
return nil, err
}

p := &asyncProducer{
client: client,
conf: client.Config(),
Expand All @@ -93,6 +144,7 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
retries: make(chan *ProducerMessage),
brokers: make(map[*Broker]chan<- *ProducerMessage),
brokerRefs: make(map[chan<- *ProducerMessage]int),
txnmgr: txnmgr,
}

// launch our singleton dispatchers
Expand Down Expand Up @@ -145,9 +197,10 @@ type ProducerMessage struct {
// least version 0.10.0.
Timestamp time.Time

retries int
flags flagSet
expectation chan *ProducerError
retries int
flags flagSet
expectation chan *ProducerError
sequenceNumber int32
}

const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
Expand Down Expand Up @@ -328,6 +381,10 @@ func (tp *topicProducer) dispatch() {
continue
}
}
if tp.parent.conf.Producer.Idempotent && msg.retries == 0 {
msg.sequenceNumber = tp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
//Logger.Printf("Message %s for TP %s-%d got sequence number: %d\n", msg.Value, msg.Topic, msg.Partition, msg.sequenceNumber)
}

handler := tp.handlers[msg.Partition]
if handler == nil {
Expand Down Expand Up @@ -752,7 +809,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
bp.parent.returnErrors(msgs, ErrIncompleteResponse)
return
}

fmt.Printf("response has error %v", block.Err)
switch block.Err {
// Success
case ErrNoError:
Expand Down
180 changes: 180 additions & 0 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package sarama

import (
"errors"
"fmt"
"log"
"os"
"os/signal"
"strconv"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -753,6 +755,184 @@ func TestAsyncProducerNoReturns(t *testing.T) {
leader.Close()
}

func TestAsyncProducerIdempotent(t *testing.T) {
broker := NewMockBroker(t, 1)

clusterID := "cid"
metadataResponse := &MetadataResponse{
Version: 3,
ThrottleTimeMs: 0,
ClusterID: &clusterID,
ControllerID: 1,
}
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
broker.Returns(metadataResponse)

initProducerID := &InitProducerIDResponse{
ThrottleTime: 0,
ProducerID: 1000,
ProducerEpoch: 1,
}
broker.Returns(initProducerID)

config := NewConfig()
config.Producer.Flush.Messages = 10
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 4
config.Producer.RequiredAcks = WaitForAll
config.Producer.Retry.Backoff = 0
config.Producer.Idempotent = true
config.Version = V0_11_0_0
producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}

prodSuccess := &ProduceResponse{
Version: 3,
ThrottleTime: 0,
}
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
broker.Returns(prodSuccess)
expectResults(t, producer, 10, 0)

broker.Close()
closeProducer(t, producer)
}

func TestAsyncProducerIdempotentRetry(t *testing.T) {
broker := NewMockBroker(t, 1)

clusterID := "cid"
metadataResponse := &MetadataResponse{
Version: 3,
ThrottleTimeMs: 0,
ClusterID: &clusterID,
ControllerID: 1,
}
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
broker.Returns(metadataResponse)

initProducerID := &InitProducerIDResponse{
ThrottleTime: 0,
ProducerID: 1000,
ProducerEpoch: 1,
}
broker.Returns(initProducerID)

config := NewConfig()
config.Producer.Flush.Messages = 10
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 4
config.Producer.RequiredAcks = WaitForAll
config.Producer.Retry.Backoff = 0
config.Producer.Idempotent = true
config.Version = V0_11_0_0
producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}

prodNotLeader := &ProduceResponse{
Version: 3,
ThrottleTime: 0,
}
prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas)
broker.Returns(prodNotLeader)

broker.Returns(metadataResponse)

prodSuccess := &ProduceResponse{
Version: 3,
ThrottleTime: 0,
}
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
broker.Returns(prodSuccess)
expectResults(t, producer, 10, 0)

broker.Close()
closeProducer(t, producer)
}

func TestAsyncProducerIdempotentRetryBatch(t *testing.T) {
Logger = log.New(os.Stderr, "", log.LstdFlags)
/*broker := NewMockBroker(t, 1)
clusterID := "cid"
metadataResponse := &MetadataResponse{
Version: 3,
ThrottleTimeMs: 0,
ClusterID: &clusterID,
ControllerID: 1,
}
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
broker.Returns(metadataResponse)
initProducerID := &InitProducerIDResponse{
ThrottleTime: 0,
ProducerID: 1000,
ProducerEpoch: 1,
}
broker.Returns(initProducerID)
*/
config := NewConfig()
config.Producer.Flush.Messages = 3
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 4
config.Producer.RequiredAcks = WaitForAll
config.Producer.Retry.Backoff = 100 * time.Millisecond
config.Producer.Idempotent = true
config.Version = V0_11_0_0
producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 3; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage + strconv.Itoa(i))}
}
/*prodNotLeader := &ProduceResponse{
Version: 3,
ThrottleTime: 0,
}
prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas)
broker.Returns(prodNotLeader)
*/
go func() {
for i := 0; i < 6; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("goroutine" + strconv.Itoa(i))}
time.Sleep(100 * time.Millisecond)
}
}()
/*
broker.Returns(metadataResponse)
prodSuccess := &ProduceResponse{
Version: 3,
ThrottleTime: 0,
}
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
broker.Returns(prodSuccess)*/
expectResults(t, producer, 9, 0)

fmt.Printf("**** Closing Broker \n")
//broker.Close()
fmt.Printf("**** Closing producer \n")
closeProducer(t, producer)
fmt.Printf("**** Closed producer \n")
}

// This example shows how to use the producer while simultaneously
// reading the Errors channel to know about any failures.
func ExampleAsyncProducer_select() {
Expand Down
23 changes: 23 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type Client interface {
// in local cache. This function only works on Kafka 0.8.2 and higher.
RefreshCoordinator(consumerGroup string) error

// InitProducerID retrieves information required for Idempotent Producer
InitProducerID() (*InitProducerIDResponse, error)

This comment has been minimized.

Copy link
@varun06

varun06 Dec 20, 2018

Contributor

I forgot to comment earlier, but this is a breaking change and should be in release notes @bai


// Close shuts down all broker connections managed by this client. It is required
// to call this function before a client object passes out of scope, as it will
// otherwise leak memory. You must close any Producers or Consumers using a client
Expand Down Expand Up @@ -183,6 +186,26 @@ func (client *client) Brokers() []*Broker {
return brokers
}

func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
var err error
for broker := client.any(); broker != nil; broker = client.any() {

req := &InitProducerIDRequest{}

response, err := broker.InitProducerID(req)
switch err.(type) {
case nil:
return response, nil
default:
// some error, remove that broker and try again
Logger.Printf("Error is %v", err)
_ = broker.Close()
client.deregisterBroker(broker)
}
}
return nil, err
}

func (client *client) Close() error {
if client.Closed() {
// Chances are this is being called from a defer() and the error will go unobserved
Expand Down
18 changes: 18 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ type Config struct {
// (defaults to hashing the message key). Similar to the `partitioner.class`
// setting for the JVM producer.
Partitioner PartitionerConstructor
// If enabled, the producer will ensure that exactly one copy of each message is
// written.
Idempotent bool

// Return specifies what channels will be populated. If they are set to true,
// you must read from the respective channels to prevent deadlock. If,
Expand Down Expand Up @@ -511,6 +514,21 @@ func (c *Config) Validate() error {
}
}

if c.Producer.Idempotent {
if !c.Version.IsAtLeast(V0_11_0_0) {
return ConfigurationError("Idempotent producer requires Version >= V0_11_0_0")
}
if c.Producer.Retry.Max == 0 {
return ConfigurationError("Idempotent producer requires Producer.Retry.Max >= 1")
}
if c.Producer.RequiredAcks != WaitForAll {
return ConfigurationError("Idempotent producer requires Producer.RequiredAcks to be WaitForAll")
}
if c.Net.MaxOpenRequests > 5 {
return ConfigurationError("Idempotent producer requires Net.MaxOpenRequests <= 5")
}
}

// validate the Consumer values
switch {
case c.Consumer.Fetch.Min <= 0:
Expand Down
Loading

0 comments on commit 8e2b04b

Please sign in to comment.