Skip to content

Commit

Permalink
[fix][broker]Fix chunked messages will be filtered by duplicating (#2…
Browse files Browse the repository at this point in the history
…0948)

## Motivation
Make the chunk message function work properly when deduplication is enabled.
## Modification
### Only check and store the sequence ID of the last chunk in a chunk message.
 For example:
 ```markdown
     Chunk-1 sequence ID: 0, chunk ID: 0, total chunk: 2
     Chunk-2 sequence ID: 0, chunk ID: 1
     Chunk-3 sequence ID: 1, chunk ID: 0 total chunk: 3
     Chunk-4 sequence ID: 1, chunk ID: 1
     Chunk-5 sequence ID: 1, chunk ID: 1
     Chunk-6 sequence ID: 1, chunk ID: 2
```   
Only store check and store the sequence ID of Chunk-2 and Chunk-6.
**Add a property in the publishContext to determine whether this chunk is the last chunk when persistent completely.**
```java
publishContext.setProperty(IS_LAST_CHUNK, Boolean.FALSE);
```
### Filter and ack duplicated chunks in a chunk message instead of discarding ctx.
 For example:
 ```markdown
     Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
     Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
     Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3
     Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
     Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
     Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6
```   
We should filter and ack chunk-4 and chunk-5.
  • Loading branch information
liangyepianzhou authored and Technoboy- committed Sep 5, 2023
1 parent 792a98e commit 608f760
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class MessageDeduplication {
private final ManagedLedger managedLedger;
private ManagedCursor managedCursor;

private static final String IS_LAST_CHUNK = "isLastChunk";

enum Status {

// Deduplication is initialized
Expand Down Expand Up @@ -328,11 +330,12 @@ public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf heade
String producerName = publishContext.getProducerName();
long sequenceId = publishContext.getSequenceId();
long highestSequenceId = Math.max(publishContext.getHighestSequenceId(), sequenceId);
MessageMetadata md = null;
if (producerName.startsWith(replicatorPrefix)) {
// Message is coming from replication, we need to use the original producer name and sequence id
// for the purpose of deduplication and not rely on the "replicator" name.
int readerIndex = headersAndPayload.readerIndex();
MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload);
md = Commands.parseMessageMetadata(headersAndPayload);
producerName = md.getProducerName();
sequenceId = md.getSequenceId();
highestSequenceId = Math.max(md.getHighestSequenceId(), sequenceId);
Expand All @@ -341,7 +344,23 @@ public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf heade
publishContext.setOriginalHighestSequenceId(highestSequenceId);
headersAndPayload.readerIndex(readerIndex);
}

long chunkID = -1;
long totalChunk = -1;
if (publishContext.isChunked()) {
if (md == null) {
int readerIndex = headersAndPayload.readerIndex();
md = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.readerIndex(readerIndex);
}
chunkID = md.getChunkId();
totalChunk = md.getNumChunksFromMsg();
}
// All chunks of a message use the same message metadata and sequence ID,
// so we only need to check the sequence ID for the last chunk in a chunk message.
if (chunkID != -1 && chunkID != totalChunk - 1) {
publishContext.setProperty(IS_LAST_CHUNK, Boolean.FALSE);
return MessageDupStatus.NotDup;
}
// Synchronize the get() and subsequent put() on the map. This would only be relevant if the producer
// disconnects and re-connects very quickly. At that point the call can be coming from a different thread
synchronized (highestSequencedPushed) {
Expand All @@ -367,6 +386,11 @@ public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf heade
}
highestSequencedPushed.put(producerName, highestSequenceId);
}
// Only put sequence ID into highestSequencedPushed and
// highestSequencedPersisted until receive and persistent the last chunk.
if (chunkID != -1 && chunkID == totalChunk - 1) {
publishContext.setProperty(IS_LAST_CHUNK, Boolean.TRUE);
}
return MessageDupStatus.NotDup;
}

Expand All @@ -387,8 +411,10 @@ public void recordMessagePersisted(PublishContext publishContext, PositionImpl p
sequenceId = publishContext.getOriginalSequenceId();
highestSequenceId = publishContext.getOriginalHighestSequenceId();
}

highestSequencedPersisted.put(producerName, Math.max(highestSequenceId, sequenceId));
Boolean isLastChunk = (Boolean) publishContext.getProperty(IS_LAST_CHUNK);
if (isLastChunk == null || isLastChunk) {
highestSequencedPersisted.put(producerName, Math.max(highestSequenceId, sequenceId));
}
if (++snapshotCounter >= snapshotInterval) {
snapshotCounter = 0;
takeSnapshot(position);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import static org.apache.pulsar.client.impl.MessageChunkingSharedTest.sendChunk;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker-impl")
public class MessageChunkingDeduplicationTest extends ProducerConsumerBase {

@BeforeClass
@Override
protected void setup() throws Exception {
this.conf.setBrokerDeduplicationEnabled(true);
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testSendChunkMessageWithSameSequenceID() throws Exception {
String topicName = "persistent://my-property/my-ns/testSendChunkMessageWithSameSequenceID";
String producerName = "test-producer";
@Cleanup
Consumer<String> consumer = pulsarClient
.newConsumer(Schema.STRING)
.subscriptionName("test-sub")
.topic(topicName)
.subscribe();
@Cleanup
Producer<String> producer = pulsarClient
.newProducer(Schema.STRING)
.producerName(producerName)
.topic(topicName)
.enableChunking(true)
.enableBatching(false)
.create();
int messageSize = 6000; // payload size in KB
String message = "a".repeat(messageSize * 1000);
producer.newMessage().value(message).sequenceId(10).send();
Message<String> msg = consumer.receive(10, TimeUnit.SECONDS);
assertNotNull(msg);
assertTrue(msg.getMessageId() instanceof ChunkMessageIdImpl);
assertEquals(msg.getValue(), message);
producer.newMessage().value(message).sequenceId(10).send();
msg = consumer.receive(3, TimeUnit.SECONDS);
assertNull(msg);
}

@Test
public void testDeduplicateChunksInSingleChunkMessages() throws Exception {
String topicName = "persistent://my-property/my-ns/testDeduplicateChunksInSingleChunkMessage";
String producerName = "test-producer";
@Cleanup
Consumer<String> consumer = pulsarClient
.newConsumer(Schema.STRING)
.subscriptionName("test-sub")
.topic(topicName)
.subscribe();
final PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
.getTopicIfExists(topicName).get().orElse(null);
assertNotNull(persistentTopic);
sendChunk(persistentTopic, producerName, 1, 0, 2);
sendChunk(persistentTopic, producerName, 1, 1, 2);
sendChunk(persistentTopic, producerName, 1, 1, 2);

Message<String> message = consumer.receive(15, TimeUnit.SECONDS);
assertEquals(message.getData().length, 2);

sendChunk(persistentTopic, producerName, 2, 0, 3);
sendChunk(persistentTopic, producerName, 2, 1, 3);
sendChunk(persistentTopic, producerName, 2, 1, 3);
sendChunk(persistentTopic, producerName, 2, 2, 3);
message = consumer.receive(20, TimeUnit.SECONDS);
assertEquals(message.getData().length, 3);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -34,6 +35,7 @@
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
Expand All @@ -45,7 +47,6 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -217,7 +218,7 @@ private static void sendNonChunk(final PersistentTopic persistentTopic,
sendChunk(persistentTopic, producerName, sequenceId, null, null);
}

private static void sendChunk(final PersistentTopic persistentTopic,
protected static void sendChunk(final PersistentTopic persistentTopic,
final String producerName,
final long sequenceId,
final Integer chunkId,
Expand All @@ -233,16 +234,33 @@ private static void sendChunk(final PersistentTopic persistentTopic,
metadata.setTotalChunkMsgSize(numChunks);
}
final ByteBuf buf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, metadata,
PulsarByteBufAllocator.DEFAULT.buffer(1));
persistentTopic.publishMessage(buf, (e, ledgerId, entryId) -> {
String name = producerName + "-" + sequenceId;
if (chunkId != null) {
name += "-" + chunkId + "-" + numChunks;
Unpooled.wrappedBuffer("a".getBytes()));
persistentTopic.publishMessage(buf, new Topic.PublishContext() {
@Override
public boolean isChunked() {
return chunkId != null;
}
if (e == null) {
log.info("Sent {} to ({}, {})", name, ledgerId, entryId);
} else {
log.error("Failed to send {}: {}", name, e.getMessage());

@Override
public String getProducerName() {
return producerName;
}

public long getSequenceId() {
return sequenceId;
}

@Override
public void completed(Exception e, long ledgerId, long entryId) {
String name = producerName + "-" + sequenceId;
if (chunkId != null) {
name += "-" + chunkId + "-" + numChunks;
}
if (e == null) {
log.info("Sent {} to ({}, {})", name, ledgerId, entryId);
} else {
log.error("Failed to send {}: {}", name, e.getMessage());
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -1452,6 +1453,30 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
// discard message if chunk is out-of-order
if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
|| msgMetadata.getChunkId() != (chunkedMsgCtx.lastChunkedMessageId + 1)) {
// Filter and ack duplicated chunks instead of discard ctx.
// For example:
// Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
// Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
// Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3
// Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
// Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
// Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6
// We should filter and ack chunk-4 and chunk-5.
if (chunkedMsgCtx != null && msgMetadata.getChunkId() <= chunkedMsgCtx.lastChunkedMessageId) {
log.warn("[{}] Receive a duplicated chunk message with messageId [{}], last-chunk-Id [{}], "
+ "chunkId [{}], sequenceId [{}]",
msgMetadata.getProducerName(), msgId, chunkedMsgCtx.lastChunkedMessageId,
msgMetadata.getChunkId(), msgMetadata.getSequenceId());
compressedPayload.release();
increaseAvailablePermits(cnx);
boolean repeatedlyReceived = Arrays.stream(chunkedMsgCtx.chunkedMessageIds)
.anyMatch(messageId1 -> messageId1 != null && messageId1.ledgerId == messageId.getLedgerId()
&& messageId1.entryId == messageId.getEntryId());
if (!repeatedlyReceived) {
doAcknowledge(msgId, AckType.Individual, Collections.emptyMap(), null);
}
return null;
}
// means we lost the first chunk: should never happen
log.info("[{}] [{}] Received unexpected chunk messageId {}, last-chunk-id = {}, chunkId = {}", topic,
subscription, msgId,
Expand Down

0 comments on commit 608f760

Please sign in to comment.