Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

producer: ensure that the management message (fin) is never "leaked" #2182

Merged
merged 11 commits into from
Mar 30, 2022

Conversation

niamster
Copy link
Contributor

Since async producer now support multiple inflight messages
thanks to #1686 and #2094, it now may "leak"
the "fin" internal management message to Kafka (and to the client)
when broker producer is reconnecting to Kafka broker and retries
multiple inflight messages at the same time.

Steps to reproduce: start async producer and restart Kafka broker (potentially multiple times).
This trigger leader update and retires current broker producer.
There's a chance that newly created broker producer is used to retry an
inflight message so it is also marked as "dying" but it does not contain
any explicit errors.
This results in a "fin" message to be sent to Kafka and reported as successful
operation to the client.

Versions affected: v1.31.0 and above.

Sarama logs of the event (with the patch applied):

[Sarama] 2022/03/16 09:54:31 producer/broker/10001 starting up
[Sarama] 2022/03/16 09:54:31 producer/broker/10001 state change to [open] on my-topic/26
[Sarama] 2022/03/16 09:54:31 producer/leader/my-topic/26 selected broker 10001
[Sarama] 2022/03/16 09:54:31 producer/leader/my-topic/26 state change to [flushing-1]
[Sarama] 2022/03/16 09:54:31 producer/leader/my-topic/26 state change to [normal]
[Sarama] 2022/03/16 09:54:31 producer/leader/my-topic/26 state change to [retrying-1]
[Sarama] 2022/03/16 09:54:31 producer/broker/10001 state change to [dying-1] on my-topic/26
[Sarama] 2022/03/16 09:54:31 producer/leader/my-topic/26 abandoning broker 10001
[Sarama] 2022/03/16 09:54:31 producer/broker/10001 input chan closed
[Sarama] 2022/03/16 09:54:33 producer/broker/10001 state change to [open] on my-topic/26
[Sarama] 2022/03/16 09:54:33 producer/leader/my-topic/26 selected broker 10001
[Sarama] 2022/03/16 09:54:33 producer/leader/my-topic/26 state change to [flushing-1]
[Sarama] 2022/03/16 09:54:33 producer/leader/my-topic/26 state change to [normal]

Kafka version: 2.7.1
Go version: 1.17.7

Kafka producer config:

cfg.Producer.Retry.Backoff = 2 * time.Second
cfg.Producer.Return.Successes = true
cfg.Producer.Return.Errors = true
cfg.Producer.Flush.Messages = 1000
cfg.Producer.Flush.Frequency = 100 * time.Millisecond
cfg.ChannelBufferSize = 1024

Since async producer now support multiple inflight messages
thanks to IBM#1686 and
IBM#2094, it now may "leak"
the "fin" internal management message to Kafka (and to the client)
when broker producer is reconnecting to Kafka broker and retries
multiple inflight messages at the same time.
@slaunay
Copy link
Contributor

slaunay commented Mar 19, 2022

@niamster Thanks for the fix, any chance you can include a unit test to reproduce that tricky race condition?

I think the linked issue is #2150, if you add fixes #2150 in your commit message somewhere that issue should get closed once the PR is merged.

if msg.flags&fin == fin {
// New broker producer that was caught up by the retry loop
bp.parent.retryMessage(msg, ErrShuttingDown)
delete(bp.currentRetries[msg.Topic], msg.Partition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure those 3 lines are necessary because the bp.currentRetries should be "empty" for that particular topic partition because bp.needsRetry(msg) returned nil above:
https://github.com/Shopify/sarama/blob/bad67e5b089437bc73f8034a95017e77be71e8b0/async_producer.go#L903-L909

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true that bp.currentRetries[msg.Topic] is nil in this case (bp.currentRetries[msg.Topic][msg.Partition] is set to nil in the "syn" case above).
I decided to delete the key to ensure that the key does not exist to avoid confusion "key exist but nil" vs. "key does not exist" to mimic the other block of code above related to handling fin message.

TBH I'm not entirely sure about the purpose of the following code in handling syn case:

				if bp.currentRetries[msg.Topic] == nil {
					bp.currentRetries[msg.Topic] = make(map[int32]error)
				}
				bp.currentRetries[msg.Topic][msg.Partition] = nil

I don't see where it might be useful (for now).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I'm not entirely sure about the purpose of the following code in handling syn case:

				if bp.currentRetries[msg.Topic] == nil {
					bp.currentRetries[msg.Topic] = make(map[int32]error)
				}
				bp.currentRetries[msg.Topic][msg.Partition] = nil

I don't see where it might be useful (for now).

I believe when a syn message is seen the brokerProducer should:

  • initialize currentRetries to prevent a panic when accessing bp.currentRetries[topic][partition] (typically on first use of the brokerProducer).
  • clear a previous retries "state" for such topic partition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @slaunay for the reply!
I don't see how this could lead to any panics 🤔
I've given another thought and I think it does not really matter (as far as I can tell) whether the retries "metadata" structure is cleared or not (I was trying to imagine what if the code is changed in the future so this branch is executed even if bp.currentRetries[msg.Topic][msg.Partition] is not nil). So I will update the code to minimise the changes.

@niamster
Copy link
Contributor Author

@niamster Thanks for the fix, any chance you can include a unit test to reproduce that tricky race condition?

I think the linked issue is #2150, if you add fixes #2150 in your commit message somewhere that issue should get closed once the PR is merged.

Thanks @slaunay for the review! I will try to add a test, this is a little bit tricky to reproduce robustly though.

As for #2150 - TBH I'm not 100% sure this is exactly the same issue. I was looking into it before submitting this PR but was not confident that this fix also fixes that one.
Maybe @hxiaodon can confirm that this patch fixes their issue as well? If this patch fixes their problem I'm happy to add fixes #2150 some commit :)

@niamster
Copy link
Contributor Author

@slaunay @hxiaodon I was able to create a test to reproduce the issue.

Without proposed fix the test fails with following log:

    async_producer_test.go:72: Unexpected successes 35 or errors 0
[sarama] 2022/03/20 02:50:31 Producer shutting down.
    async_producer_test.go:26: timeout
    async_producer_test.go:828: 1 empty values
--- FAIL: TestAsyncProducerBrokerRestart (15.85s)

Some messages are lost which locks the async producer shutdown procedure (the number of inflight packets is not 0).
This is not exactly the same behaviour that I observe in my other test environment but the fix helps. I hope this also helps #2150 so I've marked my commit with fixes #2150.

@dnwe
Copy link
Collaborator

dnwe commented Mar 21, 2022

@niamster thanks for working on these changes — I haven't yet looked over the code properly, but I notice there's a data race being flagged on the unittests with the mockbroker:

==================
WARNING: DATA RACE
Read at 0x00c0000ae258 by goroutine 40:
  github.com/Shopify/sarama.TestAsyncProducerBrokerRestart.func1()
      github.com/Shopify/sarama/async_producer_test.go:741 +0x89
  github.com/Shopify/sarama.(*MockBroker).handleRequests()
      github.com/Shopify/sarama/mockbroker.go:266 +0xe16

Previous write at 0x00c0000ae258 by goroutine 115:
  github.com/Shopify/sarama.TestAsyncProducerBrokerRestart()
      github.com/Shopify/sarama/async_producer_test.go:809 +0x7fb
  testing.tRunner()
      testing/testing.go:1203 +0x202

Goroutine 40 (running) created at:
  github.com/Shopify/sarama.(*MockBroker).serverLoop()
      github.com/Shopify/sarama/mockbroker.go:177 +0x22d

Goroutine 115 (running) created at:
  testing.(*T).Run()
      testing/testing.go:1248 +0x5d7
  testing.runTests.func1()
      testing/testing.go:1521 +0xa6
  testing.tRunner()
      testing/testing.go:1203 +0x202
  testing.runTests()
      testing/testing.go:1519 +0x612
  testing.(*M).Run()
      testing/testing.go:1427 +0x3b3
  main.main()
      _testmain.go:1131 +0x356

@niamster
Copy link
Contributor Author

@niamster thanks for working on these changes — I haven't yet looked over the code properly, but I notice there's a data race being flagged on the unittests with the mockbroker:

==================
WARNING: DATA RACE
Read at 0x00c0000ae258 by goroutine 40:
  github.com/Shopify/sarama.TestAsyncProducerBrokerRestart.func1()
      github.com/Shopify/sarama/async_producer_test.go:741 +0x89
  github.com/Shopify/sarama.(*MockBroker).handleRequests()
      github.com/Shopify/sarama/mockbroker.go:266 +0xe16

Previous write at 0x00c0000ae258 by goroutine 115:
  github.com/Shopify/sarama.TestAsyncProducerBrokerRestart()
      github.com/Shopify/sarama/async_producer_test.go:809 +0x7fb
  testing.tRunner()
      testing/testing.go:1203 +0x202

Goroutine 40 (running) created at:
  github.com/Shopify/sarama.(*MockBroker).serverLoop()
      github.com/Shopify/sarama/mockbroker.go:177 +0x22d

Goroutine 115 (running) created at:
  testing.(*T).Run()
      testing/testing.go:1248 +0x5d7
  testing.runTests.func1()
      testing/testing.go:1521 +0xa6
  testing.tRunner()
      testing/testing.go:1203 +0x202
  testing.runTests()
      testing/testing.go:1519 +0x612
  testing.(*M).Run()
      testing/testing.go:1427 +0x3b3
  main.main()
      _testmain.go:1131 +0x356

Thanks @dnwe, this race condition should be fixed with the latest commit to this branch ba24670.

}()
wg.Wait()
}

func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
func closeProducer(t *testing.T, p AsyncProducer) {
closeProducerWithTimeout(t, p, time.Hour)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make these timeout defaults time.Minute * 10 (or perhaps time.Minute * 5 even)? As the tests are run with -timeout 10m anyway, I think a timeout from these test helpers will probably be cleaner in the CI runs than a test timeout panic() anyway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've put 1h to preserve current behaviour. I think 5mn makes more sense than 10mn if we want the test fail before the whole test suite times out. I will change it to 5mn in next commit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦 I forgot to commit the change, done!

Copy link
Collaborator

@dnwe dnwe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @niamster — these changes look good to me.

@slaunay did you have any further questions/suggestions before I merge?

Copy link
Contributor

@slaunay slaunay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot @niamster for coming up with a test case that consistently reproduce the issue.

The fix looks good to me, I added a few comments.

We should probably add a retract entry to go.mod as this is a major bug like:

v1.32.0 // producer hangs on retry https://github.com/Shopify/sarama/issues/2150

seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

var leaderLock sync.Mutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the leaderLock is used to synchronize access to leader.BrokerID().
You can probably create a variable instead:

leaderID := leader.BrokerID()

as the same ID is used when the leader is closed and recreated.
You can even make it simpler by caching the metadataLeader response and just return it in the function:

metadataLeader := new(MetadataResponse)
...
seedBroker.setHandler(func(req *request) (res encoderWithHeader) {
  return metadataLeader
})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used to guard leader instance when accessing .Addr().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I thought like some other unit tests the same address was reused with leader = NewMockBrokerAddr(t, 2, leaderAddr) to stop/start but moving to a different port works too.

if batch := preq.records["my_topic"][0].RecordBatch; batch != nil {
for _, record := range batch.Records {
if len(record.Value) == 0 {
emptyValues++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As produceRequestTest is invoked from different goroutines incrementing emptyValues here and reading the value in the main test function goroutine is probably not "thread" safe.

Accessing emptyValues with atomic.AddInt32/atomic.LoadInt32 would be "thread" safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I will update this.

if msg.flags&fin == fin {
// New broker producer that was caught up by the retry loop
bp.parent.retryMessage(msg, ErrShuttingDown)
Logger.Printf("producer/broker/%d state change to [dying-%d] on %s/%d\n",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dying state would make sense for the old "unhealthy" brokerProducer but might be confusing on a "healthy" brokerProducer.
When looking at the applicative logs, the user might see state change to [dying-1] and think that there is/was an issue.

The fix preserves the original retry logic of bubbling up the fin message so it would lean towards removing that log statement but if we want to know when we hit this (internal) condition we can use DebugLogger instead.

}

leader.setHandler(func(req *request) (res encoderWithHeader) {
produceRequestTest(req)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more suitable name for that function might be:

Suggested change
produceRequestTest(req)
countRecordsWithEmptyValue(req)

go pushMsg()

for i := 0; i < 3; i++ {
time.Sleep(100 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can reproduce the failure even if the other time.Sleep in pushMsg and leader.setHandler are commented out but not when I comment this particular call.

👍 So this seems to be the crux of reproducing the race condition by sending records in bursts while restarting the (mock) broker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the whole point is to have different "generations" of the retries.


wg.Wait()

expectResultsWithTimeout(t, producer, 40, 00, 10*time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
expectResultsWithTimeout(t, producer, 40, 00, 10*time.Second)
expectResultsWithTimeout(t, producer, 40, 0, 10*time.Second)

})

config := NewTestConfig()
config.Producer.Retry.Backoff = time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Being a unit test, we can probably speed the runtime by using the default 250ms backoff.
It still fails consistently on main but succeeds after ~600ms vs ~2.2s (with 1s backoff).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't play much with this setting, I just set this value reasonably high to ensure that the test is not flaky.

@niamster
Copy link
Contributor Author

Thanks @slaunay for the review 🙇 I addressed your comments. Please let me know if I missed anything.

})

config := NewTestConfig()
config.Producer.Retry.Backoff = 250 * time.Millisecond
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is the same as the default and could be removed but nothing wrong in being explicit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's now set to 100ms.
Also I prefer to have an explicit value to avoid any surprises in the future.

@slaunay
Copy link
Contributor

slaunay commented Mar 28, 2022

Thanks a lot @niamster!

@dnwe The fix looks good to me.

Do you think a retract is beneficial to users and if so should we add it in this PR to reduce the work for the next release?

@niamster
Copy link
Contributor Author

Do you think a retract is beneficial to users and if so should we add it in this PR to reduce the work for the next release?

Yes, I think so. Because this regression causes data corruption (unexpected data in kafka). I believe it should be a different PR thought.

@dnwe dnwe added the fix label Mar 30, 2022
Copy link
Collaborator

@dnwe dnwe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@niamster thanks again! and thanks @slaunay for the detailed review and guidance on the fix

@dnwe dnwe merged commit d5f076b into IBM:main Mar 30, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants