Skip to content

Commit

Permalink
Merge pull request #436 from lutovich/1.5-no-purge
Browse files Browse the repository at this point in the history
Relax connection termination policy in routing driver
  • Loading branch information
zhenlineo authored Nov 29, 2017
2 parents e705db1 + bf6b57b commit 0c57215
Show file tree
Hide file tree
Showing 20 changed files with 649 additions and 298 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import io.netty.channel.Channel;
import io.netty.channel.pool.ChannelPool;
import io.netty.util.concurrent.Promise;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -39,16 +39,14 @@
import org.neo4j.driver.internal.util.ServerVersion;
import org.neo4j.driver.v1.Value;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.neo4j.driver.internal.util.Futures.asCompletionStage;

public class NettyConnection implements Connection
{
private final Channel channel;
private final InboundMessageDispatcher messageDispatcher;
private final BoltServerAddress serverAddress;
private final ServerVersion serverVersion;
private final ChannelPool channelPool;
private final CompletableFuture<Void> releaseFuture;
private final Clock clock;

private final AtomicBoolean open = new AtomicBoolean( true );
Expand All @@ -61,6 +59,7 @@ public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock )
this.serverAddress = ChannelAttributes.serverAddress( channel );
this.serverVersion = ChannelAttributes.serverVersion( channel );
this.channelPool = channelPool;
this.releaseFuture = new CompletableFuture<>();
this.clock = clock;
}

Expand Down Expand Up @@ -111,14 +110,9 @@ public CompletionStage<Void> release()
{
if ( open.compareAndSet( true, false ) )
{
Promise<Void> releasePromise = channel.eventLoop().newPromise();
reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock, releasePromise ) );
return asCompletionStage( releasePromise );
}
else
{
return completedFuture( null );
reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock, releaseFuture ) );
}
return releaseFuture;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.netty.util.concurrent.Future;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -58,10 +59,16 @@ public class ConnectionPoolImpl implements ConnectionPool

public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings,
Logging logging, Clock clock )
{
this( connector, bootstrap, new ActiveChannelTracker( logging ), settings, logging, clock );
}

ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, ActiveChannelTracker activeChannelTracker,
PoolSettings settings, Logging logging, Clock clock )
{
this.connector = connector;
this.bootstrap = bootstrap;
this.activeChannelTracker = new ActiveChannelTracker( logging );
this.activeChannelTracker = activeChannelTracker;
this.channelHealthChecker = new NettyChannelHealthChecker( settings, clock, logging );
this.settings = settings;
this.clock = clock;
Expand All @@ -86,27 +93,30 @@ public CompletionStage<Connection> acquire( BoltServerAddress address )
}

@Override
public void purge( BoltServerAddress address )
public void retainAll( Set<BoltServerAddress> addressesToRetain )
{
log.info( "Purging connections towards %s", address );

// purge active connections
activeChannelTracker.purge( address );

// purge idle connections in the pool and pool itself
ChannelPool pool = pools.remove( address );
if ( pool != null )
for ( BoltServerAddress address : pools.keySet() )
{
pool.close();
if ( !addressesToRetain.contains( address ) )
{
int activeChannels = activeChannelTracker.activeChannelCount( address );
if ( activeChannels == 0 )
{
// address is not present in updated routing table and has no active connections
// it's now safe to terminate corresponding connection pool and forget about it

ChannelPool pool = pools.remove( address );
if ( pool != null )
{
log.info( "Closing connection pool towards %s, it has no active connections " +
"and is not in the routing table", address );
pool.close();
}
}
}
}
}

@Override
public boolean hasAddress( BoltServerAddress address )
{
return pools.containsKey( address );
}

@Override
public int activeConnections( BoltServerAddress address )
{
Expand Down Expand Up @@ -157,7 +167,7 @@ private ChannelPool getOrCreatePool( BoltServerAddress address )
return pool;
}

private NettyChannelPool newPool( BoltServerAddress address )
ChannelPool newPool( BoltServerAddress address )
{
return new NettyChannelPool( address, connector, bootstrap, activeChannelTracker, channelHealthChecker,
settings.connectionAcquisitionTimeout(), settings.maxConnectionPoolSize() );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,54 +39,9 @@ public int size()
return addresses.length;
}

public synchronized void update( Set<BoltServerAddress> addresses, Set<BoltServerAddress> removed )
public synchronized void update( Set<BoltServerAddress> addresses )
{
BoltServerAddress[] prev = this.addresses;
if ( addresses.isEmpty() )
{
this.addresses = NONE;
return;
}
if ( prev.length == 0 )
{
this.addresses = addresses.toArray( NONE );
return;
}
BoltServerAddress[] copy = null;
if ( addresses.size() != prev.length )
{
copy = new BoltServerAddress[addresses.size()];
}
int j = 0;
for ( int i = 0; i < prev.length; i++ )
{
if ( addresses.remove( prev[i] ) )
{
if ( copy != null )
{
copy[j++] = prev[i];
}
}
else
{
removed.add( prev[i] );
if ( copy == null )
{
copy = new BoltServerAddress[prev.length];
System.arraycopy( prev, 0, copy, 0, i );
j = i;
}
}
}
if ( copy == null )
{
return;
}
for ( BoltServerAddress address : addresses )
{
copy[j++] = address;
}
this.addresses = copy;
this.addresses = addresses.toArray( NONE );
}

public synchronized void remove( BoltServerAddress address )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.neo4j.driver.internal.cluster;

import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;
Expand All @@ -43,7 +44,7 @@ public class ClusterRoutingTable implements RoutingTable
public ClusterRoutingTable( Clock clock, BoltServerAddress... routingAddresses )
{
this( clock );
routers.update( new LinkedHashSet<>( asList( routingAddresses ) ), new HashSet<BoltServerAddress>() );
routers.update( new LinkedHashSet<>( asList( routingAddresses ) ) );
}

private ClusterRoutingTable( Clock clock )
Expand All @@ -66,14 +67,12 @@ public boolean isStaleFor( AccessMode mode )
}

@Override
public synchronized Set<BoltServerAddress> update( ClusterComposition cluster )
public synchronized void update( ClusterComposition cluster )
{
expirationTimeout = cluster.expirationTimestamp();
Set<BoltServerAddress> removed = new HashSet<>();
readers.update( cluster.readers(), removed );
writers.update( cluster.writers(), removed );
routers.update( cluster.routers(), removed );
return removed;
readers.update( cluster.readers() );
writers.update( cluster.writers() );
routers.update( cluster.routers() );
}

@Override
Expand Down Expand Up @@ -102,6 +101,16 @@ public AddressSet routers()
return routers;
}

@Override
public Set<BoltServerAddress> servers()
{
Set<BoltServerAddress> servers = new HashSet<>();
Collections.addAll( servers, readers.toArray() );
Collections.addAll( servers, writers.toArray() );
Collections.addAll( servers, routers.toArray() );
return servers;
}

@Override
public void removeWriter( BoltServerAddress toRemove )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,9 @@ public CompletionStage<RoutingProcedureResponse> run( CompletionStage<Connection
return connectionStage.thenCompose( connection ->
{
Statement procedure = procedureStatement( connection.serverVersion() );
return runProcedure( connection, procedure ).handle( ( records, error ) ->
{
Throwable cause = Futures.completionErrorCause( error );
if ( cause != null )
{
return handleError( procedure, cause );
}
else
{
return new RoutingProcedureResponse( procedure, records );
}
} );
return runProcedure( connection, procedure )
.thenCompose( records -> releaseConnection( connection, records ) )
.handle( ( records, error ) -> processProcedureResponse( procedure, records, error ) );
} );
}

Expand All @@ -87,6 +78,30 @@ private Statement procedureStatement( ServerVersion serverVersion )
}
}

private CompletionStage<List<Record>> releaseConnection( Connection connection, List<Record> records )
{
// It is not strictly required to release connection after routing procedure invocation because it'll
// be released by the PULL_ALL response handler after result is fully fetched. Such release will happen
// in background. However, releasing it early as part of whole chain makes it easier to reason about
// rediscovery in stub server tests. Some of them assume connections to instances not present in new
// routing table will be closed immediately.
return connection.release().thenApply( ignore -> records );
}

private RoutingProcedureResponse processProcedureResponse( Statement procedure, List<Record> records,
Throwable error )
{
Throwable cause = Futures.completionErrorCause( error );
if ( cause != null )
{
return handleError( procedure, cause );
}
else
{
return new RoutingProcedureResponse( procedure, records );
}
}

private RoutingProcedureResponse handleError( Statement procedure, Throwable error )
{
if ( error instanceof ClientException )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public interface RoutingTable
{
boolean isStaleFor( AccessMode mode );

Set<BoltServerAddress> update( ClusterComposition cluster );
void update( ClusterComposition cluster );

void forget( BoltServerAddress address );

Expand All @@ -37,5 +37,7 @@ public interface RoutingTable

AddressSet routers();

Set<BoltServerAddress> servers();

void removeWriter( BoltServerAddress toRemove );
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import io.netty.util.concurrent.EventExecutorGroup;

import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

Expand Down Expand Up @@ -125,10 +124,8 @@ public CompletionStage<Void> close()

private synchronized void forget( BoltServerAddress address )
{
// First remove from the load balancer, to prevent concurrent threads from making connections to them.
// remove from the routing table, to prevent concurrent threads from making connections to this address
routingTable.forget( address );
// drop all current connections to the address
connectionPool.purge( address );
}

private synchronized CompletionStage<RoutingTable> freshRoutingTable( AccessMode mode )
Expand Down Expand Up @@ -171,18 +168,21 @@ else if ( routingTable.isStaleFor( mode ) )

private synchronized void freshClusterCompositionFetched( ClusterComposition composition )
{
Set<BoltServerAddress> removed = routingTable.update( composition );

for ( BoltServerAddress address : removed )
try
{
connectionPool.purge( address );
}
routingTable.update( composition );
connectionPool.retainAll( routingTable.servers() );

log.info( "Refreshed routing information. %s", routingTable );
log.info( "Refreshed routing information. %s", routingTable );

CompletableFuture<RoutingTable> routingTableFuture = refreshRoutingTableFuture;
refreshRoutingTableFuture = null;
routingTableFuture.complete( routingTable );
CompletableFuture<RoutingTable> routingTableFuture = refreshRoutingTableFuture;
refreshRoutingTableFuture = null;
routingTableFuture.complete( routingTable );
}
catch ( Throwable error )
{
clusterCompositionLookupFailed( error );
}
}

private synchronized void clusterCompositionLookupFailed( Throwable error )
Expand Down
Loading

0 comments on commit 0c57215

Please sign in to comment.