Skip to content

Commit

Permalink
fix potential deadlock in pulsar client close (apache#5731)
Browse files Browse the repository at this point in the history
# Motivation

A deadlock on PulsarClient.close() can happen if there are producers/consumers that are not closed prior to calling PulsarClient.close() and we have to close them in the routine.  The deadlock happens be cause we piggy pack off of a "pulsar-io" thread that is used to shutdown the connection in producer/consumer close, to also shutdown the EventLoopGroup.  "pulsar-io" thread is part of the EventLoopGroup thus it tries to shutdown itself and causes a deadlock.  

Below is a stacktrace of what it look like:

```
"pulsar-client-io-1-1" apache#20 prio=5 os_prio=31 tid=0x00007fc312a78800 nid=0x9a03 in Object.wait() [0x000070000384e000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x00000007b8532640> (a io.netty.util.concurrent.DefaultPromise)
    at java.lang.Object.wait(Object.java:502)
    at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:252)
    - locked <0x00000007b8532640> (a io.netty.util.concurrent.DefaultPromise)
    at io.netty.util.concurrent.DefaultPromise.sync(DefaultPromise.java:403)
    at io.netty.util.concurrent.DefaultPromise.sync(DefaultPromise.java:35)
    at org.apache.pulsar.client.impl.ConnectionPool.close(ConnectionPool.java:286)
    at org.apache.pulsar.client.impl.PulsarClientImpl.shutdown(PulsarClientImpl.java:578)
    at org.apache.pulsar.client.impl.PulsarClientImpl.lambda$closeAsync$18(PulsarClientImpl.java:560)
    at org.apache.pulsar.client.impl.PulsarClientImpl$$Lambda$82/878861517.run(Unknown Source)
    at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705)
    at java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:687)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
    at org.apache.pulsar.client.impl.ProducerImpl.lambda$closeAsync$9(ProducerImpl.java:735)
    at org.apache.pulsar.client.impl.ProducerImpl$$Lambda$80/1123226989.apply(Unknown Source)
    at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
    at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
    at org.apache.pulsar.client.impl.ClientCnx.handleSuccess(ClientCnx.java:406)
    at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:222)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:300)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
    at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
```

This deadlock went unnoticed for a long time because in the past we never waited for the shutdown of the EventLoopGroup to complete

Also apache#5628 does not solve the issue.  This PR should supersede that one.
  • Loading branch information
jerrypeng authored and Jerry Peng committed Dec 13, 2019
1 parent 33b54fc commit a790940
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,13 @@ private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int p

@Override
public void close() throws IOException {
try {
eventLoopGroup.shutdownGracefully(0, 1, TimeUnit.SECONDS).await();
} catch (InterruptedException e) {
log.warn("EventLoopGroup shutdown was interrupted", e);
}

dnsResolver.close();
eventLoopGroup.shutdown();
}

private void cleanupConnection(InetSocketAddress address, int connectionKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,11 @@ public CompletableFuture<Void> closeAsync() {
consumersToClose.forEach(c -> futures.add(c.closeAsync()));
}

FutureUtil.waitForAll(futures).thenRun(() -> {
// Need to run the shutdown sequence in a separate thread to prevent deadlocks
// If there are consumers or producers that need to be shutdown we cannot use the same thread
// to shutdown the EventLoopGroup as well as that would be trying to shutdown itself thus a deadlock
// would happen
FutureUtil.waitForAll(futures).thenRun(() -> new Thread(() -> {
// All producers & consumers are now closed, we can stop the client safely
try {
shutdown();
Expand All @@ -559,7 +563,7 @@ public CompletableFuture<Void> closeAsync() {
} catch (PulsarClientException e) {
closeFuture.completeExceptionally(e);
}
}).exceptionally(exception -> {
}, "pulsar-client-shutdown-thread").start()).exceptionally(exception -> {
closeFuture.completeExceptionally(exception);
return null;
});
Expand Down

0 comments on commit a790940

Please sign in to comment.