Skip to content

Commit

Permalink
Fix: Netty threads may still linger after client close (apache#5628)
Browse files Browse the repository at this point in the history
### Motivation

Even after the synchronous close() method is called on a PulsarClient, netty thread(s) from the EventLoopGroup used in the ConnectionPool may still linger around for another 2 seconds afterwards due to not wait for the the graceful shutdown to complete.

While this usually doesn't cause any problems for a normal use case, it does cause problems in the Pulsar Flink sources and sinks.  Exceptions like the following can happen when the Flink job with a Pulsar source or sink gets closed:

```
Exception in thread "pulsar-client-io-1-1" java.lang.NoClassDefFoundError: org/apache/pulsar/shade/io/netty/buffer/PoolArena$1
	at org.apache.pulsar.shade.io.netty.buffer.PoolArena.freeChunk(PoolArena.java:293)
	at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.freeEntry(PoolThreadCache.java:460)
	at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.free(PoolThreadCache.java:430)
	at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache$MemoryRegionCache.free(PoolThreadCache.java:422)
	at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:279)
	at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:270)
	at org.apache.pulsar.shade.io.netty.buffer.PoolThreadCache.free(PoolThreadCache.java:241)
	at org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache.onRemoval(PooledByteBufAllocator.java:450)
	at org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache.onRemoval(PooledByteBufAllocator.java:426)
	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal.remove(FastThreadLocal.java:271)
	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal.removeAll(FastThreadLocal.java:67)
	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:32)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.pulsar.shade.io.netty.buffer.PoolArena$1
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 13 more
```

This is because threads from the EventLoopGroup lingers after the pulsar client is closed and the Flink job is unloaded.

### Modifications

Shorten the graceful shutdown period and wait for it to complete
  • Loading branch information
jerrypeng authored and Jerry Peng committed Dec 13, 2019
1 parent 3b2879d commit 33b54fc
Showing 1 changed file with 2 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.apache.pulsar.client.api.PulsarClientException;
Expand Down Expand Up @@ -281,8 +282,8 @@ private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int p

@Override
public void close() throws IOException {
eventLoopGroup.shutdownGracefully();
dnsResolver.close();
eventLoopGroup.shutdown();
}

private void cleanupConnection(InetSocketAddress address, int connectionKey,
Expand Down

0 comments on commit 33b54fc

Please sign in to comment.