From 33b54fcfe882c2b5b842d8780fbdac0519d66c0d Mon Sep 17 00:00:00 2001 From: Boyang Jerry Peng Date: Fri, 22 Nov 2019 08:19:07 -0800 Subject: [PATCH] Fix: Netty threads may still linger after client close (#5628) ### Motivation Even after the synchronous close() method is called on a PulsarClient, netty thread(s) from the EventLoopGroup used in the ConnectionPool may still linger around for another 2 seconds afterwards due to not wait for the the graceful shutdown to complete. While this usually doesn't cause any problems for a normal use case, it does cause problems in the Pulsar Flink sources and sinks. Exceptions like the following can happen when the Flink job with a Pulsar source or sink gets closed: ``` Exception in thread "pulsar-client-io-1-1" java.lang.NoClassDefFoundError: org/apache/pulsar/shade/io/netty/buffer/PoolArena$1 at org.apache.pulsar.shade.io.netty.buffer.PoolArena.freeChunk(PoolArena.java:293) at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.freeEntry(PoolThreadCache.java:460) at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.free(PoolThreadCache.java:430) at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.free(PoolThreadCache.java:422) at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:279) at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:270) at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:241) at org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache.onRemoval(PooledByteBufAllocator.java:450) at org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache.onRemoval(PooledByteBufAllocator.java:426) at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal.remove(FastThreadLocal.java:271) at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal.removeAll(FastThreadLocal.java:67) at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:32) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: org.apache.pulsar.shade.io.netty.buffer.PoolArena$1 at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 13 more ``` This is because threads from the EventLoopGroup lingers after the pulsar client is closed and the Flink job is unloaded. ### Modifications Shorten the graceful shutdown period and wait for it to complete --- .../java/org/apache/pulsar/client/impl/ConnectionPool.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 2b2f822243253..fc41e4c45efa7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -40,6 +40,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.apache.pulsar.client.api.PulsarClientException; @@ -281,8 +282,8 @@ private CompletableFuture connectToAddress(InetAddress ipAddress, int p @Override public void close() throws IOException { - eventLoopGroup.shutdownGracefully(); dnsResolver.close(); + eventLoopGroup.shutdown(); } private void cleanupConnection(InetSocketAddress address, int connectionKey,