Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker]Fix chunked messages will be filtered by duplicating #20948

Merged
merged 25 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
52ae60f
[fix][broker]Fix chunked messages will be filtered by duplicating
liangyepianzhou Aug 7, 2023
30c9430
optimize
liangyepianzhou Aug 10, 2023
02d7071
optimize
liangyepianzhou Aug 10, 2023
d99ed6d
handle in broker side
liangyepianzhou Aug 11, 2023
be88b7b
broker reload and message resend
liangyepianzhou Aug 11, 2023
1c8eec3
Only check sequence ID
liangyepianzhou Aug 12, 2023
ab69604
filter duplicated message at consumer side
liangyepianzhou Aug 18, 2023
3be7bf0
add test
liangyepianzhou Aug 22, 2023
2b05a5c
address some comment
liangyepianzhou Aug 23, 2023
87a2513
address some comment
liangyepianzhou Aug 23, 2023
7af1053
address some comment
liangyepianzhou Aug 23, 2023
c4ef26f
address some comment
liangyepianzhou Aug 23, 2023
fb1a55c
Revert "address some comment"
liangyepianzhou Aug 24, 2023
d9af22f
ack duplicated chunk
liangyepianzhou Aug 24, 2023
8655723
rollback the logic of `chunkedMsgCtx == null` and add note.
liangyepianzhou Aug 25, 2023
d7f9290
fix test
liangyepianzhou Aug 25, 2023
f282ac0
optimize uuid
liangyepianzhou Aug 26, 2023
1f8263d
optimize uuid and checkstyle
liangyepianzhou Aug 28, 2023
4f789ee
check the last chunk ID
liangyepianzhou Aug 28, 2023
86c3f77
Merge remote-tracking branch 'apache/master' into deplicate_chunk
liangyepianzhou Aug 30, 2023
2aaac45
only store the sequence ID of the last chunk ID
liangyepianzhou Aug 30, 2023
8579c37
Avoid flaky testing
liangyepianzhou Aug 30, 2023
3e2267c
Make comments indent aligned.
BewareMyPower Aug 30, 2023
7c813e9
Add a new test class
liangyepianzhou Aug 30, 2023
233c5cc
address some comments
liangyepianzhou Aug 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class MessageDeduplication {
private final ManagedLedger managedLedger;
private ManagedCursor managedCursor;

private static final String IS_LAST_CHUNK = "isLastChunk";

enum Status {

// Deduplication is initialized
Expand Down Expand Up @@ -328,11 +330,12 @@ public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf heade
String producerName = publishContext.getProducerName();
long sequenceId = publishContext.getSequenceId();
long highestSequenceId = Math.max(publishContext.getHighestSequenceId(), sequenceId);
MessageMetadata md = null;
if (producerName.startsWith(replicatorPrefix)) {
// Message is coming from replication, we need to use the original producer name and sequence id
// for the purpose of deduplication and not rely on the "replicator" name.
int readerIndex = headersAndPayload.readerIndex();
MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload);
md = Commands.parseMessageMetadata(headersAndPayload);
producerName = md.getProducerName();
sequenceId = md.getSequenceId();
highestSequenceId = Math.max(md.getHighestSequenceId(), sequenceId);
Expand All @@ -341,7 +344,23 @@ public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf heade
publishContext.setOriginalHighestSequenceId(highestSequenceId);
headersAndPayload.readerIndex(readerIndex);
}

long chunkID = -1;
long totalChunk = -1;
if (publishContext.isChunked()) {
if (md == null) {
int readerIndex = headersAndPayload.readerIndex();
md = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.readerIndex(readerIndex);
}
chunkID = md.getChunkId();
totalChunk = md.getNumChunksFromMsg();
}
// All chunks of a message use the same message metadata and sequence ID,
// so we only need to check the sequence ID for the last chunk in a chunk message.
if (chunkID != -1 && chunkID != totalChunk - 1) {
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
publishContext.setProperty(IS_LAST_CHUNK, Boolean.FALSE);
return MessageDupStatus.NotDup;
}
// Synchronize the get() and subsequent put() on the map. This would only be relevant if the producer
// disconnects and re-connects very quickly. At that point the call can be coming from a different thread
synchronized (highestSequencedPushed) {
Expand All @@ -367,6 +386,11 @@ public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf heade
}
highestSequencedPushed.put(producerName, highestSequenceId);
}
// Only put sequence ID into highestSequencedPushed and
// highestSequencedPersisted until receive and persistent the last chunk.
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
if (chunkID != -1 && chunkID == totalChunk - 1) {
publishContext.setProperty(IS_LAST_CHUNK, Boolean.TRUE);
}
return MessageDupStatus.NotDup;
}

Expand All @@ -387,8 +411,10 @@ public void recordMessagePersisted(PublishContext publishContext, PositionImpl p
sequenceId = publishContext.getOriginalSequenceId();
highestSequenceId = publishContext.getOriginalHighestSequenceId();
}

highestSequencedPersisted.put(producerName, Math.max(highestSequenceId, sequenceId));
Boolean isLastChunk = (Boolean) publishContext.getProperty(IS_LAST_CHUNK);
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
if (isLastChunk == null || isLastChunk) {
highestSequencedPersisted.put(producerName, Math.max(highestSequenceId, sequenceId));
}
if (++snapshotCounter >= snapshotInterval) {
snapshotCounter = 0;
takeSnapshot(position);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import static org.apache.pulsar.client.impl.MessageChunkingSharedTest.sendChunk;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker-impl")
public class MessageChunkingDeduplicationTest extends ProducerConsumerBase {

@BeforeClass
@Override
protected void setup() throws Exception {
this.conf.setBrokerDeduplicationEnabled(true);
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testSendChunkMessageWithSameSequenceID() throws Exception {
String topicName = "persistent://my-property/my-ns/testSendChunkMessageWithSameSequenceID";
String producerName = "test-producer";
@Cleanup
Consumer<String> consumer = pulsarClient
.newConsumer(Schema.STRING)
.subscriptionName("test-sub")
.topic(topicName)
.subscribe();
@Cleanup
Producer<String> producer = pulsarClient
.newProducer(Schema.STRING)
.producerName(producerName)
.topic(topicName)
.enableChunking(true)
.enableBatching(false)
.create();
int messageSize = 6000; // payload size in KB
String message = "a".repeat(messageSize * 1000);
producer.newMessage().value(message).sequenceId(10).send();
Message<String> msg = consumer.receive(10, TimeUnit.SECONDS);
assertNotNull(msg);
assertTrue(msg.getMessageId() instanceof ChunkMessageIdImpl);
assertEquals(msg.getValue(), message);
producer.newMessage().value(message).sequenceId(10).send();
msg = consumer.receive(3, TimeUnit.SECONDS);
assertNull(msg);
}

@Test
public void testDeduplicateChunksInSingleChunkMessages() throws Exception {
String topicName = "persistent://my-property/my-ns/testDeduplicateChunksInSingleChunkMessage";
String producerName = "test-producer";
@Cleanup
Consumer<String> consumer = pulsarClient
.newConsumer(Schema.STRING)
.subscriptionName("test-sub")
.topic(topicName)
.subscribe();
final PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
.getTopicIfExists(topicName).get().orElse(null);
assertNotNull(persistentTopic);
sendChunk(persistentTopic, producerName, 1, 0, 2);
sendChunk(persistentTopic, producerName, 1, 1, 2);
sendChunk(persistentTopic, producerName, 1, 1, 2);

Message<String> message = consumer.receive(15, TimeUnit.SECONDS);
assertEquals(message.getData().length, 2);

sendChunk(persistentTopic, producerName, 2, 0, 3);
sendChunk(persistentTopic, producerName, 2, 1, 3);
sendChunk(persistentTopic, producerName, 2, 1, 3);
sendChunk(persistentTopic, producerName, 2, 2, 3);
message = consumer.receive(20, TimeUnit.SECONDS);
assertEquals(message.getData().length, 3);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -34,6 +35,7 @@
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
Expand All @@ -45,7 +47,6 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -217,7 +218,7 @@ private static void sendNonChunk(final PersistentTopic persistentTopic,
sendChunk(persistentTopic, producerName, sequenceId, null, null);
}

private static void sendChunk(final PersistentTopic persistentTopic,
protected static void sendChunk(final PersistentTopic persistentTopic,
final String producerName,
final long sequenceId,
final Integer chunkId,
Expand All @@ -233,16 +234,33 @@ private static void sendChunk(final PersistentTopic persistentTopic,
metadata.setTotalChunkMsgSize(numChunks);
}
final ByteBuf buf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, metadata,
PulsarByteBufAllocator.DEFAULT.buffer(1));
persistentTopic.publishMessage(buf, (e, ledgerId, entryId) -> {
String name = producerName + "-" + sequenceId;
if (chunkId != null) {
name += "-" + chunkId + "-" + numChunks;
Unpooled.wrappedBuffer("a".getBytes()));
persistentTopic.publishMessage(buf, new Topic.PublishContext() {
@Override
public boolean isChunked() {
return chunkId != null;
}
if (e == null) {
log.info("Sent {} to ({}, {})", name, ledgerId, entryId);
} else {
log.error("Failed to send {}: {}", name, e.getMessage());

@Override
public String getProducerName() {
return producerName;
}

public long getSequenceId() {
return sequenceId;
}

@Override
public void completed(Exception e, long ledgerId, long entryId) {
String name = producerName + "-" + sequenceId;
if (chunkId != null) {
name += "-" + chunkId + "-" + numChunks;
}
if (e == null) {
log.info("Sent {} to ({}, {})", name, ledgerId, entryId);
} else {
log.error("Failed to send {}: {}", name, e.getMessage());
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -1459,6 +1460,30 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
// discard message if chunk is out-of-order
if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we handle chunkedMsgCtx == null in a separated block?

There are actually two cases (assuming the message is split into 2 chunks):

  1. Consumer received chunk-1 before chunk-0. In this case, chunkedMsgCtx is null.
  2. Consumer has received chunk-0, then it received chunk-0 again. In this case, chunkedMsgCtx is not null and msgMetadata.getChunkId() <= chunkedMsgCtx.lastChunkedMessageId.

The 1st case should be an unexpected case because message disordering happens. The 2nd case might be expected (I'm not sure) so that consumer should filter duplicated messages.

/cc @liangyepianzhou @poorbarcode

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another case:

  • The consumer received chunk-0, chunk-1, and receive chunk-1 again.

Copy link
Contributor Author

@liangyepianzhou liangyepianzhou Aug 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case of chunkedMsgCtx == null:
Case 1:

chunk-0 sequence ID: 1, chunk ID: 1
chunk-1 sequence ID: 1, chunk ID: 0
chunk-2 sequence ID: 1, chunk ID: 1

Case 2:

chunk-0 sequence ID: 1, chunk ID: 1
chunk-1 sequence ID: 1, chunk ID: 0

Case 3:

chunk-0 sequence ID: 1, chunk ID: 0
chunk-1 sequence ID: 1, chunk ID: 1
chunk-2 sequence ID: 1, chunk ID: 1

I caught chunkedMsgCtx == null with msgMetadata.getChunkId() <= chunkedMsgCtx.lastChunkedMessageId when encounter Case 3.
However, after reminders and deliberation, I think these three cases can all be processed by the original logic.

/cc @BewareMyPower @poorbarcode

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consumer has received chunk-0, then it received chunk-0 again. In this case, chunkedMsgCtx is not null and msgMetadata.getChunkId() <= chunkedMsgCtx.lastChunkedMessageId.
The 2nd case might be expected (I'm not sure) so that consumer should filter duplicated messages.

This PR does not deduplicate the chunks in a single chunk message on the broker side.
So, the 2nd case is expected. The proposal 295 is add a patch to do deduplication of chunks on the broker side. After that the 2nd case should be an unexpected case.

|| msgMetadata.getChunkId() != (chunkedMsgCtx.lastChunkedMessageId + 1)) {
// Filter and ack duplicated chunks instead of discard ctx.
// For example:
// Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
// Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
// Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3
// Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
// Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
// Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6
// We should filter and ack chunk-4 and chunk-5.
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
if (chunkedMsgCtx != null && msgMetadata.getChunkId() <= chunkedMsgCtx.lastChunkedMessageId) {
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
log.warn("[{}] Receive a duplicated chunk message with messageId [{}], last-chunk-Id [{}], "
+ "chunkId [{}], sequenceId [{}]",
msgMetadata.getProducerName(), msgId, chunkedMsgCtx.lastChunkedMessageId,
msgMetadata.getChunkId(), msgMetadata.getSequenceId());
compressedPayload.release();
increaseAvailablePermits(cnx);
boolean repeatedlyReceived = Arrays.stream(chunkedMsgCtx.chunkedMessageIds)
.anyMatch(messageId1 -> messageId1 != null && messageId1.ledgerId == messageId.getLedgerId()
&& messageId1.entryId == messageId.getEntryId());
if (!repeatedlyReceived) {
doAcknowledge(msgId, AckType.Individual, Collections.emptyMap(), null);
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
}
return null;
}
// means we lost the first chunk: should never happen
log.info("Received unexpected chunk messageId {}, last-chunk-id{}, chunkId = {}", msgId,
(chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId());
Expand Down
Loading