diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index d1b4b74945f8e..238dc740509b1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -56,6 +56,8 @@ public class MessageDeduplication { private final ManagedLedger managedLedger; private ManagedCursor managedCursor; + private static final String IS_LAST_CHUNK = "isLastChunk"; + enum Status { // Deduplication is initialized @@ -328,11 +330,12 @@ public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf heade String producerName = publishContext.getProducerName(); long sequenceId = publishContext.getSequenceId(); long highestSequenceId = Math.max(publishContext.getHighestSequenceId(), sequenceId); + MessageMetadata md = null; if (producerName.startsWith(replicatorPrefix)) { // Message is coming from replication, we need to use the original producer name and sequence id // for the purpose of deduplication and not rely on the "replicator" name. int readerIndex = headersAndPayload.readerIndex(); - MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload); + md = Commands.parseMessageMetadata(headersAndPayload); producerName = md.getProducerName(); sequenceId = md.getSequenceId(); highestSequenceId = Math.max(md.getHighestSequenceId(), sequenceId); @@ -341,7 +344,23 @@ public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf heade publishContext.setOriginalHighestSequenceId(highestSequenceId); headersAndPayload.readerIndex(readerIndex); } - + long chunkID = -1; + long totalChunk = -1; + if (publishContext.isChunked()) { + if (md == null) { + int readerIndex = headersAndPayload.readerIndex(); + md = Commands.parseMessageMetadata(headersAndPayload); + headersAndPayload.readerIndex(readerIndex); + } + chunkID = md.getChunkId(); + totalChunk = md.getNumChunksFromMsg(); + } + // All chunks of a message use the same message metadata and sequence ID, + // so we only need to check the sequence ID for the last chunk in a chunk message. + if (chunkID != -1 && chunkID != totalChunk - 1) { + publishContext.setProperty(IS_LAST_CHUNK, Boolean.FALSE); + return MessageDupStatus.NotDup; + } // Synchronize the get() and subsequent put() on the map. This would only be relevant if the producer // disconnects and re-connects very quickly. At that point the call can be coming from a different thread synchronized (highestSequencedPushed) { @@ -367,6 +386,11 @@ public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf heade } highestSequencedPushed.put(producerName, highestSequenceId); } + // Only put sequence ID into highestSequencedPushed and + // highestSequencedPersisted until receive and persistent the last chunk. + if (chunkID != -1 && chunkID == totalChunk - 1) { + publishContext.setProperty(IS_LAST_CHUNK, Boolean.TRUE); + } return MessageDupStatus.NotDup; } @@ -387,8 +411,10 @@ public void recordMessagePersisted(PublishContext publishContext, PositionImpl p sequenceId = publishContext.getOriginalSequenceId(); highestSequenceId = publishContext.getOriginalHighestSequenceId(); } - - highestSequencedPersisted.put(producerName, Math.max(highestSequenceId, sequenceId)); + Boolean isLastChunk = (Boolean) publishContext.getProperty(IS_LAST_CHUNK); + if (isLastChunk == null || isLastChunk) { + highestSequencedPersisted.put(producerName, Math.max(highestSequenceId, sequenceId)); + } if (++snapshotCounter >= snapshotInterval) { snapshotCounter = 0; takeSnapshot(position); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingDeduplicationTest.java new file mode 100644 index 0000000000000..5e590414132a5 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingDeduplicationTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.apache.pulsar.client.impl.MessageChunkingSharedTest.sendChunk; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.Schema; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-impl") +public class MessageChunkingDeduplicationTest extends ProducerConsumerBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + this.conf.setBrokerDeduplicationEnabled(true); + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testSendChunkMessageWithSameSequenceID() throws Exception { + String topicName = "persistent://my-property/my-ns/testSendChunkMessageWithSameSequenceID"; + String producerName = "test-producer"; + @Cleanup + Consumer consumer = pulsarClient + .newConsumer(Schema.STRING) + .subscriptionName("test-sub") + .topic(topicName) + .subscribe(); + @Cleanup + Producer producer = pulsarClient + .newProducer(Schema.STRING) + .producerName(producerName) + .topic(topicName) + .enableChunking(true) + .enableBatching(false) + .create(); + int messageSize = 6000; // payload size in KB + String message = "a".repeat(messageSize * 1000); + producer.newMessage().value(message).sequenceId(10).send(); + Message msg = consumer.receive(10, TimeUnit.SECONDS); + assertNotNull(msg); + assertTrue(msg.getMessageId() instanceof ChunkMessageIdImpl); + assertEquals(msg.getValue(), message); + producer.newMessage().value(message).sequenceId(10).send(); + msg = consumer.receive(3, TimeUnit.SECONDS); + assertNull(msg); + } + + @Test + public void testDeduplicateChunksInSingleChunkMessages() throws Exception { + String topicName = "persistent://my-property/my-ns/testDeduplicateChunksInSingleChunkMessage"; + String producerName = "test-producer"; + @Cleanup + Consumer consumer = pulsarClient + .newConsumer(Schema.STRING) + .subscriptionName("test-sub") + .topic(topicName) + .subscribe(); + final PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService() + .getTopicIfExists(topicName).get().orElse(null); + assertNotNull(persistentTopic); + sendChunk(persistentTopic, producerName, 1, 0, 2); + sendChunk(persistentTopic, producerName, 1, 1, 2); + sendChunk(persistentTopic, producerName, 1, 1, 2); + + Message message = consumer.receive(15, TimeUnit.SECONDS); + assertEquals(message.getData().length, 2); + + sendChunk(persistentTopic, producerName, 2, 0, 3); + sendChunk(persistentTopic, producerName, 2, 1, 3); + sendChunk(persistentTopic, producerName, 2, 1, 3); + sendChunk(persistentTopic, producerName, 2, 2, 3); + message = consumer.receive(20, TimeUnit.SECONDS); + assertEquals(message.getData().length, 3); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java index 163c42d835b35..3d24d3746d66a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java @@ -23,6 +23,7 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; @@ -45,7 +47,6 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.protocol.Commands; import org.awaitility.Awaitility; @@ -217,7 +218,7 @@ private static void sendNonChunk(final PersistentTopic persistentTopic, sendChunk(persistentTopic, producerName, sequenceId, null, null); } - private static void sendChunk(final PersistentTopic persistentTopic, + protected static void sendChunk(final PersistentTopic persistentTopic, final String producerName, final long sequenceId, final Integer chunkId, @@ -233,16 +234,33 @@ private static void sendChunk(final PersistentTopic persistentTopic, metadata.setTotalChunkMsgSize(numChunks); } final ByteBuf buf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, metadata, - PulsarByteBufAllocator.DEFAULT.buffer(1)); - persistentTopic.publishMessage(buf, (e, ledgerId, entryId) -> { - String name = producerName + "-" + sequenceId; - if (chunkId != null) { - name += "-" + chunkId + "-" + numChunks; + Unpooled.wrappedBuffer("a".getBytes())); + persistentTopic.publishMessage(buf, new Topic.PublishContext() { + @Override + public boolean isChunked() { + return chunkId != null; } - if (e == null) { - log.info("Sent {} to ({}, {})", name, ledgerId, entryId); - } else { - log.error("Failed to send {}: {}", name, e.getMessage()); + + @Override + public String getProducerName() { + return producerName; + } + + public long getSequenceId() { + return sequenceId; + } + + @Override + public void completed(Exception e, long ledgerId, long entryId) { + String name = producerName + "-" + sequenceId; + if (chunkId != null) { + name += "-" + chunkId + "-" + numChunks; + } + if (e == null) { + log.info("Sent {} to ({}, {})", name, ledgerId, entryId); + } else { + log.error("Failed to send {}: {}", name, e.getMessage()); + } } }); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 6a3b894364a36..be7c094318434 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -35,6 +35,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.BitSet; import java.util.Collections; import java.util.HashMap; @@ -1459,6 +1460,30 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m // discard message if chunk is out-of-order if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null || msgMetadata.getChunkId() != (chunkedMsgCtx.lastChunkedMessageId + 1)) { + // Filter and ack duplicated chunks instead of discard ctx. + // For example: + // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1 + // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2 + // Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3 + // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4 + // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5 + // Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6 + // We should filter and ack chunk-4 and chunk-5. + if (chunkedMsgCtx != null && msgMetadata.getChunkId() <= chunkedMsgCtx.lastChunkedMessageId) { + log.warn("[{}] Receive a duplicated chunk message with messageId [{}], last-chunk-Id [{}], " + + "chunkId [{}], sequenceId [{}]", + msgMetadata.getProducerName(), msgId, chunkedMsgCtx.lastChunkedMessageId, + msgMetadata.getChunkId(), msgMetadata.getSequenceId()); + compressedPayload.release(); + increaseAvailablePermits(cnx); + boolean repeatedlyReceived = Arrays.stream(chunkedMsgCtx.chunkedMessageIds) + .anyMatch(messageId1 -> messageId1 != null && messageId1.ledgerId == messageId.getLedgerId() + && messageId1.entryId == messageId.getEntryId()); + if (!repeatedlyReceived) { + doAcknowledge(msgId, AckType.Individual, Collections.emptyMap(), null); + } + return null; + } // means we lost the first chunk: should never happen log.info("Received unexpected chunk messageId {}, last-chunk-id{}, chunkId = {}", msgId, (chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId());