-
Notifications
You must be signed in to change notification settings - Fork 286
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kafka_producer: fix deadlock when an error occurs in asyncClient #3003
kafka_producer: fix deadlock when an error occurs in asyncClient #3003
Conversation
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
/run-kafka-tests |
d580f78
to
e7ed165
Compare
/run-kafka-tests |
1 similar comment
/run-kafka-tests |
/run-leak-tests |
/run-kafka-tests |
1 similar comment
/run-kafka-tests |
@@ -116,8 +133,7 @@ func (k *kafkaSaramaProducer) SendMessage(ctx context.Context, message *codec.MQ | |||
return ctx.Err() | |||
case <-k.closeCh: | |||
return nil | |||
default: | |||
k.asyncClient.Input() <- msg | |||
case k.asyncClient.Input() <- msg: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SyncBroadcastMessage
has a similar default branch, please check whether we need to update in SyncBroadcastMessage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SyncBroadcastMessage
has a different problem. I would like to fix it in a separate PR, what do you think?
/merge |
This pull request has been accepted and is ready to merge. Commit hash: 17378b5
|
@liuzix: Your PR was out of date, I have automatically updated it for you. At the same time I will also trigger all tests for you: /run-all-tests If the CI test fails, you just re-trigger the test that failed and the bot will merge the PR for you after the CI passes. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
/run-integration-tests |
/run-kafka-tests |
In response to a cherrypick label: new pull request created: #3015. |
In response to a cherrypick label: new pull request created: #3016. |
In response to a cherrypick label: new pull request created: #3017. |
In response to a cherrypick label: new pull request created: #3018. |
What problem does this PR solve?
kafkaSaramaProducer
can deadlock if an error is reported by the underlying producer. kafka_producer: deadlock when closing after failure #2978What is changed and how it works?
k.asyncClient.Input() <- msg
cancellable inSendMessage
clientLock
, so that checking whether the producer is closing will no longer be blocked by other goroutines.Check List
Tests
Related changes
Release note