Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Fix non partitioned topic consumer call RedeliverUnacknowledgedMessages cause deadlock. #18462

Closed
wants to merge 2 commits into from

Conversation

Technoboy-
Copy link
Contributor

Motivation

#17318 has made internalRedeliverUnacknowledgedMessages executed by internalPinnedExecutor.

If the consumer received msg then call redeliverUnacknowledgedMessages like the below :

consumer.receiveAsync()      // using internalPinnedExecutor thread
                .thenCompose(m -> {
                    return consumer.acknowledgeCumulativeAsync(m);
                })
                .thenAccept(ignore -> {
                    try {
                        consumer.redeliverUnacknowledgedMessages(); // still the same internalPinnedExecutor thread, it will be blocked
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                })

public void redeliverUnacknowledgedMessages() {
try {
internalRedeliverUnacknowledgedMessages().get();
} catch (ExecutionException e) {
throw e.getCause();
}
}

public CompletableFuture<Void> internalRedeliverUnacknowledgedMessages() {
return CompletableFuture.runAsync(() -> {
// First : synchronized in order to handle consumer reconnect produce race condition, when broker receive
// redeliverUnacknowledgedMessages and consumer have not be created and then receive reconnect epoch
// change the broker is smaller than the client epoch, this will cause client epoch smaller
// than broker epoch forever. client will not receive message anymore.
// Second : we should synchronized `ClientCnx cnx = cnx()` to prevent use old cnx to
// send redeliverUnacknowledgedMessages to a old broker
synchronized (ConsumerImpl.this) {
ClientCnx cnx = cnx();
// V1 don't support redeliverUnacknowledgedMessages
if (cnx != null && cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v2.getValue()) {
if ((getState() == State.Connecting)) {
log.warn("[{}] Client Connection needs to be established "
+ "for redelivery of unacknowledged messages", this);
} else {
log.warn("[{}] Reconnecting the client to redeliver the messages.", this);
cnx.ctx().close();
}
return;
}
// we should increase epoch every time, because MultiTopicsConsumerImpl also increase it,
// we need to keep both epochs the same
if (conf.getSubscriptionType() == SubscriptionType.Failover
|| conf.getSubscriptionType() == SubscriptionType.Exclusive) {
CONSUMER_EPOCH.incrementAndGet(this);
}
// clear local message
int currentSize = incomingMessages.size();
clearIncomingMessages();
unAckedMessageTracker.clear();
// is channel is connected, we should send redeliver command to broker
if (cnx != null && isConnected(cnx)) {
cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(
consumerId, CONSUMER_EPOCH.get(this)), cnx.ctx().voidPromise());
if (currentSize > 0) {
increaseAvailablePermits(cnx, currentSize);
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] [{}] Redeliver unacked messages and send {} permits", subscription, topic,
consumerName, currentSize);
}
} else {
log.warn("[{}] Send redeliver messages command but the client is reconnect or close, "
+ "so don't need to send redeliver command to broker", this);
}
}
}, internalPinnedExecutor);
}

Modification

  • Remove using internalPinnedExecutor to execute internalRedeliverUnacknowledgedMessages.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@Technoboy- Technoboy- added this to the 2.12.0 milestone Nov 14, 2022
@Technoboy- Technoboy- self-assigned this Nov 14, 2022
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Nov 14, 2022
@codelipenghui
Copy link
Contributor

@Technoboy- I think the right fix should change

public void redeliverUnacknowledgedMessages() { 
     try { 
         internalRedeliverUnacknowledgedMessages().get(); 
     } catch (ExecutionException e) { 
         throw e.getCause(); 
     } 
 } 

to

public void redeliverUnacknowledgedMessages() { 
         internalRedeliverUnacknowledgedMessages();
 } 

I'm afraid we will introduce a potential race condition issue if we remove the internalPinnedExecutor for internalRedeliverUnacknowledgedMessages

@codelipenghui
Copy link
Contributor

Oh sorry. But it will introduce another problem. Users have completed the redeliverUnacknowledgedMessages() and then try to call receive() again. Due to redeliverUnacknowledgedMessages() being executed in another thread. Users can get the message before cleaning up the incoming queue.

@codecov-commenter
Copy link

codecov-commenter commented Nov 15, 2022

Codecov Report

Merging #18462 (43966a1) into master (fdf86d3) will decrease coverage by 9.48%.
The diff coverage is 5.00%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #18462      +/-   ##
============================================
- Coverage     45.67%   36.18%   -9.49%     
+ Complexity    10075     7734    -2341     
============================================
  Files           693      697       +4     
  Lines         67940    67979      +39     
  Branches       7273     7278       +5     
============================================
- Hits          31030    24600    -6430     
- Misses        33333    40076    +6743     
+ Partials       3577     3303     -274     
Flag Coverage Δ
unittests 36.18% <5.00%> (-9.49%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...adbalance/extensions/channel/ServiceUnitState.java 0.00% <0.00%> (ø)
...lance/extensions/channel/ServiceUnitStateData.java 0.00% <0.00%> (ø)
...ar/broker/loadbalance/extensions/models/Split.java 0.00% <0.00%> (ø)
...r/broker/loadbalance/extensions/models/Unload.java 0.00% <0.00%> (ø)
...va/org/apache/pulsar/client/impl/ConsumerImpl.java 15.05% <0.00%> (-0.05%) ⬇️
...he/pulsar/client/impl/MultiTopicsConsumerImpl.java 22.99% <25.00%> (+0.03%) ⬆️
...a/org/apache/pulsar/client/impl/MessageIdImpl.java 80.55% <33.33%> (+18.05%) ⬆️
...apache/pulsar/client/impl/ConsumerBuilderImpl.java 27.80% <50.00%> (+0.29%) ⬆️
...ava/org/apache/pulsar/broker/admin/v1/Brokers.java 0.00% <0.00%> (-100.00%) ⬇️
...va/org/apache/pulsar/broker/admin/v1/Clusters.java 0.00% <0.00%> (-100.00%) ⬇️
... and 154 more

@codelipenghui
Copy link
Contributor

The issue should also happen on a partitioned topic, right?

public void redeliverUnacknowledgedMessages() {
        List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
        internalPinnedExecutor.execute(() -> {
            CONSUMER_EPOCH.incrementAndGet(this);
            consumers.values().stream().forEach(consumer -> {
                futures.add(consumer.internalRedeliverUnacknowledgedMessages());
                consumer.unAckedChunkedMessageIdSequenceMap.clear();
            });
            clearIncomingMessages();
            unAckedMessageTracker.clear();
        });
        try {
            FutureUtil.waitForAll(futures).get();
        } catch (ExecutionException e) {
            throw e.getCause();
        }
        resumeReceivingFromPausedConsumersIfNeeded();
    }

@Technoboy-
Copy link
Contributor Author

Close this patch due to the revert of #18475

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs ready-to-test
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants