From 51215a1a9fe9b694693254a73abe51c480d10b07 Mon Sep 17 00:00:00 2001 From: Dmitriy Tverdiakov Date: Tue, 30 Nov 2021 12:10:19 +0000 Subject: [PATCH] Improve connection release handling and improve flaky test This update ensures connection release stages are linked. In addition, it improves stability of flaky `RoutingTableAndConnectionPoolTest.shouldHandleAddAndRemoveFromRoutingTableAndConnectionPool` test. --- .../internal/async/NetworkConnection.java | 13 ++++++--- .../ChannelReleasingResetResponseHandler.java | 12 +++++--- .../RoutingTableAndConnectionPoolTest.java | 29 ++++++++++++------- 3 files changed, 36 insertions(+), 18 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java index 84e1c1cc2a..c9e607f85b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java @@ -48,6 +48,7 @@ import static java.util.Collections.emptyMap; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.poolId; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setTerminationReason; +import static org.neo4j.driver.internal.util.Futures.asCompletionStage; /** * This connection represents a simple network connection to a remote server. It wraps a channel obtained from a connection pool. The life cycle of this @@ -189,10 +190,14 @@ public void terminateAndRelease( String reason ) if ( status.compareAndSet( Status.OPEN, Status.TERMINATED ) ) { setTerminationReason( channel, reason ); - channel.close(); - channelPool.release( channel ); - releaseFuture.complete( null ); - metricsListener.afterConnectionReleased( poolId( this.channel ), this.inUseEvent ); + asCompletionStage( channel.close() ) + .exceptionally( throwable -> null ) + .thenCompose( ignored -> channelPool.release( channel ) ) + .whenComplete( ( ignored, throwable ) -> + { + releaseFuture.complete( null ); + metricsListener.afterConnectionReleased( poolId( this.channel ), this.inUseEvent ); + } ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java index 0b956e5915..39aeb32330 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java @@ -28,6 +28,8 @@ import org.neo4j.driver.internal.util.Clock; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setLastUsedTimestamp; +import static org.neo4j.driver.internal.util.Futures.asCompletionStage; +import static org.neo4j.driver.internal.util.Futures.completedWithNull; public class ChannelReleasingResetResponseHandler extends ResetResponseHandler { @@ -47,18 +49,20 @@ public ChannelReleasingResetResponseHandler( Channel channel, ExtendedChannelPoo @Override protected void resetCompleted( CompletableFuture completionFuture, boolean success ) { + CompletionStage closureStage; if ( success ) { // update the last-used timestamp before returning the channel back to the pool setLastUsedTimestamp( channel, clock.millis() ); + closureStage = completedWithNull(); } else { // close the channel before returning it back to the pool if RESET failed - channel.close(); + closureStage = asCompletionStage( channel.close() ); } - - CompletionStage released = pool.release( channel ); - released.whenComplete( ( ignore, error ) -> completionFuture.complete( null ) ); + closureStage.exceptionally( throwable -> null ) + .thenCompose( ignored -> pool.release( channel ) ) + .whenComplete( ( ignore, error ) -> completionFuture.complete( null ) ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/RoutingTableAndConnectionPoolTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/RoutingTableAndConnectionPoolTest.java index bd7eb9a2ae..f11b5f487b 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/RoutingTableAndConnectionPoolTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/RoutingTableAndConnectionPoolTest.java @@ -25,9 +25,11 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Objects; import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -43,6 +45,7 @@ import org.neo4j.driver.exceptions.FatalDiscoveryException; import org.neo4j.driver.exceptions.ProtocolException; import org.neo4j.driver.internal.BoltServerAddress; +import org.neo4j.driver.internal.DatabaseNameUtil; import org.neo4j.driver.internal.async.connection.BootstrapFactory; import org.neo4j.driver.internal.async.pool.NettyChannelHealthChecker; import org.neo4j.driver.internal.async.pool.NettyChannelTracker; @@ -85,7 +88,7 @@ class RoutingTableAndConnectionPoolTest private static final BoltServerAddress D = new BoltServerAddress( "localhost:30003" ); private static final BoltServerAddress E = new BoltServerAddress( "localhost:30004" ); private static final BoltServerAddress F = new BoltServerAddress( "localhost:30005" ); - private static final List SERVERS = new LinkedList<>( Arrays.asList( null, A, B, C, D, E, F ) ); + private static final List SERVERS = Collections.synchronizedList( new LinkedList<>( Arrays.asList( null, A, B, C, D, E, F ) ) ); private static final String[] DATABASES = new String[]{"", SYSTEM_DATABASE_NAME, "my database"}; @@ -94,7 +97,7 @@ class RoutingTableAndConnectionPoolTest private final Logging logging = none(); @Test - void shouldAddServerToRoutingTableAndConnectionPool() throws Throwable + void shouldAddServerToRoutingTableAndConnectionPool() { // Given ConnectionPool connectionPool = newConnectionPool(); @@ -114,7 +117,7 @@ void shouldAddServerToRoutingTableAndConnectionPool() throws Throwable } @Test - void shouldNotAddToRoutingTableWhenFailedWithRoutingError() throws Throwable + void shouldNotAddToRoutingTableWhenFailedWithRoutingError() { // Given ConnectionPool connectionPool = newConnectionPool(); @@ -134,7 +137,7 @@ void shouldNotAddToRoutingTableWhenFailedWithRoutingError() throws Throwable } @Test - void shouldNotAddToRoutingTableWhenFailedWithProtocolError() throws Throwable + void shouldNotAddToRoutingTableWhenFailedWithProtocolError() { // Given ConnectionPool connectionPool = newConnectionPool(); @@ -154,7 +157,7 @@ void shouldNotAddToRoutingTableWhenFailedWithProtocolError() throws Throwable } @Test - void shouldNotAddToRoutingTableWhenFailedWithSecurityError() throws Throwable + void shouldNotAddToRoutingTableWhenFailedWithSecurityError() { // Given ConnectionPool connectionPool = newConnectionPool(); @@ -174,7 +177,7 @@ void shouldNotAddToRoutingTableWhenFailedWithSecurityError() throws Throwable } @Test - void shouldNotRemoveNewlyAddedRoutingTableEvenIfItIsExpired() throws Throwable + void shouldNotRemoveNewlyAddedRoutingTableEvenIfItIsExpired() { // Given ConnectionPool connectionPool = newConnectionPool(); @@ -197,7 +200,7 @@ void shouldNotRemoveNewlyAddedRoutingTableEvenIfItIsExpired() throws Throwable } @Test - void shouldRemoveExpiredRoutingTableAndServers() throws Throwable + void shouldRemoveExpiredRoutingTableAndServers() { // Given ConnectionPool connectionPool = newConnectionPool(); @@ -224,7 +227,7 @@ void shouldRemoveExpiredRoutingTableAndServers() throws Throwable } @Test - void shouldRemoveExpiredRoutingTableButNotServer() throws Throwable + void shouldRemoveExpiredRoutingTableButNotServer() { // Given ConnectionPool connectionPool = newConnectionPool(); @@ -263,7 +266,7 @@ void shouldHandleAddAndRemoveFromRoutingTableAndConnectionPool() throws Throwabl acquireAndReleaseConnections( loadBalancer ); Set servers = routingTables.allServers(); BoltServerAddress openServer = null; - for( BoltServerAddress server: servers ) + for ( BoltServerAddress server : servers ) { if ( connectionPool.isOpen( server ) ) { @@ -275,6 +278,8 @@ void shouldHandleAddAndRemoveFromRoutingTableAndConnectionPool() throws Throwabl // if we remove the open server from servers, then the connection pool should remove the server from the pool. SERVERS.remove( openServer ); + // ensure rediscovery is necessary on subsequent interaction + Arrays.stream( DATABASES ).map( DatabaseNameUtil::database ).forEach( routingTables::remove ); acquireAndReleaseConnections( loadBalancer ); assertFalse( connectionPool.isOpen( openServer ) ); @@ -375,7 +380,11 @@ public CompletionStage lookupClusterComposition( } if ( servers.size() == 0 ) { - servers.add( A ); + BoltServerAddress address = SERVERS.stream() + .filter( Objects::nonNull ) + .findFirst() + .orElseThrow( () -> new RuntimeException( "No non null server addresses are available" ) ); + servers.add( address ); } ClusterComposition composition = new ClusterComposition( clock.millis() + 1, servers, servers, servers, null ); return CompletableFuture.completedFuture( new ClusterCompositionLookupResult( composition ) );