Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka_producer: fix deadlock when an error occurs in asyncClient #3003

Merged
merged 3 commits into from
Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 34 additions & 17 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,13 @@ type kafkaSaramaProducer struct {
// clientLock is used to protect concurrent access of asyncClient and syncClient.
// Since we don't close these two clients (which have an input chan) from the
// sender routine, data race or send on closed chan could happen.
clientLock sync.RWMutex
asyncClient sarama.AsyncProducer
syncClient sarama.SyncProducer
topic string
partitionNum int32
clientLock sync.RWMutex
asyncClient sarama.AsyncProducer
syncClient sarama.SyncProducer
// producersReleased records whether asyncClient and syncClient have been closed properly
producersReleased bool
topic string
partitionNum int32

partitionOffset []struct {
flushed uint64
Expand All @@ -84,12 +86,27 @@ type kafkaSaramaProducer struct {
failpointCh chan error

closeCh chan struct{}
closed int32
// atomic flag indicating whether the producer is closing
closing kafkaProducerClosingFlag
}

type kafkaProducerClosingFlag = int32

const (
kafkaProducerRunning = 0
kafkaProducerClosing = 1
)

func (k *kafkaSaramaProducer) SendMessage(ctx context.Context, message *codec.MQMessage, partition int32) error {
k.clientLock.RLock()
defer k.clientLock.RUnlock()

// Checks whether the producer is closing.
// The atomic flag must be checked under `clientLock.RLock()`
if atomic.LoadInt32(&k.closing) == kafkaProducerClosing {
return nil
}

msg := &sarama.ProducerMessage{
Topic: k.topic,
Key: sarama.ByteEncoder(message.Key),
Expand All @@ -116,8 +133,7 @@ func (k *kafkaSaramaProducer) SendMessage(ctx context.Context, message *codec.MQ
return ctx.Err()
case <-k.closeCh:
return nil
default:
k.asyncClient.Input() <- msg
case k.asyncClient.Input() <- msg:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SyncBroadcastMessage has a similar default branch, please check whether we need to update in SyncBroadcastMessage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SyncBroadcastMessage has a different problem. I would like to fix it in a separate PR, what do you think?

}
return nil
}
Expand Down Expand Up @@ -197,26 +213,27 @@ func (k *kafkaSaramaProducer) GetPartitionNum() int32 {
}

// stop closes the closeCh to signal other routines to exit
// It SHOULD NOT be called under `clientLock`.
func (k *kafkaSaramaProducer) stop() {
k.clientLock.Lock()
defer k.clientLock.Unlock()
select {
case <-k.closeCh:
if atomic.SwapInt32(&k.closing, kafkaProducerClosing) == kafkaProducerClosing {
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
return
default:
close(k.closeCh)
}
close(k.closeCh)
}

// Close closes the sync and async clients.
func (k *kafkaSaramaProducer) Close() error {
k.stop()

k.clientLock.Lock()
defer k.clientLock.Unlock()
// close sarama client multiple times will cause panic
if atomic.LoadInt32(&k.closed) == 1 {

if k.producersReleased {
// We need to guard against double closing the clients,
// which could lead to panic.
return nil
}
k.producersReleased = true
// In fact close sarama sync client doesn't return any error.
// But close async client returns error if error channel is not empty, we
// don't populate this error to the upper caller, just add a log here.
Expand All @@ -228,7 +245,6 @@ func (k *kafkaSaramaProducer) Close() error {
if err2 != nil {
log.Error("close async client with error", zap.Error(err2))
}
atomic.StoreInt32(&k.closed, 1)
return nil
}

Expand Down Expand Up @@ -362,6 +378,7 @@ func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, c
flushedReceiver: flushedReceiver,
closeCh: make(chan struct{}),
failpointCh: make(chan error, 1),
closing: kafkaProducerRunning,
}
go func() {
if err := k.run(ctx); err != nil && errors.Cause(err) != context.Canceled {
Expand Down
114 changes: 114 additions & 0 deletions cdc/sink/producer/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,3 +317,117 @@ func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) {
_, err = NewKafkaSaramaProducer(ctx, "127.0.0.1:1111", "topic", config, errCh)
c.Assert(cerror.ErrKafkaInvalidPartitionNum.Equal(err), check.IsTrue)
}

func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) {
defer testleak.AfterTest(c)()
topic := "unit_test_4"
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

leader := sarama.NewMockBroker(c, 2)
defer leader.Close()
metadataResponse := new(sarama.MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError)
metadataResponse.AddTopicPartition(topic, 1, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError)
leader.Returns(metadataResponse)
leader.Returns(metadataResponse)

config := NewKafkaConfig()
// Because the sarama mock broker is not compatible with version larger than 1.0.0
// We use a smaller version in the following producer tests.
// Ref: https://github.com/Shopify/sarama/blob/89707055369768913defac030c15cf08e9e57925/async_producer_test.go#L1445-L1447
config.Version = "0.9.0.0"
config.PartitionNum = int32(2)
config.TopicPreProcess = false

newSaramaConfigImplBak := newSaramaConfigImpl
newSaramaConfigImpl = func(ctx context.Context, config Config) (*sarama.Config, error) {
cfg, err := newSaramaConfigImplBak(ctx, config)
c.Assert(err, check.IsNil)
cfg.Producer.Flush.MaxMessages = 1
cfg.Producer.Retry.Max = 2
cfg.Producer.MaxMessageBytes = 8
return cfg, err
}
defer func() {
newSaramaConfigImpl = newSaramaConfigImplBak
}()

errCh := make(chan error, 1)
producer, err := NewKafkaSaramaProducer(ctx, leader.Addr(), topic, config, errCh)
defer func() {
err := producer.Close()
c.Assert(err, check.IsNil)
}()

c.Assert(err, check.IsNil)
c.Assert(producer, check.NotNil)

var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 20; i++ {
err = producer.SendMessage(ctx, &codec.MQMessage{
Key: []byte("test-key-1"),
Value: []byte("test-value"),
}, int32(0))
c.Assert(err, check.IsNil)
}
}()

wg.Add(1)
go func() {
defer wg.Done()
select {
case <-ctx.Done():
c.Fatal("TestProducerSendMessageFailed timed out")
case err := <-errCh:
c.Assert(err, check.ErrorMatches, ".*too large.*")
}
}()

wg.Wait()
}

func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) {
defer testleak.AfterTest(c)()
topic := "unit_test_4"
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

leader := sarama.NewMockBroker(c, 2)
defer leader.Close()
metadataResponse := new(sarama.MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError)
metadataResponse.AddTopicPartition(topic, 1, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError)
leader.Returns(metadataResponse)
leader.Returns(metadataResponse)

config := NewKafkaConfig()
// Because the sarama mock broker is not compatible with version larger than 1.0.0
// We use a smaller version in the following producer tests.
// Ref: https://github.com/Shopify/sarama/blob/89707055369768913defac030c15cf08e9e57925/async_producer_test.go#L1445-L1447
config.Version = "0.9.0.0"
config.PartitionNum = int32(2)
config.TopicPreProcess = false

errCh := make(chan error, 1)
producer, err := NewKafkaSaramaProducer(ctx, leader.Addr(), topic, config, errCh)
defer func() {
err := producer.Close()
c.Assert(err, check.IsNil)
}()

c.Assert(err, check.IsNil)
c.Assert(producer, check.NotNil)

err = producer.Close()
c.Assert(err, check.IsNil)

err = producer.Close()
c.Assert(err, check.IsNil)
}