From 028f89c75c01ba1901561b7989615d0b367aaaa1 Mon Sep 17 00:00:00 2001 From: injectives <11927660+injectives@users.noreply.github.com> Date: Fri, 4 Feb 2022 13:12:48 +0200 Subject: [PATCH] Improve connection release handling and improve flaky test (#1092) (#1140) (#1145) This update ensures connection release stages are linked. In addition, it improves stability of flaky `RoutingTableAndConnectionPoolTest.shouldHandleAddAndRemoveFromRoutingTableAndConnectionPool` test. --- .../internal/async/NetworkConnection.java | 13 ++++++--- .../ChannelReleasingResetResponseHandler.java | 12 +++++--- .../RoutingTableAndConnectionPoolTest.java | 29 ++++++++++++------- 3 files changed, 36 insertions(+), 18 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java index dda50e6bda..a5f1028a90 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java @@ -43,6 +43,7 @@ import static java.util.Collections.emptyMap; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.poolId; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setTerminationReason; +import static org.neo4j.driver.internal.util.Futures.asCompletionStage; /** * This connection represents a simple network connection to a remote server. @@ -178,10 +179,14 @@ public void terminateAndRelease( String reason ) if ( status.compareAndSet( Status.OPEN, Status.TERMINATED ) ) { setTerminationReason( channel, reason ); - channel.close(); - channelPool.release( channel ); - releaseFuture.complete( null ); - metricsListener.afterConnectionReleased( poolId( this.channel ), this.inUseEvent ); + asCompletionStage( channel.close() ) + .exceptionally( throwable -> null ) + .thenCompose( ignored -> channelPool.release( channel ) ) + .whenComplete( ( ignored, throwable ) -> + { + releaseFuture.complete( null ); + metricsListener.afterConnectionReleased( poolId( this.channel ), this.inUseEvent ); + } ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java index 0b956e5915..39aeb32330 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java @@ -28,6 +28,8 @@ import org.neo4j.driver.internal.util.Clock; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setLastUsedTimestamp; +import static org.neo4j.driver.internal.util.Futures.asCompletionStage; +import static org.neo4j.driver.internal.util.Futures.completedWithNull; public class ChannelReleasingResetResponseHandler extends ResetResponseHandler { @@ -47,18 +49,20 @@ public ChannelReleasingResetResponseHandler( Channel channel, ExtendedChannelPoo @Override protected void resetCompleted( CompletableFuture completionFuture, boolean success ) { + CompletionStage closureStage; if ( success ) { // update the last-used timestamp before returning the channel back to the pool setLastUsedTimestamp( channel, clock.millis() ); + closureStage = completedWithNull(); } else { // close the channel before returning it back to the pool if RESET failed - channel.close(); + closureStage = asCompletionStage( channel.close() ); } - - CompletionStage released = pool.release( channel ); - released.whenComplete( ( ignore, error ) -> completionFuture.complete( null ) ); + closureStage.exceptionally( throwable -> null ) + .thenCompose( ignored -> pool.release( channel ) ) + .whenComplete( ( ignore, error ) -> completionFuture.complete( null ) ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/RoutingTableAndConnectionPoolTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/RoutingTableAndConnectionPoolTest.java index 4f235b5535..d732fb594e 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/RoutingTableAndConnectionPoolTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/RoutingTableAndConnectionPoolTest.java @@ -25,9 +25,11 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Objects; import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -43,6 +45,7 @@ import org.neo4j.driver.exceptions.FatalDiscoveryException; import org.neo4j.driver.exceptions.ProtocolException; import org.neo4j.driver.internal.BoltServerAddress; +import org.neo4j.driver.internal.DatabaseNameUtil; import org.neo4j.driver.internal.async.connection.BootstrapFactory; import org.neo4j.driver.internal.async.pool.NettyChannelTracker; import org.neo4j.driver.internal.async.pool.PoolSettings; @@ -84,7 +87,7 @@ class RoutingTableAndConnectionPoolTest private static final BoltServerAddress D = new BoltServerAddress( "localhost:30003" ); private static final BoltServerAddress E = new BoltServerAddress( "localhost:30004" ); private static final BoltServerAddress F = new BoltServerAddress( "localhost:30005" ); - private static final List SERVERS = new LinkedList<>( Arrays.asList( null, A, B, C, D, E, F ) ); + private static final List SERVERS = Collections.synchronizedList( new LinkedList<>( Arrays.asList( null, A, B, C, D, E, F ) ) ); private static final String[] DATABASES = new String[]{"", SYSTEM_DATABASE_NAME, "my database"}; @@ -93,7 +96,7 @@ class RoutingTableAndConnectionPoolTest private final Logging logging = none(); @Test - void shouldAddServerToRoutingTableAndConnectionPool() throws Throwable + void shouldAddServerToRoutingTableAndConnectionPool() { // Given ConnectionPool connectionPool = newConnectionPool(); @@ -113,7 +116,7 @@ void shouldAddServerToRoutingTableAndConnectionPool() throws Throwable } @Test - void shouldNotAddToRoutingTableWhenFailedWithRoutingError() throws Throwable + void shouldNotAddToRoutingTableWhenFailedWithRoutingError() { // Given ConnectionPool connectionPool = newConnectionPool(); @@ -132,7 +135,7 @@ void shouldNotAddToRoutingTableWhenFailedWithRoutingError() throws Throwable } @Test - void shouldNotAddToRoutingTableWhenFailedWithProtocolError() throws Throwable + void shouldNotAddToRoutingTableWhenFailedWithProtocolError() { // Given ConnectionPool connectionPool = newConnectionPool(); @@ -151,7 +154,7 @@ void shouldNotAddToRoutingTableWhenFailedWithProtocolError() throws Throwable } @Test - void shouldNotAddToRoutingTableWhenFailedWithSecurityError() throws Throwable + void shouldNotAddToRoutingTableWhenFailedWithSecurityError() { // Given ConnectionPool connectionPool = newConnectionPool(); @@ -170,7 +173,7 @@ void shouldNotAddToRoutingTableWhenFailedWithSecurityError() throws Throwable } @Test - void shouldNotRemoveNewlyAddedRoutingTableEvenIfItIsExpired() throws Throwable + void shouldNotRemoveNewlyAddedRoutingTableEvenIfItIsExpired() { // Given ConnectionPool connectionPool = newConnectionPool(); @@ -193,7 +196,7 @@ void shouldNotRemoveNewlyAddedRoutingTableEvenIfItIsExpired() throws Throwable } @Test - void shouldRemoveExpiredRoutingTableAndServers() throws Throwable + void shouldRemoveExpiredRoutingTableAndServers() { // Given ConnectionPool connectionPool = newConnectionPool(); @@ -218,7 +221,7 @@ void shouldRemoveExpiredRoutingTableAndServers() throws Throwable } @Test - void shouldRemoveExpiredRoutingTableButNotServer() throws Throwable + void shouldRemoveExpiredRoutingTableButNotServer() { // Given ConnectionPool connectionPool = newConnectionPool(); @@ -255,7 +258,7 @@ void shouldHandleAddAndRemoveFromRoutingTableAndConnectionPool() throws Throwabl acquireAndReleaseConnections( loadBalancer ); Set servers = routingTables.allServers(); BoltServerAddress openServer = null; - for( BoltServerAddress server: servers ) + for ( BoltServerAddress server : servers ) { if ( connectionPool.isOpen( server ) ) { @@ -267,6 +270,8 @@ void shouldHandleAddAndRemoveFromRoutingTableAndConnectionPool() throws Throwabl // if we remove the open server from servers, then the connection pool should remove the server from the pool. SERVERS.remove( openServer ); + // ensure rediscovery is necessary on subsequent interaction + Arrays.stream( DATABASES ).map( DatabaseNameUtil::database ).forEach( routingTables::remove ); acquireAndReleaseConnections( loadBalancer ); assertFalse( connectionPool.isOpen( openServer ) ); @@ -366,7 +371,11 @@ public CompletionStage lookupClusterComposition( } if ( servers.size() == 0 ) { - servers.add( A ); + BoltServerAddress address = SERVERS.stream() + .filter( Objects::nonNull ) + .findFirst() + .orElseThrow( () -> new RuntimeException( "No non null server addresses are available" ) ); + servers.add( address ); } ClusterComposition composition = new ClusterComposition( clock.millis() + 1, servers, servers, servers ); return CompletableFuture.completedFuture( new ClusterCompositionLookupResult( composition ) );