Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve connection acquisition timeout error #433

Merged
merged 2 commits into from
Nov 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import io.netty.channel.pool.ChannelPool;
import io.netty.util.concurrent.Future;

import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.driver.internal.async.BoltServerAddress;
Expand All @@ -38,6 +40,7 @@
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.exceptions.ClientException;

public class ConnectionPoolImpl implements ConnectionPool
{
Expand Down Expand Up @@ -73,8 +76,9 @@ public CompletionStage<Connection> acquire( final BoltServerAddress address )
ChannelPool pool = getOrCreatePool( address );
Future<Channel> connectionFuture = pool.acquire();

return Futures.asCompletionStage( connectionFuture ).thenApply( channel ->
return Futures.asCompletionStage( connectionFuture ).handle( ( channel, error ) ->
{
processAcquisitionError( error );
assertNotClosed( address, channel, pool );
return new NettyConnection( channel, pool, clock );
} );
Expand Down Expand Up @@ -160,6 +164,27 @@ private EventLoopGroup eventLoopGroup()
return bootstrap.config().group();
}

private void processAcquisitionError( Throwable error )
{
Throwable cause = Futures.completionErrorCause( error );
if ( cause != null )
{
if ( cause instanceof TimeoutException )
{
// NettyChannelPool returns future failed with TimeoutException if acquire operation takes more than
// configured time, translate this exception to a prettier one and re-throw
throw new ClientException(
"Unable to acquire connection from the pool within configured maximum time of " +
settings.connectionAcquisitionTimeout() + "ms" );
}
else
{
// some unknown error happened during connection acquisition, propagate it
throw new CompletionException( cause );
}
}
}

private void assertNotClosed()
{
if ( closed.get() )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import org.neo4j.driver.internal.InternalDriver;
import org.neo4j.driver.internal.SessionFactory;
import org.neo4j.driver.internal.SessionFactoryImpl;
import org.neo4j.driver.internal.async.BoltServerAddress;
import org.neo4j.driver.internal.cluster.AddressSet;
import org.neo4j.driver.internal.cluster.RoutingTable;
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancer;
import org.neo4j.driver.internal.async.BoltServerAddress;
import org.neo4j.driver.internal.spi.ConnectionProvider;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.exceptions.ClientException;
Expand Down Expand Up @@ -236,6 +236,30 @@ public void describeTo( Description description )
};
}

public static Matcher<Throwable> connectionAcquisitionTimeoutError( int timeoutMillis )
{
return new TypeSafeMatcher<Throwable>()
{
@Override
protected boolean matchesSafely( Throwable error )
{
if ( error instanceof ClientException )
{
String expectedMessage = "Unable to acquire connection from the pool within " +
"configured maximum time of " + timeoutMillis + "ms";
return expectedMessage.equals( error.getMessage() );
}
return false;
}

@Override
public void describeTo( Description description )
{
description.appendText( "acquisition timeout error with " + timeoutMillis + "ms" );
}
};
}

private static boolean contains( AddressSet set, BoltServerAddress address )
{
BoltServerAddress[] addresses = set.toArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.neo4j.driver.v1.integration;

import org.junit.AfterClass;
import org.junit.Rule;
import org.junit.Test;

Expand All @@ -35,7 +36,6 @@
import java.util.concurrent.TimeoutException;

import org.neo4j.driver.internal.cluster.RoutingSettings;
import org.neo4j.driver.internal.logging.DevNullLogger;
import org.neo4j.driver.internal.retry.RetrySettings;
import org.neo4j.driver.internal.util.ChannelTrackingDriverFactory;
import org.neo4j.driver.internal.util.FakeClock;
Expand All @@ -44,8 +44,6 @@
import org.neo4j.driver.v1.Config;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.StatementResult;
Expand All @@ -64,9 +62,11 @@
import org.neo4j.driver.v1.util.cc.ClusterMemberRole;
import org.neo4j.driver.v1.util.cc.ClusterRule;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
Expand All @@ -76,6 +76,7 @@
import static org.junit.Assert.fail;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
import static org.neo4j.driver.internal.util.Futures.getBlocking;
import static org.neo4j.driver.internal.util.Matchers.connectionAcquisitionTimeoutError;
import static org.neo4j.driver.v1.Values.parameters;

public class CausalClusteringIT
Expand All @@ -85,6 +86,12 @@ public class CausalClusteringIT
@Rule
public final ClusterRule clusterRule = new ClusterRule();

@AfterClass
public static void stopSharedCluster()
{
ClusterRule.stopSharedCluster();
Copy link
Contributor

@zhenlineo zhenlineo Nov 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we want to immediately shop the cluster after this test class? Should we actually want to keep the cluster running until the whole tests finish?

If this is the only place that requires a cluster, then maybe we should change the ClusterRule to be a class role?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cluster takes a lot of memory and might influence other tests. There exists one more test that uses this rule. I can make it a @ClassRule and add cleaning of db in some @After method instead.

}

@Test
public void shouldExecuteReadAndWritesWhenDriverSuppliedWithAddressOfLeader() throws Exception
{
Expand Down Expand Up @@ -532,6 +539,46 @@ public void shouldNotReuseReadConnectionForWriteTransaction()
}
}

@Test
public void shouldRespectMaxConnectionPoolSizePerClusterMember()
{
Cluster cluster = clusterRule.getCluster();
ClusterMember leader = cluster.leader();

Config config = Config.build()
.withMaxConnectionPoolSize( 2 )
.withConnectionAcquisitionTimeout( 42, MILLISECONDS )
.withLogging( DEV_NULL_LOGGING )
.toConfig();

try ( Driver driver = createDriver( leader.getRoutingUri(), config ) )
{
Session writeSession1 = driver.session( AccessMode.WRITE );
writeSession1.beginTransaction();

Session writeSession2 = driver.session( AccessMode.WRITE );
writeSession2.beginTransaction();

// should not be possible to acquire more connections towards leader because limit is 2
Session writeSession3 = driver.session( AccessMode.WRITE );
try
{
writeSession3.beginTransaction();
fail( "Exception expected" );
}
catch ( ClientException e )
{
assertThat( e, is( connectionAcquisitionTimeoutError( 42 ) ) );
}

// should be possible to acquire new connection towards read server
// it's a different machine, not leader, so different max connection pool size limit applies
Session readSession = driver.session( AccessMode.READ );
Record record = readSession.readTransaction( tx -> tx.run( "RETURN 1" ).single() );
assertEquals( 1, record.get( 0 ).asInt() );
}
}

private CompletionStage<List<RecordAndSummary>> combineCursors( StatementResultCursor cursor1,
StatementResultCursor cursor2 )
{
Expand Down Expand Up @@ -702,19 +749,15 @@ else if ( role == ClusterMemberRole.READ_REPLICA )

private Driver createDriver( URI boltUri )
{
Logging devNullLogging = new Logging()
{
@Override
public Logger getLog( String name )
{
return DevNullLogger.DEV_NULL_LOGGER;
}
};

Config config = Config.build()
.withLogging( devNullLogging )
.withLogging( DEV_NULL_LOGGING )
.toConfig();

return createDriver( boltUri, config );
}

private Driver createDriver( URI boltUri, Config config )
{
return GraphDatabase.driver( boltUri, clusterRule.getDefaultAuthToken(), config );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,26 @@

import org.neo4j.driver.internal.cluster.RoutingSettings;
import org.neo4j.driver.internal.util.ChannelTrackingDriverFactory;
import org.neo4j.driver.internal.util.DriverFactoryWithOneEventLoopThread;
import org.neo4j.driver.internal.util.FakeClock;
import org.neo4j.driver.v1.Config;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.StatementResult;
import org.neo4j.driver.v1.Transaction;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.util.TestNeo4j;

import static java.util.concurrent.TimeUnit.SECONDS;
import static junit.framework.TestCase.fail;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.neo4j.driver.internal.retry.RetrySettings.DEFAULT;
import static org.neo4j.driver.internal.util.Matchers.connectionAcquisitionTimeoutError;

public class ConnectionPoolIT
{
Expand Down Expand Up @@ -113,6 +118,28 @@ public void shouldDisposeChannelsBasedOnMaxLifetime() throws Exception
assertTrue( channel2.isActive() );
}

@Test
public void shouldRespectMaxConnectionPoolSize()
{
int maxPoolSize = 3;
Config config = Config.build()
.withMaxConnectionPoolSize( maxPoolSize )
.withConnectionAcquisitionTimeout( 542, TimeUnit.MILLISECONDS )
.toConfig();

driver = new DriverFactoryWithOneEventLoopThread().newInstance( neo4j.uri(), neo4j.authToken(), config );

try
{
startAndCloseTransactions( driver, maxPoolSize + 1 );
fail( "Exception expected" );
}
catch ( ClientException e )
{
assertThat( e, is( connectionAcquisitionTimeoutError( 542 ) ) );
}
}

@After
public void cleanup() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.neo4j.driver.internal.logging.DevNullLogging;
import org.neo4j.driver.internal.retry.RetrySettings;
import org.neo4j.driver.internal.util.DriverFactoryWithFixedRetryLogic;
import org.neo4j.driver.internal.util.DriverFactoryWithOneEventLoopThread;
import org.neo4j.driver.internal.util.ServerVersion;
import org.neo4j.driver.v1.AccessMode;
import org.neo4j.driver.v1.AuthToken;
Expand Down Expand Up @@ -73,6 +74,7 @@
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -87,6 +89,7 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.neo4j.driver.internal.util.Matchers.connectionAcquisitionTimeoutError;
import static org.neo4j.driver.internal.util.ServerVersion.v3_1_0;
import static org.neo4j.driver.v1.Values.parameters;
import static org.neo4j.driver.v1.util.DaemonThreadFactory.daemon;
Expand Down Expand Up @@ -1296,6 +1299,38 @@ public void shouldConsumePreviousResultBeforeRunningNewQuery()
}
}

@Test
public void shouldNotRetryOnConnectionAcquisitionTimeout()
{
int maxPoolSize = 3;
Config config = Config.build()
.withMaxConnectionPoolSize( maxPoolSize )
.withConnectionAcquisitionTimeout( 0, TimeUnit.SECONDS )
.withMaxTransactionRetryTime( 42, TimeUnit.DAYS ) // retry for a really long time
.toConfig();

driver = new DriverFactoryWithOneEventLoopThread().newInstance( neo4j.uri(), neo4j.authToken(), config );

for ( int i = 0; i < maxPoolSize; i++ )
{
driver.session().beginTransaction();
}

AtomicInteger invocations = new AtomicInteger();
try
{
driver.session().writeTransaction( tx -> invocations.incrementAndGet() );
fail( "Exception expected" );
}
catch ( ClientException e )
{
assertThat( e, is( connectionAcquisitionTimeoutError( 0 ) ) );
}

// work should never be invoked
assertEquals( 0, invocations.get() );
}

private void assumeServerIs31OrLater()
{
ServerVersion serverVersion = ServerVersion.version( neo4j.driver() );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.neo4j.driver.v1.stress;

import org.junit.AfterClass;
import org.junit.Rule;

import java.net.URI;
Expand All @@ -40,6 +41,7 @@
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
import org.neo4j.driver.v1.summary.ResultSummary;
import org.neo4j.driver.v1.util.cc.ClusterMemberRole;
import org.neo4j.driver.v1.util.cc.ClusterRule;
import org.neo4j.driver.v1.util.cc.LocalOrRemoteClusterRule;

import static org.hamcrest.Matchers.both;
Expand All @@ -54,6 +56,12 @@ public class CausalClusteringStressIT extends AbstractStressTestBase<CausalClust
@Rule
public final LocalOrRemoteClusterRule clusterRule = new LocalOrRemoteClusterRule();

@AfterClass
public static void stopSharedCluster()
{
ClusterRule.stopSharedCluster();
}

@Override
URI databaseUri()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public static synchronized Neo4jRunner getOrCreateGlobalRunner() throws IOExcept
return globalInstance;
}

public static synchronized boolean globalRunnerExists()
{
return globalInstance != null;
}

private Neo4jRunner() throws IOException
{
try
Expand Down
Loading