-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][client] Fix multi-topic consumer stuck after redeliver messages #18491
[fix][client] Fix multi-topic consumer stuck after redeliver messages #18491
Conversation
75581fe
to
5af3256
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
The PR title should change to
[fix][client] Fix multi-topic consumer stuck after redeliver messages
* see https://github.com/apache/pulsar/pull/18491 | ||
*/ | ||
@Test | ||
public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have tried the test, but it seems unable to cover the change or reproduce the issue without this fix.
After changing the test like this
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 56f2d3f29ec..c476cb516aa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -474,9 +474,12 @@ public class NegativeAcksTest extends ProducerConsumerBase {
Awaitility.await().until(() -> consumer.incomingMessages.size() == receiverQueueSize);
+ // For testing the race condition of issue #18491
+ // We need to inject a delay for the pinned internal thread
+ injectDelayToInternalThread(consumer, 1000L);
consumer.redeliverUnacknowledgedMessages();
- waitMultiTopicConsumerRedeliverFinish(consumer);
-
+ // Make sure the message redelivery is completed. The incoming queue will be cleaned up during the redelivery.
+ waitForAllTasksForInternalThread(consumer);
Set<Integer> receivedMsgs = new HashSet<>();
for (;;){
Message<Integer> msg = consumer.receive(2, TimeUnit.SECONDS);
@@ -491,7 +494,18 @@ public class NegativeAcksTest extends ProducerConsumerBase {
/**
* If the task after "redeliver" finish, means task-redeliver finish.
*/
- private void waitMultiTopicConsumerRedeliverFinish(MultiTopicsConsumerImpl consumer){
+ private void injectDelayToInternalThread(MultiTopicsConsumerImpl<?> consumer, long delayInMillis){
+ ExecutorService internalPinnedExecutor =
+ WhiteboxImpl.getInternalState(consumer, "internalPinnedExecutor");
+ internalPinnedExecutor.execute(() -> {
+ try {
+ Thread.sleep(delayInMillis);
+ } catch (InterruptedException ignore) {
+ }
+ });
+ }
+
+ private void waitForAllTasksForInternalThread(MultiTopicsConsumerImpl<?> consumer) {
ExecutorService internalPinnedExecutor =
WhiteboxImpl.getInternalState(consumer, "internalPinnedExecutor");
CompletableFuture<Void> taskAfterRedeliver = new CompletableFuture<>();
The issue can be reproduced without the fix and passed with the fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, maybe it's better to add a sleep before receive()
messages from the consumer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already add a sleep before receiving messages.
I have tried to increase the probability of test failure, but can't get to 100%. Because that:
- method
receiveMessageFromConsumer
will callconsumerImpl.batchReceiveAsync
andconsumerImpl.receiveAsync
. - When
consumerImpl.batchReceiveAsync
andconsumerImpl.receiveAsync
is finished, it will trigger anotherreceiveMessageFromConsumer
. consumerImpl.batchReceiveAsync
andconsumerImpl.receiveAsync
calls each other- if receive messages is empty call
consumerImpl.receiveAsync
- else call
consumerImpl.batchReceiveAsync
- if receive messages is empty call
consumerImpl.batchReceiveAsync
andconsumerImpl.receiveAsync
executed asynchronously.
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
Show resolved
Hide resolved
Could you also explain why the flaky test is caused by this reason? I added some logs: diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/im
pl/MultiTopicsConsumerImpl.java
index 1853be55f9..809a590451 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -705,6 +705,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
@Override
public void redeliverUnacknowledgedMessages() {
internalPinnedExecutor.execute(() -> {
+ log.info("XYZ rededeliverUnacked");
CONSUMER_EPOCH.incrementAndGet(this);
consumers.values().stream().forEach(consumer -> {
consumer.redeliverUnacknowledgedMessages();
@@ -713,6 +714,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
clearIncomingMessages();
unAckedMessageTracker.clear();
});
+ log.info("XYZ resumeReceiving");
resumeReceivingFromPausedConsumersIfNeeded();
} And in my local env,
It's because |
if |
Could you explain why? Or is there a way to make |
The reason in Menu-Motivation
I guess that in CI the incoming queue can be filled quickly enough before the receive has not started. We can't make it runs faster locally, but we can make receive execution slower, you can change the test like this which makes the test fail in the local env easily: |
Got it. Thanks. The motivation part is a little confusing. The root cause is that the consumer won't be removed from BTW, it's not a race condition in public void redeliverUnacknowledgedMessages() {
internalPinnedExecutor.execute(() -> {
/* ... */
clearIncomingMessages();
unAckedMessageTracker.clear();
});
resumeReceivingFromPausedConsumersIfNeeded();
}
|
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can simplify the tests like
ExecutorService internalPinnedExecutor =
WhiteboxImpl.getInternalState(consumer, "internalPinnedExecutor");
internalPinnedExecutor.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
consumer.redeliverUnacknowledgedMessages();
// Make sure the message redelivery is completed. The incoming queue will be cleaned up during the redelivery.
internalPinnedExecutor.submit(() -> {}).get();
Because we reused the internal executor. Then we can remove injectDelayToInternalThread
and waitForAllTasksForInternalThread
and the code will be more clear.
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can avoid sleeping by calling redeliverUnacknowledgedMessages
in the internal pinned executor.
ExecutorService internalPinnedExecutor =
WhiteboxImpl.getInternalState(consumer, "internalPinnedExecutor");
CountDownLatch redeliverDone = new CountDownLatch(1);
internalPinnedExecutor.execute(() -> {
consumer.redeliverUnacknowledgedMessages();
redeliverDone.countDown();
});
redeliverDone.await();
internalPinnedExecutor.submit(() -> {}).get();
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
Show resolved
Hide resolved
Yes, you are right. This is definitely a good idea, and I tried it in the previous commit, but it didn't make more present of failure than |
The issue mentioned is not related to this PR
Codecov Report
@@ Coverage Diff @@
## master #18491 +/- ##
============================================
+ Coverage 45.62% 46.96% +1.34%
- Complexity 10075 10392 +317
============================================
Files 697 697
Lines 68024 68015 -9
Branches 7293 7285 -8
============================================
+ Hits 31033 31946 +913
+ Misses 33413 32498 -915
+ Partials 3578 3571 -7
Flags with carried forward coverage won't be shown. Click here to find out more.
|
…apache#18491) (cherry picked from commit 7a93ff9) (cherry picked from commit ce4172d)
Fixes #18480
Motivation
The multi-topic consumer's mechanism for pulling messages looks like this:
receiveMessageFromConsumer
.resumeReceivingFromPausedConsumersIfNeeded
when the incoming queue is full.receiveMessageFromConsumer
redeliver
after the loop task stopped, will trigger a newresumeReceivingFromPausedConsumersIfNeeded
.But there has a race condition when doing
redeliver
: andtask-redeliver
runs asynchrony.resumeReceivingFromPausedConsumersIfNeeded
runs in another thread.receiveMessageFromConsumer
Modifications
Make this two task above runs in the same thread.
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: