From 6a156730d1f46c4aff7c338c36adb1d0bfa4c649 Mon Sep 17 00:00:00 2001 From: Dmitriy Tverdiakov Date: Mon, 6 Dec 2021 16:53:36 +0000 Subject: [PATCH] Migrate Java IT tests to Testkit Migrated tests: - `CausalClusteringIT.shouldNotReuseReadConnectionForWriteTransaction` -> `TestBookmarks.test_does_not_use_read_connection_for_write` - `CausalClusteringIT.shouldExecuteReadAndWritesWhenDriverSuppliedWithAddressOfLeader` -> `Routing.test_should_get_rt_from_leader_w_and_r_via_leader_using_session_run` - `CausalClusteringIT.shouldExecuteReadAndWritesWhenDriverSuppliedWithAddressOfFollower` -> `Routing.test_should_get_rt_from_follower_w_and_r_via_leader_using_session_run` - `CausalClusteringIT.shouldDropBrokenOldConnections` -> `Routing.test_should_drop_connections_failing_liveness_check` - `CausalClusteringIT.shouldRespectMaxConnectionPoolSizePerClusterMember` -> `Routing.test_should_enforce_pool_size_per_cluster_member` - `CausalClusteringIT.shouldKeepOperatingWhenConnectionsBreak` -> unexpected interruptions are covered by routing tests `Routing.test_should_retry_read_tx_until_success_on_run_error`, `Routing.test_should_retry_read_tx_until_success_on_pull_error`, `Routing.test_should_retry_read_tx_until_success_on_error`, `Routing.test_should_retry_read_tx_and_rediscovery_until_success_on_run_failure`, `Routing.test_should_retry_read_tx_and_rediscovery_until_success_on_pull_failure`, `Routing.test_should_retry_read_tx_and_rediscovery_until_success`, `Routing.test_should_retry_write_tx_and_rediscovery_until_success_on_run_failure`, `Routing.test_should_retry_write_until_success_with_leader_change_on_run_using_tx_function`, `Routing.test_should_retry_write_tx_until_success_on_run_error`, `Routing.test_should_retry_write_tx_until_success_on_pull_error`, `Routing.test_should_retry_write_tx_and_rediscovery_until_success_on_pull_failure`, `Routing.test_should_retry_write_until_success_with_leader_change_using_tx_function`, `Routing.test_should_retry_write_tx_until_success_on_error`, `Routing.test_should_retry_write_tx_and_rediscovery_until_success` Converted tests: - `CausalClusteringIT.shouldExecuteReadAndWritesWhenRouterIsDiscovered` -> `GraphDatabaseTest.shouldNotFailRoutingDriverWhenThereIsWorkingUri` New Testkit features: - `Feature:API:Liveness.Check` - `Temporary:DriverMaxConnectionPoolSize` - `Temporary:ConnectionAcquisitionTimeout` - `Temporary:GetConnectionPoolMetrics` The `address` field has been added to `Summary` Testkit response. --- .../InternalConnectionPoolMetrics.java | 17 +- .../org/neo4j/driver/GraphDatabaseTest.java | 41 +- .../driver/stress/CausalClusteringIT.java | 478 ------------------ .../requests/GetConnectionPoolMetrics.java | 95 ++++ .../messages/requests/GetFeatures.java | 6 +- .../backend/messages/requests/NewDriver.java | 9 + .../messages/requests/ResultConsume.java | 1 + .../messages/requests/TestkitRequest.java | 2 +- .../responses/ConnectionPoolMetrics.java | 43 ++ .../backend/messages/responses/Summary.java | 2 + 10 files changed, 201 insertions(+), 493 deletions(-) create mode 100644 testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetConnectionPoolMetrics.java create mode 100644 testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/ConnectionPoolMetrics.java diff --git a/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalConnectionPoolMetrics.java b/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalConnectionPoolMetrics.java index 26a6ae4abe..073a4a1952 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalConnectionPoolMetrics.java +++ b/driver/src/main/java/org/neo4j/driver/internal/metrics/InternalConnectionPoolMetrics.java @@ -219,15 +219,20 @@ public long acquired() return this.acquired.get(); } - @Override public String toString() { return format( "%s=[created=%s, closed=%s, creating=%s, failedToCreate=%s, acquiring=%s, acquired=%s, " + - "timedOutToAcquire=%s, inUse=%s, idle=%s, " + - "totalAcquisitionTime=%s, totalConnectionTime=%s, totalInUseTime=%s, totalInUseCount=%s]", - id(), created(), closed(), creating(), failedToCreate(), acquiring(), acquired(), - timedOutToAcquire(), inUse(), idle(), - totalAcquisitionTime(), totalConnectionTime(), totalInUseTime(), totalInUseCount() ); + "timedOutToAcquire=%s, inUse=%s, idle=%s, " + + "totalAcquisitionTime=%s, totalConnectionTime=%s, totalInUseTime=%s, totalInUseCount=%s]", + id(), created(), closed(), creating(), failedToCreate(), acquiring(), acquired(), + timedOutToAcquire(), inUse(), idle(), + totalAcquisitionTime(), totalConnectionTime(), totalInUseTime(), totalInUseCount() ); + } + + // This method is for purposes testing only + public BoltServerAddress getAddress() + { + return address; } } diff --git a/driver/src/test/java/org/neo4j/driver/GraphDatabaseTest.java b/driver/src/test/java/org/neo4j/driver/GraphDatabaseTest.java index 5c6ef0ea4a..8c6a928730 100644 --- a/driver/src/test/java/org/neo4j/driver/GraphDatabaseTest.java +++ b/driver/src/test/java/org/neo4j/driver/GraphDatabaseTest.java @@ -24,6 +24,8 @@ import java.io.IOException; import java.net.ServerSocket; import java.net.URI; +import java.util.Arrays; +import java.util.Iterator; import java.util.List; import org.neo4j.driver.exceptions.ServiceUnavailableException; @@ -73,7 +75,7 @@ void shouldLogWhenUnableToCreateRoutingDriver() when( logging.getLog( any( Class.class ) ) ).thenReturn( logger ); InternalDriver driver = mock( InternalDriver.class ); doThrow( ServiceUnavailableException.class ).when( driver ).verifyConnectivity(); - DriverFactory driverFactory = new MockSupplyingDriverFactory( driver ); + DriverFactory driverFactory = new MockSupplyingDriverFactory( Arrays.asList( driver, driver ) ); Config config = Config.builder() .withLogging( logging ) .build(); @@ -85,10 +87,35 @@ void shouldLogWhenUnableToCreateRoutingDriver() assertThrows( ServiceUnavailableException.class, () -> GraphDatabase.routingDriver( routingUris, AuthTokens.none(), config, driverFactory ) ); verify( logger ).warn( eq( "Unable to create routing driver for URI: neo4j://localhost:9001" ), - any( Throwable.class ) ); + any( Throwable.class ) ); verify( logger ).warn( eq( "Unable to create routing driver for URI: neo4j://localhost:9002" ), - any( Throwable.class ) ); + any( Throwable.class ) ); + } + + @Test + void shouldNotFailRoutingDriverWhenThereIsWorkingUri() + { + Logging logging = mock( Logging.class ); + Logger logger = mock( Logger.class ); + when( logging.getLog( any( Class.class ) ) ).thenReturn( logger ); + InternalDriver failingDriver = mock( InternalDriver.class ); + doThrow( ServiceUnavailableException.class ).when( failingDriver ).verifyConnectivity(); + InternalDriver workingDriver = mock( InternalDriver.class ); + DriverFactory driverFactory = new MockSupplyingDriverFactory( Arrays.asList( failingDriver, workingDriver ) ); + Config config = Config.builder() + .withLogging( logging ) + .build(); + + List routingUris = asList( + URI.create( "neo4j://localhost:9001" ), + URI.create( "neo4j://localhost:9002" ) ); + + Driver driver = GraphDatabase.routingDriver( routingUris, AuthTokens.none(), config, driverFactory ); + + verify( logger ).warn( eq( "Unable to create routing driver for URI: neo4j://localhost:9001" ), + any( Throwable.class ) ); + assertEquals( driver, workingDriver ); } @Test @@ -184,11 +211,11 @@ private static Config createConfig( boolean encrypted, int timeoutMillis ) private static class MockSupplyingDriverFactory extends DriverFactory { - private final InternalDriver driver; + private final Iterator driverIterator; - private MockSupplyingDriverFactory( InternalDriver driver ) + private MockSupplyingDriverFactory( List drivers ) { - this.driver = driver; + driverIterator = drivers.iterator(); } @Override @@ -196,7 +223,7 @@ protected InternalDriver createRoutingDriver( SecurityPlan securityPlan, BoltSer EventExecutorGroup eventExecutorGroup, RoutingSettings routingSettings, RetryLogic retryLogic, MetricsProvider metricsProvider, Config config ) { - return driver; + return driverIterator.next(); } } } diff --git a/driver/src/test/java/org/neo4j/driver/stress/CausalClusteringIT.java b/driver/src/test/java/org/neo4j/driver/stress/CausalClusteringIT.java index f464655d95..50edaa90f2 100644 --- a/driver/src/test/java/org/neo4j/driver/stress/CausalClusteringIT.java +++ b/driver/src/test/java/org/neo4j/driver/stress/CausalClusteringIT.java @@ -18,83 +18,27 @@ */ package org.neo4j.driver.stress; -import io.netty.channel.Channel; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; import org.neo4j.driver.AccessMode; -import org.neo4j.driver.AuthToken; -import org.neo4j.driver.Bookmark; import org.neo4j.driver.Config; import org.neo4j.driver.Driver; import org.neo4j.driver.GraphDatabase; -import org.neo4j.driver.QueryRunner; -import org.neo4j.driver.Record; -import org.neo4j.driver.Result; import org.neo4j.driver.Session; -import org.neo4j.driver.async.AsyncSession; -import org.neo4j.driver.async.ResultCursor; -import org.neo4j.driver.exceptions.ClientException; -import org.neo4j.driver.exceptions.ServiceUnavailableException; -import org.neo4j.driver.exceptions.SessionExpiredException; import org.neo4j.driver.integration.NestedQueries; -import org.neo4j.driver.internal.cluster.RoutingSettings; -import org.neo4j.driver.internal.retry.RetrySettings; -import org.neo4j.driver.internal.security.SecurityPlanImpl; -import org.neo4j.driver.internal.util.FakeClock; -import org.neo4j.driver.internal.util.ThrowingMessageEncoder; -import org.neo4j.driver.internal.util.io.ChannelTrackingDriverFactory; -import org.neo4j.driver.summary.ResultSummary; -import org.neo4j.driver.util.cc.Cluster; import org.neo4j.driver.util.cc.ClusterExtension; -import org.neo4j.driver.util.cc.ClusterMember; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.MINUTES; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.junit.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assumptions.assumeFalse; import static org.neo4j.driver.Logging.none; import static org.neo4j.driver.SessionConfig.builder; -import static org.neo4j.driver.Values.parameters; -import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; -import static org.neo4j.driver.internal.util.Matchers.connectionAcquisitionTimeoutError; -import static org.neo4j.driver.util.DaemonThreadFactory.daemon; -import static org.neo4j.driver.util.TestUtil.await; -import static org.neo4j.driver.util.TestUtil.awaitAllFutures; public class CausalClusteringIT implements NestedQueries { - private static final long DEFAULT_TIMEOUT_MS = 120_000; - @RegisterExtension static final ClusterExtension clusterRule = new ClusterExtension(); - private ExecutorService executor; private Driver driver; @Override @@ -115,302 +59,6 @@ void tearDown() { driver.close(); } - - if ( executor != null ) - { - executor.shutdownNow(); - } - } - - @Test - void shouldExecuteReadAndWritesWhenDriverSuppliedWithAddressOfLeader() throws Exception - { - Cluster cluster = clusterRule.getCluster(); - - int count = executeWriteAndReadThroughBolt( cluster.leader() ); - - assertEquals( 1, count ); - } - - @Test - void shouldExecuteReadAndWritesWhenRouterIsDiscovered() throws Exception - { - assertRoutingNotAvailableOnReadReplica(); - - Cluster cluster = clusterRule.getCluster(); - - int count = executeWriteAndReadThroughBoltOnFirstAvailableAddress( cluster.anyReadReplica(), cluster.leader() ); - - assertEquals( 1, count ); - } - - @Test - void shouldExecuteReadAndWritesWhenDriverSuppliedWithAddressOfFollower() throws Exception - { - Cluster cluster = clusterRule.getCluster(); - - int count = executeWriteAndReadThroughBolt( cluster.anyFollower() ); - - assertEquals( 1, count ); - } - - @Test - void shouldDropBrokenOldConnections() throws Exception - { - Cluster cluster = clusterRule.getCluster(); - - int concurrentSessionsCount = 9; - int livenessCheckTimeoutMinutes = 2; - - Config config = Config.builder() - .withConnectionLivenessCheckTimeout( livenessCheckTimeoutMinutes, MINUTES ) - .withLogging( DEV_NULL_LOGGING ) - .build(); - - FakeClock clock = new FakeClock(); - ChannelTrackingDriverFactory driverFactory = new ChannelTrackingDriverFactory( clock ); - - URI routingUri = cluster.getRoutingUri(); - AuthToken auth = clusterRule.getDefaultAuthToken(); - - try ( Driver driver = driverFactory.newInstance( routingUri, auth, RoutingSettings.DEFAULT, RetrySettings.DEFAULT, config, SecurityPlanImpl.insecure() ) ) - { - // create nodes in different threads using different sessions and connections - createNodesInDifferentThreads( concurrentSessionsCount, driver ); - - // now pool contains many channels, make them all invalid - List oldChannels = driverFactory.channels(); - for ( Channel oldChannel : oldChannels ) - { - RuntimeException error = new ServiceUnavailableException( "Unable to reset" ); - oldChannel.pipeline().addLast( ThrowingMessageEncoder.forResetMessage( error ) ); - } - - // move clock forward more than configured liveness check timeout - clock.progress( MINUTES.toMillis( livenessCheckTimeoutMinutes + 1 ) ); - - // now all idle channels should be considered too old and will be verified during acquisition - // they will appear broken because they were closed and new valid connection will be created - try ( Session session = driver.session( builder().withDefaultAccessMode( AccessMode.WRITE ).build() ) ) - { - List records = session.run( "MATCH (n) RETURN count(n)" ).list(); - assertEquals( 1, records.size() ); - assertEquals( concurrentSessionsCount, records.get( 0 ).get( 0 ).asInt() ); - } - - // all old channels failed to reset and should be closed - for ( Channel oldChannel : oldChannels ) - { - assertFalse( oldChannel.isActive() ); - } - } - } - - @Test - void shouldNotReuseReadConnectionForWriteTransaction() - { - Cluster cluster = clusterRule.getCluster(); - - try ( Driver driver = createDriver( cluster.getRoutingUri() ) ) - { - AsyncSession session = driver.asyncSession( builder().withDefaultAccessMode( AccessMode.READ ).build() ); - - CompletionStage> resultsStage = session.runAsync( "RETURN 42" ) - .thenCompose( cursor1 -> - session.writeTransactionAsync( - tx -> tx.runAsync( "CREATE (:Node1) RETURN 42" ) - .thenCompose( - cursor2 -> combineCursors( cursor2, - cursor1 ) ) ) ); - - List results = await( resultsStage ); - assertEquals( 2, results.size() ); - - RecordAndSummary first = results.get( 0 ); - RecordAndSummary second = results.get( 1 ); - - // both auto-commit query and write tx should return 42 - assertEquals( 42, first.record.get( 0 ).asInt() ); - assertEquals( first.record, second.record ); - // they should not use same server - assertNotEquals( first.summary.server().address(), second.summary.server().address() ); - - CompletionStage countStage = - session.readTransactionAsync( tx -> tx.runAsync( "MATCH (n:Node1) RETURN count(n)" ) - .thenCompose( ResultCursor::singleAsync ) ) - .thenApply( record -> record.get( 0 ).asInt() ); - - assertEquals( 1, await( countStage ).intValue() ); - - await( session.closeAsync() ); - } - } - - @Test - void shouldRespectMaxConnectionPoolSizePerClusterMember() - { - Cluster cluster = clusterRule.getCluster(); - - Config config = Config.builder() - .withMaxConnectionPoolSize( 2 ) - .withConnectionAcquisitionTimeout( 42, MILLISECONDS ) - .withLogging( DEV_NULL_LOGGING ) - .build(); - - try ( Driver driver = createDriver( cluster.getRoutingUri(), config ) ) - { - String database = "neo4j"; - Session writeSession1 = - driver.session( builder().withDatabase( database ).withDefaultAccessMode( AccessMode.WRITE ).build() ); - writeSession1.beginTransaction(); - - Session writeSession2 = - driver.session( builder().withDatabase( database ).withDefaultAccessMode( AccessMode.WRITE ).build() ); - writeSession2.beginTransaction(); - - // should not be possible to acquire more connections towards leader because limit is 2 - Session writeSession3 = - driver.session( builder().withDatabase( database ).withDefaultAccessMode( AccessMode.WRITE ).build() ); - ClientException e = assertThrows( ClientException.class, writeSession3::beginTransaction ); - assertThat( e, is( connectionAcquisitionTimeoutError( 42 ) ) ); - - // should be possible to acquire new connection towards read server - // it's a different machine, not leader, so different max connection pool size limit applies - Session readSession = - driver.session( builder().withDatabase( database ).withDefaultAccessMode( AccessMode.READ ).build() ); - Record record = readSession.readTransaction( tx -> tx.run( "RETURN 1" ).single() ); - assertEquals( 1, record.get( 0 ).asInt() ); - } - } - - @Test - void shouldKeepOperatingWhenConnectionsBreak() throws Exception - { - long testRunTimeMs = MINUTES.toMillis( 1 ); - String label = "Person"; - String property = "name"; - String value = "Tony Stark"; - Cluster cluster = clusterRule.getCluster(); - - ChannelTrackingDriverFactory driverFactory = new ChannelTrackingDriverFactory(); - AtomicBoolean stop = new AtomicBoolean(); - executor = newExecutor(); - - Config config = Config.builder() - .withLogging( DEV_NULL_LOGGING ) - .withMaxTransactionRetryTime( testRunTimeMs, MILLISECONDS ) - .build(); - - try ( Driver driver = driverFactory.newInstance( cluster.getRoutingUri(), clusterRule.getDefaultAuthToken(), - RoutingSettings.DEFAULT, RetrySettings.DEFAULT, config, SecurityPlanImpl.insecure() ) ) - { - List> results = new ArrayList<>(); - - // launch writers and readers that use transaction functions and thus should never fail - for ( int i = 0; i < 3; i++ ) - { - results.add( executor.submit( readNodesCallable( driver, label, property, value, stop ) ) ); - } - for ( int i = 0; i < 2; i++ ) - { - results.add( executor.submit( createNodesCallable( driver, label, property, value, stop ) ) ); - } - - // make connections throw while reads and writes are in progress - long deadline = System.currentTimeMillis() + MINUTES.toMillis( 1 ); - while ( System.currentTimeMillis() < deadline && !stop.get() ) - { - List channels = driverFactory.pollChannels(); - for ( Channel channel : channels ) - { - RuntimeException error = new ServiceUnavailableException( "Unable to execute query" ); - channel.pipeline().addLast( ThrowingMessageEncoder.forRunMessage( error ) ); - } - SECONDS.sleep( 10 ); // sleep a bit to allow readers and writers to progress - } - stop.set( true ); - - awaitAllFutures( results ); // readers and writers should stop - assertThat( countNodes( driver.session(), label, property, value ), greaterThan( 0 ) ); // some nodes should be created - } - } - - private void assertRoutingNotAvailableOnReadReplica() - { - driver = createDriver( clusterRule.getCluster().leader().getRoutingUri() ); - assumeFalse( driver.supportsMultiDb() ); - } - - private CompletionStage> combineCursors( ResultCursor cursor1, - ResultCursor cursor2 ) - { - return buildRecordAndSummary( cursor1 ).thenCombine( buildRecordAndSummary( cursor2 ), - ( rs1, rs2 ) -> Arrays.asList( rs1, rs2 ) ); - } - - private CompletionStage buildRecordAndSummary( ResultCursor cursor ) - { - return cursor.singleAsync().thenCompose( record -> - cursor.consumeAsync().thenApply( summary -> new RecordAndSummary( record, summary ) ) ); - } - - private int executeWriteAndReadThroughBolt( ClusterMember member ) throws TimeoutException, InterruptedException - { - try ( Driver driver = createDriver( member.getRoutingUri() ) ) - { - return inExpirableSession( driver, createWritableSession( null ), executeWriteAndRead() ); - } - } - - private int executeWriteAndReadThroughBoltOnFirstAvailableAddress( ClusterMember... members ) throws TimeoutException, InterruptedException - { - List addresses = new ArrayList<>( members.length ); - for ( ClusterMember member : members ) - { - addresses.add( member.getRoutingUri() ); - } - try ( Driver driver = discoverDriver( addresses ) ) - { - return inExpirableSession( driver, createWritableSession( null ), executeWriteAndRead() ); - } - } - - private Function createWritableSession( final Bookmark bookmark ) - { - return driver -> driver.session( builder().withDefaultAccessMode( AccessMode.WRITE ).withBookmarks( bookmark ).build() ); - } - - private Function executeWriteAndRead() - { - return session -> - { - session.run( "MERGE (n:Person {name: 'Jim'})" ).consume(); - Record record = session.run( "MATCH (n:Person) RETURN COUNT(*) AS count" ).next(); - return record.get( "count" ).asInt(); - }; - } - - private T inExpirableSession( Driver driver, Function acquirer, Function op ) - throws TimeoutException, InterruptedException - { - long endTime = System.currentTimeMillis() + DEFAULT_TIMEOUT_MS; - - do - { - try ( Session session = acquirer.apply( driver ) ) - { - return op.apply( session ); - } - catch ( SessionExpiredException | ServiceUnavailableException e ) - { - // role might have changed; try again; - } - - Thread.sleep( 500 ); - } - while ( System.currentTimeMillis() < endTime ); - - throw new TimeoutException( "Transaction did not succeed in time" ); } private Driver createDriver( URI boltUri ) @@ -423,134 +71,8 @@ private Driver createDriver( URI boltUri, Config config ) return GraphDatabase.driver( boltUri, clusterRule.getDefaultAuthToken(), config ); } - private Driver discoverDriver( List routingUris ) - { - return GraphDatabase.routingDriver( routingUris, clusterRule.getDefaultAuthToken(), configWithoutLogging() ); - } - - private static void createNodesInDifferentThreads( int count, final Driver driver ) throws Exception - { - final CountDownLatch beforeRunLatch = new CountDownLatch( count ); - final CountDownLatch runQueryLatch = new CountDownLatch( 1 ); - final ExecutorService executor = newExecutor(); - - for ( int i = 0; i < count; i++ ) - { - executor.submit( () -> - { - beforeRunLatch.countDown(); - try ( Session session = driver.session( builder().withDefaultAccessMode( AccessMode.WRITE ).build() ) ) - { - runQueryLatch.await(); - session.run( "CREATE ()" ); - } - return null; - } ); - } - - beforeRunLatch.await(); - runQueryLatch.countDown(); - - executor.shutdown(); - assertTrue( executor.awaitTermination( 1, TimeUnit.MINUTES ) ); - } - - private static Callable createNodesCallable( Driver driver, String label, String property, String value, AtomicBoolean stop ) - { - return () -> - { - while ( !stop.get() ) - { - try ( Session session = driver.session( builder().withDefaultAccessMode( AccessMode.WRITE ).build() ) ) - { - createNode( session, label, property, value ); - } - catch ( Throwable t ) - { - stop.set( true ); - throw t; - } - } - return null; - }; - } - - private static Callable readNodesCallable( Driver driver, String label, String property, String value, AtomicBoolean stop ) - { - return () -> - { - while ( !stop.get() ) - { - try ( Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ) ) - { - List ids = readNodeIds( session, label, property, value ); - assertNotNull( ids ); - } - catch ( Throwable t ) - { - stop.set( true ); - throw t; - } - } - return null; - }; - } - - private static List readNodeIds( final Session session, final String label, final String property, final String value ) - { - return session.readTransaction( tx -> - { - Result result = tx.run( "MATCH (n:" + label + " {" + property + ": $value}) RETURN n LIMIT 10", - parameters( "value", value ) ); - - return result.list( record -> record.get( 0 ).asNode().id() ); - } ); - } - - private static void createNode( Session session, String label, String property, String value ) - { - session.writeTransaction( tx -> - { - runCreateNode( tx, label, property, value ); - return null; - } ); - } - - private static int countNodes( Session session, String label, String property, String value ) - { - return session.readTransaction( tx -> runCountNodes( tx, label, property, value ) ); - } - - private static Result runCreateNode(QueryRunner queryRunner, String label, String property, String value ) - { - return queryRunner.run( "CREATE (n:" + label + ") SET n." + property + " = $value", parameters( "value", value ) ); - } - - private static int runCountNodes(QueryRunner queryRunner, String label, String property, String value ) - { - Result result = queryRunner.run( "MATCH (n:" + label + " {" + property + ": $value}) RETURN count(n)", parameters( "value", value ) ); - return result.single().get( 0 ).asInt(); - } - private static Config configWithoutLogging() { return Config.builder().withLogging( none() ).build(); } - - private static ExecutorService newExecutor() - { - return Executors.newCachedThreadPool( daemon( CausalClusteringIT.class.getSimpleName() + "-thread-" ) ); - } - - private static class RecordAndSummary - { - final Record record; - final ResultSummary summary; - - RecordAndSummary( Record record, ResultSummary summary ) - { - this.record = record; - this.summary = summary; - } - } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetConnectionPoolMetrics.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetConnectionPoolMetrics.java new file mode 100644 index 0000000000..bc69c22ea2 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetConnectionPoolMetrics.java @@ -0,0 +1,95 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.messages.requests; + +import lombok.Getter; +import lombok.Setter; +import neo4j.org.testkit.backend.TestkitState; +import neo4j.org.testkit.backend.holder.DriverHolder; +import neo4j.org.testkit.backend.messages.responses.ConnectionPoolMetrics; +import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import reactor.core.publisher.Mono; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +import org.neo4j.driver.Metrics; +import org.neo4j.driver.internal.BoltServerAddress; +import org.neo4j.driver.internal.metrics.InternalConnectionPoolMetrics; + +@Getter +@Setter +public class GetConnectionPoolMetrics implements TestkitRequest +{ + private GetConnectionPoolMetricsBody data; + + @Override + public TestkitResponse process( TestkitState testkitState ) + { + return getConnectionPoolMetrics( testkitState ); + } + + @Override + public CompletionStage processAsync( TestkitState testkitState ) + { + return CompletableFuture.completedFuture( getConnectionPoolMetrics( testkitState ) ); + } + + @Override + public Mono processRx( TestkitState testkitState ) + { + return Mono.just( getConnectionPoolMetrics( testkitState ) ); + } + + private ConnectionPoolMetrics getConnectionPoolMetrics( TestkitState testkitState ) + { + DriverHolder driverHolder = testkitState.getDriverHolder( data.getDriverId() ); + Metrics metrics = driverHolder.getDriver().metrics(); + org.neo4j.driver.ConnectionPoolMetrics poolMetrics = + metrics.connectionPoolMetrics().stream() + .map( InternalConnectionPoolMetrics.class::cast ) + .filter( pm -> + { + BoltServerAddress address = new BoltServerAddress( data.getAddress() ); + BoltServerAddress poolAddress = pm.getAddress(); + return address.host().equals( poolAddress.host() ) && address.port() == poolAddress.port(); + } ) + .findFirst() + .orElseThrow( () -> new IllegalArgumentException( String.format( "Pool metrics for %s are not available", data.getAddress() ) ) ); + return createResponse( poolMetrics ); + } + + private ConnectionPoolMetrics createResponse( org.neo4j.driver.ConnectionPoolMetrics poolMetrics ) + { + return ConnectionPoolMetrics.builder() + .data( ConnectionPoolMetrics.ConnectionPoolMetricsBody.builder() + .inUse( poolMetrics.inUse() ) + .idle( poolMetrics.idle() ) + .build() ) + .build(); + } + + @Setter + @Getter + public static class GetConnectionPoolMetricsBody + { + private String driverId; + private String address; + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java index 83ae45ed19..06609ca817 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java @@ -52,7 +52,11 @@ public class GetFeatures implements TestkitRequest "Temporary:FastFailingDiscovery", "Feature:TLS:1.1", "Feature:TLS:1.2", - "Feature:API:SSLSchemes" + "Feature:API:SSLSchemes", + "Feature:API:Liveness.Check", + "Temporary:DriverMaxConnectionPoolSize", + "Temporary:ConnectionAcquisitionTimeout", + "Temporary:GetConnectionPoolMetrics" ) ); private static final Set SYNC_FEATURES = new HashSet<>( Arrays.asList( diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java index cbe1e5208b..96a19ae7af 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java @@ -107,6 +107,12 @@ public TestkitResponse process( TestkitState testkitState ) RetrySettings retrySettings = Optional.ofNullable( data.maxTxRetryTimeMs ) .map( RetrySettings::new ) .orElse( RetrySettings.DEFAULT ); + Optional.ofNullable( data.livenessCheckTimeoutMs ) + .ifPresent( timeout -> configBuilder.withConnectionLivenessCheckTimeout( timeout, TimeUnit.MILLISECONDS ) ); + Optional.ofNullable( data.maxConnectionPoolSize ).ifPresent( configBuilder::withMaxConnectionPoolSize ); + Optional.ofNullable( data.connectionAcquisitionTimeoutMs ) + .ifPresent( timeout -> configBuilder.withConnectionAcquisitionTimeout( timeout, TimeUnit.MILLISECONDS ) ); + configBuilder.withDriverMetrics(); org.neo4j.driver.Driver driver; Config config = configBuilder.build(); try @@ -254,6 +260,9 @@ public static class NewDriverBody private Long connectionTimeoutMs; private Integer fetchSize; private Long maxTxRetryTimeMs; + private Long livenessCheckTimeoutMs; + private Integer maxConnectionPoolSize; + private Long connectionAcquisitionTimeoutMs; } @RequiredArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java index dbf65f3871..101c6c902e 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java @@ -70,6 +70,7 @@ public Mono processRx( TestkitState testkitState ) private Summary createResponse( org.neo4j.driver.summary.ResultSummary summary ) { Summary.ServerInfo serverInfo = Summary.ServerInfo.builder() + .address( summary.server().address() ) .protocolVersion( summary.server().protocolVersion() ) .agent( summary.server().agent() ) .build(); diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java index 96ba563352..64a3558560 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java @@ -40,7 +40,7 @@ @JsonSubTypes.Type( DomainNameResolutionCompleted.class ), @JsonSubTypes.Type( StartTest.class ), @JsonSubTypes.Type( TransactionRollback.class ), @JsonSubTypes.Type( GetFeatures.class ), @JsonSubTypes.Type( GetRoutingTable.class ), @JsonSubTypes.Type( TransactionClose.class ), - @JsonSubTypes.Type( ResultList.class ) + @JsonSubTypes.Type( ResultList.class ), @JsonSubTypes.Type( GetConnectionPoolMetrics.class ) } ) public interface TestkitRequest { diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/ConnectionPoolMetrics.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/ConnectionPoolMetrics.java new file mode 100644 index 0000000000..fbbee3989c --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/ConnectionPoolMetrics.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.messages.responses; + +import lombok.Builder; +import lombok.Getter; + +@Getter +@Builder +public class ConnectionPoolMetrics implements TestkitResponse +{ + private final ConnectionPoolMetricsBody data; + + @Override + public String testkitName() + { + return "ConnectionPoolMetrics"; + } + + @Getter + @Builder + public static class ConnectionPoolMetricsBody + { + private int inUse; + private int idle; + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/Summary.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/Summary.java index ac5e0421b0..a8d14aa1e2 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/Summary.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/Summary.java @@ -44,6 +44,8 @@ public static class SummaryBody @Builder public static class ServerInfo { + private String address; + private String protocolVersion; private String agent;