diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 71de1ff544ec9..679d6c1a19e51 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; @@ -36,6 +37,8 @@ import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.awaitility.Awaitility; +import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -439,4 +442,55 @@ public void run() { Assert.assertEquals(count, 9); Assert.assertEquals(0, datas.size()); } + + @Test(invocationCount = 5) + public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("my-topic"); + admin.topics().createPartitionedTopic(topic, 2); + + final int receiverQueueSize = 10; + + @Cleanup + MultiTopicsConsumerImpl consumer = + (MultiTopicsConsumerImpl) pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .receiverQueueSize(receiverQueueSize) + .subscribe(); + ExecutorService internalPinnedExecutor = + WhiteboxImpl.getInternalState(consumer, "internalPinnedExecutor"); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .enableBatching(false) + .create(); + + for (int i = 0; i < receiverQueueSize; i++){ + producer.send(i); + } + + Awaitility.await().until(() -> consumer.incomingMessages.size() == receiverQueueSize); + + // For testing the race condition of issue #18491 + // We need to inject a delay for the pinned internal thread + Thread.sleep(1000L); + internalPinnedExecutor.submit(() -> consumer.redeliverUnacknowledgedMessages()).get(); + // Make sure the message redelivery is completed. The incoming queue will be cleaned up during the redelivery. + internalPinnedExecutor.submit(() -> {}).get(); + + Set receivedMsgs = new HashSet<>(); + for (;;){ + Message msg = consumer.receive(2, TimeUnit.SECONDS); + if (msg == null){ + break; + } + receivedMsgs.add(msg.getValue()); + } + Assert.assertEquals(receivedMsgs.size(), 10); + + producer.close(); + consumer.close(); + admin.topics().deletePartitionedTopic("persistent://public/default/" + topic); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 1853be55f961f..bf6c7a451747d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -712,8 +712,8 @@ public void redeliverUnacknowledgedMessages() { }); clearIncomingMessages(); unAckedMessageTracker.clear(); + resumeReceivingFromPausedConsumersIfNeeded(); }); - resumeReceivingFromPausedConsumersIfNeeded(); } @Override