diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java index 4c9922acbec06..ceb7d7fd4844c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java @@ -116,10 +116,23 @@ public void testConsumerDedup(boolean batchingEnabled, int receiverQueueSize) th } producer.flush(); - for (int i = 0; i < 30; i++) { - Message msg = consumer.receive(); - assertEquals(msg.getValue(), "new-message-" + i); - consumer.acknowledge(msg); + if (batchingEnabled) { + for (int i = 0; i < 30; i++) { + Message msg = consumer.receive(); + assertEquals(msg.getValue(), "hello-" + i); + consumer.acknowledge(msg); + } + for (int i = 0; i < 30; i++) { + Message msg = consumer.receive(); + assertEquals(msg.getValue(), "new-message-" + i); + consumer.acknowledge(msg); + } + } else { + for (int i = 0; i < 30; i++) { + Message msg = consumer.receive(); + assertEquals(msg.getValue(), "new-message-" + i); + consumer.acknowledge(msg); + } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 3b3b46a044fe4..f7acdf9baa6a0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -23,6 +23,7 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.ExecutorService; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; @@ -309,4 +310,79 @@ public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Excepti consumer.close(); admin.topics().deletePartitionedTopic("persistent://public/default/" + topic); } + + @Test + public void testFailoverConsumerBatchCumulateAck() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("my-topic"); + admin.topics().createPartitionedTopic(topic, 2); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .subscriptionType(SubscriptionType.Failover) + .enableBatchIndexAcknowledgment(true) + .acknowledgmentGroupTime(100, TimeUnit.MILLISECONDS) + .receiverQueueSize(10) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .batchingMaxMessages(10) + .batchingMaxPublishDelay(3, TimeUnit.SECONDS) + .blockIfQueueFull(true) + .create(); + + int count = 0; + Set datas = new HashSet<>(); + CountDownLatch producerLatch = new CountDownLatch(10); + while (count < 10) { + datas.add(count); + producer.sendAsync(count).whenComplete((m, e) -> { + producerLatch.countDown(); + }); + count++; + } + producerLatch.await(); + CountDownLatch consumerLatch = new CountDownLatch(1); + new Thread(new Runnable() { + @Override + public void run() { + consumer.receiveAsync() + .thenCompose(m -> { + log.info("received one msg : {}", m.getMessageId()); + datas.remove(m.getValue()); + return consumer.acknowledgeCumulativeAsync(m); + }) + .thenAccept(ignore -> { + try { + Thread.sleep(500); + consumer.redeliverUnacknowledgedMessages(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .whenComplete((r, e) -> { + consumerLatch.countDown(); + }); + } + }).start(); + consumerLatch.await(); + Thread.sleep(500); + count = 0; + while(true) { + Message msg = consumer.receive(5, TimeUnit.SECONDS); + if (msg == null) { + break; + } + consumer.acknowledgeCumulative(msg); + Thread.sleep(200); + datas.remove(msg.getValue()); + log.info("received msg : {}", msg.getMessageId()); + count++; + } + Assert.assertEquals(count, 9); + Assert.assertEquals(0, datas.size()); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 1b30bdbdc77d1..190ddd8b435cb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1267,7 +1267,8 @@ void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, Clien final boolean isChunkedMessage = numChunks > 1 && conf.getSubscriptionType() != SubscriptionType.Shared; MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex()); - if (acknowledgmentsGroupingTracker.isDuplicate(msgId)) { + if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch() + && acknowledgmentsGroupingTracker.isDuplicate(msgId)) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Ignoring message as it was already being acked earlier by same consumer {}/{}", topic, subscription, consumerName, msgId); @@ -1531,7 +1532,10 @@ void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, skippedMessages++; continue; } - + } + if (acknowledgmentsGroupingTracker.isDuplicate(message.getMessageId())) { + skippedMessages++; + continue; } executeNotifyCallback(message); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 06d8a087ba916..49a66b55fa6c3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -265,7 +265,11 @@ private void receiveMessageFromConsumer(ConsumerImpl consumer, boolean batchR return; } // Process the message, add to the queue and trigger listener or async callback - messages.forEach(msg -> messageReceived(consumer, msg)); + messages.forEach(msg -> { + if (isValidConsumerEpoch((MessageImpl) msg)) { + messageReceived(consumer, msg); + } + }); int size = incomingMessages.size(); if (size >= maxReceiverQueueSize