-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker]Fix chunked messages will be filtered by duplicating #20948
[fix][broker]Fix chunked messages will be filtered by duplicating #20948
Conversation
### Motivation Chunked messages use the same metadata, so all the chunked messages in a single message use the same sequence Id. And it will be recorded as duplicated messages. ``` private long updateMessageMetadataSequenceId(final MessageMetadata msgMetadata) { final long sequenceId; if (!msgMetadata.hasSequenceId()) { sequenceId = msgIdGenerator++; msgMetadata.setSequenceId(sequenceId); } else { sequenceId = msgMetadata.getSequenceId(); } return sequenceId; } ``` ### Modification Use different sequence id for chunk message.
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
Outdated
Show resolved
Hide resolved
@rdhabalia Please review this change. I think we have missed discussing the deduplication feature compatibility when introducing the chunking feature in the PIP(https://github.com/apache/pulsar/wiki/PIP-37:-Large-message-size-handling-in-Pulsar). |
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
Outdated
Show resolved
Hide resolved
...r-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
Outdated
Show resolved
Hide resolved
...r-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
Outdated
Show resolved
Hide resolved
...r-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
Outdated
Show resolved
Hide resolved
...r-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
Outdated
Show resolved
Hide resolved
...r-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
Outdated
Show resolved
Hide resolved
...r-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
Outdated
Show resolved
Hide resolved
...r-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
Outdated
Show resolved
Hide resolved
...r-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
Outdated
Show resolved
Hide resolved
Does this pr require review? Have you raised a PIP for another solution? |
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please create another test class like MessageChunkingDeduplicationTest
for it for the following reasons:
MessageChunkingSharedTest
was added to test chunking with Shared subscriptions. It's bad to add unrelated tests into this class.- You won't need to set
conf.setBrokerDeduplicationEnabled(true)
and restart the broker for each test method.
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingDeduplicationTest.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Show resolved
Hide resolved
...r-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
Show resolved
Hide resolved
...r-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Show resolved
Hide resolved
...r-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
Show resolved
Hide resolved
…0948) Make the chunk message function work properly when deduplication is enabled. For example: ```markdown Chunk-1 sequence ID: 0, chunk ID: 0, total chunk: 2 Chunk-2 sequence ID: 0, chunk ID: 1 Chunk-3 sequence ID: 1, chunk ID: 0 total chunk: 3 Chunk-4 sequence ID: 1, chunk ID: 1 Chunk-5 sequence ID: 1, chunk ID: 1 Chunk-6 sequence ID: 1, chunk ID: 2 ``` Only store check and store the sequence ID of Chunk-2 and Chunk-6. **Add a property in the publishContext to determine whether this chunk is the last chunk when persistent completely.** ```java publishContext.setProperty(IS_LAST_CHUNK, Boolean.FALSE); ``` For example: ```markdown Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1 Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2 Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3 Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4 Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5 Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6 ``` We should filter and ack chunk-4 and chunk-5. (cherry picked from commit b0b13bc)
…0948) Make the chunk message function work properly when deduplication is enabled. For example: ```markdown Chunk-1 sequence ID: 0, chunk ID: 0, total chunk: 2 Chunk-2 sequence ID: 0, chunk ID: 1 Chunk-3 sequence ID: 1, chunk ID: 0 total chunk: 3 Chunk-4 sequence ID: 1, chunk ID: 1 Chunk-5 sequence ID: 1, chunk ID: 1 Chunk-6 sequence ID: 1, chunk ID: 2 ``` Only store check and store the sequence ID of Chunk-2 and Chunk-6. **Add a property in the publishContext to determine whether this chunk is the last chunk when persistent completely.** ```java publishContext.setProperty(IS_LAST_CHUNK, Boolean.FALSE); ``` For example: ```markdown Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1 Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2 Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3 Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4 Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5 Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6 ``` We should filter and ack chunk-4 and chunk-5. (cherry picked from commit b0b13bc)
…0948) ## Motivation Make the chunk message function work properly when deduplication is enabled. ## Modification ### Only check and store the sequence ID of the last chunk in a chunk message. For example: ```markdown Chunk-1 sequence ID: 0, chunk ID: 0, total chunk: 2 Chunk-2 sequence ID: 0, chunk ID: 1 Chunk-3 sequence ID: 1, chunk ID: 0 total chunk: 3 Chunk-4 sequence ID: 1, chunk ID: 1 Chunk-5 sequence ID: 1, chunk ID: 1 Chunk-6 sequence ID: 1, chunk ID: 2 ``` Only store check and store the sequence ID of Chunk-2 and Chunk-6. **Add a property in the publishContext to determine whether this chunk is the last chunk when persistent completely.** ```java publishContext.setProperty(IS_LAST_CHUNK, Boolean.FALSE); ``` ### Filter and ack duplicated chunks in a chunk message instead of discarding ctx. For example: ```markdown Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1 Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2 Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3 Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4 Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5 Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6 ``` We should filter and ack chunk-4 and chunk-5.
…0948) ## Motivation Make the chunk message function work properly when deduplication is enabled. ## Modification ### Only check and store the sequence ID of the last chunk in a chunk message. For example: ```markdown Chunk-1 sequence ID: 0, chunk ID: 0, total chunk: 2 Chunk-2 sequence ID: 0, chunk ID: 1 Chunk-3 sequence ID: 1, chunk ID: 0 total chunk: 3 Chunk-4 sequence ID: 1, chunk ID: 1 Chunk-5 sequence ID: 1, chunk ID: 1 Chunk-6 sequence ID: 1, chunk ID: 2 ``` Only store check and store the sequence ID of Chunk-2 and Chunk-6. **Add a property in the publishContext to determine whether this chunk is the last chunk when persistent completely.** ```java publishContext.setProperty(IS_LAST_CHUNK, Boolean.FALSE); ``` ### Filter and ack duplicated chunks in a chunk message instead of discarding ctx. For example: ```markdown Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1 Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2 Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3 Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4 Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5 Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6 ``` We should filter and ack chunk-4 and chunk-5.
Motivation
Make the chunk message function work properly when deduplication is enabled.
Modification
Only check and store the sequence ID of the last chunk in a chunk message.
For example:
Only store check and store the sequence ID of Chunk-2 and Chunk-6.
Add a property in the publishContext to determine whether this chunk is the last chunk when persistent completely.
Filter and ack duplicated chunks in a chunk message instead of discarding ctx.
For example:
We should filter and ack chunk-4 and chunk-5.
Documentation
doc
doc-required
doc-not-needed
doc-complete