From f1a7cf2494292a4fc50e2a3b4d3b18f958a80066 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 16 Nov 2022 14:10:54 +0800 Subject: [PATCH 1/8] [fix] [client] Multi topic consumer receive null even if has backlog --- .../org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 1853be55f961f..bf6c7a451747d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -712,8 +712,8 @@ public void redeliverUnacknowledgedMessages() { }); clearIncomingMessages(); unAckedMessageTracker.clear(); + resumeReceivingFromPausedConsumersIfNeeded(); }); - resumeReceivingFromPausedConsumersIfNeeded(); } @Override From 5af3256cb4a96954c1da0b7f01597ec611b51abe Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 16 Nov 2022 14:35:29 +0800 Subject: [PATCH 2/8] add test --- .../pulsar/client/impl/NegativeAcksTest.java | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 71de1ff544ec9..56f2d3f29ec71 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -24,7 +24,9 @@ import java.util.HashSet; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; @@ -36,6 +38,8 @@ import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.awaitility.Awaitility; +import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -439,4 +443,61 @@ public void run() { Assert.assertEquals(count, 9); Assert.assertEquals(0, datas.size()); } + + /** + * see https://github.com/apache/pulsar/pull/18491 + */ + @Test + public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("my-topic"); + admin.topics().createPartitionedTopic(topic, 2); + + final int receiverQueueSize = 10; + + @Cleanup + MultiTopicsConsumerImpl consumer = + (MultiTopicsConsumerImpl) pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .receiverQueueSize(receiverQueueSize) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .enableBatching(false) + .create(); + + for (int i = 0; i < receiverQueueSize; i++){ + producer.send(i); + } + + Awaitility.await().until(() -> consumer.incomingMessages.size() == receiverQueueSize); + + consumer.redeliverUnacknowledgedMessages(); + waitMultiTopicConsumerRedeliverFinish(consumer); + + Set receivedMsgs = new HashSet<>(); + for (;;){ + Message msg = consumer.receive(2, TimeUnit.SECONDS); + if (msg == null){ + break; + } + receivedMsgs.add(msg.getValue()); + } + Assert.assertEquals(receivedMsgs.size(), 10); + } + + /** + * If the task after "redeliver" finish, means task-redeliver finish. + */ + private void waitMultiTopicConsumerRedeliverFinish(MultiTopicsConsumerImpl consumer){ + ExecutorService internalPinnedExecutor = + WhiteboxImpl.getInternalState(consumer, "internalPinnedExecutor"); + CompletableFuture taskAfterRedeliver = new CompletableFuture<>(); + internalPinnedExecutor.execute(() -> { + taskAfterRedeliver.complete(null); + }); + taskAfterRedeliver.join(); + } } From 399d76ad4ca3f767411b4707a39584c669f15537 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 17 Nov 2022 01:48:50 +0800 Subject: [PATCH 3/8] Improved test hit rate --- .../pulsar/client/impl/NegativeAcksTest.java | 68 ++++++++++++++++--- 1 file changed, 57 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 56f2d3f29ec71..a8b3431c855f9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -22,7 +22,9 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -38,6 +40,7 @@ import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl; import org.testng.Assert; @@ -451,7 +454,6 @@ public void run() { public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Exception { final String topic = BrokerTestUtil.newUniqueName("my-topic"); admin.topics().createPartitionedTopic(topic, 2); - final int receiverQueueSize = 10; @Cleanup @@ -461,21 +463,32 @@ public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Excepti .subscriptionName("sub") .receiverQueueSize(receiverQueueSize) .subscribe(); - @Cleanup Producer producer = pulsarClient.newProducer(Schema.INT32) .topic(topic) - .enableBatching(false) + .batchingMaxMessages(100) + .enableBatching(true) .create(); - + List> sendTask = new ArrayList<>(receiverQueueSize); for (int i = 0; i < receiverQueueSize; i++){ - producer.send(i); + sendTask.add(producer.sendAsync(i)); } + FutureUtil.waitForAll(sendTask).join(); Awaitility.await().until(() -> consumer.incomingMessages.size() == receiverQueueSize); - consumer.redeliverUnacknowledgedMessages(); - waitMultiTopicConsumerRedeliverFinish(consumer); + // For testing the race condition of issue #18491. + // We wait all message-receive-task finish. + tryWaitAllMessageReceiveTaskFinish(consumer); + // Redeliver. + executeTaskWithInternalExecutor(consumer, () -> { + // Run redeliver with internal executor can ensure that the current + // task("resumeReceivingFromPausedConsumersIfNeeded") is executed before + // the task("clearIncomingMessages") triggered by the current task. + consumer.redeliverUnacknowledgedMessages(); + }); + // Make sure the message redelivery is completed. The incoming queue will be cleaned up during the redelivery. + waitForAllTasksForInternalThread(consumer); Set receivedMsgs = new HashSet<>(); for (;;){ @@ -488,10 +501,35 @@ public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Excepti Assert.assertEquals(receivedMsgs.size(), 10); } - /** - * If the task after "redeliver" finish, means task-redeliver finish. - */ - private void waitMultiTopicConsumerRedeliverFinish(MultiTopicsConsumerImpl consumer){ + private void tryWaitAllMessageReceiveTaskFinish(MultiTopicsConsumerImpl consumer) { + waitForAllTasksForInternalThread(consumer); + // Try to wait pending receive finish, success is not guaranteed. + long awaitStartTimestamp = System.currentTimeMillis(); + Awaitility.await().atMost(6, TimeUnit.SECONDS).until(() -> { + // In some cases, the future-receive will not be completed, waiting for 5s at most. + if (System.currentTimeMillis() - awaitStartTimestamp > 3000) { + return true; + } + List consumers = consumer.getConsumers(); + for (ConsumerImpl consumerImpl : consumers) { + waitForAllTasksForInternalThread(consumerImpl); + if (consumerImpl.hasPendingBatchReceive()) { + return false; + } + } + for (ConsumerImpl consumerImpl : consumers) { + waitForAllTasksForInternalThread(consumerImpl); + if (consumerImpl.hasNextPendingReceive()) { + return false; + } + } + return true; + }); + // Wait for all task running at internal thread. + waitForAllTasksForInternalThread(consumer); + } + + private void waitForAllTasksForInternalThread(ConsumerBase consumer){ ExecutorService internalPinnedExecutor = WhiteboxImpl.getInternalState(consumer, "internalPinnedExecutor"); CompletableFuture taskAfterRedeliver = new CompletableFuture<>(); @@ -500,4 +538,12 @@ private void waitMultiTopicConsumerRedeliverFinish(MultiTopicsConsumerImpl consu }); taskAfterRedeliver.join(); } + + private void executeTaskWithInternalExecutor(ConsumerBase consumer, Runnable task){ + ExecutorService internalPinnedExecutor = + WhiteboxImpl.getInternalState(consumer, "internalPinnedExecutor"); + internalPinnedExecutor.execute(() -> { + task.run(); + }); + } } From 9a7a9d6b761811f9affdc96c2733205c3a3e843f Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 17 Nov 2022 02:02:29 +0800 Subject: [PATCH 4/8] add a sleep before receive() messages from the consumer. --- .../pulsar/client/impl/NegativeAcksTest.java | 77 ++++++------------- 1 file changed, 23 insertions(+), 54 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index a8b3431c855f9..df6b5cb91a2a9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -22,9 +22,7 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -40,7 +38,6 @@ import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl; import org.testng.Assert; @@ -110,7 +107,7 @@ public static Object[][] variations() { @Test(dataProvider = "variations") public void testNegativeAcks(boolean batching, boolean usePartitions, SubscriptionType subscriptionType, - int negAcksDelayMillis, int ackTimeout) + int negAcksDelayMillis, int ackTimeout) throws Exception { log.info("Test negative acks batching={} partitions={} subType={} negAckDelayMs={}", batching, usePartitions, subscriptionType, negAcksDelayMillis); @@ -211,10 +208,10 @@ public static Object[][] variationsBackoff() { @Test(dataProvider = "variationsBackoff") public void testNegativeAcksWithBackoff(boolean batching, boolean usePartitions, SubscriptionType subscriptionType, - int minNackTimeMs, int maxNackTimeMs) + int minNackTimeMs, int maxNackTimeMs) throws Exception { log.info("Test negative acks with back off batching={} partitions={} subType={} minNackTimeMs={}, " - + "maxNackTimeMs={}", batching, usePartitions, subscriptionType, minNackTimeMs, maxNackTimeMs); + + "maxNackTimeMs={}", batching, usePartitions, subscriptionType, minNackTimeMs, maxNackTimeMs); String topic = BrokerTestUtil.newUniqueName("testNegativeAcksWithBackoff"); MultiplierRedeliveryBackoff backoff = MultiplierRedeliveryBackoff.builder() @@ -454,6 +451,7 @@ public void run() { public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Exception { final String topic = BrokerTestUtil.newUniqueName("my-topic"); admin.topics().createPartitionedTopic(topic, 2); + final int receiverQueueSize = 10; @Cleanup @@ -463,30 +461,23 @@ public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Excepti .subscriptionName("sub") .receiverQueueSize(receiverQueueSize) .subscribe(); + @Cleanup Producer producer = pulsarClient.newProducer(Schema.INT32) .topic(topic) - .batchingMaxMessages(100) - .enableBatching(true) + .enableBatching(false) .create(); - List> sendTask = new ArrayList<>(receiverQueueSize); + for (int i = 0; i < receiverQueueSize; i++){ - sendTask.add(producer.sendAsync(i)); + producer.send(i); } - FutureUtil.waitForAll(sendTask).join(); Awaitility.await().until(() -> consumer.incomingMessages.size() == receiverQueueSize); - // For testing the race condition of issue #18491. - // We wait all message-receive-task finish. - tryWaitAllMessageReceiveTaskFinish(consumer); - // Redeliver. - executeTaskWithInternalExecutor(consumer, () -> { - // Run redeliver with internal executor can ensure that the current - // task("resumeReceivingFromPausedConsumersIfNeeded") is executed before - // the task("clearIncomingMessages") triggered by the current task. - consumer.redeliverUnacknowledgedMessages(); - }); + // For testing the race condition of issue #18491 + // We need to inject a delay for the pinned internal thread + injectDelayToInternalThread(consumer, 1000L); + consumer.redeliverUnacknowledgedMessages(); // Make sure the message redelivery is completed. The incoming queue will be cleaned up during the redelivery. waitForAllTasksForInternalThread(consumer); @@ -501,35 +492,21 @@ public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Excepti Assert.assertEquals(receivedMsgs.size(), 10); } - private void tryWaitAllMessageReceiveTaskFinish(MultiTopicsConsumerImpl consumer) { - waitForAllTasksForInternalThread(consumer); - // Try to wait pending receive finish, success is not guaranteed. - long awaitStartTimestamp = System.currentTimeMillis(); - Awaitility.await().atMost(6, TimeUnit.SECONDS).until(() -> { - // In some cases, the future-receive will not be completed, waiting for 5s at most. - if (System.currentTimeMillis() - awaitStartTimestamp > 3000) { - return true; - } - List consumers = consumer.getConsumers(); - for (ConsumerImpl consumerImpl : consumers) { - waitForAllTasksForInternalThread(consumerImpl); - if (consumerImpl.hasPendingBatchReceive()) { - return false; - } - } - for (ConsumerImpl consumerImpl : consumers) { - waitForAllTasksForInternalThread(consumerImpl); - if (consumerImpl.hasNextPendingReceive()) { - return false; - } + private void injectDelayToInternalThread(MultiTopicsConsumerImpl consumer, long delayInMillis){ + ExecutorService internalPinnedExecutor = + WhiteboxImpl.getInternalState(consumer, "internalPinnedExecutor"); + internalPinnedExecutor.execute(() -> { + try { + Thread.sleep(delayInMillis); + } catch (InterruptedException ignore) { } - return true; }); - // Wait for all task running at internal thread. - waitForAllTasksForInternalThread(consumer); } - private void waitForAllTasksForInternalThread(ConsumerBase consumer){ + /** + * If the task after "redeliver" finish, means task-redeliver finish. + */ + private void waitForAllTasksForInternalThread(MultiTopicsConsumerImpl consumer){ ExecutorService internalPinnedExecutor = WhiteboxImpl.getInternalState(consumer, "internalPinnedExecutor"); CompletableFuture taskAfterRedeliver = new CompletableFuture<>(); @@ -538,12 +515,4 @@ private void waitForAllTasksForInternalThread(ConsumerBase consumer){ }); taskAfterRedeliver.join(); } - - private void executeTaskWithInternalExecutor(ConsumerBase consumer, Runnable task){ - ExecutorService internalPinnedExecutor = - WhiteboxImpl.getInternalState(consumer, "internalPinnedExecutor"); - internalPinnedExecutor.execute(() -> { - task.run(); - }); - } } From 89aaba8832c99523f3be2b996fdbe8418016f48e Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 17 Nov 2022 02:03:02 +0800 Subject: [PATCH 5/8] code format --- .../java/org/apache/pulsar/client/impl/NegativeAcksTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index df6b5cb91a2a9..efc01c81b96ae 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -475,7 +475,7 @@ public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Excepti Awaitility.await().until(() -> consumer.incomingMessages.size() == receiverQueueSize); // For testing the race condition of issue #18491 - // We need to inject a delay for the pinned internal thread + // We need to inject a delay for the pinned internal thread injectDelayToInternalThread(consumer, 1000L); consumer.redeliverUnacknowledgedMessages(); // Make sure the message redelivery is completed. The incoming queue will be cleaned up during the redelivery. From 89df38d7507934c2f28fea5f28f6d352c5c45cf4 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 17 Nov 2022 02:05:30 +0800 Subject: [PATCH 6/8] code format --- .../org/apache/pulsar/client/impl/NegativeAcksTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index efc01c81b96ae..30b8541216506 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -107,7 +107,7 @@ public static Object[][] variations() { @Test(dataProvider = "variations") public void testNegativeAcks(boolean batching, boolean usePartitions, SubscriptionType subscriptionType, - int negAcksDelayMillis, int ackTimeout) + int negAcksDelayMillis, int ackTimeout) throws Exception { log.info("Test negative acks batching={} partitions={} subType={} negAckDelayMs={}", batching, usePartitions, subscriptionType, negAcksDelayMillis); @@ -208,10 +208,10 @@ public static Object[][] variationsBackoff() { @Test(dataProvider = "variationsBackoff") public void testNegativeAcksWithBackoff(boolean batching, boolean usePartitions, SubscriptionType subscriptionType, - int minNackTimeMs, int maxNackTimeMs) + int minNackTimeMs, int maxNackTimeMs) throws Exception { log.info("Test negative acks with back off batching={} partitions={} subType={} minNackTimeMs={}, " - + "maxNackTimeMs={}", batching, usePartitions, subscriptionType, minNackTimeMs, maxNackTimeMs); + + "maxNackTimeMs={}", batching, usePartitions, subscriptionType, minNackTimeMs, maxNackTimeMs); String topic = BrokerTestUtil.newUniqueName("testNegativeAcksWithBackoff"); MultiplierRedeliveryBackoff backoff = MultiplierRedeliveryBackoff.builder() From 78162240bd1551649a7b3d62cab77ead626349bb Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 18 Nov 2022 00:26:15 +0800 Subject: [PATCH 7/8] remove unnecessary code-comment --- .../java/org/apache/pulsar/client/impl/NegativeAcksTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 30b8541216506..87be875ef3b8e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -444,9 +444,6 @@ public void run() { Assert.assertEquals(0, datas.size()); } - /** - * see https://github.com/apache/pulsar/pull/18491 - */ @Test public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Exception { final String topic = BrokerTestUtil.newUniqueName("my-topic"); From 6c391d9e80489b73125719eb2267e475791a3c2d Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 18 Nov 2022 00:54:47 +0800 Subject: [PATCH 8/8] improve test --- .../pulsar/client/impl/NegativeAcksTest.java | 37 +++++-------------- 1 file changed, 9 insertions(+), 28 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 87be875ef3b8e..679d6c1a19e51 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -24,7 +24,6 @@ import java.util.HashSet; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -444,7 +443,7 @@ public void run() { Assert.assertEquals(0, datas.size()); } - @Test + @Test(invocationCount = 5) public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Exception { final String topic = BrokerTestUtil.newUniqueName("my-topic"); admin.topics().createPartitionedTopic(topic, 2); @@ -458,6 +457,8 @@ public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Excepti .subscriptionName("sub") .receiverQueueSize(receiverQueueSize) .subscribe(); + ExecutorService internalPinnedExecutor = + WhiteboxImpl.getInternalState(consumer, "internalPinnedExecutor"); @Cleanup Producer producer = pulsarClient.newProducer(Schema.INT32) @@ -473,10 +474,10 @@ public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Excepti // For testing the race condition of issue #18491 // We need to inject a delay for the pinned internal thread - injectDelayToInternalThread(consumer, 1000L); - consumer.redeliverUnacknowledgedMessages(); + Thread.sleep(1000L); + internalPinnedExecutor.submit(() -> consumer.redeliverUnacknowledgedMessages()).get(); // Make sure the message redelivery is completed. The incoming queue will be cleaned up during the redelivery. - waitForAllTasksForInternalThread(consumer); + internalPinnedExecutor.submit(() -> {}).get(); Set receivedMsgs = new HashSet<>(); for (;;){ @@ -487,29 +488,9 @@ public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Excepti receivedMsgs.add(msg.getValue()); } Assert.assertEquals(receivedMsgs.size(), 10); - } - - private void injectDelayToInternalThread(MultiTopicsConsumerImpl consumer, long delayInMillis){ - ExecutorService internalPinnedExecutor = - WhiteboxImpl.getInternalState(consumer, "internalPinnedExecutor"); - internalPinnedExecutor.execute(() -> { - try { - Thread.sleep(delayInMillis); - } catch (InterruptedException ignore) { - } - }); - } - /** - * If the task after "redeliver" finish, means task-redeliver finish. - */ - private void waitForAllTasksForInternalThread(MultiTopicsConsumerImpl consumer){ - ExecutorService internalPinnedExecutor = - WhiteboxImpl.getInternalState(consumer, "internalPinnedExecutor"); - CompletableFuture taskAfterRedeliver = new CompletableFuture<>(); - internalPinnedExecutor.execute(() -> { - taskAfterRedeliver.complete(null); - }); - taskAfterRedeliver.join(); + producer.close(); + consumer.close(); + admin.topics().deletePartitionedTopic("persistent://public/default/" + topic); } }