From 1de291560524218ad35be6bbbc30e3acc95b53ee Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Tue, 15 Nov 2022 11:37:16 +0800 Subject: [PATCH] [fix][client] Fix failover/exclusive consumer with batch cumulate ack issue. (#18454) (cherry picked from commit 7712aa396bfca55a79a45761e0a405e90185e0f8) --- .../impl/ConsumerDedupPermitsUpdateTest.java | 21 ++++- .../pulsar/client/impl/NegativeAcksTest.java | 77 +++++++++++++++++++ .../pulsar/client/impl/ConsumerImpl.java | 9 ++- .../client/impl/MultiTopicsConsumerImpl.java | 3 - 4 files changed, 100 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java index 4c9922acbec06..ceb7d7fd4844c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java @@ -116,10 +116,23 @@ public void testConsumerDedup(boolean batchingEnabled, int receiverQueueSize) th } producer.flush(); - for (int i = 0; i < 30; i++) { - Message msg = consumer.receive(); - assertEquals(msg.getValue(), "new-message-" + i); - consumer.acknowledge(msg); + if (batchingEnabled) { + for (int i = 0; i < 30; i++) { + Message msg = consumer.receive(); + assertEquals(msg.getValue(), "hello-" + i); + consumer.acknowledge(msg); + } + for (int i = 0; i < 30; i++) { + Message msg = consumer.receive(); + assertEquals(msg.getValue(), "new-message-" + i); + consumer.acknowledge(msg); + } + } else { + for (int i = 0; i < 30; i++) { + Message msg = consumer.receive(); + assertEquals(msg.getValue(), "new-message-" + i); + consumer.acknowledge(msg); + } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 5eb43af38f771..de130b7827078 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -23,6 +23,7 @@ import java.util.HashSet; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import lombok.Cleanup; @@ -35,6 +36,7 @@ import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -154,4 +156,79 @@ public void testNegativeAcks(boolean batching, boolean usePartitions, Subscripti consumer.close(); producer.close(); } + + @Test + public void testFailoverConsumerBatchCumulateAck() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("my-topic"); + admin.topics().createPartitionedTopic(topic, 2); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .subscriptionType(SubscriptionType.Failover) + .enableBatchIndexAcknowledgment(true) + .acknowledgmentGroupTime(100, TimeUnit.MILLISECONDS) + .receiverQueueSize(10) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .batchingMaxMessages(10) + .batchingMaxPublishDelay(3, TimeUnit.SECONDS) + .blockIfQueueFull(true) + .create(); + + int count = 0; + Set datas = new HashSet<>(); + CountDownLatch producerLatch = new CountDownLatch(10); + while (count < 10) { + datas.add(count); + producer.sendAsync(count).whenComplete((m, e) -> { + producerLatch.countDown(); + }); + count++; + } + producerLatch.await(); + CountDownLatch consumerLatch = new CountDownLatch(1); + new Thread(new Runnable() { + @Override + public void run() { + consumer.receiveAsync() + .thenCompose(m -> { + log.info("received one msg : {}", m.getMessageId()); + datas.remove(m.getValue()); + return consumer.acknowledgeCumulativeAsync(m); + }) + .thenAccept(ignore -> { + try { + Thread.sleep(500); + consumer.redeliverUnacknowledgedMessages(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .whenComplete((r, e) -> { + consumerLatch.countDown(); + }); + } + }).start(); + consumerLatch.await(); + Thread.sleep(500); + count = 0; + while(true) { + Message msg = consumer.receive(5, TimeUnit.SECONDS); + if (msg == null) { + break; + } + consumer.acknowledgeCumulative(msg); + Thread.sleep(200); + datas.remove(msg.getValue()); + log.info("received msg : {}", msg.getMessageId()); + count++; + } + Assert.assertEquals(count, 9); + Assert.assertEquals(0, datas.size()); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 1a185d4c17d5c..5381b28033085 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1174,9 +1174,9 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List ac final int numMessages = msgMetadata.getNumMessagesInBatch(); final int numChunks = msgMetadata.hasNumChunksFromMsg() ? msgMetadata.getNumChunksFromMsg() : 0; final boolean isChunkedMessage = numChunks > 1 && conf.getSubscriptionType() != SubscriptionType.Shared; - MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex()); - if (acknowledgmentsGroupingTracker.isDuplicate(msgId)) { + if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch() + && acknowledgmentsGroupingTracker.isDuplicate(msgId)) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Ignoring message as it was already being acked earlier by same consumer {}/{}", topic, subscription, consumerName, msgId); @@ -1429,7 +1429,10 @@ void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, skippedMessages++; continue; } - + } + if (acknowledgmentsGroupingTracker.isDuplicate(message.getMessageId())) { + skippedMessages++; + continue; } executeNotifyCallback(message); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index e5cc32af8fe9d..b2de0e3b92c38 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -985,7 +985,6 @@ private void doSubscribeTopicPartitions(Schema schema, partitionIndex -> { String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString(); CompletableFuture> subFuture = new CompletableFuture<>(); - configurationData.setStartPaused(paused); ConsumerImpl newConsumer = createInternalConsumer(configurationData, partitionName, partitionIndex, subFuture, createIfDoesNotExist, schema); synchronized (pauseMutex) { @@ -1012,7 +1011,6 @@ private void doSubscribeTopicPartitions(Schema schema, subscribeResult.completeExceptionally(new PulsarClientException(errorMessage)); return existingValue; } else { - internalConfig.setStartPaused(paused); ConsumerImpl newConsumer = createInternalConsumer(internalConfig, topicName, -1, subFuture, createIfDoesNotExist, schema); @@ -1328,7 +1326,6 @@ private CompletableFuture subscribeIncreasedTopicPartitions(String topicNa int partitionIndex = TopicName.getPartitionIndex(partitionName); CompletableFuture> subFuture = new CompletableFuture<>(); ConsumerConfigurationData configurationData = getInternalConsumerConfig(); - configurationData.setStartPaused(paused); ConsumerImpl newConsumer = createInternalConsumer(configurationData, partitionName, partitionIndex, subFuture, true, schema); synchronized (pauseMutex) {