-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][client] Fix failover/exclusive consumer with batch cumulate ack issue. #18454
Conversation
Codecov Report
@@ Coverage Diff @@
## master #18454 +/- ##
============================================
+ Coverage 46.98% 47.14% +0.15%
- Complexity 10343 10413 +70
============================================
Files 692 697 +5
Lines 67766 67981 +215
Branches 7259 7278 +19
============================================
+ Hits 31842 32048 +206
- Misses 32344 32366 +22
+ Partials 3580 3567 -13
Flags with carried forward coverage won't be shown. Click here to find out more.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, 👍🏼
It looks like the problems are not only duplicate messages but also missing messages,
I'm wondering if It's better to add a declaration in the motivation part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch!
### Motivation apache#18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by apache#18454. The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations. ### Modifications Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in apache#18454. `ConsumerRedeliveryTest#testAckNotSent` is added to verify it works. ### TODO The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it.
…#18486) ### Motivation #18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by #18454. The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations. ### Modifications Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in #18454. `ConsumerRedeliveryTest#testAckNotSent` is added to verify it works. ### TODO The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it. ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> ### Matching PR in forked repository PR in forked repository: BewareMyPower#8
…#18486) #18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by #18454. The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations. Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in #18454. `ConsumerRedeliveryTest#testAckNotSent` is added to verify it works. The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it. <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> PR in forked repository: BewareMyPower#8 (cherry picked from commit be1d07e)
…#18486) #18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by #18454. The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations. Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in #18454. `ConsumerRedeliveryTest#testAckNotSent` is added to verify it works. The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it. <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> PR in forked repository: BewareMyPower#8 (cherry picked from commit be1d07e)
…apache#18486) apache#18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by apache#18454. The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations. Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in apache#18454. `ConsumerRedeliveryTest#testAckNotSent` is added to verify it works. The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it. <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> PR in forked repository: BewareMyPower#8 (cherry picked from commit be1d07e) (cherry picked from commit 870b060)
…#18486) #18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by #18454. The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations. Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in #18454. `ConsumerRedeliveryTest#testAckNotSent` is added to verify it works. The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it. <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> PR in forked repository: BewareMyPower#8 (cherry picked from commit be1d07e)
…apache#18486) ### Motivation apache#18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by apache#18454. The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations. ### Modifications Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in apache#18454. `ConsumerRedeliveryTest#testAckNotSent` is added to verify it works. ### TODO The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it. ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> ### Matching PR in forked repository PR in forked repository: BewareMyPower#8
… issue. (apache#18454) (cherry picked from commit 7712aa3) (cherry picked from commit 6f69aa4)
…apache#18486) ### Motivation apache#18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by apache#18454. The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations. ### Modifications Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in apache#18454. `ConsumerRedeliveryTest#testAckNotSent` is added to verify it works. ### TODO The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it. ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> ### Matching PR in forked repository PR in forked repository: BewareMyPower#8
… issue. (apache#18454) (cherry picked from commit 7712aa3) (cherry picked from commit 6f69aa4)
…#18486) #18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by #18454. The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations. Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in #18454. `ConsumerRedeliveryTest#testAckNotSent` is added to verify it works. The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it. <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> PR in forked repository: BewareMyPower#8
Motivation
When the consumer
redeliverUnacknowledgedMessages
, the MultiTopicsConsumerImpl doesn't filter the duplicate message. Also, the ConsumerImpl doesn't check the batched msg-id, this will cause missing msg in the below case :Documentation
doc
doc-required
doc-not-needed
doc-complete