Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Fix multi-topic consumer stuck after redeliver messages #18491

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -36,6 +38,8 @@
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.awaitility.Awaitility;
import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -439,4 +443,61 @@ public void run() {
Assert.assertEquals(count, 9);
Assert.assertEquals(0, datas.size());
}

/**
* see https://github.com/apache/pulsar/pull/18491
*/
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
@Test
public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried the test, but it seems unable to cover the change or reproduce the issue without this fix.

After changing the test like this

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 56f2d3f29ec..c476cb516aa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -474,9 +474,12 @@ public class NegativeAcksTest extends ProducerConsumerBase {
 
         Awaitility.await().until(() -> consumer.incomingMessages.size() == receiverQueueSize);
 
+        // For testing the race condition of issue #18491
+        // We need to inject a delay for the pinned internal thread
+        injectDelayToInternalThread(consumer, 1000L);
         consumer.redeliverUnacknowledgedMessages();
-        waitMultiTopicConsumerRedeliverFinish(consumer);
-
+        // Make sure the message redelivery is completed. The incoming queue will be cleaned up during the redelivery.
+        waitForAllTasksForInternalThread(consumer);
         Set<Integer> receivedMsgs = new HashSet<>();
         for (;;){
             Message<Integer> msg = consumer.receive(2, TimeUnit.SECONDS);
@@ -491,7 +494,18 @@ public class NegativeAcksTest extends ProducerConsumerBase {
     /**
      * If the task after "redeliver" finish, means task-redeliver finish.
      */
-    private void waitMultiTopicConsumerRedeliverFinish(MultiTopicsConsumerImpl consumer){
+    private void injectDelayToInternalThread(MultiTopicsConsumerImpl<?> consumer, long delayInMillis){
+        ExecutorService internalPinnedExecutor =
+                WhiteboxImpl.getInternalState(consumer, "internalPinnedExecutor");
+        internalPinnedExecutor.execute(() -> {
+            try {
+                Thread.sleep(delayInMillis);
+            } catch (InterruptedException ignore) {
+            }
+        });
+    }
+
+    private void waitForAllTasksForInternalThread(MultiTopicsConsumerImpl<?> consumer) {
         ExecutorService internalPinnedExecutor =
                 WhiteboxImpl.getInternalState(consumer, "internalPinnedExecutor");
         CompletableFuture<Void> taskAfterRedeliver = new CompletableFuture<>();

The issue can be reproduced without the fix and passed with the fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is not a 100% hit test problem, I tried to fix it

截屏2022-11-16 16 55 42

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, maybe it's better to add a sleep before receive() messages from the consumer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already add a sleep before receiving messages.

I have tried to increase the probability of test failure, but can't get to 100%. Because that:

  • method receiveMessageFromConsumer will call consumerImpl.batchReceiveAsync and consumerImpl.receiveAsync .
  • When consumerImpl.batchReceiveAsync and consumerImpl.receiveAsync is finished, it will trigger another receiveMessageFromConsumer.
  • consumerImpl.batchReceiveAsync and consumerImpl.receiveAsync calls each other
    • if receive messages is empty call consumerImpl.receiveAsync
    • else call consumerImpl.batchReceiveAsync
  • consumerImpl.batchReceiveAsync and consumerImpl.receiveAsync executed asynchronously.

final String topic = BrokerTestUtil.newUniqueName("my-topic");
admin.topics().createPartitionedTopic(topic, 2);

final int receiverQueueSize = 10;

@Cleanup
MultiTopicsConsumerImpl<Integer> consumer =
(MultiTopicsConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("sub")
.receiverQueueSize(receiverQueueSize)
.subscribe();

@Cleanup
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.enableBatching(false)
.create();

for (int i = 0; i < receiverQueueSize; i++){
producer.send(i);
}

Awaitility.await().until(() -> consumer.incomingMessages.size() == receiverQueueSize);

consumer.redeliverUnacknowledgedMessages();
waitMultiTopicConsumerRedeliverFinish(consumer);

Set<Integer> receivedMsgs = new HashSet<>();
for (;;){
Message<Integer> msg = consumer.receive(2, TimeUnit.SECONDS);
if (msg == null){
break;
}
receivedMsgs.add(msg.getValue());
}
Assert.assertEquals(receivedMsgs.size(), 10);
}

/**
* If the task after "redeliver" finish, means task-redeliver finish.
*/
private void waitMultiTopicConsumerRedeliverFinish(MultiTopicsConsumerImpl consumer){
ExecutorService internalPinnedExecutor =
WhiteboxImpl.getInternalState(consumer, "internalPinnedExecutor");
CompletableFuture<Void> taskAfterRedeliver = new CompletableFuture<>();
internalPinnedExecutor.execute(() -> {
taskAfterRedeliver.complete(null);
});
taskAfterRedeliver.join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -712,8 +712,8 @@ public void redeliverUnacknowledgedMessages() {
});
clearIncomingMessages();
unAckedMessageTracker.clear();
resumeReceivingFromPausedConsumersIfNeeded();
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
});
resumeReceivingFromPausedConsumersIfNeeded();
}

@Override
Expand Down