Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Avoid redelivering duplicated messages when batching is enabled #18486

Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Cleanup;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -65,6 +68,19 @@ public Object[][] ackReceiptEnabled() {
return new Object[][] { { true }, { false } };
}

@DataProvider(name = "batchedMessageAck")
public Object[][] batchedMessageAck() {
// When batch index ack is disabled (by default), only after all single messages were sent would the pending
// ACK be added into the ACK tracker.
return new Object[][] {
// numAcked, batchSize, ack type
{ 3, 5, CommandAck.AckType.Individual },
{ 5, 5, CommandAck.AckType.Individual },
{ 3, 5, CommandAck.AckType.Cumulative },
{ 5, 5, CommandAck.AckType.Cumulative }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5, 5 test case may not stable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I will take a look soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fixed now. The root cause is that after b0fad40, a BatchedMessageIdImpl will be compared with a MessageIdImpl, which should be forbidden.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@congbobo184 PTAL again.

};
}

/**
* It verifies that redelivered messages are sorted based on the ledger-ids.
* <pre>
Expand Down Expand Up @@ -297,4 +313,57 @@ public void testMessageRedeliveryAfterUnloadedWithEarliestPosition() throws Exce
consumer.close();
producer.close();
}

@Test(timeOut = 30000, dataProvider = "batchedMessageAck")
public void testAckNotSent(int numAcked, int batchSize, CommandAck.AckType ackType) throws Exception {
String topic = "persistent://my-property/my-ns/test-ack-not-sent-"
+ numAcked + "-" + batchSize + "-" + ackType.getValue();
@Cleanup Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub")
.enableBatchIndexAcknowledgment(false)
.acknowledgmentGroupTime(1, TimeUnit.HOURS) // ACK won't be sent
.subscribe();
@Cleanup Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(true)
.batchingMaxMessages(batchSize)
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
.create();
for (int i = 0; i < batchSize; i++) {
String value = "msg-" + i;
producer.sendAsync(value).thenAccept(id -> log.info("{} was sent to {}", value, id));
}
List<Message<String>> messages = new ArrayList<>();
for (int i = 0; i < batchSize; i++) {
messages.add(consumer.receive());
}
if (ackType == CommandAck.AckType.Individual) {
for (int i = 0; i < numAcked; i++) {
consumer.acknowledge(messages.get(i));
}
} else {
consumer.acknowledgeCumulative(messages.get(numAcked - 1));
}

consumer.redeliverUnacknowledgedMessages();

messages.clear();
for (int i = 0; i < batchSize; i++) {
Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
if (msg == null) {
break;
}
log.info("Received {} from {}", msg.getValue(), msg.getMessageId());
messages.add(msg);
}
List<String> values = messages.stream().map(Message::getValue).collect(Collectors.toList());
// All messages are redelivered because only if the whole batch are acknowledged would the message ID be
// added into the ACK tracker.
if (numAcked < batchSize) {
assertEquals(values, IntStream.range(0, batchSize).mapToObj(i -> "msg-" + i).collect(Collectors.toList()));
} else {
assertTrue(values.isEmpty());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,23 +116,10 @@ public void testConsumerDedup(boolean batchingEnabled, int receiverQueueSize) th
}
producer.flush();

if (batchingEnabled) {
for (int i = 0; i < 30; i++) {
Message<String> msg = consumer.receive();
assertEquals(msg.getValue(), "hello-" + i);
consumer.acknowledge(msg);
}
for (int i = 0; i < 30; i++) {
Message<String> msg = consumer.receive();
assertEquals(msg.getValue(), "new-message-" + i);
consumer.acknowledge(msg);
}
} else {
for (int i = 0; i < 30; i++) {
Message<String> msg = consumer.receive();
assertEquals(msg.getValue(), "new-message-" + i);
consumer.acknowledge(msg);
}
for (int i = 0; i < 30; i++) {
Message<String> msg = consumer.receive();
assertEquals(msg.getValue(), "new-message-" + i);
consumer.acknowledge(msg);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,18 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, Consum
*/
@Override
public boolean isDuplicate(MessageId messageId) {
final MessageIdImpl messageIdOfLastAck = lastCumulativeAck.getMessageId();
if (messageIdOfLastAck != null && messageId.compareTo(messageIdOfLastAck) <= 0) {
if (!(messageId instanceof MessageIdImpl)) {
throw new IllegalArgumentException("isDuplicated cannot accept "
+ messageId.getClass().getName() + ": " + messageId);
}
if (lastCumulativeAck.compareTo(messageId) >= 0) {
// Already included in a cumulative ack
return true;
} else {
return pendingIndividualAcks.contains((MessageIdImpl) messageId);
final MessageIdImpl messageIdImpl = (messageId instanceof BatchMessageIdImpl)
? ((BatchMessageIdImpl) messageId).toMessageIdImpl()
: (MessageIdImpl) messageId;
return pendingIndividualAcks.contains(messageIdImpl);
}
}

Expand Down Expand Up @@ -622,7 +628,7 @@ protected LastCumulativeAck initialValue() {
private boolean flushRequired = false;

public synchronized void update(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
if (messageId.compareTo(this.messageId) > 0) {
if (compareTo(messageId) < 0) {
if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) {
this.bitSetRecyclable.recycle();
}
Expand Down Expand Up @@ -656,6 +662,24 @@ public synchronized void reset() {
flushRequired = false;
}

public synchronized int compareTo(MessageId messageId) {
if (this.messageId instanceof BatchMessageIdImpl && (!(messageId instanceof BatchMessageIdImpl))) {
final BatchMessageIdImpl lhs = (BatchMessageIdImpl) this.messageId;
final MessageIdImpl rhs = (MessageIdImpl) messageId;
return MessageIdImpl.messageIdCompare(
lhs.getLedgerId(), lhs.getEntryId(), lhs.getPartitionIndex(), lhs.getBatchIndex(),
rhs.getLedgerId(), rhs.getEntryId(), rhs.getPartitionIndex(), Integer.MAX_VALUE);
} else if (messageId instanceof BatchMessageIdImpl && (!(this.messageId instanceof BatchMessageIdImpl))){
final MessageIdImpl lhs = this.messageId;
final BatchMessageIdImpl rhs = (BatchMessageIdImpl) messageId;
return MessageIdImpl.messageIdCompare(
lhs.getLedgerId(), lhs.getEntryId(), lhs.getPartitionIndex(), Integer.MAX_VALUE,
rhs.getLedgerId(), rhs.getEntryId(), rhs.getPartitionIndex(), rhs.getBatchIndex());
} else {
return this.messageId.compareTo(messageId);
}
}

private synchronized void set(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
this.messageId = messageId;
this.bitSetRecyclable = bitSetRecyclable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,24 @@ public void testFlush() {
assertEquals(lastCumulativeAckToFlush.getBitSetRecyclable(), bitSetRecyclable);
}

@Test
public void testCompareTo() {
LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
lastCumulativeAck.update(new MessageIdImpl(0L, 1L, -1), null);

assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 0L, -1)) > 0);
assertEquals(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 1L, -1)), 0);
assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 2L, -1)) < 0);
assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 0)) > 0);

lastCumulativeAck = new LastCumulativeAck();
lastCumulativeAck.update(new BatchMessageIdImpl(0L, 1L, -1, 1), null);

assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 0L, -1)) > 0);
assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 1L, -1)) < 0);
assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 2L, -1)) < 0);
assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 0)) > 0);
assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 2)) < 0);
assertEquals(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 1)), 0);
}
}