diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 71de1ff544ec9..56f2d3f29ec71 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -24,7 +24,9 @@ import java.util.HashSet; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; @@ -36,6 +38,8 @@ import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.awaitility.Awaitility; +import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -439,4 +443,61 @@ public void run() { Assert.assertEquals(count, 9); Assert.assertEquals(0, datas.size()); } + + /** + * see https://github.com/apache/pulsar/pull/18491 + */ + @Test + public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("my-topic"); + admin.topics().createPartitionedTopic(topic, 2); + + final int receiverQueueSize = 10; + + @Cleanup + MultiTopicsConsumerImpl consumer = + (MultiTopicsConsumerImpl) pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .receiverQueueSize(receiverQueueSize) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .enableBatching(false) + .create(); + + for (int i = 0; i < receiverQueueSize; i++){ + producer.send(i); + } + + Awaitility.await().until(() -> consumer.incomingMessages.size() == receiverQueueSize); + + consumer.redeliverUnacknowledgedMessages(); + waitMultiTopicConsumerRedeliverFinish(consumer); + + Set receivedMsgs = new HashSet<>(); + for (;;){ + Message msg = consumer.receive(2, TimeUnit.SECONDS); + if (msg == null){ + break; + } + receivedMsgs.add(msg.getValue()); + } + Assert.assertEquals(receivedMsgs.size(), 10); + } + + /** + * If the task after "redeliver" finish, means task-redeliver finish. + */ + private void waitMultiTopicConsumerRedeliverFinish(MultiTopicsConsumerImpl consumer){ + ExecutorService internalPinnedExecutor = + WhiteboxImpl.getInternalState(consumer, "internalPinnedExecutor"); + CompletableFuture taskAfterRedeliver = new CompletableFuture<>(); + internalPinnedExecutor.execute(() -> { + taskAfterRedeliver.complete(null); + }); + taskAfterRedeliver.join(); + } }