Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Avoid redelivering duplicated messages when batching is enabled #8

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Cleanup;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -65,6 +68,19 @@ public Object[][] ackReceiptEnabled() {
return new Object[][] { { true }, { false } };
}

@DataProvider(name = "batchedMessageAck")
public Object[][] batchedMessageAck() {
// When batch index ack is disabled (by default), only after all single messages were sent would the pending
// ACK be added into the ACK tracker.
return new Object[][] {
// numAcked, batchSize, ack type
{ 3, 5, CommandAck.AckType.Individual },
{ 5, 5, CommandAck.AckType.Individual },
{ 3, 5, CommandAck.AckType.Cumulative },
{ 5, 5, CommandAck.AckType.Cumulative }
};
}

/**
* It verifies that redelivered messages are sorted based on the ledger-ids.
* <pre>
Expand Down Expand Up @@ -297,4 +313,57 @@ public void testMessageRedeliveryAfterUnloadedWithEarliestPosition() throws Exce
consumer.close();
producer.close();
}

@Test(timeOut = 30000, dataProvider = "batchedMessageAck")
public void testAckNotSent(int numAcked, int batchSize, CommandAck.AckType ackType) throws Exception {
String topic = "persistent://my-property/my-ns/test-ack-not-sent-"
+ numAcked + "-" + batchSize + "-" + ackType.getValue();
@Cleanup Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub")
.enableBatchIndexAcknowledgment(false)
.acknowledgmentGroupTime(1, TimeUnit.HOURS) // ACK won't be sent
.subscribe();
@Cleanup Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(true)
.batchingMaxMessages(batchSize)
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
.create();
for (int i = 0; i < batchSize; i++) {
String value = "msg-" + i;
producer.sendAsync(value).thenAccept(id -> log.info("{} was sent to {}", value, id));
}
List<Message<String>> messages = new ArrayList<>();
for (int i = 0; i < batchSize; i++) {
messages.add(consumer.receive());
}
if (ackType == CommandAck.AckType.Individual) {
for (int i = 0; i < numAcked; i++) {
consumer.acknowledge(messages.get(i));
}
} else {
consumer.acknowledgeCumulative(messages.get(numAcked - 1));
}

consumer.redeliverUnacknowledgedMessages();

messages.clear();
for (int i = 0; i < batchSize; i++) {
Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
if (msg == null) {
break;
}
log.info("Received {} from {}", msg.getValue(), msg.getMessageId());
messages.add(msg);
}
List<String> values = messages.stream().map(Message::getValue).collect(Collectors.toList());
// All messages are redelivered because only if the whole batch are acknowledged would the message ID be
// added into the ACK tracker.
if (numAcked < batchSize) {
assertEquals(values, IntStream.range(0, batchSize).mapToObj(i -> "msg-" + i).collect(Collectors.toList()));
} else {
assertTrue(values.isEmpty());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,23 +116,10 @@ public void testConsumerDedup(boolean batchingEnabled, int receiverQueueSize) th
}
producer.flush();

if (batchingEnabled) {
for (int i = 0; i < 30; i++) {
Message<String> msg = consumer.receive();
assertEquals(msg.getValue(), "hello-" + i);
consumer.acknowledge(msg);
}
for (int i = 0; i < 30; i++) {
Message<String> msg = consumer.receive();
assertEquals(msg.getValue(), "new-message-" + i);
consumer.acknowledge(msg);
}
} else {
for (int i = 0; i < 30; i++) {
Message<String> msg = consumer.receive();
assertEquals(msg.getValue(), "new-message-" + i);
consumer.acknowledge(msg);
}
for (int i = 0; i < 30; i++) {
Message<String> msg = consumer.receive();
assertEquals(msg.getValue(), "new-message-" + i);
consumer.acknowledge(msg);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,18 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, Consum
*/
@Override
public boolean isDuplicate(MessageId messageId) {
final MessageIdImpl messageIdOfLastAck = lastCumulativeAck.getMessageId();
if (messageIdOfLastAck != null && messageId.compareTo(messageIdOfLastAck) <= 0) {
if (!(messageId instanceof MessageIdImpl)) {
throw new IllegalArgumentException("isDuplicated cannot accept "
+ messageId.getClass().getName() + ": " + messageId);
}
if (lastCumulativeAck.compareTo(messageId) >= 0) {
// Already included in a cumulative ack
return true;
} else {
return pendingIndividualAcks.contains((MessageIdImpl) messageId);
final MessageIdImpl messageIdImpl = (messageId instanceof BatchMessageIdImpl)
? ((BatchMessageIdImpl) messageId).toMessageIdImpl()
: (MessageIdImpl) messageId;
return pendingIndividualAcks.contains(messageIdImpl);
}
}

Expand Down Expand Up @@ -622,7 +628,7 @@ protected LastCumulativeAck initialValue() {
private boolean flushRequired = false;

public synchronized void update(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
if (messageId.compareTo(this.messageId) > 0) {
if (compareTo(messageId) < 0) {
if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) {
this.bitSetRecyclable.recycle();
}
Expand Down Expand Up @@ -656,6 +662,24 @@ public synchronized void reset() {
flushRequired = false;
}

public synchronized int compareTo(MessageId messageId) {
if (this.messageId instanceof BatchMessageIdImpl && (!(messageId instanceof BatchMessageIdImpl))) {
final BatchMessageIdImpl lhs = (BatchMessageIdImpl) this.messageId;
final MessageIdImpl rhs = (MessageIdImpl) messageId;
return MessageIdImpl.messageIdCompare(
lhs.getLedgerId(), lhs.getEntryId(), lhs.getPartitionIndex(), lhs.getBatchIndex(),
rhs.getLedgerId(), rhs.getEntryId(), rhs.getPartitionIndex(), Integer.MAX_VALUE);
} else if (messageId instanceof BatchMessageIdImpl && (!(this.messageId instanceof BatchMessageIdImpl))){
final MessageIdImpl lhs = this.messageId;
final BatchMessageIdImpl rhs = (BatchMessageIdImpl) messageId;
return MessageIdImpl.messageIdCompare(
lhs.getLedgerId(), lhs.getEntryId(), lhs.getPartitionIndex(), Integer.MAX_VALUE,
rhs.getLedgerId(), rhs.getEntryId(), rhs.getPartitionIndex(), rhs.getBatchIndex());
} else {
return this.messageId.compareTo(messageId);
}
}

private synchronized void set(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
this.messageId = messageId;
this.bitSetRecyclable = bitSetRecyclable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,24 @@ public void testFlush() {
assertEquals(lastCumulativeAckToFlush.getBitSetRecyclable(), bitSetRecyclable);
}

@Test
public void testCompareTo() {
LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
lastCumulativeAck.update(new MessageIdImpl(0L, 1L, -1), null);

assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 0L, -1)) > 0);
assertEquals(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 1L, -1)), 0);
assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 2L, -1)) < 0);
assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 0)) > 0);

lastCumulativeAck = new LastCumulativeAck();
lastCumulativeAck.update(new BatchMessageIdImpl(0L, 1L, -1, 1), null);

assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 0L, -1)) > 0);
assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 1L, -1)) < 0);
assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 2L, -1)) < 0);
assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 0)) > 0);
assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 2)) < 0);
assertEquals(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 1)), 0);
}
}