Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Avoid redelivering duplicated messages when batching is enabled #18486

Conversation

BewareMyPower
Copy link
Contributor

Motivation

#18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the testConsumerDedup test modified by #18454.

The root cause is that single messages will still be passed into the isDuplicated method in receiveIndividualMessagesFromBatch. However, in this case, the MessageId is a BatchedMessageIdImpl, while the MessageId in lastCumulativeAck or pendingIndividualAcks are MessageIdImpl implementations.

Modifications

Validate the class type in isDuplicated and convert a BatchedMessageIdImpl to MessageIdImpl. Then revert the unnecessary changes in #18454.

ConsumerRedeliveryTest#testAckNotSent is added to verify it works.

TODO

The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: BewareMyPower#8

### Motivation

apache#18454 fixed the potential message
loss when a batched message is redelivered and one single message of the
batch is added to the ACK tracker. However, it also leads to a
potential message duplication, see the `testConsumerDedup` test modified
by apache#18454.

The root cause is that single messages will still be passed into the
`isDuplicated` method in `receiveIndividualMessagesFromBatch`. However,
in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the
`MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are
`MessageIdImpl` implementations.

### Modifications

Validate the class type in `isDuplicated` and convert a
`BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary
changes in apache#18454.

`ConsumerRedeliveryTest#testAckNotSent` is added to verify it works.

### TODO

The duplication could still happen when batch index ACK is enabled.
Because even after the ACK tracker is flushed, if only parts of a
batched message are not acknowledged, the whole batched message would
still be redelivered. I will open another PR to fix it.
codelipenghui
codelipenghui previously approved these changes Nov 16, 2022
@codelipenghui codelipenghui added this to the 2.12.0 milestone Nov 16, 2022
@codelipenghui
Copy link
Contributor

/pulsarbot run-failure-checks

@codecov-commenter
Copy link

codecov-commenter commented Nov 16, 2022

Codecov Report

Merging #18486 (c48ab86) into master (aeb4503) will increase coverage by 16.86%.
The diff coverage is 62.26%.

Impacted file tree graph

@@              Coverage Diff              @@
##             master   #18486       +/-   ##
=============================================
+ Coverage     31.39%   48.26%   +16.86%     
- Complexity     6651     9447     +2796     
=============================================
  Files           697      618       -79     
  Lines         68015    58558     -9457     
  Branches       7285     6091     -1194     
=============================================
+ Hits          21353    28262     +6909     
+ Misses        43667    27278    -16389     
- Partials       2995     3018       +23     
Flag Coverage Δ
unittests 48.26% <62.26%> (+16.86%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...ersistentStreamingDispatcherMultipleConsumers.java 0.00% <0.00%> (ø)
...he/pulsar/client/impl/MultiTopicsConsumerImpl.java 22.98% <0.00%> (+0.71%) ⬆️
...ersistentStickyKeyDispatcherMultipleConsumers.java 58.91% <50.00%> (+20.44%) ⬆️
...sistent/PersistentDispatcherMultipleConsumers.java 58.13% <75.00%> (+18.90%) ⬆️
...impl/PersistentAcknowledgmentsGroupingTracker.java 58.89% <77.27%> (+1.16%) ⬆️
...pulsar/broker/admin/impl/PersistentTopicsBase.java 59.22% <100.00%> (+43.45%) ⬆️
...lsar/client/impl/conf/ClientConfigurationData.java 96.66% <100.00%> (+0.11%) ⬆️
...apache/pulsar/client/impl/AutoClusterFailover.java 75.55% <0.00%> (-0.56%) ⬇️
...va/org/apache/pulsar/client/impl/ConsumerBase.java 21.93% <0.00%> (-0.02%) ⬇️
.../main/java/org/apache/pulsar/PulsarStandalone.java 0.00% <0.00%> (ø)
... and 252 more

@BewareMyPower BewareMyPower dismissed codelipenghui’s stale review November 16, 2022 03:06

there are new changes

Copy link
Contributor

@congbobo184 congbobo184 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! should we add a test
5 messages in one batch and ackIndividual the 3 messages, redeliver, only can receive the last 2 messages in this batch and enableBatchIndex

{ 3, 5, CommandAck.AckType.Individual },
{ 5, 5, CommandAck.AckType.Individual },
{ 3, 5, CommandAck.AckType.Cumulative },
{ 5, 5, CommandAck.AckType.Cumulative }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5, 5 test case may not stable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I will take a look soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fixed now. The root cause is that after b0fad40, a BatchedMessageIdImpl will be compared with a MessageIdImpl, which should be forbidden.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@congbobo184 PTAL again.

Copy link
Contributor

@liangyepianzhou liangyepianzhou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, good work, and the root cause of the flaky test should be this.

@BewareMyPower
Copy link
Contributor Author

5 messages in one batch and ackIndividual the 3 messages, redeliver, only can receive the last 2 messages in this batch and enableBatchIndex

@congbobo184 See the TODO in my PR description. I think it's a bug for batch index ACK and it needs some extra work to fix.

@congbobo184 congbobo184 merged commit be1d07e into apache:master Nov 21, 2022
@congbobo184
Copy link
Contributor

5 messages in one batch and ackIndividual the 3 messages, redeliver, only can receive the last 2 messages in this batch and enableBatchIndex

@congbobo184 See the TODO in my PR description. I think it's a bug for batch index ACK and it needs some extra work to fix.

sorry, I miss your description. doesn't the batch index ack fix should include in release/2.9.4?

congbobo184 pushed a commit that referenced this pull request Nov 21, 2022
…#18486)

#18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by #18454.

The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations.

Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in #18454.

`ConsumerRedeliveryTest#testAckNotSent` is added to verify it works.

The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it.

<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->

- [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

PR in forked repository: BewareMyPower#8

(cherry picked from commit be1d07e)
@congbobo184 congbobo184 added the cherry-picked/branch-2.9 Archived: 2.9 is end of life label Nov 21, 2022
@BewareMyPower
Copy link
Contributor Author

doesn't the batch index ack fix should include in release/2.9.4?

I think not. It's not a regression. It's a bug from the beginning when batch index ACK was introduced.

liangyepianzhou added a commit that referenced this pull request Nov 24, 2022
## Motivation
1. fix flaky test #18466 caused by txn async send method
2. decrease run time by optimizing receive method 
## Modification
1. fix flaky test
   * modify `producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();` to `producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).send();` 
This also can be resolved by #17836 and #18486 later.
2. decrease run time by optimizing receive method 
    * modify
 `    Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
                   Assert.assertNull(message);` to
                   `  Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS);
                            Assert.assertNull(message);`
   * modify `message = consumer.receive();` to `message = consumer.receive(5, TimeUnit.SECONDS);`
   * keep other `consumer.receive(x, y)` no change.
@BewareMyPower BewareMyPower deleted the bewaremypower/fix-duplicated-individual-ack branch November 24, 2022 07:02
@Technoboy- Technoboy- changed the title [fix] Avoid redelivering duplicated messages when batching is enabled [fix][client] Avoid redelivering duplicated messages when batching is enabled Nov 28, 2022
liangyepianzhou pushed a commit that referenced this pull request Dec 5, 2022
…#18486)

#18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by #18454.

The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations.

Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in #18454.

`ConsumerRedeliveryTest#testAckNotSent` is added to verify it works.

The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it.

<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->

- [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

PR in forked repository: BewareMyPower#8

(cherry picked from commit be1d07e)
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Dec 6, 2022
…apache#18486)

apache#18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by apache#18454.

The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations.

Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in apache#18454.

`ConsumerRedeliveryTest#testAckNotSent` is added to verify it works.

The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it.

<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->

- [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

PR in forked repository: BewareMyPower#8

(cherry picked from commit be1d07e)
(cherry picked from commit 870b060)
congbobo184 pushed a commit that referenced this pull request Dec 8, 2022
…#18486)

#18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by #18454.

The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations.

Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in #18454.

`ConsumerRedeliveryTest#testAckNotSent` is added to verify it works.

The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it.

<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->

- [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

PR in forked repository: BewareMyPower#8

(cherry picked from commit be1d07e)
lifepuzzlefun pushed a commit to lifepuzzlefun/pulsar that referenced this pull request Dec 9, 2022
…apache#18486)

### Motivation

apache#18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by apache#18454.

The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations.

### Modifications

Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in apache#18454.

`ConsumerRedeliveryTest#testAckNotSent` is added to verify it works.

### TODO

The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it.

### Documentation

<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->

- [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

### Matching PR in forked repository

PR in forked repository: BewareMyPower#8
lifepuzzlefun pushed a commit to lifepuzzlefun/pulsar that referenced this pull request Dec 9, 2022
## Motivation
1. fix flaky test apache#18466 caused by txn async send method
2. decrease run time by optimizing receive method 
## Modification
1. fix flaky test
   * modify `producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();` to `producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).send();` 
This also can be resolved by apache#17836 and apache#18486 later.
2. decrease run time by optimizing receive method 
    * modify
 `    Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
                   Assert.assertNull(message);` to
                   `  Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS);
                            Assert.assertNull(message);`
   * modify `message = consumer.receive();` to `message = consumer.receive(5, TimeUnit.SECONDS);`
   * keep other `consumer.receive(x, y)` no change.
lifepuzzlefun pushed a commit to lifepuzzlefun/pulsar that referenced this pull request Jan 10, 2023
…apache#18486)

### Motivation

apache#18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by apache#18454.

The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations.

### Modifications

Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in apache#18454.

`ConsumerRedeliveryTest#testAckNotSent` is added to verify it works.

### TODO

The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it.

### Documentation

<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->

- [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

### Matching PR in forked repository

PR in forked repository: BewareMyPower#8
lifepuzzlefun pushed a commit to lifepuzzlefun/pulsar that referenced this pull request Jan 10, 2023
## Motivation
1. fix flaky test apache#18466 caused by txn async send method
2. decrease run time by optimizing receive method 
## Modification
1. fix flaky test
   * modify `producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();` to `producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).send();` 
This also can be resolved by apache#17836 and apache#18486 later.
2. decrease run time by optimizing receive method 
    * modify
 `    Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
                   Assert.assertNull(message);` to
                   `  Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS);
                            Assert.assertNull(message);`
   * modify `message = consumer.receive();` to `message = consumer.receive(5, TimeUnit.SECONDS);`
   * keep other `consumer.receive(x, y)` no change.
Technoboy- pushed a commit that referenced this pull request Feb 8, 2023
…#18486)

#18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by #18454.

The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations.

Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in #18454.

`ConsumerRedeliveryTest#testAckNotSent` is added to verify it works.

The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it.

<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->

- [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

PR in forked repository: BewareMyPower#8
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants