Skip to content

Commit

Permalink
Fix seek at batchIndex level receive duplicated messages (#11826)
Browse files Browse the repository at this point in the history
Fixes #11825 

### Motivation
If AcknowledgmentAtBatchIndexLevelEnabled, consumer may receive duplicated messages after seek to batchIndex.
This pull request tries to resolve this problem.

### Modifications
Set the `duringSeek` before reveived seek response.

### Verifying this change

This change added tests and can be verified as follows:
SubscriptionSeekTest.testSeekForBatchMessageAndSpecifiedBatchIndex
  • Loading branch information
aloyszhang authored Sep 2, 2021
1 parent 1d5403e commit fe4cd09
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
Expand Down Expand Up @@ -183,6 +184,88 @@ public void testSeekForBatch() throws Exception {
assertEquals(receiveId, messageId);
}
}

@Test
public void testSeekForBatchMessageAndSpecifiedBatchIndex() throws Exception {
final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatch";
String subscriptionName = "my-subscription-batch";

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(true)
.batchingMaxMessages(3)
// set batch max publish delay big enough to make sure entry has 3 messages
.batchingMaxPublishDelay(10, TimeUnit.SECONDS)
.topic(topicName).create();


List<MessageId> messageIds = new ArrayList<>();
List<CompletableFuture<MessageId>> futureMessageIds = new ArrayList<>();

List<String> messages = new ArrayList<>();
for (int i = 0; i < 3; i++) {
String message = "my-message-" + i;
messages.add(message);
CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(message);
futureMessageIds.add(messageIdCompletableFuture);
}

for (CompletableFuture<MessageId> futureMessageId : futureMessageIds) {
MessageId messageId = futureMessageId.get();
messageIds.add(messageId);
}

producer.close();

assertTrue(messageIds.get(0) instanceof BatchMessageIdImpl);
assertTrue(messageIds.get(1) instanceof BatchMessageIdImpl);
assertTrue(messageIds.get(2) instanceof BatchMessageIdImpl);

BatchMessageIdImpl batchMsgId0 = (BatchMessageIdImpl) messageIds.get(0);
BatchMessageIdImpl batchMsgId1 = (BatchMessageIdImpl) messageIds.get(1);
BatchMessageIdImpl msgIdToSeekFirst = (BatchMessageIdImpl) messageIds.get(2);

assertEquals(batchMsgId0.getEntryId(), batchMsgId1.getEntryId());
assertEquals(batchMsgId1.getEntryId(), msgIdToSeekFirst.getEntryId());

PulsarClient newPulsarClient = PulsarClient.builder()
// set start backoff interval short enough to make sure client will re-connect quickly
.startingBackoffInterval(1, TimeUnit.MICROSECONDS)
.serviceUrl(lookupUrl.toString())
.build();

org.apache.pulsar.client.api.Consumer<String> consumer = newPulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName(subscriptionName)
.startMessageIdInclusive()
.subscribe();

PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getSubscriptions().size(), 1);

consumer.seek(msgIdToSeekFirst);
MessageId msgId = consumer.receive().getMessageId();
assertTrue(msgId instanceof BatchMessageIdImpl);
BatchMessageIdImpl batchMsgId = (BatchMessageIdImpl) msgId;
assertEquals(batchMsgId, msgIdToSeekFirst);


consumer.seek(MessageId.earliest);
Message<String> receiveBeforEarliest = consumer.receive();
assertEquals(receiveBeforEarliest.getValue(), messages.get(0));
consumer.seek(MessageId.latest);
Message<String> receiveAfterLatest = consumer.receive(1, TimeUnit.SECONDS);
assertNull(receiveAfterLatest);

for (MessageId messageId : messageIds) {
consumer.seek(messageId);
MessageId receiveId = consumer.receive().getMessageId();
assertEquals(receiveId, messageId);
}

newPulsarClient.close();
}

@Test
public void testSeekForBatchByAdmin() throws PulsarClientException, ExecutionException, InterruptedException, PulsarAdminException {
final String topicName = "persistent://prop/use/ns-abcd/testSeekForBatchByAdmin-" + UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1826,20 +1826,25 @@ private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek,
final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
ClientCnx cnx = cnx();

log.info("[{}][{}] Seek subscription to {}", topic, subscription, seekBy);
BatchMessageIdImpl originSeekMessageId = seekMessageId;
seekMessageId = new BatchMessageIdImpl((MessageIdImpl) seekId);
duringSeek.set(true);
log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);

cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy);
acknowledgmentsGroupingTracker.flushAndClean();

seekMessageId = new BatchMessageIdImpl((MessageIdImpl) seekId);
duringSeek.set(true);
lastDequeuedMessageId = MessageId.earliest;

clearIncomingMessages();
seekFuture.complete(null);
}).exceptionally(e -> {
// re-set duringSeek and seekMessageId if seek failed
seekMessageId = originSeekMessageId;
duringSeek.set(false);
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());

seekFuture.completeExceptionally(
PulsarClientException.wrap(e.getCause(),
String.format("Failed to seek the subscription %s of the topic %s to %s",
Expand Down

0 comments on commit fe4cd09

Please sign in to comment.