Skip to content

Commit

Permalink
[fix][client] Fix failover/exclusive consumer with batch cumulate ack…
Browse files Browse the repository at this point in the history
… issue. (apache#18454)

(cherry picked from commit 7712aa3)
  • Loading branch information
Technoboy- authored and liangyepianzhou committed Dec 14, 2022
1 parent 73b30a3 commit 6f69aa4
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,23 @@ public void testConsumerDedup(boolean batchingEnabled, int receiverQueueSize) th
}
producer.flush();

for (int i = 0; i < 30; i++) {
Message<String> msg = consumer.receive();
assertEquals(msg.getValue(), "new-message-" + i);
consumer.acknowledge(msg);
if (batchingEnabled) {
for (int i = 0; i < 30; i++) {
Message<String> msg = consumer.receive();
assertEquals(msg.getValue(), "hello-" + i);
consumer.acknowledge(msg);
}
for (int i = 0; i < 30; i++) {
Message<String> msg = consumer.receive();
assertEquals(msg.getValue(), "new-message-" + i);
consumer.acknowledge(msg);
}
} else {
for (int i = 0; i < 30; i++) {
Message<String> msg = consumer.receive();
assertEquals(msg.getValue(), "new-message-" + i);
consumer.acknowledge(msg);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -309,4 +310,79 @@ public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Excepti
consumer.close();
admin.topics().deletePartitionedTopic("persistent://public/default/" + topic);
}

@Test
public void testFailoverConsumerBatchCumulateAck() throws Exception {
final String topic = BrokerTestUtil.newUniqueName("my-topic");
admin.topics().createPartitionedTopic(topic, 2);

@Cleanup
Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("sub")
.subscriptionType(SubscriptionType.Failover)
.enableBatchIndexAcknowledgment(true)
.acknowledgmentGroupTime(100, TimeUnit.MILLISECONDS)
.receiverQueueSize(10)
.subscribe();

@Cleanup
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.batchingMaxMessages(10)
.batchingMaxPublishDelay(3, TimeUnit.SECONDS)
.blockIfQueueFull(true)
.create();

int count = 0;
Set<Integer> datas = new HashSet<>();
CountDownLatch producerLatch = new CountDownLatch(10);
while (count < 10) {
datas.add(count);
producer.sendAsync(count).whenComplete((m, e) -> {
producerLatch.countDown();
});
count++;
}
producerLatch.await();
CountDownLatch consumerLatch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
consumer.receiveAsync()
.thenCompose(m -> {
log.info("received one msg : {}", m.getMessageId());
datas.remove(m.getValue());
return consumer.acknowledgeCumulativeAsync(m);
})
.thenAccept(ignore -> {
try {
Thread.sleep(500);
consumer.redeliverUnacknowledgedMessages();
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.whenComplete((r, e) -> {
consumerLatch.countDown();
});
}
}).start();
consumerLatch.await();
Thread.sleep(500);
count = 0;
while(true) {
Message<Integer> msg = consumer.receive(5, TimeUnit.SECONDS);
if (msg == null) {
break;
}
consumer.acknowledgeCumulative(msg);
Thread.sleep(200);
datas.remove(msg.getValue());
log.info("received msg : {}", msg.getMessageId());
count++;
}
Assert.assertEquals(count, 9);
Assert.assertEquals(0, datas.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1267,7 +1267,8 @@ void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, Clien
final boolean isChunkedMessage = numChunks > 1 && conf.getSubscriptionType() != SubscriptionType.Shared;

MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex());
if (acknowledgmentsGroupingTracker.isDuplicate(msgId)) {
if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()
&& acknowledgmentsGroupingTracker.isDuplicate(msgId)) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Ignoring message as it was already being acked earlier by same consumer {}/{}",
topic, subscription, consumerName, msgId);
Expand Down Expand Up @@ -1531,7 +1532,10 @@ void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata,
skippedMessages++;
continue;
}

}
if (acknowledgmentsGroupingTracker.isDuplicate(message.getMessageId())) {
skippedMessages++;
continue;
}
executeNotifyCallback(message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,11 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchR
return;
}
// Process the message, add to the queue and trigger listener or async callback
messages.forEach(msg -> messageReceived(consumer, msg));
messages.forEach(msg -> {
if (isValidConsumerEpoch((MessageImpl<T>) msg)) {
messageReceived(consumer, msg);
}
});

int size = incomingMessages.size();
if (size >= maxReceiverQueueSize
Expand Down

0 comments on commit 6f69aa4

Please sign in to comment.