Skip to content

Commit

Permalink
Improve connection pool concurrent access (#1035)
Browse files Browse the repository at this point in the history
This update introduces read/write lock for internal pool map access management. For instance, the `retainAll` method is executed with write lock.
  • Loading branch information
injectives authored Oct 15, 2021
1 parent e7b8c21 commit 3b2b564
Showing 1 changed file with 107 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,20 @@
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;

import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
Expand Down Expand Up @@ -61,7 +66,8 @@ public class ConnectionPoolImpl implements ConnectionPool
private final MetricsListener metricsListener;
private final boolean ownsEventLoopGroup;

private final ConcurrentMap<BoltServerAddress,ExtendedChannelPool> pools = new ConcurrentHashMap<>();
private final ReadWriteLock addressToPoolLock = new ReentrantReadWriteLock();
private final Map<BoltServerAddress,ExtendedChannelPool> addressToPool = new HashMap<>();
private final AtomicBoolean closed = new AtomicBoolean();
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
private final ConnectionFactory connectionFactory;
Expand Down Expand Up @@ -124,25 +130,32 @@ public CompletionStage<Connection> acquire( BoltServerAddress address )
@Override
public void retainAll( Set<BoltServerAddress> addressesToRetain )
{
for ( BoltServerAddress address : pools.keySet() )
executeWithLock( addressToPoolLock.writeLock(), () ->
{
if ( !addressesToRetain.contains( address ) )
Iterator<Map.Entry<BoltServerAddress,ExtendedChannelPool>> entryIterator = addressToPool.entrySet().iterator();
while ( entryIterator.hasNext() )
{
int activeChannels = nettyChannelTracker.inUseChannelCount( address );
if ( activeChannels == 0 )
Map.Entry<BoltServerAddress,ExtendedChannelPool> entry = entryIterator.next();
BoltServerAddress address = entry.getKey();
if ( !addressesToRetain.contains( address ) )
{
// address is not present in updated routing table and has no active connections
// it's now safe to terminate corresponding connection pool and forget about it
ExtendedChannelPool pool = pools.remove( address );
if ( pool != null )
int activeChannels = nettyChannelTracker.inUseChannelCount( address );
if ( activeChannels == 0 )
{
log.info( "Closing connection pool towards %s, it has no active connections " +
"and is not in the routing table registry.", address );
closePoolInBackground( address, pool );
// address is not present in updated routing table and has no active connections
// it's now safe to terminate corresponding connection pool and forget about it
ExtendedChannelPool pool = entry.getValue();
entryIterator.remove();
if ( pool != null )
{
log.info( "Closing connection pool towards %s, it has no active connections " +
"and is not in the routing table registry.", address );
closePoolInBackground( address, pool );
}
}
}
}
}
} );
}

@Override
Expand All @@ -163,35 +176,40 @@ public CompletionStage<Void> close()
if ( closed.compareAndSet( false, true ) )
{
nettyChannelTracker.prepareToCloseChannels();
CompletableFuture<Void> allPoolClosedFuture = closeAllPools();

// We can only shutdown event loop group when all netty pools are fully closed,
// otherwise the netty pools might missing threads (from event loop group) to execute clean ups.
allPoolClosedFuture.whenComplete( ( ignored, pollCloseError ) -> {
pools.clear();
if ( !ownsEventLoopGroup )
{
completeWithNullIfNoError( closeFuture, pollCloseError );
}
else
{
shutdownEventLoopGroup( pollCloseError );
}
} );
executeWithLockAsync( addressToPoolLock.writeLock(),
() ->
{
// We can only shutdown event loop group when all netty pools are fully closed,
// otherwise the netty pools might missing threads (from event loop group) to execute clean ups.
return closeAllPools().whenComplete(
( ignored, pollCloseError ) ->
{
addressToPool.clear();
if ( !ownsEventLoopGroup )
{
completeWithNullIfNoError( closeFuture, pollCloseError );
}
else
{
shutdownEventLoopGroup( pollCloseError );
}
} );
} );
}
return closeFuture;
}

@Override
public boolean isOpen( BoltServerAddress address )
{
return pools.containsKey( address );
return executeWithLock( addressToPoolLock.readLock(), () -> addressToPool.containsKey( address ) );
}

@Override
public String toString()
{
return "ConnectionPoolImpl{" + "pools=" + pools + '}';
return executeWithLock( addressToPoolLock.readLock(), () -> "ConnectionPoolImpl{" + "pools=" + addressToPool + '}' );
}

private void processAcquisitionError( ExtendedChannelPool pool, BoltServerAddress serverAddress, Throwable error )
Expand Down Expand Up @@ -237,15 +255,15 @@ private void assertNotClosed( BoltServerAddress address, Channel channel, Extend
{
pool.release( channel );
closePoolInBackground( address, pool );
pools.remove( address );
executeWithLock( addressToPoolLock.writeLock(), () -> addressToPool.remove( address ) );
assertNotClosed();
}
}

// for testing only
ExtendedChannelPool getPool( BoltServerAddress address )
{
return pools.get( address );
return executeWithLock( addressToPoolLock.readLock(), () -> addressToPool.get( address ) );
}

ExtendedChannelPool newPool( BoltServerAddress address )
Expand All @@ -256,12 +274,22 @@ ExtendedChannelPool newPool( BoltServerAddress address )

private ExtendedChannelPool getOrCreatePool( BoltServerAddress address )
{
return pools.computeIfAbsent( address, ignored -> {
ExtendedChannelPool pool = newPool( address );
// before the connection pool is added I can add the metrics for the pool.
metricsListener.putPoolMetrics( pool.id(), address, this );
return pool;
} );
ExtendedChannelPool existingPool = executeWithLock( addressToPoolLock.readLock(), () -> addressToPool.get( address ) );
return existingPool != null
? existingPool
: executeWithLock( addressToPoolLock.writeLock(),
() ->
{
ExtendedChannelPool pool = addressToPool.get( address );
if ( pool == null )
{
pool = newPool( address );
// before the connection pool is added I can add the metrics for the pool.
metricsListener.putPoolMetrics( pool.id(), address, this );
addressToPool.put( address, pool );
}
return pool;
} );
}

private CompletionStage<Void> closePool( ExtendedChannelPool pool )
Expand Down Expand Up @@ -303,12 +331,45 @@ private void shutdownEventLoopGroup( Throwable pollCloseError )
private CompletableFuture<Void> closeAllPools()
{
return CompletableFuture.allOf(
pools.entrySet().stream().map( entry -> {
BoltServerAddress address = entry.getKey();
ExtendedChannelPool pool = entry.getValue();
log.info( "Closing connection pool towards %s", address );
// Wait for all pools to be closed.
return closePool( pool ).toCompletableFuture();
} ).toArray( CompletableFuture[]::new ) );
addressToPool.entrySet().stream()
.map( entry ->
{
BoltServerAddress address = entry.getKey();
ExtendedChannelPool pool = entry.getValue();
log.info( "Closing connection pool towards %s", address );
// Wait for all pools to be closed.
return closePool( pool ).toCompletableFuture();
} )
.toArray( CompletableFuture[]::new ) );
}

private void executeWithLock( Lock lock, Runnable runnable )
{
executeWithLock( lock, () ->
{
runnable.run();
return null;
} );
}

private <T> T executeWithLock( Lock lock, Supplier<T> supplier )
{
lock.lock();
try
{
return supplier.get();
}
finally
{
lock.unlock();
}
}

private <T> void executeWithLockAsync( Lock lock, Supplier<CompletionStage<T>> stageSupplier )
{
lock.lock();
CompletableFuture.completedFuture( lock )
.thenCompose( ignored -> stageSupplier.get() )
.whenComplete( ( ignored, throwable ) -> lock.unlock() );
}
}

0 comments on commit 3b2b564

Please sign in to comment.