From c1f2f84feb8d44aa3801ded1a0f544aabdd5bccc Mon Sep 17 00:00:00 2001 From: lutovich Date: Mon, 18 Sep 2017 22:41:31 +0200 Subject: [PATCH 1/8] Simplify FixedRetryLogic This is a test-only implementation of `RetryLogic` interface that retries up to a specified amount of times. It previously had a separate implementation which was basically a simplified version of `ExponentialBackoffRetryLogic`. It is problematic for following `retryAsync` functionality because implementation will not be that simple. This commit makes `FixedRetryLogic` extend `ExponentialBackoffRetryLogic` and override just a single method. So that they can share complicated retrying code. --- .../retry/ExponentialBackoffRetryLogic.java | 14 +++++----- .../internal/retry/FixedRetryLogic.java | 26 ++++++------------- 2 files changed, 15 insertions(+), 25 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java index a30290e42b..519197308c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java +++ b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java @@ -109,6 +109,13 @@ public T retry( Supplier work ) } } + protected boolean canRetryOn( Throwable error ) + { + return error instanceof SessionExpiredException || + error instanceof ServiceUnavailableException || + isTransientError( error ); + } + private long computeDelayWithJitter( long delayMs ) { long jitter = (long) (delayMs * jitterFactor); @@ -154,13 +161,6 @@ private void verifyAfterConstruction() } } - private static boolean canRetryOn( Throwable error ) - { - return error instanceof SessionExpiredException || - error instanceof ServiceUnavailableException || - isTransientError( error ); - } - private static boolean isTransientError( Throwable error ) { if ( error instanceof TransientException ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/retry/FixedRetryLogic.java b/driver/src/test/java/org/neo4j/driver/internal/retry/FixedRetryLogic.java index 2b6235bf43..fe3bc713e9 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/retry/FixedRetryLogic.java +++ b/driver/src/test/java/org/neo4j/driver/internal/retry/FixedRetryLogic.java @@ -18,34 +18,24 @@ */ package org.neo4j.driver.internal.retry; -import org.neo4j.driver.internal.util.Supplier; +import org.neo4j.driver.internal.util.SleeplessClock; -public class FixedRetryLogic implements RetryLogic +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; + +public class FixedRetryLogic extends ExponentialBackoffRetryLogic { private final int retryCount; - private int invokedWork; + private int invocationCount; public FixedRetryLogic( int retryCount ) { + super( new RetrySettings( Long.MAX_VALUE ), new SleeplessClock(), DEV_NULL_LOGGING ); this.retryCount = retryCount; } @Override - public T retry( Supplier work ) + protected boolean canRetryOn( Throwable error ) { - while ( true ) - { - try - { - return work.get(); - } - catch ( Throwable error ) - { - if ( invokedWork++ >= retryCount ) - { - throw error; - } - } - } + return invocationCount++ < retryCount; } } From af7c2a661170a803e1905b5c2b9a55275ff613c8 Mon Sep 17 00:00:00 2001 From: lutovich Date: Tue, 19 Sep 2017 13:52:00 +0200 Subject: [PATCH 2/8] Added async retries to RetryLogic This commit makes retry logic able to retry work that executes async operations and return a `Future`. It's a first step towards supporting async transaction functions. Also renamed future combinator methods to better represent their intent. --- .../neo4j/driver/internal/DriverFactory.java | 16 +- .../driver/internal/ExplicitTransaction.java | 9 +- .../neo4j/driver/internal/NetworkSession.java | 17 +- .../neo4j/driver/internal/async/Futures.java | 19 +- .../driver/internal/async/InternalFuture.java | 5 +- .../internal/async/InternalPromise.java | 7 +- .../retry/ExponentialBackoffRetryLogic.java | 129 +++++- .../driver/internal/retry/RetryLogic.java | 3 + .../driver/internal/util/BiConsumer.java | 24 ++ .../loadbalancing/LoadBalancerTest.java | 4 +- .../ExponentialBackoffRetryLogicTest.java | 394 +++++++++++++++++- .../internal/retry/FixedRetryLogic.java | 11 +- .../DriverFactoryWithFixedRetryLogic.java | 7 +- .../internal/util/TrackingEventExecutor.java | 250 +++++++++++ 14 files changed, 840 insertions(+), 55 deletions(-) create mode 100644 driver/src/main/java/org/neo4j/driver/internal/util/BiConsumer.java create mode 100644 driver/src/test/java/org/neo4j/driver/internal/util/TrackingEventExecutor.java diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java index 45ad5e0b8a..04ded634f4 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java @@ -19,6 +19,7 @@ package org.neo4j.driver.internal; import io.netty.bootstrap.Bootstrap; +import io.netty.util.concurrent.EventExecutorGroup; import java.io.IOException; import java.net.URI; @@ -72,9 +73,12 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r RoutingSettings newRoutingSettings = routingSettings.withRoutingContext( new RoutingContext( uri ) ); SecurityPlan securityPlan = createSecurityPlan( address, config ); ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config ); - RetryLogic retryLogic = createRetryLogic( retrySettings, config.logging() ); - AsyncConnectionPool asyncConnectionPool = createAsyncConnectionPool( authToken, securityPlan, config ); + Bootstrap bootstrap = BootstrapFactory.newBootstrap(); + RetryLogic retryLogic = createRetryLogic( retrySettings, bootstrap.config().group(), config.logging() ); + + AsyncConnectionPool asyncConnectionPool = createAsyncConnectionPool( authToken, securityPlan, bootstrap, + config ); try { @@ -98,14 +102,13 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r } private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, SecurityPlan securityPlan, - Config config ) + Bootstrap bootstrap, Config config ) { Clock clock = createClock(); ConnectionSettings connectionSettings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() ); ActiveChannelTracker activeChannelTracker = new ActiveChannelTracker( config.logging() ); AsyncConnectorImpl connector = new AsyncConnectorImpl( connectionSettings, securityPlan, activeChannelTracker, config.logging(), clock ); - Bootstrap bootstrap = BootstrapFactory.newBootstrap(); PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize(), config.idleTimeBeforeConnectionTest(), config.maxConnectionLifetimeMillis(), config.maxConnectionPoolSize(), @@ -250,9 +253,10 @@ protected SessionFactory createSessionFactory( ConnectionProvider connectionProv *

* This method is protected only for testing */ - protected RetryLogic createRetryLogic( RetrySettings settings, Logging logging ) + protected RetryLogic createRetryLogic( RetrySettings settings, EventExecutorGroup eventExecutorGroup, + Logging logging ) { - return new ExponentialBackoffRetryLogic( settings, createClock(), logging ); + return new ExponentialBackoffRetryLogic( settings, eventExecutorGroup, createClock(), logging ); } private static SecurityPlan createSecurityPlan( BoltServerAddress address, Config config ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index 8d0ff5a364..7551c88373 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -33,6 +33,7 @@ import org.neo4j.driver.internal.handlers.RollbackTxResponseHandler; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.types.InternalTypeSystem; +import org.neo4j.driver.internal.util.BiConsumer; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Response; import org.neo4j.driver.v1.Statement; @@ -219,7 +220,7 @@ public Response commitAsync() return internalCommitAsync(); } - private InternalFuture internalCommitAsync() + InternalFuture internalCommitAsync() { if ( state == State.COMMITTED ) { @@ -259,12 +260,12 @@ else if ( state == State.ROLLED_BACK ) } } - private Runnable releaseConnectionAndNotifySession() + private BiConsumer releaseConnectionAndNotifySession() { - return new Runnable() + return new BiConsumer() { @Override - public void run() + public void accept( Void result, Throwable error ) { asyncConnection.release(); session.asyncTransactionClosed( ExplicitTransaction.this ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index 51881a2c2b..ed7b4b584e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -153,7 +153,7 @@ public Response runAsync( final Statement statement ) InternalFuture connectionFuture = acquireAsyncConnection( mode ); - return connectionFuture.thenCombine( new Function>() + return connectionFuture.thenCompose( new Function>() { @Override public InternalFuture apply( AsyncConnection connection ) @@ -240,7 +240,7 @@ public Response closeAsync() { if ( asyncConnectionFuture != null ) { - return asyncConnectionFuture.thenCombine( new Function>() + return asyncConnectionFuture.thenCompose( new Function>() { @Override public InternalFuture apply( AsyncConnection connection ) @@ -251,7 +251,7 @@ public InternalFuture apply( AsyncConnection connection ) } else if ( currentAsyncTransactionFuture != null ) { - return currentAsyncTransactionFuture.thenCombine( new Function>() + return currentAsyncTransactionFuture.thenCompose( new Function>() { @Override public InternalFuture apply( ExplicitTransaction tx ) @@ -283,7 +283,8 @@ public synchronized Transaction beginTransaction( String bookmark ) @Override public Response beginTransactionAsync() { - return beginTransactionAsync( mode ); + //noinspection unchecked + return (Response) beginTransactionAsync( mode ); } @Override @@ -412,14 +413,14 @@ private synchronized Transaction beginTransaction( AccessMode mode ) return currentTransaction; } - private synchronized Response beginTransactionAsync( AccessMode mode ) + private synchronized InternalFuture beginTransactionAsync( AccessMode mode ) { ensureSessionIsOpen(); ensureNoOpenTransactionBeforeOpeningTransaction(); InternalFuture connectionFuture = acquireAsyncConnection( mode ); - currentAsyncTransactionFuture = connectionFuture.thenCombine( + currentAsyncTransactionFuture = connectionFuture.thenCompose( new Function>() { @Override @@ -431,7 +432,7 @@ public InternalFuture apply( AsyncConnection connection ) } ); //noinspection unchecked - return (Response) currentAsyncTransactionFuture; + return currentAsyncTransactionFuture; } private void ensureNoUnrecoverableError() @@ -495,7 +496,7 @@ private InternalFuture acquireAsyncConnection( final AccessMode // memorize in local so same instance is transformed and used in callbacks final InternalFuture currentAsyncConnectionFuture = asyncConnectionFuture; - asyncConnectionFuture = currentAsyncConnectionFuture.thenCombine( + asyncConnectionFuture = currentAsyncConnectionFuture.thenCompose( new Function>() { @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/Futures.java b/driver/src/main/java/org/neo4j/driver/internal/async/Futures.java index 8f14c7d182..04b0240eee 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/Futures.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/Futures.java @@ -23,6 +23,7 @@ import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; +import org.neo4j.driver.internal.util.BiConsumer; import org.neo4j.driver.v1.util.Function; public final class Futures @@ -45,14 +46,14 @@ public static InternalFuture thenApply( Future future, Bootstrap bo return result; } - public static InternalFuture thenCombine( InternalFuture future, Function> fn ) + public static InternalFuture thenCompose( InternalFuture future, Function> fn ) { InternalPromise result = new InternalPromise<>( future.eventExecutor() ); - future.addListener( new ThenCombineListener<>( result, fn ) ); + future.addListener( new ThenComposeListener<>( result, fn ) ); return result; } - public static InternalFuture whenComplete( InternalFuture future, Runnable action ) + public static InternalFuture whenComplete( InternalFuture future, BiConsumer action ) { InternalPromise result = new InternalPromise<>( future.eventExecutor() ); future.addListener( new CompletionListener<>( result, action ) ); @@ -97,12 +98,12 @@ else if ( future.isSuccess() ) } } - private static class ThenCombineListener implements GenericFutureListener> + private static class ThenComposeListener implements GenericFutureListener> { final Promise result; final Function> fn; - ThenCombineListener( Promise result, Function> fn ) + ThenComposeListener( Promise result, Function> fn ) { this.result = result; this.fn = fn; @@ -168,9 +169,9 @@ else if ( future.isSuccess() ) private static class CompletionListener implements GenericFutureListener> { final Promise result; - final Runnable action; + final BiConsumer action; - CompletionListener( Promise result, Runnable action ) + CompletionListener( Promise result, BiConsumer action ) { this.result = result; this.action = action; @@ -187,7 +188,7 @@ else if ( future.isSuccess() ) { try { - action.run(); + action.accept( future.getNow(), null ); result.setSuccess( future.getNow() ); } catch ( Throwable t ) @@ -200,7 +201,7 @@ else if ( future.isSuccess() ) Throwable error = future.cause(); try { - action.run(); + action.accept( null, error ); } catch ( Throwable t ) { diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalFuture.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalFuture.java index 54245824be..6444384fe3 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalFuture.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalFuture.java @@ -21,6 +21,7 @@ import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; +import org.neo4j.driver.internal.util.BiConsumer; import org.neo4j.driver.v1.Response; import org.neo4j.driver.v1.util.Function; @@ -30,7 +31,7 @@ public interface InternalFuture extends Future, Response InternalFuture thenApply( Function fn ); - InternalFuture thenCombine( Function> fn ); + InternalFuture thenCompose( Function> fn ); - InternalFuture whenComplete( Runnable action ); + InternalFuture whenComplete( BiConsumer action ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalPromise.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalPromise.java index dd1db10afd..1e952773b3 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalPromise.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalPromise.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.neo4j.driver.internal.util.BiConsumer; import org.neo4j.driver.v1.ResponseListener; import org.neo4j.driver.v1.util.Function; @@ -61,13 +62,13 @@ public InternalFuture thenApply( Function fn ) } @Override - public InternalFuture thenCombine( Function> fn ) + public InternalFuture thenCompose( Function> fn ) { - return Futures.thenCombine( this, fn ); + return Futures.thenCompose( this, fn ); } @Override - public InternalFuture whenComplete( Runnable action ) + public InternalFuture whenComplete( BiConsumer action ) { return Futures.whenComplete( this, action ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java index 519197308c..6fa8142cf3 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java +++ b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java @@ -18,10 +18,18 @@ */ package org.neo4j.driver.internal.retry; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; + import java.util.ArrayList; import java.util.List; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import org.neo4j.driver.internal.async.InternalFuture; +import org.neo4j.driver.internal.async.InternalPromise; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.Supplier; import org.neo4j.driver.v1.Logger; @@ -41,27 +49,31 @@ public class ExponentialBackoffRetryLogic implements RetryLogic private static final long INITIAL_RETRY_DELAY_MS = SECONDS.toMillis( 1 ); private static final double RETRY_DELAY_MULTIPLIER = 2.0; private static final double RETRY_DELAY_JITTER_FACTOR = 0.2; + private static final long MAX_RETRY_DELAY = Long.MAX_VALUE / 2; private final long maxRetryTimeMs; private final long initialRetryDelayMs; private final double multiplier; private final double jitterFactor; + private final EventExecutorGroup eventExecutorGroup; private final Clock clock; private final Logger log; - public ExponentialBackoffRetryLogic( RetrySettings settings, Clock clock, Logging logging ) + public ExponentialBackoffRetryLogic( RetrySettings settings, EventExecutorGroup eventExecutorGroup, Clock clock, + Logging logging ) { this( settings.maxRetryTimeMs(), INITIAL_RETRY_DELAY_MS, RETRY_DELAY_MULTIPLIER, RETRY_DELAY_JITTER_FACTOR, - clock, logging ); + eventExecutorGroup, clock, logging ); } ExponentialBackoffRetryLogic( long maxRetryTimeMs, long initialRetryDelayMs, double multiplier, - double jitterFactor, Clock clock, Logging logging ) + double jitterFactor, EventExecutorGroup eventExecutorGroup, Clock clock, Logging logging ) { this.maxRetryTimeMs = maxRetryTimeMs; this.initialRetryDelayMs = initialRetryDelayMs; this.multiplier = multiplier; this.jitterFactor = jitterFactor; + this.eventExecutorGroup = eventExecutorGroup; this.clock = clock; this.log = logging.getLog( RETRY_LOGIC_LOG_NAME ); @@ -95,7 +107,7 @@ public T retry( Supplier work ) if ( elapsedTime < maxRetryTimeMs ) { long delayWithJitterMs = computeDelayWithJitter( nextDelayMs ); - log.error( "Transaction failed and will be retried in " + delayWithJitterMs + "ms", error ); + log.warn( "Transaction failed and will be retried in " + delayWithJitterMs + "ms", error ); sleep( delayWithJitterMs ); nextDelayMs = (long) (nextDelayMs * multiplier); @@ -109,6 +121,14 @@ public T retry( Supplier work ) } } + @Override + public InternalFuture retryAsync( Supplier> work ) + { + InternalPromise result = new InternalPromise<>( eventExecutorGroup.next() ); + executeWorkInEventLoop( result, work ); + return result; + } + protected boolean canRetryOn( Throwable error ) { return error instanceof SessionExpiredException || @@ -116,8 +136,109 @@ protected boolean canRetryOn( Throwable error ) isTransientError( error ); } + private void executeWorkInEventLoop( final InternalPromise result, final Supplier> work ) + { + // this is the very first time we execute given work + EventExecutor eventExecutor = eventExecutorGroup.next(); + + eventExecutor.execute( new Runnable() + { + @Override + public void run() + { + executeWork( result, work, -1, initialRetryDelayMs, null ); + } + } ); + } + + private void retryWorkInEventLoop( final InternalPromise result, final Supplier> work, + final Throwable error, final long startTime, final long delayMs, final List errors ) + { + // work has failed before, we need to schedule retry with the given delay + EventExecutor eventExecutor = eventExecutorGroup.next(); + + long delayWithJitterMs = computeDelayWithJitter( delayMs ); + log.warn( "Async transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error ); + + eventExecutor.schedule( new Runnable() + { + @Override + public void run() + { + long newRetryDelayMs = (long) (delayMs * multiplier); + executeWork( result, work, startTime, newRetryDelayMs, errors ); + } + }, delayWithJitterMs, TimeUnit.MILLISECONDS ); + } + + private void executeWork( final InternalPromise result, final Supplier> work, + final long startTime, final long retryDelayMs, final List errors ) + { + InternalFuture workFuture; + try + { + workFuture = work.get(); + } + catch ( Throwable error ) + { + // work failed in a sync way, attempt to schedule a retry + retryOnError( result, work, startTime, retryDelayMs, error, errors ); + return; + } + + workFuture.addListener( new FutureListener() + { + @Override + public void operationComplete( Future future ) + { + if ( future.isCancelled() ) + { + result.cancel( true ); + } + else if ( future.isSuccess() ) + { + result.setSuccess( future.getNow() ); + } + else + { + // work failed in async way, attempt to schedule a retry + retryOnError( result, work, startTime, retryDelayMs, future.cause(), errors ); + } + } + } ); + } + + private void retryOnError( InternalPromise result, Supplier> work, long startTime, + long retryDelayMs, Throwable error, List errors ) + { + if ( canRetryOn( error ) ) + { + long currentTime = clock.millis(); + if ( startTime == -1 ) + { + startTime = currentTime; + } + + long elapsedTime = currentTime - startTime; + if ( elapsedTime < maxRetryTimeMs ) + { + errors = recordError( error, errors ); + retryWorkInEventLoop( result, work, error, startTime, retryDelayMs, errors ); + return; + } + } + + addSuppressed( error, errors ); + result.setFailure( error ); + } + private long computeDelayWithJitter( long delayMs ) { + if ( delayMs > MAX_RETRY_DELAY ) + { + delayMs = MAX_RETRY_DELAY; + } + long jitter = (long) (delayMs * jitterFactor); long min = delayMs - jitter; long max = delayMs + jitter; diff --git a/driver/src/main/java/org/neo4j/driver/internal/retry/RetryLogic.java b/driver/src/main/java/org/neo4j/driver/internal/retry/RetryLogic.java index d37b2b3055..5f6569b87d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/retry/RetryLogic.java +++ b/driver/src/main/java/org/neo4j/driver/internal/retry/RetryLogic.java @@ -18,9 +18,12 @@ */ package org.neo4j.driver.internal.retry; +import org.neo4j.driver.internal.async.InternalFuture; import org.neo4j.driver.internal.util.Supplier; public interface RetryLogic { T retry( Supplier work ); + + InternalFuture retryAsync( Supplier> work ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/BiConsumer.java b/driver/src/main/java/org/neo4j/driver/internal/util/BiConsumer.java new file mode 100644 index 0000000000..a9c09139db --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/util/BiConsumer.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.util; + +public interface BiConsumer +{ + void accept( T t, U u ); +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java index 8cec050acc..4f00ecbc6c 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.internal.cluster.loadbalancing; +import io.netty.util.concurrent.GlobalEventExecutor; import org.junit.Test; import org.mockito.InOrder; import org.mockito.invocation.InvocationOnMock; @@ -384,7 +385,8 @@ private LoadBalancer setupLoadBalancer( PooledConnection writerConn, PooledConne private static Session newSession( LoadBalancer loadBalancer ) { SleeplessClock clock = new SleeplessClock(); - RetryLogic retryLogic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, clock, DEV_NULL_LOGGING ); + RetryLogic retryLogic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, GlobalEventExecutor.INSTANCE, + clock, DEV_NULL_LOGGING ); return new NetworkSession( loadBalancer, AccessMode.WRITE, retryLogic, DEV_NULL_LOGGING ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java b/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java index e6916c474c..be0dd58a27 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java @@ -23,10 +23,13 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutionException; -import org.neo4j.driver.internal.logging.DevNullLogging; +import org.neo4j.driver.internal.async.InternalFuture; +import org.neo4j.driver.internal.async.InternalPromise; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.Supplier; +import org.neo4j.driver.internal.util.TrackingEventExecutor; import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; @@ -51,9 +54,13 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; +import static org.neo4j.driver.v1.util.TestUtil.await; public class ExponentialBackoffRetryLogicTest { + private final TrackingEventExecutor eventExecutor = new TrackingEventExecutor(); + @Test public void throwsForIllegalMaxRetryTime() { @@ -155,6 +162,24 @@ public void nextDelayCalculatedAccordingToMultiplier() throws Exception assertEquals( delaysWithoutJitter( initialDelay, multiplier, retries ), sleepValues( clock, retries ) ); } + @Test + public void nextDelayCalculatedAccordingToMultiplierAsync() throws Exception + { + String result = "The Result"; + int retries = 14; + int initialDelay = 1; + int multiplier = 2; + int noJitter = 0; + + ExponentialBackoffRetryLogic retryLogic = newRetryLogic( MAX_VALUE, initialDelay, multiplier, noJitter, + Clock.SYSTEM ); + + InternalFuture future = retryAsync( retryLogic, retries, result ); + + assertEquals( result, future.get() ); + assertEquals( delaysWithoutJitter( initialDelay, multiplier, retries ), eventExecutor.scheduleDelays() ); + } + @Test public void nextDelayCalculatedAccordingToJitter() throws Exception { @@ -170,16 +195,29 @@ public void nextDelayCalculatedAccordingToJitter() throws Exception List sleepValues = sleepValues( clock, retries ); List delaysWithoutJitter = delaysWithoutJitter( initialDelay, multiplier, retries ); - assertEquals( delaysWithoutJitter.size(), sleepValues.size() ); - for ( int i = 0; i < sleepValues.size(); i++ ) - { - double sleepValue = sleepValues.get( i ).doubleValue(); - long delayWithoutJitter = delaysWithoutJitter.get( i ); - double jitter = delayWithoutJitter * jitterFactor; + assertDelaysApproximatelyEqual( delaysWithoutJitter, sleepValues, jitterFactor ); + } - assertThat( sleepValue, closeTo( delayWithoutJitter, jitter ) ); - } + @Test + public void nextDelayCalculatedAccordingToJitterAsync() throws Exception + { + String result = "The Result"; + int retries = 24; + double jitterFactor = 0.2; + int initialDelay = 1; + int multiplier = 2; + + ExponentialBackoffRetryLogic retryLogic = newRetryLogic( MAX_VALUE, initialDelay, multiplier, jitterFactor, + mock( Clock.class ) ); + + InternalFuture future = retryAsync( retryLogic, retries, result ); + assertEquals( result, future.get() ); + + List scheduleDelays = eventExecutor.scheduleDelays(); + List delaysWithoutJitter = delaysWithoutJitter( initialDelay, multiplier, retries ); + + assertDelaysApproximatelyEqual( delaysWithoutJitter, scheduleDelays, jitterFactor ); } @Test @@ -215,6 +253,44 @@ public void doesNotRetryWhenMaxRetryTimeExceeded() throws Exception verify( workMock, times( 3 ) ).get(); } + @Test + public void doesNotRetryWhenMaxRetryTimeExceededAsync() throws Exception + { + long retryStart = Clock.SYSTEM.millis(); + int initialDelay = 100; + int multiplier = 2; + long maxRetryTimeMs = 45; + Clock clock = mock( Clock.class ); + when( clock.millis() ).thenReturn( retryStart ) + .thenReturn( retryStart + maxRetryTimeMs - 5 ) + .thenReturn( retryStart + maxRetryTimeMs + 7 ); + + ExponentialBackoffRetryLogic retryLogic = newRetryLogic( maxRetryTimeMs, initialDelay, multiplier, 0, clock ); + + Supplier> workMock = newWorkMock(); + SessionExpiredException error = sessionExpired(); + when( workMock.get() ).thenReturn( failedFuture( error ) ); + + InternalFuture future = retryLogic.retryAsync( workMock ); + + try + { + await( future ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertEquals( error, e ); + } + + List scheduleDelays = eventExecutor.scheduleDelays(); + assertEquals( 2, scheduleDelays.size() ); + assertEquals( initialDelay, scheduleDelays.get( 0 ).intValue() ); + assertEquals( initialDelay * multiplier, scheduleDelays.get( 1 ).intValue() ); + + verify( workMock, times( 3 ) ).get(); + } + @Test public void sleepsOnServiceUnavailableException() throws Exception { @@ -231,6 +307,26 @@ public void sleepsOnServiceUnavailableException() throws Exception verify( clock ).sleep( 42 ); } + @Test + public void schedulesRetryOnServiceUnavailableException() throws Exception + { + String result = "The Result"; + Clock clock = mock( Clock.class ); + + ExponentialBackoffRetryLogic retryLogic = newRetryLogic( 1, 42, 1, 0, clock ); + + Supplier> workMock = newWorkMock(); + SessionExpiredException error = sessionExpired(); + when( workMock.get() ).thenReturn( failedFuture( error ) ).thenReturn( succeededFuture( result ) ); + + assertEquals( result, await( retryLogic.retryAsync( workMock ) ) ); + + verify( workMock, times( 2 ) ).get(); + List scheduleDelays = eventExecutor.scheduleDelays(); + assertEquals( 1, scheduleDelays.size() ); + assertEquals( 42, scheduleDelays.get( 0 ).intValue() ); + } + @Test public void sleepsOnSessionExpiredException() throws Exception { @@ -247,6 +343,26 @@ public void sleepsOnSessionExpiredException() throws Exception verify( clock ).sleep( 4242 ); } + @Test + public void schedulesRetryOnSessionExpiredException() throws Exception + { + String result = "The Result"; + Clock clock = mock( Clock.class ); + + ExponentialBackoffRetryLogic retryLogic = newRetryLogic( 1, 4242, 1, 0, clock ); + + Supplier> workMock = newWorkMock(); + SessionExpiredException error = sessionExpired(); + when( workMock.get() ).thenReturn( failedFuture( error ) ).thenReturn( succeededFuture( result ) ); + + assertEquals( result, await( retryLogic.retryAsync( workMock ) ) ); + + verify( workMock, times( 2 ) ).get(); + List scheduleDelays = eventExecutor.scheduleDelays(); + assertEquals( 1, scheduleDelays.size() ); + assertEquals( 4242, scheduleDelays.get( 0 ).intValue() ); + } + @Test public void sleepsOnTransientException() throws Exception { @@ -263,6 +379,26 @@ public void sleepsOnTransientException() throws Exception verify( clock ).sleep( 23 ); } + @Test + public void schedulesRetryOnTransientException() throws Exception + { + String result = "The Result"; + Clock clock = mock( Clock.class ); + + ExponentialBackoffRetryLogic retryLogic = newRetryLogic( 1, 23, 1, 0, clock ); + + Supplier> workMock = newWorkMock(); + TransientException error = transientException(); + when( workMock.get() ).thenReturn( failedFuture( error ) ).thenReturn( succeededFuture( result ) ); + + assertEquals( result, await( retryLogic.retryAsync( workMock ) ) ); + + verify( workMock, times( 2 ) ).get(); + List scheduleDelays = eventExecutor.scheduleDelays(); + assertEquals( 1, scheduleDelays.size() ); + assertEquals( 23, scheduleDelays.get( 0 ).intValue() ); + } + @Test public void throwsWhenUnknownError() throws Exception { @@ -287,6 +423,31 @@ public void throwsWhenUnknownError() throws Exception verify( clock, never() ).sleep( anyLong() ); } + @Test + public void doesNotRetryOnUnknownError() + { + Clock clock = mock( Clock.class ); + + ExponentialBackoffRetryLogic retryLogic = newRetryLogic( 1, 1, 1, 1, clock ); + + Supplier> workMock = newWorkMock(); + IllegalStateException error = new IllegalStateException(); + when( workMock.get() ).thenReturn( failedFuture( error ) ); + + try + { + await( retryLogic.retryAsync( workMock ) ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertEquals( error, e ); + } + + verify( workMock ).get(); + assertEquals( 0, eventExecutor.scheduleDelays().size() ); + } + @Test public void throwsWhenTransactionTerminatedError() throws Exception { @@ -311,6 +472,31 @@ public void throwsWhenTransactionTerminatedError() throws Exception verify( clock, never() ).sleep( 13 ); } + @Test + public void doesNotRetryOnTransactionTerminatedError() + { + Clock clock = mock( Clock.class ); + + ExponentialBackoffRetryLogic retryLogic = newRetryLogic( 1, 13, 1, 0, clock ); + + Supplier> workMock = newWorkMock(); + TransientException error = new TransientException( "Neo.TransientError.Transaction.Terminated", "" ); + when( workMock.get() ).thenReturn( failedFuture( error ) ); + + try + { + await( retryLogic.retryAsync( workMock ) ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertEquals( error, e ); + } + + verify( workMock ).get(); + assertEquals( 0, eventExecutor.scheduleDelays().size() ); + } + @Test public void throwsWhenTransactionLockClientStoppedError() throws Exception { @@ -335,6 +521,31 @@ public void throwsWhenTransactionLockClientStoppedError() throws Exception verify( clock, never() ).sleep( 13 ); } + @Test + public void doesNotRetryOnTransactionLockClientStoppedError() + { + Clock clock = mock( Clock.class ); + + ExponentialBackoffRetryLogic retryLogic = newRetryLogic( 1, 15, 1, 0, clock ); + + Supplier> workMock = newWorkMock(); + TransientException error = new TransientException( "Neo.TransientError.Transaction.LockClientStopped", "" ); + when( workMock.get() ).thenReturn( failedFuture( error ) ); + + try + { + await( retryLogic.retryAsync( workMock ) ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertEquals( error, e ); + } + + verify( workMock ).get(); + assertEquals( 0, eventExecutor.scheduleDelays().size() ); + } + @Test public void throwsWhenSleepInterrupted() throws Exception { @@ -402,6 +613,56 @@ public void collectsSuppressedErrors() throws Exception verify( clock ).sleep( initialDelay * multiplier * multiplier ); } + @Test + public void collectsSuppressedErrorsAsync() throws Exception + { + String result = "The Result"; + long maxRetryTime = 20; + int initialDelay = 15; + int multiplier = 2; + Clock clock = mock( Clock.class ); + when( clock.millis() ).thenReturn( 0L ).thenReturn( 10L ).thenReturn( 15L ).thenReturn( 25L ); + + ExponentialBackoffRetryLogic retryLogic = newRetryLogic( maxRetryTime, initialDelay, multiplier, 0, clock ); + + Supplier> workMock = newWorkMock(); + SessionExpiredException error1 = sessionExpired(); + SessionExpiredException error2 = sessionExpired(); + ServiceUnavailableException error3 = serviceUnavailable(); + TransientException error4 = transientException(); + + when( workMock.get() ).thenReturn( failedFuture( error1 ) ) + .thenReturn( failedFuture( error2 ) ) + .thenReturn( failedFuture( error3 ) ) + .thenReturn( failedFuture( error4 ) ) + .thenReturn( succeededFuture( result ) ); + + try + { + retryLogic.retryAsync( workMock ).get(); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( ExecutionException.class ) ); + Throwable cause = e.getCause(); + assertEquals( error4, cause ); + Throwable[] suppressed = cause.getSuppressed(); + assertEquals( 3, suppressed.length ); + assertEquals( error1, suppressed[0] ); + assertEquals( error2, suppressed[1] ); + assertEquals( error3, suppressed[2] ); + } + + verify( workMock, times( 4 ) ).get(); + + List scheduleDelays = eventExecutor.scheduleDelays(); + assertEquals( 3, scheduleDelays.size() ); + assertEquals( initialDelay, scheduleDelays.get( 0 ).intValue() ); + assertEquals( initialDelay * multiplier, scheduleDelays.get( 1 ).intValue() ); + assertEquals( initialDelay * multiplier * multiplier, scheduleDelays.get( 2 ).intValue() ); + } + @Test public void doesNotCollectSuppressedErrorsWhenSameErrorIsThrown() throws Exception { @@ -434,6 +695,43 @@ public void doesNotCollectSuppressedErrorsWhenSameErrorIsThrown() throws Excepti verify( clock ).sleep( initialDelay * multiplier ); } + @Test + public void doesNotCollectSuppressedErrorsWhenSameErrorIsThrownAsync() throws Exception + { + long maxRetryTime = 20; + int initialDelay = 15; + int multiplier = 2; + Clock clock = mock( Clock.class ); + when( clock.millis() ).thenReturn( 0L ).thenReturn( 10L ).thenReturn( 25L ); + + ExponentialBackoffRetryLogic retryLogic = newRetryLogic( maxRetryTime, initialDelay, multiplier, 0, clock ); + + Supplier> workMock = newWorkMock(); + SessionExpiredException error = sessionExpired(); + when( workMock.get() ).thenReturn( failedFuture( error ) ); + + try + { + retryLogic.retryAsync( workMock ).get(); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( ExecutionException.class ) ); + Throwable cause = e.getCause(); + + assertEquals( error, cause ); + assertEquals( 0, cause.getSuppressed().length ); + } + + verify( workMock, times( 3 ) ).get(); + + List scheduleDelays = eventExecutor.scheduleDelays(); + assertEquals( 2, scheduleDelays.size() ); + assertEquals( initialDelay, scheduleDelays.get( 0 ).intValue() ); + assertEquals( initialDelay * multiplier, scheduleDelays.get( 1 ).intValue() ); + } + @Test public void eachRetryIsLogged() { @@ -442,16 +740,38 @@ public void eachRetryIsLogged() Logging logging = mock( Logging.class ); Logger logger = mock( Logger.class ); when( logging.getLog( anyString() ) ).thenReturn( logger ); - ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, clock, logging ); + ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, + clock, logging ); retry( logic, retries ); - verify( logger, times( retries ) ).error( + verify( logger, times( retries ) ).warn( startsWith( "Transaction failed and will be retried" ), any( ServiceUnavailableException.class ) ); } + @Test + public void eachAsyncRetryIsLogged() + { + String result = "The Result"; + int retries = 9; + Clock clock = mock( Clock.class ); + Logging logging = mock( Logging.class ); + Logger logger = mock( Logger.class ); + when( logging.getLog( anyString() ) ).thenReturn( logger ); + + ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, + clock, logging ); + + assertEquals( result, await( retryAsync( logic, retries, result ) ) ); + + verify( logger, times( retries ) ).warn( + startsWith( "Async transaction failed and is scheduled to retry" ), + any( ServiceUnavailableException.class ) + ); + } + private static void retry( ExponentialBackoffRetryLogic retryLogic, final int times ) { retryLogic.retry( new Supplier() @@ -471,6 +791,26 @@ public Void get() } ); } + private InternalFuture retryAsync( ExponentialBackoffRetryLogic retryLogic, final int times, + final Object result ) + { + return retryLogic.retryAsync( new Supplier>() + { + int invoked; + + @Override + public InternalFuture get() + { + if ( invoked < times ) + { + invoked++; + return failedFuture( serviceUnavailable() ); + } + return succeededFuture( result ); + } + } ); + } + private static List delaysWithoutJitter( long initialDelay, double multiplier, int count ) { List values = new ArrayList<>(); @@ -491,11 +831,21 @@ private static List sleepValues( Clock clockMock, int expectedCount ) thro return captor.getAllValues(); } - private static ExponentialBackoffRetryLogic newRetryLogic( long maxRetryTimeMs, long initialRetryDelayMs, + private ExponentialBackoffRetryLogic newRetryLogic( long maxRetryTimeMs, long initialRetryDelayMs, double multiplier, double jitterFactor, Clock clock ) { - return new ExponentialBackoffRetryLogic( maxRetryTimeMs, initialRetryDelayMs, multiplier, jitterFactor, clock, - DevNullLogging.DEV_NULL_LOGGING ); + return new ExponentialBackoffRetryLogic( maxRetryTimeMs, initialRetryDelayMs, multiplier, jitterFactor, + eventExecutor, clock, DEV_NULL_LOGGING ); + } + + private InternalFuture succeededFuture( Object value ) + { + return new InternalPromise<>( eventExecutor ).setSuccess( value ); + } + + private InternalFuture failedFuture( Throwable error ) + { + return new InternalPromise<>( eventExecutor ).setFailure( error ); } private static ServiceUnavailableException serviceUnavailable() @@ -514,8 +864,22 @@ private static TransientException transientException() } @SuppressWarnings( "unchecked" ) - private static Supplier newWorkMock() + private static Supplier newWorkMock() { return mock( Supplier.class ); } + + private static void assertDelaysApproximatelyEqual( List expectedDelays, List actualDelays, + double delta ) + { + assertEquals( expectedDelays.size(), actualDelays.size() ); + + for ( int i = 0; i < actualDelays.size(); i++ ) + { + double actualValue = actualDelays.get( i ).doubleValue(); + long expectedValue = expectedDelays.get( i ); + + assertThat( actualValue, closeTo( expectedValue, expectedValue * delta ) ); + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/retry/FixedRetryLogic.java b/driver/src/test/java/org/neo4j/driver/internal/retry/FixedRetryLogic.java index fe3bc713e9..a142500e6b 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/retry/FixedRetryLogic.java +++ b/driver/src/test/java/org/neo4j/driver/internal/retry/FixedRetryLogic.java @@ -18,6 +18,9 @@ */ package org.neo4j.driver.internal.retry; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.GlobalEventExecutor; + import org.neo4j.driver.internal.util.SleeplessClock; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; @@ -29,7 +32,13 @@ public class FixedRetryLogic extends ExponentialBackoffRetryLogic public FixedRetryLogic( int retryCount ) { - super( new RetrySettings( Long.MAX_VALUE ), new SleeplessClock(), DEV_NULL_LOGGING ); + this( retryCount, GlobalEventExecutor.INSTANCE ); + } + + public FixedRetryLogic( int retryCount, EventExecutorGroup eventExecutorGroup ) + { + super( new RetrySettings( Long.MAX_VALUE ), eventExecutorGroup, new SleeplessClock(), + DEV_NULL_LOGGING ); this.retryCount = retryCount; } diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/DriverFactoryWithFixedRetryLogic.java b/driver/src/test/java/org/neo4j/driver/internal/util/DriverFactoryWithFixedRetryLogic.java index 4d8482b00c..65792e3012 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/DriverFactoryWithFixedRetryLogic.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/DriverFactoryWithFixedRetryLogic.java @@ -18,6 +18,8 @@ */ package org.neo4j.driver.internal.util; +import io.netty.util.concurrent.EventExecutorGroup; + import org.neo4j.driver.internal.retry.FixedRetryLogic; import org.neo4j.driver.internal.retry.RetryLogic; import org.neo4j.driver.internal.retry.RetrySettings; @@ -34,8 +36,9 @@ public DriverFactoryWithFixedRetryLogic( int retryCount ) } @Override - protected RetryLogic createRetryLogic( RetrySettings settings, Logging logging ) + protected RetryLogic createRetryLogic( RetrySettings settings, EventExecutorGroup eventExecutorGroup, + Logging logging ) { - return new FixedRetryLogic( retryCount ); + return new FixedRetryLogic( retryCount, eventExecutorGroup ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/TrackingEventExecutor.java b/driver/src/test/java/org/neo4j/driver/internal/util/TrackingEventExecutor.java new file mode 100644 index 0000000000..b357f67c7d --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/util/TrackingEventExecutor.java @@ -0,0 +1,250 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.util; + +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.concurrent.ProgressivePromise; +import io.netty.util.concurrent.Promise; +import io.netty.util.concurrent.ScheduledFuture; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static java.util.Collections.unmodifiableList; +import static org.mockito.Mockito.mock; + +public class TrackingEventExecutor implements EventExecutor +{ + private final EventExecutor delegate; + private final List scheduleDelays; + + public TrackingEventExecutor() + { + this( GlobalEventExecutor.INSTANCE ); + } + + public TrackingEventExecutor( EventExecutor delegate ) + { + this.delegate = delegate; + this.scheduleDelays = new CopyOnWriteArrayList<>(); + } + + public List scheduleDelays() + { + return unmodifiableList( scheduleDelays ); + } + + @Override + public EventExecutor next() + { + return this; + } + + @Override + public EventExecutorGroup parent() + { + return this; + } + + @Override + public boolean inEventLoop() + { + return delegate.inEventLoop(); + } + + @Override + public boolean inEventLoop( Thread thread ) + { + return delegate.inEventLoop( thread ); + } + + @Override + public Promise newPromise() + { + return delegate.newPromise(); + } + + @Override + public ProgressivePromise newProgressivePromise() + { + return delegate.newProgressivePromise(); + } + + @Override + public Future newSucceededFuture( V result ) + { + return delegate.newSucceededFuture( result ); + } + + @Override + public Future newFailedFuture( Throwable cause ) + { + return delegate.newFailedFuture( cause ); + } + + @Override + public boolean isShuttingDown() + { + return delegate.isShuttingDown(); + } + + @Override + public Future shutdownGracefully() + { + return delegate.shutdownGracefully(); + } + + @Override + public Future shutdownGracefully( long quietPeriod, long timeout, TimeUnit unit ) + { + return delegate.shutdownGracefully( quietPeriod, timeout, unit ); + } + + @Override + public Future terminationFuture() + { + return delegate.terminationFuture(); + } + + @Override + @Deprecated + public void shutdown() + { + delegate.shutdown(); + } + + @Override + @Deprecated + public List shutdownNow() + { + return delegate.shutdownNow(); + } + + @Override + public Iterator iterator() + { + return delegate.iterator(); + } + + @Override + public Future submit( Runnable task ) + { + return delegate.submit( task ); + } + + @Override + public Future submit( Runnable task, T result ) + { + return delegate.submit( task, result ); + } + + @Override + public Future submit( Callable task ) + { + return delegate.submit( task ); + } + + @Override + public ScheduledFuture schedule( Runnable command, long delay, TimeUnit unit ) + { + scheduleDelays.add( unit.toMillis( delay ) ); + delegate.execute( command ); + return mock( ScheduledFuture.class ); + } + + @Override + public ScheduledFuture schedule( Callable callable, long delay, TimeUnit unit ) + { + throw new UnsupportedOperationException(); + } + + @Override + public ScheduledFuture scheduleAtFixedRate( Runnable command, long initialDelay, long period, + TimeUnit unit ) + { + throw new UnsupportedOperationException(); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, + TimeUnit unit ) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isShutdown() + { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() + { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination( long timeout, TimeUnit unit ) throws InterruptedException + { + return delegate.awaitTermination( timeout, unit ); + } + + @Override + public List> invokeAll( Collection> tasks ) + throws InterruptedException + { + return delegate.invokeAll( tasks ); + } + + @Override + public List> invokeAll( Collection> tasks, long timeout, + TimeUnit unit ) throws InterruptedException + { + return delegate.invokeAll( tasks, timeout, unit ); + } + + @Override + public T invokeAny( Collection> tasks ) throws InterruptedException, ExecutionException + { + return delegate.invokeAny( tasks ); + } + + @Override + public T invokeAny( Collection> tasks, long timeout, TimeUnit unit ) + throws InterruptedException, ExecutionException, TimeoutException + { + return delegate.invokeAny( tasks, timeout, unit ); + } + + @Override + public void execute( Runnable command ) + { + delegate.execute( command ); + } +} From be6225dbe3a35242aec2ab7d04612380d78ba4e2 Mon Sep 17 00:00:00 2001 From: lutovich Date: Wed, 20 Sep 2017 00:16:41 +0200 Subject: [PATCH 3/8] Added async transaction functions Via `Session#readTransactionAsync()` and `#writeTransactionAsync()`. Both accept a single parameter - async function that takes a transaction and returns a `Response`. When function returns a failed response or throws exception it will be retried. Retries only happen on non-fatal & transient errors. API is not fully complete yet - there is no way to create a `Result` object so it'll be inconvenient to use. Helpers to create some sort of "settable" response will come in subsequent commits. --- .../neo4j/driver/internal/DriverFactory.java | 32 ++-- .../internal/LeakLoggingNetworkSession.java | 6 +- .../neo4j/driver/internal/NetworkSession.java | 131 ++++++++++++++- .../driver/internal/SessionFactoryImpl.java | 13 +- .../internal/async/InternalPromise.java | 8 +- .../retry/ExponentialBackoffRetryLogic.java | 2 +- .../java/org/neo4j/driver/v1/Session.java | 4 + .../driver/internal/DriverFactoryTest.java | 9 +- .../LeakLoggingNetworkSessionTest.java | 3 +- .../driver/internal/NetworkSessionTest.java | 4 +- .../driver/internal/RoutingDriverTest.java | 5 +- .../internal/SessionFactoryImplTest.java | 4 +- .../loadbalancing/LoadBalancerTest.java | 3 +- .../driver/v1/integration/SessionAsyncIT.java | 154 +++++++++++++++++- .../driver/v1/util/TestNeo4jSession.java | 12 ++ 15 files changed, 355 insertions(+), 35 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java index 04ded634f4..527874e44f 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java @@ -19,6 +19,7 @@ package org.neo4j.driver.internal; import io.netty.bootstrap.Bootstrap; +import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.EventExecutorGroup; import java.io.IOException; @@ -75,7 +76,8 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config ); Bootstrap bootstrap = BootstrapFactory.newBootstrap(); - RetryLogic retryLogic = createRetryLogic( retrySettings, bootstrap.config().group(), config.logging() ); + EventLoopGroup eventLoopGroup = bootstrap.config().group(); + RetryLogic retryLogic = createRetryLogic( retrySettings, eventLoopGroup, config.logging() ); AsyncConnectionPool asyncConnectionPool = createAsyncConnectionPool( authToken, securityPlan, bootstrap, config ); @@ -83,7 +85,7 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r try { return createDriver( uri, address, connectionPool, config, newRoutingSettings, securityPlan, retryLogic, - asyncConnectionPool ); + asyncConnectionPool, eventLoopGroup ); } catch ( Throwable driverError ) { @@ -119,16 +121,18 @@ private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, Secu private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool, Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, - RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool ) + RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool, EventExecutorGroup eventExecutorGroup ) { String scheme = uri.getScheme().toLowerCase(); switch ( scheme ) { case BOLT_URI_SCHEME: assertNoRoutingContext( uri, routingSettings ); - return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool ); + return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool, + eventExecutorGroup ); case BOLT_ROUTING_URI_SCHEME: - return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic ); + return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic, + eventExecutorGroup ); default: throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) ); } @@ -140,11 +144,13 @@ private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool * This method is protected only for testing */ protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config, - SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool ) + SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool, + EventExecutorGroup eventExecutorGroup ) { ConnectionProvider connectionProvider = new DirectConnectionProvider( address, connectionPool, asyncConnectionPool ); - SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config ); + SessionFactory sessionFactory = + createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config ); return createDriver( config, securityPlan, sessionFactory ); } @@ -154,14 +160,16 @@ protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool c * This method is protected only for testing */ protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool, - Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic ) + Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic, + EventExecutorGroup eventExecutorGroup ) { if ( !securityPlan.isRoutingCompatible() ) { throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" ); } ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, config, routingSettings ); - SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config ); + SessionFactory sessionFactory = + createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config ); return createDriver( config, securityPlan, sessionFactory ); } @@ -242,10 +250,10 @@ protected Connector createConnector( final ConnectionSettings connectionSettings *

* This method is protected only for testing */ - protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider, - RetryLogic retryLogic, Config config ) + protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider, RetryLogic retryLogic, + EventExecutorGroup eventExecutorGroup, Config config ) { - return new SessionFactoryImpl( connectionProvider, retryLogic, config ); + return new SessionFactoryImpl( connectionProvider, retryLogic, eventExecutorGroup, config ); } /** diff --git a/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java index eeecf4f0fe..63420be0e8 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java @@ -18,6 +18,8 @@ */ package org.neo4j.driver.internal; +import io.netty.util.concurrent.EventExecutorGroup; + import org.neo4j.driver.internal.retry.RetryLogic; import org.neo4j.driver.internal.spi.ConnectionProvider; import org.neo4j.driver.v1.AccessMode; @@ -30,9 +32,9 @@ class LeakLoggingNetworkSession extends NetworkSession private final String stackTrace; LeakLoggingNetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic, - Logging logging ) + EventExecutorGroup eventExecutorGroup, Logging logging ) { - super( connectionProvider, mode, retryLogic, logging ); + super( connectionProvider, mode, retryLogic, eventExecutorGroup, logging ); this.stackTrace = captureStackTrace(); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index ed7b4b584e..918f68a8cb 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -18,7 +18,9 @@ */ package org.neo4j.driver.internal; -import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -40,6 +42,7 @@ import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Response; +import org.neo4j.driver.v1.ResponseListener; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.Statement; import org.neo4j.driver.v1.StatementResult; @@ -61,6 +64,7 @@ public class NetworkSession implements Session, SessionResourcesHandler, ResultR private final ConnectionProvider connectionProvider; private final AccessMode mode; private final RetryLogic retryLogic; + private final EventExecutorGroup eventExecutorGroup; protected final Logger logger; private volatile Bookmark bookmark = Bookmark.empty(); @@ -73,11 +77,12 @@ public class NetworkSession implements Session, SessionResourcesHandler, ResultR private final AtomicBoolean isOpen = new AtomicBoolean( true ); public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic, - Logging logging ) + EventExecutorGroup eventExecutorGroup, Logging logging ) { this.connectionProvider = connectionProvider; this.mode = mode; this.retryLogic = retryLogic; + this.eventExecutorGroup = eventExecutorGroup; this.logger = new DelegatingLogger( logging.getLog( LOG_NAME ), String.valueOf( hashCode() ) ); } @@ -262,7 +267,7 @@ public InternalFuture apply( ExplicitTransaction tx ) } else { - return new InternalPromise( GlobalEventExecutor.INSTANCE ).setSuccess( null ); + return new InternalPromise( eventExecutorGroup ).setSuccess( null ); } } @@ -293,12 +298,24 @@ public T readTransaction( TransactionWork work ) return transaction( AccessMode.READ, work ); } + @Override + public Response readTransactionAsync( TransactionWork> work ) + { + return transactionAsync( AccessMode.READ, work ); + } + @Override public T writeTransaction( TransactionWork work ) { return transaction( AccessMode.WRITE, work ); } + @Override + public Response writeTransactionAsync( TransactionWork> work ) + { + return transactionAsync( AccessMode.WRITE, work ); + } + void setBookmark( Bookmark bookmark ) { if ( bookmark != null && !bookmark.isEmpty() ) @@ -398,6 +415,113 @@ public T get() } ); } + private InternalFuture transactionAsync( final AccessMode mode, final TransactionWork> work ) + { + return retryLogic.retryAsync( new Supplier>() + { + @Override + public InternalFuture get() + { + final InternalFuture txFuture = beginTransactionAsync( mode ); + final InternalPromise resultPromise = new InternalPromise<>( txFuture.eventExecutor() ); + + txFuture.addListener( new FutureListener() + { + @Override + public void operationComplete( Future future ) throws Exception + { + if ( future.isCancelled() ) + { + resultPromise.cancel( true ); + } + else if ( future.isSuccess() ) + { + executeWork( resultPromise, future.getNow(), work ); + } + else + { + resultPromise.setFailure( future.cause() ); + } + } + } ); + + return resultPromise; + } + } ); + } + + private void executeWork( final InternalPromise resultPromise, final ExplicitTransaction tx, + TransactionWork> work ) + { + Response workResponse = work.execute( tx ); + workResponse.addListener( new ResponseListener() + { + @Override + public void operationCompleted( T result, Throwable error ) + { + if ( error != null ) + { + rollbackTxAfterFailedTransactionWork( tx, resultPromise, error ); + } + else + { + commitTxAfterSucceededTransactionWork( tx, resultPromise, result ); + } + } + } ); + } + + private void rollbackTxAfterFailedTransactionWork( ExplicitTransaction tx, + final InternalPromise resultPromise, final Throwable error ) + { + if ( tx.isOpen() ) + { + tx.rollbackAsync().addListener( new ResponseListener() + { + @Override + public void operationCompleted( Void ignore, Throwable rollbackError ) + { + if ( rollbackError != null ) + { + error.addSuppressed( rollbackError ); + } + resultPromise.setFailure( error ); + } + } ); + } + else + { + resultPromise.setFailure( error ); + } + } + + private void commitTxAfterSucceededTransactionWork( ExplicitTransaction tx, + final InternalPromise resultPromise, final T result ) + { + if ( tx.isOpen() ) + { + tx.commitAsync().addListener( new ResponseListener() + { + @Override + public void operationCompleted( Void ignore, Throwable commitError ) + { + if ( commitError != null ) + { + resultPromise.setFailure( commitError ); + } + else + { + resultPromise.setSuccess( result ); + } + } + } ); + } + else + { + resultPromise.setSuccess( result ); + } + } + private synchronized Transaction beginTransaction( AccessMode mode ) { ensureSessionIsOpen(); @@ -431,7 +555,6 @@ public InternalFuture apply( AsyncConnection connection ) } } ); - //noinspection unchecked return currentAsyncTransactionFuture; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java b/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java index 6d633019ec..cc9c7050ac 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java @@ -18,6 +18,8 @@ */ package org.neo4j.driver.internal; +import io.netty.util.concurrent.EventExecutorGroup; + import org.neo4j.driver.internal.retry.RetryLogic; import org.neo4j.driver.internal.spi.ConnectionProvider; import org.neo4j.driver.v1.AccessMode; @@ -29,12 +31,15 @@ public class SessionFactoryImpl implements SessionFactory { private final ConnectionProvider connectionProvider; private final RetryLogic retryLogic; + private final EventExecutorGroup eventExecutorGroup; private final Logging logging; private final boolean leakedSessionsLoggingEnabled; - SessionFactoryImpl( ConnectionProvider connectionProvider, RetryLogic retryLogic, Config config ) + SessionFactoryImpl( ConnectionProvider connectionProvider, RetryLogic retryLogic, + EventExecutorGroup eventExecutorGroup, Config config ) { this.connectionProvider = connectionProvider; + this.eventExecutorGroup = eventExecutorGroup; this.leakedSessionsLoggingEnabled = config.logLeakedSessions(); this.retryLogic = retryLogic; this.logging = config.logging(); @@ -51,9 +56,9 @@ public final Session newInstance( AccessMode mode, Bookmark bookmark ) protected NetworkSession createSession( ConnectionProvider connectionProvider, RetryLogic retryLogic, AccessMode mode, Logging logging ) { - return leakedSessionsLoggingEnabled ? - new LeakLoggingNetworkSession( connectionProvider, mode, retryLogic, logging ) : - new NetworkSession( connectionProvider, mode, retryLogic, logging ); + return leakedSessionsLoggingEnabled + ? new LeakLoggingNetworkSession( connectionProvider, mode, retryLogic, eventExecutorGroup, logging ) + : new NetworkSession( connectionProvider, mode, retryLogic, eventExecutorGroup, logging ); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalPromise.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalPromise.java index 1e952773b3..6a48065b4e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalPromise.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalPromise.java @@ -20,6 +20,7 @@ import io.netty.bootstrap.Bootstrap; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.GenericFutureListener; @@ -40,7 +41,12 @@ public class InternalPromise implements InternalFuture, Promise public InternalPromise( Bootstrap bootstrap ) { - this( bootstrap.config().group().next() ); + this( bootstrap.config().group() ); + } + + public InternalPromise( EventExecutorGroup eventExecutorGroup ) + { + this( eventExecutorGroup.next() ); } public InternalPromise( EventExecutor eventExecutor ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java index 6fa8142cf3..a1d675a005 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java +++ b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java @@ -124,7 +124,7 @@ public T retry( Supplier work ) @Override public InternalFuture retryAsync( Supplier> work ) { - InternalPromise result = new InternalPromise<>( eventExecutorGroup.next() ); + InternalPromise result = new InternalPromise<>( eventExecutorGroup ); executeWorkInEventLoop( result, work ); return result; } diff --git a/driver/src/main/java/org/neo4j/driver/v1/Session.java b/driver/src/main/java/org/neo4j/driver/v1/Session.java index a9674128cf..a3c9e3e61e 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Session.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Session.java @@ -96,6 +96,8 @@ public interface Session extends Resource, StatementRunner */ T readTransaction( TransactionWork work ); + Response readTransactionAsync( TransactionWork> work ); + /** * Execute given unit of work in a {@link AccessMode#WRITE write} transaction. *

@@ -108,6 +110,8 @@ public interface Session extends Resource, StatementRunner */ T writeTransaction( TransactionWork work ); + Response writeTransactionAsync( TransactionWork> work ); + /** * Return the bookmark received following the last completed * {@linkplain Transaction transaction}. If no bookmark was received diff --git a/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java b/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java index dd7be8391b..7a89922175 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.internal; +import io.netty.util.concurrent.EventExecutorGroup; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -169,7 +170,8 @@ protected InternalDriver createDriver( Config config, SecurityPlan securityPlan, @Override protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config, - RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic ) + RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic, + EventExecutorGroup eventExecutorGroup ) { throw new UnsupportedOperationException( "Can't create routing driver" ); } @@ -200,9 +202,10 @@ protected LoadBalancer createLoadBalancer( BoltServerAddress address, Connection @Override protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider, - RetryLogic retryLogic, Config config ) + RetryLogic retryLogic, EventExecutorGroup eventExecutorGroup, Config config ) { - SessionFactory sessionFactory = super.createSessionFactory( connectionProvider, retryLogic, config ); + SessionFactory sessionFactory = + super.createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config ); capturedSessionFactory = sessionFactory; return sessionFactory; } diff --git a/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionTest.java index 1249acb047..6188e4d51b 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionTest.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.internal; +import io.netty.util.concurrent.GlobalEventExecutor; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -99,7 +100,7 @@ private static void finalize( Session session ) throws Exception private static LeakLoggingNetworkSession newSession( Logging logging, boolean openConnection ) { return new LeakLoggingNetworkSession( connectionProviderMock( openConnection ), READ, - new FixedRetryLogic( 0 ), logging ); + new FixedRetryLogic( 0 ), GlobalEventExecutor.INSTANCE, logging ); } private static ConnectionProvider connectionProviderMock( final boolean openConnection ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java index 132a4a826f..23343fff11 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.internal; +import io.netty.util.concurrent.GlobalEventExecutor; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -1010,7 +1011,8 @@ private static NetworkSession newSession( ConnectionProvider connectionProvider, private static NetworkSession newSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic, Bookmark bookmark ) { - NetworkSession session = new NetworkSession( connectionProvider, mode, retryLogic, DEV_NULL_LOGGING ); + NetworkSession session = new NetworkSession( connectionProvider, mode, retryLogic, + GlobalEventExecutor.INSTANCE, DEV_NULL_LOGGING ); session.setBookmark( bookmark ); return session; } diff --git a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java index 6212951034..5e6a61800d 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.internal; +import io.netty.util.concurrent.GlobalEventExecutor; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -439,7 +440,7 @@ private static class NetworkSessionWithAddressFactory extends SessionFactoryImpl { NetworkSessionWithAddressFactory( ConnectionProvider connectionProvider, Config config ) { - super( connectionProvider, new FixedRetryLogic( 0 ), config ); + super( connectionProvider, new FixedRetryLogic( 0 ), GlobalEventExecutor.INSTANCE, config ); } @Override @@ -456,7 +457,7 @@ private static class NetworkSessionWithAddress extends NetworkSession NetworkSessionWithAddress( ConnectionProvider connectionProvider, AccessMode mode, Logging logging ) { - super( connectionProvider, mode, new FixedRetryLogic( 0 ), logging ); + super( connectionProvider, mode, new FixedRetryLogic( 0 ), GlobalEventExecutor.INSTANCE, logging ); try ( PooledConnection connection = connectionProvider.acquireConnection( mode ) ) { this.address = connection.boltServerAddress(); diff --git a/driver/src/test/java/org/neo4j/driver/internal/SessionFactoryImplTest.java b/driver/src/test/java/org/neo4j/driver/internal/SessionFactoryImplTest.java index fd27004032..fdf07758d2 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/SessionFactoryImplTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/SessionFactoryImplTest.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.internal; +import io.netty.util.concurrent.GlobalEventExecutor; import org.junit.Test; import org.neo4j.driver.internal.retry.FixedRetryLogic; @@ -61,6 +62,7 @@ public void createsLeakLoggingNetworkSessions() private static SessionFactory newSessionFactory( Config config ) { - return new SessionFactoryImpl( mock( ConnectionProvider.class ), new FixedRetryLogic( 0 ), config ); + return new SessionFactoryImpl( mock( ConnectionProvider.class ), new FixedRetryLogic( 0 ), + GlobalEventExecutor.INSTANCE, config ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java index 4f00ecbc6c..f80e26c9b3 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java @@ -387,7 +387,8 @@ private static Session newSession( LoadBalancer loadBalancer ) SleeplessClock clock = new SleeplessClock(); RetryLogic retryLogic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, GlobalEventExecutor.INSTANCE, clock, DEV_NULL_LOGGING ); - return new NetworkSession( loadBalancer, AccessMode.WRITE, retryLogic, DEV_NULL_LOGGING ); + return new NetworkSession( loadBalancer, AccessMode.WRITE, retryLogic, GlobalEventExecutor.INSTANCE, + DEV_NULL_LOGGING ); } private static PooledConnection newConnectionWithFailingSync( BoltServerAddress address ) diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java index 4edfff25ac..aa47e33940 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java @@ -28,19 +28,27 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.neo4j.driver.internal.async.InternalPromise; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Response; import org.neo4j.driver.v1.ResponseListener; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.Statement; import org.neo4j.driver.v1.StatementResultCursor; +import org.neo4j.driver.v1.Transaction; +import org.neo4j.driver.v1.TransactionWork; import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; +import org.neo4j.driver.v1.exceptions.SessionExpiredException; +import org.neo4j.driver.v1.exceptions.TransientException; import org.neo4j.driver.v1.summary.ResultSummary; import org.neo4j.driver.v1.summary.StatementType; import org.neo4j.driver.v1.types.Node; @@ -71,13 +79,13 @@ public class SessionAsyncIT private Session session; @Before - public void setUp() throws Exception + public void setUp() { session = neo4j.driver().session(); } @After - public void tearDown() throws Exception + public void tearDown() { await( session.closeAsync() ); } @@ -348,6 +356,56 @@ public void shouldExposeResultSummaryForProfileQuery() assertThat( summary.resultConsumedAfter( TimeUnit.MILLISECONDS ), greaterThanOrEqualTo( 0L ) ); } + @Test + public void shouldRunAsyncTransactionWithoutRetries() + { + InvocationTrackingWork work = new InvocationTrackingWork( "CREATE (:Apa) RETURN 42" ); + Response txResponse = session.writeTransactionAsync( work ); + + Record record = await( txResponse ); + assertNotNull( record ); + assertEquals( 42L, record.get( 0 ).asLong() ); + + assertEquals( 1, work.invocationCount() ); + assertEquals( 1, countNodesByLabel( "Apa" ) ); + } + + @Test + public void shouldRunAsyncTransactionWithRetries() + { + List failures = Arrays.asList( new ServiceUnavailableException( "Oh!" ), + new SessionExpiredException( "Ah!" ), new TransientException( "Code", "Message" ) ); + InvocationTrackingWork work = new InvocationTrackingWork( "CREATE (:Node) RETURN 24", failures ); + Response txResponse = session.writeTransactionAsync( work ); + + Record record = await( txResponse ); + assertNotNull( record ); + assertEquals( 24L, record.get( 0 ).asLong() ); + + assertEquals( 4, work.invocationCount() ); + assertEquals( 1, countNodesByLabel( "Node" ) ); + } + + @Test + public void shouldRunAsyncTransactionThatCanNotBeRetried() + { + InvocationTrackingWork work = new InvocationTrackingWork( "UNWIND [10, 5, 0] AS x CREATE (:Hi) RETURN 10/x" ); + Response txResponse = session.writeTransactionAsync( work ); + + try + { + await( txResponse ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( ClientException.class ) ); + } + + assertEquals( 1, work.invocationCount() ); + assertEquals( 0, countNodesByLabel( "Hi" ) ); + } + private Future>> runNestedQueries( StatementResultCursor inputCursor ) { Promise>> resultPromise = GlobalEventExecutor.INSTANCE.newPromise(); @@ -412,6 +470,11 @@ public void operationCompleted( StatementResultCursor result, Throwable error ) } ); } + private long countNodesByLabel( String label ) + { + return session.run( "MATCH (n:" + label + ") RETURN count(n)" ).single().get( 0 ).asLong(); + } + private static void assertSyntaxError( Exception e ) { assertThat( e, instanceOf( ClientException.class ) ); @@ -457,4 +520,91 @@ void killDb() } } } + + private static class InvocationTrackingWork implements TransactionWork> + { + final String query; + final Iterator failures; + final AtomicInteger invocationCount; + + InvocationTrackingWork( String query ) + { + this( query, Collections.emptyList() ); + } + + InvocationTrackingWork( String query, List failures ) + { + this.query = query; + this.failures = failures.iterator(); + this.invocationCount = new AtomicInteger(); + } + + int invocationCount() + { + return invocationCount.get(); + } + + @Override + public Response execute( Transaction tx ) + { + invocationCount.incrementAndGet(); + + final InternalPromise resultPromise = new InternalPromise<>( GlobalEventExecutor.INSTANCE ); + + tx.runAsync( query ).addListener( new ResponseListener() + { + @Override + public void operationCompleted( final StatementResultCursor cursor, Throwable error ) + { + processQueryResult( cursor, error, resultPromise ); + } + } ); + + return resultPromise; + } + + private void processQueryResult( final StatementResultCursor cursor, final Throwable error, + final InternalPromise resultPromise ) + { + if ( error != null ) + { + resultPromise.setFailure( error ); + return; + } + + cursor.fetchAsync().addListener( new ResponseListener() + { + @Override + public void operationCompleted( Boolean recordAvailable, Throwable error ) + { + processFetchResult( recordAvailable, error, resultPromise, cursor ); + } + } ); + } + + private void processFetchResult( Boolean recordAvailable, Throwable error, + InternalPromise resultPromise, StatementResultCursor cursor ) + { + if ( error != null ) + { + resultPromise.setFailure( error ); + return; + } + + if ( !recordAvailable ) + { + resultPromise.setFailure( new AssertionError( "Record not available" ) ); + return; + } + + if ( failures.hasNext() ) + { + resultPromise.setFailure( failures.next() ); + } + else + { + resultPromise.setSuccess( cursor.current() ); + } + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4jSession.java b/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4jSession.java index 429c5f62e8..e171f7d937 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4jSession.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4jSession.java @@ -134,12 +134,24 @@ public T readTransaction( TransactionWork work ) return realSession.readTransaction( work ); } + @Override + public Response readTransactionAsync( TransactionWork> work ) + { + return realSession.readTransactionAsync( work ); + } + @Override public T writeTransaction( TransactionWork work ) { return realSession.writeTransaction( work ); } + @Override + public Response writeTransactionAsync( TransactionWork> work ) + { + return realSession.writeTransactionAsync( work ); + } + @Override public String lastBookmark() { From 62cd3976a4a308f4d5ec87140dacd246d676e06a Mon Sep 17 00:00:00 2001 From: lutovich Date: Wed, 20 Sep 2017 14:12:16 +0200 Subject: [PATCH 4/8] Better handle sync failures in async tx functions Given work might fail by throwing exception or by returning a failed future. This commit makes retries gracefully handle exceptions. Previously retrying code would only log an exception and hang. --- .../neo4j/driver/internal/NetworkSession.java | 18 +++- .../driver/v1/integration/SessionAsyncIT.java | 85 ++++++++++++++++--- .../org/neo4j/driver/v1/util/TestUtil.java | 8 +- 3 files changed, 90 insertions(+), 21 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index 918f68a8cb..4a39bab241 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -453,7 +453,7 @@ else if ( future.isSuccess() ) private void executeWork( final InternalPromise resultPromise, final ExplicitTransaction tx, TransactionWork> work ) { - Response workResponse = work.execute( tx ); + Response workResponse = safeExecuteWork( tx, work ); workResponse.addListener( new ResponseListener() { @Override @@ -471,6 +471,22 @@ public void operationCompleted( T result, Throwable error ) } ); } + private Response safeExecuteWork( ExplicitTransaction tx, TransactionWork> work ) + { + // given work might fail in both async and sync way + // async failure will result in a failed future being returned + // sync failure will result in an exception being thrown + try + { + return work.execute( tx ); + } + catch ( Throwable workError ) + { + // work threw an exception, wrap it in a future and proceed + return new InternalPromise( eventExecutorGroup ).setFailure( workError ); + } + } + private void rollbackTxAfterFailedTransactionWork( ExplicitTransaction tx, final InternalPromise resultPromise, final Throwable error ) { diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java index aa47e33940..aed14ca2b1 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java @@ -28,7 +28,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.Future; @@ -46,6 +45,7 @@ import org.neo4j.driver.v1.TransactionWork; import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.DatabaseException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.exceptions.SessionExpiredException; import org.neo4j.driver.v1.exceptions.TransientException; @@ -54,6 +54,7 @@ import org.neo4j.driver.v1.types.Node; import org.neo4j.driver.v1.util.TestNeo4j; +import static java.util.Collections.emptyIterator; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; @@ -371,11 +372,13 @@ public void shouldRunAsyncTransactionWithoutRetries() } @Test - public void shouldRunAsyncTransactionWithRetries() + public void shouldRunAsyncTransactionWithRetriesOnAsyncFailures() { - List failures = Arrays.asList( new ServiceUnavailableException( "Oh!" ), - new SessionExpiredException( "Ah!" ), new TransientException( "Code", "Message" ) ); - InvocationTrackingWork work = new InvocationTrackingWork( "CREATE (:Node) RETURN 24", failures ); + InvocationTrackingWork work = new InvocationTrackingWork( "CREATE (:Node) RETURN 24" ).withAsyncFailures( + new ServiceUnavailableException( "Oh!" ), + new SessionExpiredException( "Ah!" ), + new TransientException( "Code", "Message" ) ); + Response txResponse = session.writeTransactionAsync( work ); Record record = await( txResponse ); @@ -386,6 +389,23 @@ public void shouldRunAsyncTransactionWithRetries() assertEquals( 1, countNodesByLabel( "Node" ) ); } + @Test + public void shouldRunAsyncTransactionWithRetriesOnSyncFailures() + { + InvocationTrackingWork work = new InvocationTrackingWork( "CREATE (:Test) RETURN 12" ).withSyncFailures( + new TransientException( "Oh!", "Deadlock!" ), + new ServiceUnavailableException( "Oh! Network Failure" ) ); + + Response txResponse = session.writeTransactionAsync( work ); + + Record record = await( txResponse ); + assertNotNull( record ); + assertEquals( 12L, record.get( 0 ).asLong() ); + + assertEquals( 3, work.invocationCount() ); + assertEquals( 1, countNodesByLabel( "Test" ) ); + } + @Test public void shouldRunAsyncTransactionThatCanNotBeRetried() { @@ -406,6 +426,32 @@ public void shouldRunAsyncTransactionThatCanNotBeRetried() assertEquals( 0, countNodesByLabel( "Hi" ) ); } + @Test + public void shouldRunAsyncTransactionThatCanNotBeRetriedAfterATransientFailure() + { + // first throw TransientException directly from work, retry can happen afterwards + // then return a future failed with DatabaseException, retry can't happen afterwards + InvocationTrackingWork work = new InvocationTrackingWork( "CREATE (:Person) RETURN 1" ) + .withSyncFailures( new TransientException( "Oh!", "Deadlock!" ) ) + .withAsyncFailures( new DatabaseException( "Oh!", "OutOfMemory!" ) ); + Response txResponse = session.writeTransactionAsync( work ); + + try + { + await( txResponse ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( DatabaseException.class ) ); + assertEquals( 1, e.getSuppressed().length ); + assertThat( e.getSuppressed()[0], instanceOf( TransientException.class ) ); + } + + assertEquals( 2, work.invocationCount() ); + assertEquals( 0, countNodesByLabel( "Person" ) ); + } + private Future>> runNestedQueries( StatementResultCursor inputCursor ) { Promise>> resultPromise = GlobalEventExecutor.INSTANCE.newPromise(); @@ -524,19 +570,27 @@ void killDb() private static class InvocationTrackingWork implements TransactionWork> { final String query; - final Iterator failures; final AtomicInteger invocationCount; + Iterator asyncFailures = emptyIterator(); + Iterator syncFailures = emptyIterator(); + InvocationTrackingWork( String query ) { - this( query, Collections.emptyList() ); + this.query = query; + this.invocationCount = new AtomicInteger(); + } + + InvocationTrackingWork withAsyncFailures( RuntimeException... failures ) + { + asyncFailures = Arrays.asList( failures ).iterator(); + return this; } - InvocationTrackingWork( String query, List failures ) + InvocationTrackingWork withSyncFailures( RuntimeException... failures ) { - this.query = query; - this.failures = failures.iterator(); - this.invocationCount = new AtomicInteger(); + syncFailures = Arrays.asList( failures ).iterator(); + return this; } int invocationCount() @@ -549,6 +603,11 @@ public Response execute( Transaction tx ) { invocationCount.incrementAndGet(); + if ( syncFailures.hasNext() ) + { + throw syncFailures.next(); + } + final InternalPromise resultPromise = new InternalPromise<>( GlobalEventExecutor.INSTANCE ); tx.runAsync( query ).addListener( new ResponseListener() @@ -597,9 +656,9 @@ private void processFetchResult( Boolean recordAvailable, Throwable error, return; } - if ( failures.hasNext() ) + if ( asyncFailures.hasNext() ) { - resultPromise.setFailure( failures.next() ); + resultPromise.setFailure( asyncFailures.next() ); } else { diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java index 91c4701673..97b244778e 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java @@ -68,13 +68,7 @@ public static > T await( F future ) } catch ( ExecutionException e ) { - Throwable cause = e.getCause(); - StackTraceElement[] originalStackTrace = cause.getStackTrace(); - RuntimeException exceptionWithOriginalStackTrace = new RuntimeException(); - cause.setStackTrace( exceptionWithOriginalStackTrace.getStackTrace() ); - exceptionWithOriginalStackTrace.setStackTrace( originalStackTrace ); - cause.addSuppressed( exceptionWithOriginalStackTrace ); - throwException( cause ); + throwException( e.getCause() ); return null; } catch ( TimeoutException e ) From 7eae6983ff8353b3c6d292a5d2092780f540655e Mon Sep 17 00:00:00 2001 From: lutovich Date: Wed, 20 Sep 2017 16:01:41 +0200 Subject: [PATCH 5/8] Notify transaction about RUN failure Transaction should be marked as failed when queries fail. It was previously only marked when PULL_ALL failed. This commit makes RUN handler notify transaction on failure. --- .../neo4j/driver/internal/InternalStatementResult.java | 2 +- .../org/neo4j/driver/internal/async/QueryRunner.java | 2 +- .../driver/internal/handlers/RunResponseHandler.java | 9 ++++++++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java b/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java index 21833b86fd..5eac6f1e6a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java @@ -50,7 +50,7 @@ public class InternalStatementResult implements StatementResult { this.statement = statement; this.connection = connection; - this.runResponseHandler = new RunResponseHandler( null ); + this.runResponseHandler = new RunResponseHandler( null, null ); this.pullAllResponseHandler = new RecordsResponseHandler( runResponseHandler ); this.resourcesHandler = resourcesHandler; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java b/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java index 2afae41876..69d8dc4dac 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java @@ -50,7 +50,7 @@ public static InternalFuture runAsync( AsyncConnection co Map params = statement.parameters().asMap( ofValue() ); InternalPromise runCompletedPromise = connection.newPromise(); - final RunResponseHandler runHandler = new RunResponseHandler( runCompletedPromise ); + final RunResponseHandler runHandler = new RunResponseHandler( runCompletedPromise, tx ); final PullAllResponseHandler pullAllHandler = newPullAllHandler( statement, runHandler, connection, tx ); connection.run( query, params, runHandler ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java index 434554e21e..83ba08c75f 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java @@ -25,19 +25,22 @@ import java.util.List; import java.util.Map; +import org.neo4j.driver.internal.ExplicitTransaction; import org.neo4j.driver.internal.spi.ResponseHandler; import org.neo4j.driver.v1.Value; public class RunResponseHandler implements ResponseHandler { private final Promise runCompletedPromise; + private final ExplicitTransaction tx; private List statementKeys; private long resultAvailableAfter; - public RunResponseHandler( Promise runCompletedPromise ) + public RunResponseHandler( Promise runCompletedPromise, ExplicitTransaction tx ) { this.runCompletedPromise = runCompletedPromise; + this.tx = tx; } @Override @@ -55,6 +58,10 @@ public void onSuccess( Map metadata ) @Override public void onFailure( Throwable error ) { + if ( tx != null ) + { + tx.resultFailed( error ); + } if ( runCompletedPromise != null ) { runCompletedPromise.setFailure( error ); From cd90b3ce142e21c5a1a3fdad4541019bf5a05ff2 Mon Sep 17 00:00:00 2001 From: lutovich Date: Wed, 20 Sep 2017 21:36:10 +0200 Subject: [PATCH 6/8] Simplify async result cursor record fetching Commit replaces `Response #fetchAsync()` + `Record #current()` in `StatementResultCursor` with single `Response #nextAsync()`. This simplifies API and makes it more convenient to consume records via future chaining and callbacks. `Response` is already a container for record, it can't contain anything else. So it makes sense to return `null` as an end of records stream marker and not use separate method for this. Also added `#peekAsync()` method. --- .../async/InternalStatementResultCursor.java | 23 ++- .../org/neo4j/driver/internal/async/Main.java | 8 +- .../handlers/PullAllResponseHandler.java | 64 ++++---- .../driver/v1/StatementResultCursor.java | 4 +- .../driver/v1/integration/SessionAsyncIT.java | 142 ++++++++++-------- .../v1/integration/TransactionAsyncIT.java | 68 ++++++--- 6 files changed, 185 insertions(+), 124 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java index ca09fd5c01..63073ea270 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java @@ -33,6 +33,8 @@ public class InternalStatementResultCursor implements StatementResultCursor private final RunResponseHandler runResponseHandler; private final PullAllResponseHandler pullAllHandler; + private Response peekedRecordResponse; + public InternalStatementResultCursor( RunResponseHandler runResponseHandler, PullAllResponseHandler pullAllHandler ) { this.runResponseHandler = runResponseHandler; @@ -53,14 +55,27 @@ public Response summaryAsync() } @Override - public Response fetchAsync() + public Response nextAsync() { - return pullAllHandler.fetchRecordAsync(); + if ( peekedRecordResponse != null ) + { + Response result = peekedRecordResponse; + peekedRecordResponse = null; + return result; + } + else + { + return pullAllHandler.nextAsync(); + } } @Override - public Record current() + public Response peekAsync() { - return pullAllHandler.currentRecord(); + if ( peekedRecordResponse == null ) + { + peekedRecordResponse = pullAllHandler.nextAsync(); + } + return peekedRecordResponse; } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/Main.java b/driver/src/main/java/org/neo4j/driver/internal/async/Main.java index 2abd234d4c..225b1a8e46 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/Main.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/Main.java @@ -158,9 +158,9 @@ public void apply( Driver driver, MutableInt recordsRead ) Session session = driver.session(); Response cursorResponse = session.runAsync( QUERY, PARAMS_OBJ ); StatementResultCursor cursor = await( cursorResponse ); - while ( await( cursor.fetchAsync() ) ) + Record record; + while ( (record = await( cursor.nextAsync() )) != null ) { - Record record = cursor.current(); useRecord( record ); recordsRead.increment(); } @@ -202,9 +202,9 @@ public void apply( Driver driver, MutableInt recordsRead ) Session session = driver.session(); Transaction tx = await( session.beginTransactionAsync() ); StatementResultCursor cursor = await( tx.runAsync( QUERY, PARAMS_OBJ ) ); - while ( await( cursor.fetchAsync() ) ) + Record record; + while ( (record = await( cursor.nextAsync() )) != null ) { - Record record = cursor.current(); useRecord( record ); recordsRead.increment(); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java index 3803bec368..f562161456 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java @@ -53,15 +53,14 @@ public abstract class PullAllResponseHandler implements ResponseHandler private final RunResponseHandler runResponseHandler; protected final AsyncConnection connection; - private final Queue records; + private final Queue records = new LinkedList<>(); + private boolean succeeded; private Throwable failure; - private ResultSummary summary; - private volatile Record current; - private InternalPromise recordAvailablePromise; - private InternalPromise summaryAvailablePromise; + private InternalPromise recordPromise; + private InternalPromise summaryPromise; public PullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler, AsyncConnection connection ) @@ -69,26 +68,25 @@ public PullAllResponseHandler( Statement statement, RunResponseHandler runRespon this.statement = requireNonNull( statement ); this.runResponseHandler = requireNonNull( runResponseHandler ); this.connection = requireNonNull( connection ); - this.records = new LinkedList<>(); } @Override public synchronized void onSuccess( Map metadata ) { summary = extractResultSummary( metadata ); - if ( summaryAvailablePromise != null ) + if ( summaryPromise != null ) { - summaryAvailablePromise.setSuccess( summary ); - summaryAvailablePromise = null; + summaryPromise.setSuccess( summary ); + summaryPromise = null; } succeeded = true; afterSuccess(); - if ( recordAvailablePromise != null ) + if ( recordPromise != null ) { - recordAvailablePromise.setSuccess( false ); - recordAvailablePromise = null; + recordPromise.setSuccess( null ); + recordPromise = null; } } @@ -100,10 +98,10 @@ public synchronized void onFailure( Throwable error ) failure = error; afterFailure( error ); - if ( recordAvailablePromise != null ) + if ( recordPromise != null ) { - recordAvailablePromise.setFailure( error ); - recordAvailablePromise = null; + recordPromise.setFailure( error ); + recordPromise = null; } } @@ -114,11 +112,10 @@ public synchronized void onRecord( Value[] fields ) { Record record = new InternalRecord( runResponseHandler.statementKeys(), fields ); - if ( recordAvailablePromise != null ) + if ( recordPromise != null ) { - current = record; - recordAvailablePromise.setSuccess( true ); - recordAvailablePromise = null; + recordPromise.setSuccess( record ); + recordPromise = null; } else { @@ -126,42 +123,33 @@ public synchronized void onRecord( Value[] fields ) } } - public synchronized InternalFuture fetchRecordAsync() + public synchronized InternalFuture nextAsync() { Record record = dequeueRecord(); if ( record == null ) { if ( succeeded ) { - return connection.newPromise().setSuccess( false ); + return connection.newPromise().setSuccess( null ); } if ( failure != null ) { - return connection.newPromise().setFailure( failure ); + return connection.newPromise().setFailure( failure ); } - if ( recordAvailablePromise == null ) + if ( recordPromise == null ) { - recordAvailablePromise = connection.newPromise(); + recordPromise = connection.newPromise(); } - - return recordAvailablePromise; + return recordPromise; } else { - current = record; - return connection.newPromise().setSuccess( true ); + return connection.newPromise().setSuccess( record ); } } - public Record currentRecord() - { - Record result = current; - current = null; - return result; - } - public synchronized InternalFuture summaryAsync() { if ( summary != null ) @@ -170,11 +158,11 @@ public synchronized InternalFuture summaryAsync() } else { - if ( summaryAvailablePromise == null ) + if ( summaryPromise == null ) { - summaryAvailablePromise = connection.newPromise(); + summaryPromise = connection.newPromise(); } - return summaryAvailablePromise; + return summaryPromise; } } diff --git a/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java index d44232fff7..86ebc2dbd3 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java +++ b/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java @@ -33,7 +33,7 @@ public interface StatementResultCursor Response summaryAsync(); - Response fetchAsync(); + Response nextAsync(); - Record current(); + Response peekAsync(); } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java index aed14ca2b1..3f6bea7358 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java @@ -58,7 +58,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -96,7 +95,7 @@ public void shouldRunQueryWithEmptyResult() { StatementResultCursor cursor = await( session.runAsync( "CREATE (:Person)" ) ); - assertThat( await( cursor.fetchAsync() ), is( false ) ); + assertNull( await( cursor.nextAsync() ) ); } @Test @@ -104,14 +103,13 @@ public void shouldRunQueryWithSingleResult() { StatementResultCursor cursor = await( session.runAsync( "CREATE (p:Person {name: 'Nick Fury'}) RETURN p" ) ); - assertThat( await( cursor.fetchAsync() ), is( true ) ); - - Record record = cursor.current(); + Record record = await( cursor.nextAsync() ); + assertNotNull( record ); Node node = record.get( 0 ).asNode(); assertEquals( "Person", single( node.labels() ) ); assertEquals( "Nick Fury", node.get( "name" ).asString() ); - assertThat( await( cursor.fetchAsync() ), is( false ) ); + assertNull( await( cursor.nextAsync() ) ); } @Test @@ -119,16 +117,19 @@ public void shouldRunQueryWithMultipleResults() { StatementResultCursor cursor = await( session.runAsync( "UNWIND [1,2,3] AS x RETURN x" ) ); - assertThat( await( cursor.fetchAsync() ), is( true ) ); - assertEquals( 1, cursor.current().get( 0 ).asInt() ); + Record record1 = await( cursor.nextAsync() ); + assertNotNull( record1 ); + assertEquals( 1, record1.get( 0 ).asInt() ); - assertThat( await( cursor.fetchAsync() ), is( true ) ); - assertEquals( 2, cursor.current().get( 0 ).asInt() ); + Record record2 = await( cursor.nextAsync() ); + assertNotNull( record2 ); + assertEquals( 2, record2.get( 0 ).asInt() ); - assertThat( await( cursor.fetchAsync() ), is( true ) ); - assertEquals( 3, cursor.current().get( 0 ).asInt() ); + Record record3 = await( cursor.nextAsync() ); + assertNotNull( record3 ); + assertEquals( 3, record3.get( 0 ).asInt() ); - assertThat( await( cursor.fetchAsync() ), is( false ) ); + assertNull( await( cursor.nextAsync() ) ); } @Test @@ -150,16 +151,17 @@ public void shouldFailWhenQueryFailsAtRuntime() { StatementResultCursor cursor = await( session.runAsync( "UNWIND [1, 2, 0] AS x RETURN 10 / x" ) ); - assertThat( await( cursor.fetchAsync() ), is( true ) ); - assertEquals( 10, cursor.current().get( 0 ).asInt() ); + Record record1 = await( cursor.nextAsync() ); + assertNotNull( record1 ); + assertEquals( 10, record1.get( 0 ).asInt() ); - assertThat( await( cursor.fetchAsync() ), is( true ) ); - assertEquals( 5, cursor.current().get( 0 ).asInt() ); + Record record2 = await( cursor.nextAsync() ); + assertNotNull( record2 ); + assertEquals( 5, record2.get( 0 ).asInt() ); try { - await( cursor.fetchAsync() ); - System.out.println( cursor.current() ); + await( cursor.nextAsync() ); fail( "Exception expected" ); } catch ( Exception e ) @@ -179,16 +181,17 @@ public void shouldFailWhenServerIsRestarted() throws Exception try { - Response recordAvailable = cursor.fetchAsync(); + Response recordResponse = cursor.nextAsync(); // kill db after receiving the first record // do it from a listener so that event loop thread executes the kill operation - recordAvailable.addListener( new KillDbListener( neo4j ) ); + recordResponse.addListener( new KillDbListener( neo4j ) ); - while ( await( recordAvailable ) ) + Record record; + while ( (record = await( recordResponse )) != null ) { - assertNotNull( cursor.current() ); - recordAvailable = cursor.fetchAsync(); + assertNotNull( record ); + recordResponse = cursor.nextAsync(); } fail( "Exception expected" ); } @@ -204,18 +207,19 @@ public void shouldAllowNestedQueries() StatementResultCursor cursor = await( session.runAsync( "UNWIND [1, 2, 3] AS x CREATE (p:Person {id: x}) RETURN p" ) ); - Future>> queriesExecuted = runNestedQueries( cursor ); - List> futures = await( queriesExecuted ); + Future>> queriesExecuted = runNestedQueries( cursor ); + List> futures = await( queriesExecuted ); - List futureResults = awaitAll( futures ); + List futureResults = awaitAll( futures ); assertEquals( 7, futureResults.size() ); StatementResultCursor personCursor = await( session.runAsync( "MATCH (p:Person) RETURN p ORDER BY p.id" ) ); List personNodes = new ArrayList<>(); - while ( await( personCursor.fetchAsync() ) ) + Record record; + while ( (record = await( personCursor.nextAsync() )) != null ) { - personNodes.add( personCursor.current().get( 0 ).asNode() ); + personNodes.add( record.get( 0 ).asNode() ); } assertEquals( 3, personNodes.size() ); @@ -242,21 +246,20 @@ public void shouldAllowMultipleAsyncRunsWithoutConsumingResults() throws Interru cursors.add( session.runAsync( "CREATE (:Person)" ) ); } - List> fetches = new ArrayList<>(); + List> records = new ArrayList<>(); for ( StatementResultCursor cursor : awaitAll( cursors ) ) { - fetches.add( cursor.fetchAsync() ); + records.add( cursor.nextAsync() ); } - awaitAll( fetches ); + awaitAll( records ); await( session.closeAsync() ); session = neo4j.driver().session(); StatementResultCursor cursor = await( session.runAsync( "MATCH (p:Person) RETURN count(p)" ) ); - assertThat( await( cursor.fetchAsync() ), is( true ) ); - - Record record = cursor.current(); + Record record = await( cursor.nextAsync() ); + assertNotNull( record ); assertEquals( queryCount, record.get( 0 ).asInt() ); } @@ -452,31 +455,53 @@ public void shouldRunAsyncTransactionThatCanNotBeRetriedAfterATransientFailure() assertEquals( 0, countNodesByLabel( "Person" ) ); } - private Future>> runNestedQueries( StatementResultCursor inputCursor ) + @Test + public void shouldPeekRecordFromCursor() + { + StatementResultCursor cursor = await( session.runAsync( "UNWIND [1, 2, 42] AS x RETURN x" ) ); + + assertEquals( 1, await( cursor.peekAsync() ).get( 0 ).asInt() ); + assertEquals( 1, await( cursor.peekAsync() ).get( 0 ).asInt() ); + assertEquals( 1, await( cursor.peekAsync() ).get( 0 ).asInt() ); + + assertEquals( 1, await( cursor.nextAsync() ).get( 0 ).asInt() ); + + assertEquals( 2, await( cursor.peekAsync() ).get( 0 ).asInt() ); + assertEquals( 2, await( cursor.peekAsync() ).get( 0 ).asInt() ); + + assertEquals( 2, await( cursor.nextAsync() ).get( 0 ).asInt() ); + + assertEquals( 42, await( cursor.nextAsync() ).get( 0 ).asInt() ); + + assertNull( await( cursor.peekAsync() ) ); + assertNull( await( cursor.nextAsync() ) ); + } + + private Future>> runNestedQueries( StatementResultCursor inputCursor ) { - Promise>> resultPromise = GlobalEventExecutor.INSTANCE.newPromise(); - runNestedQueries( inputCursor, new ArrayList>(), resultPromise ); + Promise>> resultPromise = GlobalEventExecutor.INSTANCE.newPromise(); + runNestedQueries( inputCursor, new ArrayList>(), resultPromise ); return resultPromise; } - private void runNestedQueries( final StatementResultCursor inputCursor, final List> futures, - final Promise>> resultPromise ) + private void runNestedQueries( final StatementResultCursor inputCursor, final List> futures, + final Promise>> resultPromise ) { - final Response inputAvailable = inputCursor.fetchAsync(); - futures.add( inputAvailable ); + final Response recordResponse = inputCursor.nextAsync(); + futures.add( recordResponse ); - inputAvailable.addListener( new ResponseListener() + recordResponse.addListener( new ResponseListener() { @Override - public void operationCompleted( Boolean inputAvailable, Throwable error ) + public void operationCompleted( Record record, Throwable error ) { if ( error != null ) { resultPromise.setFailure( error ); } - else if ( inputAvailable ) + else if ( record != null ) { - runNestedQuery( inputCursor, futures, resultPromise ); + runNestedQuery( inputCursor, record, futures, resultPromise ); } else { @@ -486,10 +511,9 @@ else if ( inputAvailable ) } ); } - private void runNestedQuery( final StatementResultCursor inputCursor, final List> futures, - final Promise>> resultPromise ) + private void runNestedQuery( final StatementResultCursor inputCursor, Record record, + final List> futures, final Promise>> resultPromise ) { - Record record = inputCursor.current(); Node node = record.get( 0 ).asNode(); long id = node.get( "id" ).asLong(); long age = id * 10; @@ -501,7 +525,7 @@ private void runNestedQuery( final StatementResultCursor inputCursor, final List response.addListener( new ResponseListener() { @Override - public void operationCompleted( StatementResultCursor result, Throwable error ) + public void operationCompleted( StatementResultCursor cursor, Throwable error ) { if ( error != null ) { @@ -509,7 +533,7 @@ public void operationCompleted( StatementResultCursor result, Throwable error ) } else { - futures.add( result.fetchAsync() ); + futures.add( cursor.nextAsync() ); runNestedQueries( inputCursor, futures, resultPromise ); } } @@ -534,7 +558,7 @@ private static void assertArithmeticError( Exception e ) assertThat( ((ClientException) e).code(), containsString( "ArithmeticError" ) ); } - private static class KillDbListener implements ResponseListener + private static class KillDbListener implements ResponseListener { final TestNeo4j neo4j; volatile boolean shouldKillDb = true; @@ -545,7 +569,7 @@ private static class KillDbListener implements ResponseListener } @Override - public void operationCompleted( Boolean result, Throwable error ) + public void operationCompleted( Record record, Throwable error ) { if ( shouldKillDb ) { @@ -631,17 +655,17 @@ private void processQueryResult( final StatementResultCursor cursor, final Throw return; } - cursor.fetchAsync().addListener( new ResponseListener() + cursor.nextAsync().addListener( new ResponseListener() { @Override - public void operationCompleted( Boolean recordAvailable, Throwable error ) + public void operationCompleted( Record record, Throwable error ) { - processFetchResult( recordAvailable, error, resultPromise, cursor ); + processFetchResult( record, error, resultPromise, cursor ); } } ); } - private void processFetchResult( Boolean recordAvailable, Throwable error, + private void processFetchResult( Record record, Throwable error, InternalPromise resultPromise, StatementResultCursor cursor ) { if ( error != null ) @@ -650,7 +674,7 @@ private void processFetchResult( Boolean recordAvailable, Throwable error, return; } - if ( !recordAvailable ) + if ( record == null ) { resultPromise.setFailure( new AssertionError( "Record not available" ) ); return; @@ -662,7 +686,7 @@ private void processFetchResult( Boolean recordAvailable, Throwable error, } else { - resultPromise.setSuccess( cursor.current() ); + resultPromise.setSuccess( record ); } } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java index f21ebc8da9..cd1c78bb8c 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.concurrent.TimeUnit; +import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Response; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.Statement; @@ -108,10 +109,13 @@ public void shouldBePossibleToRunSingleStatementAndCommit() Transaction tx = await( session.beginTransactionAsync() ); StatementResultCursor cursor = await( tx.runAsync( "CREATE (n:Node {id: 42}) RETURN n" ) ); - assertThat( await( cursor.fetchAsync() ), is( true ) ); - Node node = cursor.current().get( 0 ).asNode(); + + Record record = await( cursor.nextAsync() ); + assertNotNull( record ); + Node node = record.get( 0 ).asNode(); assertEquals( "Node", single( node.labels() ) ); assertEquals( 42, node.get( "id" ).asInt() ); + assertNull( await( cursor.nextAsync() ) ); assertNull( await( tx.commitAsync() ) ); assertEquals( 1, countNodes( 42 ) ); @@ -123,10 +127,12 @@ public void shouldBePossibleToRunSingleStatementAndRollback() Transaction tx = await( session.beginTransactionAsync() ); StatementResultCursor cursor = await( tx.runAsync( "CREATE (n:Node {id: 4242}) RETURN n" ) ); - assertThat( await( cursor.fetchAsync() ), is( true ) ); - Node node = cursor.current().get( 0 ).asNode(); + Record record = await( cursor.nextAsync() ); + assertNotNull( record ); + Node node = record.get( 0 ).asNode(); assertEquals( "Node", single( node.labels() ) ); assertEquals( 4242, node.get( "id" ).asInt() ); + assertNull( await( cursor.nextAsync() ) ); assertNull( await( tx.rollbackAsync() ) ); assertEquals( 0, countNodes( 4242 ) ); @@ -138,13 +144,13 @@ public void shouldBePossibleToRunMultipleStatementsAndCommit() Transaction tx = await( session.beginTransactionAsync() ); StatementResultCursor cursor1 = await( tx.runAsync( "CREATE (n:Node {id: 1})" ) ); - assertThat( await( cursor1.fetchAsync() ), is( false ) ); + assertNull( await( cursor1.nextAsync() ) ); StatementResultCursor cursor2 = await( tx.runAsync( "CREATE (n:Node {id: 2})" ) ); - assertThat( await( cursor2.fetchAsync() ), is( false ) ); + assertNull( await( cursor2.nextAsync() ) ); StatementResultCursor cursor3 = await( tx.runAsync( "CREATE (n:Node {id: 2})" ) ); - assertThat( await( cursor3.fetchAsync() ), is( false ) ); + assertNull( await( cursor3.nextAsync() ) ); assertNull( await( tx.commitAsync() ) ); assertEquals( 1, countNodes( 1 ) ); @@ -171,10 +177,10 @@ public void shouldBePossibleToRunMultipleStatementsAndRollback() Transaction tx = await( session.beginTransactionAsync() ); StatementResultCursor cursor1 = await( tx.runAsync( "CREATE (n:Node {id: 1})" ) ); - assertThat( await( cursor1.fetchAsync() ), is( false ) ); + assertNull( await( cursor1.nextAsync() ) ); StatementResultCursor cursor2 = await( tx.runAsync( "CREATE (n:Node {id: 42})" ) ); - assertThat( await( cursor2.fetchAsync() ), is( false ) ); + assertNull( await( cursor2.nextAsync() ) ); assertNull( await( tx.rollbackAsync() ) ); assertEquals( 0, countNodes( 1 ) ); @@ -244,12 +250,14 @@ public void shouldFailToCommitAfterCoupleCorrectAndSingleWrongStatement() Transaction tx = await( session.beginTransactionAsync() ); StatementResultCursor cursor1 = await( tx.runAsync( "CREATE (n:Node) RETURN n" ) ); - assertThat( await( cursor1.fetchAsync() ), is( true ) ); - assertTrue( cursor1.current().get( 0 ).asNode().hasLabel( "Node" ) ); + Record record1 = await( cursor1.nextAsync() ); + assertNotNull( record1 ); + assertTrue( record1.get( 0 ).asNode().hasLabel( "Node" ) ); StatementResultCursor cursor2 = await( tx.runAsync( "RETURN 42" ) ); - assertThat( await( cursor2.fetchAsync() ), is( true ) ); - assertEquals( 42, cursor2.current().get( 0 ).asInt() ); + Record record2 = await( cursor2.nextAsync() ); + assertNotNull( record2 ); + assertEquals( 42, record2.get( 0 ).asInt() ); try { @@ -278,12 +286,14 @@ public void shouldAllowRollbackAfterCoupleCorrectAndSingleWrongStatement() Transaction tx = await( session.beginTransactionAsync() ); StatementResultCursor cursor1 = await( tx.runAsync( "RETURN 4242" ) ); - assertThat( await( cursor1.fetchAsync() ), is( true ) ); - assertEquals( 4242, cursor1.current().get( 0 ).asInt() ); + Record record1 = await( cursor1.nextAsync() ); + assertNotNull( record1 ); + assertEquals( 4242, record1.get( 0 ).asInt() ); StatementResultCursor cursor2 = await( tx.runAsync( "CREATE (n:Node) DELETE n RETURN 42" ) ); - assertThat( await( cursor2.fetchAsync() ), is( true ) ); - assertEquals( 42, cursor2.current().get( 0 ).asInt() ); + Record record2 = await( cursor2.nextAsync() ); + assertNotNull( record2 ); + assertEquals( 42, record2.get( 0 ).asInt() ); try { @@ -510,6 +520,30 @@ public void shouldExposeResultSummaryForProfileQuery() assertThat( summary.resultConsumedAfter( TimeUnit.MILLISECONDS ), greaterThanOrEqualTo( 0L ) ); } + @Test + public void shouldPeekRecordFromCursor() + { + Transaction tx = await( session.beginTransactionAsync() ); + StatementResultCursor cursor = await( tx.runAsync( "UNWIND ['a', 'b', 'c'] AS x RETURN x" ) ); + + assertEquals( "a", await( cursor.peekAsync() ).get( 0 ).asString() ); + assertEquals( "a", await( cursor.peekAsync() ).get( 0 ).asString() ); + + assertEquals( "a", await( cursor.nextAsync() ).get( 0 ).asString() ); + + assertEquals( "b", await( cursor.peekAsync() ).get( 0 ).asString() ); + assertEquals( "b", await( cursor.peekAsync() ).get( 0 ).asString() ); + assertEquals( "b", await( cursor.peekAsync() ).get( 0 ).asString() ); + + assertEquals( "b", await( cursor.nextAsync() ).get( 0 ).asString() ); + assertEquals( "c", await( cursor.nextAsync() ).get( 0 ).asString() ); + + assertNull( await( cursor.peekAsync() ) ); + assertNull( await( cursor.nextAsync() ) ); + + await( tx.rollbackAsync() ); + } + private int countNodes( Object id ) { StatementResult result = session.run( "MATCH (n:Node {id: $id}) RETURN count(n)", parameters( "id", id ) ); From 0a1af0dce99eb80e0415027d1f9d185507ce8e9d Mon Sep 17 00:00:00 2001 From: lutovich Date: Thu, 21 Sep 2017 00:10:50 +0200 Subject: [PATCH 7/8] Added `#forEach()` to async result cursor Method allows processing of every incoming record in an async fashion. It returns a future which signals about processing completion. It can either be failed or completed with `null`. --- .../async/InternalStatementResultCursor.java | 84 +++++++++++++++---- .../driver/internal/async/QueryRunner.java | 4 +- .../handlers/PullAllResponseHandler.java | 38 +++++---- .../driver/v1/StatementResultCursor.java | 3 + .../driver/v1/integration/SessionAsyncIT.java | 31 +++++++ .../v1/integration/TransactionAsyncIT.java | 33 ++++++++ 6 files changed, 163 insertions(+), 30 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java index 63073ea270..7b2a9c3c3a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java @@ -18,27 +18,36 @@ */ package org.neo4j.driver.internal.async; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; + import java.util.Collections; import java.util.List; import org.neo4j.driver.internal.handlers.PullAllResponseHandler; import org.neo4j.driver.internal.handlers.RunResponseHandler; +import org.neo4j.driver.internal.util.Consumer; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Response; import org.neo4j.driver.v1.StatementResultCursor; import org.neo4j.driver.v1.summary.ResultSummary; +import static java.util.Objects.requireNonNull; + public class InternalStatementResultCursor implements StatementResultCursor { + private final AsyncConnection connection; private final RunResponseHandler runResponseHandler; private final PullAllResponseHandler pullAllHandler; - private Response peekedRecordResponse; + private InternalFuture peekedRecordResponse; - public InternalStatementResultCursor( RunResponseHandler runResponseHandler, PullAllResponseHandler pullAllHandler ) + public InternalStatementResultCursor( AsyncConnection connection, RunResponseHandler runResponseHandler, + PullAllResponseHandler pullAllHandler ) { - this.runResponseHandler = runResponseHandler; - this.pullAllHandler = pullAllHandler; + this.connection = requireNonNull( connection ); + this.runResponseHandler = requireNonNull( runResponseHandler ); + this.pullAllHandler = requireNonNull( pullAllHandler ); } @Override @@ -57,16 +66,7 @@ public Response summaryAsync() @Override public Response nextAsync() { - if ( peekedRecordResponse != null ) - { - Response result = peekedRecordResponse; - peekedRecordResponse = null; - return result; - } - else - { - return pullAllHandler.nextAsync(); - } + return internalNextAsync(); } @Override @@ -78,4 +78,60 @@ public Response peekAsync() } return peekedRecordResponse; } + + @Override + public Response forEachAsync( final Consumer action ) + { + InternalPromise result = connection.newPromise(); + internalForEachAsync( action, result ); + return result; + } + + private void internalForEachAsync( final Consumer action, final InternalPromise result ) + { + final InternalFuture recordFuture = internalNextAsync(); + + recordFuture.addListener( new FutureListener() + { + @Override + public void operationComplete( Future future ) + { + if ( future.isCancelled() ) + { + result.cancel( true ); + } + else if ( future.isSuccess() ) + { + Record record = future.getNow(); + if ( record != null ) + { + action.accept( record ); + internalForEachAsync( action, result ); + } + else + { + result.setSuccess( null ); + } + } + else + { + result.setFailure( future.cause() ); + } + } + } ); + } + + private InternalFuture internalNextAsync() + { + if ( peekedRecordResponse != null ) + { + InternalFuture result = peekedRecordResponse; + peekedRecordResponse = null; + return result; + } + else + { + return pullAllHandler.nextAsync(); + } + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java b/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java index 69d8dc4dac..d08e3fc3f2 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java @@ -43,7 +43,7 @@ public static InternalFuture runAsync( AsyncConnection co return runAsync( connection, statement, null ); } - public static InternalFuture runAsync( AsyncConnection connection, Statement statement, + public static InternalFuture runAsync( final AsyncConnection connection, Statement statement, ExplicitTransaction tx ) { String query = statement.text(); @@ -62,7 +62,7 @@ public static InternalFuture runAsync( AsyncConnection co @Override public StatementResultCursor apply( Void ignore ) { - return new InternalStatementResultCursor( runHandler, pullAllHandler ); + return new InternalStatementResultCursor( connection, runHandler, pullAllHandler ); } } ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java index f562161456..c6aa1687eb 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java @@ -82,12 +82,7 @@ public synchronized void onSuccess( Map metadata ) succeeded = true; afterSuccess(); - - if ( recordPromise != null ) - { - recordPromise.setSuccess( null ); - recordPromise = null; - } + succeedRecordPromise( null ); } protected abstract void afterSuccess(); @@ -97,12 +92,7 @@ public synchronized void onFailure( Throwable error ) { failure = error; afterFailure( error ); - - if ( recordPromise != null ) - { - recordPromise.setFailure( error ); - recordPromise = null; - } + failRecordPromise( error ); } protected abstract void afterFailure( Throwable error ); @@ -114,8 +104,7 @@ public synchronized void onRecord( Value[] fields ) if ( recordPromise != null ) { - recordPromise.setSuccess( record ); - recordPromise = null; + succeedRecordPromise( record ); } else { @@ -141,6 +130,7 @@ public synchronized InternalFuture nextAsync() if ( recordPromise == null ) { recordPromise = connection.newPromise(); + System.out.println( "setting promise " + recordPromise.hashCode() ); } return recordPromise; } @@ -191,6 +181,26 @@ private Record dequeueRecord() return record; } + private void succeedRecordPromise( Record record ) + { + if ( recordPromise != null ) + { + InternalPromise promise = recordPromise; + recordPromise = null; + promise.setSuccess( record ); + } + } + + private void failRecordPromise( Throwable error ) + { + if ( recordPromise != null ) + { + InternalPromise promise = recordPromise; + recordPromise = null; + promise.setFailure( error ); + } + } + private ResultSummary extractResultSummary( Map metadata ) { return new InternalResultSummary( statement, connection.serverInfo(), extractStatementType( metadata ), diff --git a/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java index 86ebc2dbd3..2573645458 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java +++ b/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java @@ -20,6 +20,7 @@ import java.util.List; +import org.neo4j.driver.internal.util.Consumer; import org.neo4j.driver.v1.summary.ResultSummary; public interface StatementResultCursor @@ -36,4 +37,6 @@ public interface StatementResultCursor Response nextAsync(); Response peekAsync(); + + Response forEachAsync( Consumer action ); } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java index 3f6bea7358..0abb112117 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.neo4j.driver.internal.async.InternalPromise; +import org.neo4j.driver.internal.util.Consumer; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Response; import org.neo4j.driver.v1.ResponseListener; @@ -477,6 +478,18 @@ public void shouldPeekRecordFromCursor() assertNull( await( cursor.nextAsync() ) ); } + @Test + public void shouldForEachWithEmptyCursor() + { + testForEach( "CREATE ()", 0 ); + } + + @Test + public void shouldForEachWithNonEmptyCursor() + { + testForEach( "UNWIND range(1, 10000) AS x RETURN x", 10000 ); + } + private Future>> runNestedQueries( StatementResultCursor inputCursor ) { Promise>> resultPromise = GlobalEventExecutor.INSTANCE.newPromise(); @@ -545,6 +558,24 @@ private long countNodesByLabel( String label ) return session.run( "MATCH (n:" + label + ") RETURN count(n)" ).single().get( 0 ).asLong(); } + private void testForEach( String query, int expectedSeenRecords ) + { + StatementResultCursor cursor = await( session.runAsync( query ) ); + + final AtomicInteger recordsSeen = new AtomicInteger(); + Response forEachDone = cursor.forEachAsync( new Consumer() + { + @Override + public void accept( Record record ) + { + recordsSeen.incrementAndGet(); + } + } ); + + assertNull( await( forEachDone ) ); + assertEquals( expectedSeenRecords, recordsSeen.get() ); + } + private static void assertSyntaxError( Exception e ) { assertThat( e, instanceOf( ClientException.class ) ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java index cd1c78bb8c..4480b0d3e9 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java @@ -25,7 +25,9 @@ import java.util.Arrays; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.neo4j.driver.internal.util.Consumer; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Response; import org.neo4j.driver.v1.Session; @@ -544,12 +546,43 @@ public void shouldPeekRecordFromCursor() await( tx.rollbackAsync() ); } + @Test + public void shouldForEachWithEmptyCursor() + { + testForEach( "MATCH (n:SomeReallyStrangeLabel) RETURN n", 0 ); + } + + @Test + public void shouldForEachWithNonEmptyCursor() + { + testForEach( "UNWIND range(1, 12555) AS x CREATE (n:Node {id: x}) RETURN n", 12555 ); + } + private int countNodes( Object id ) { StatementResult result = session.run( "MATCH (n:Node {id: $id}) RETURN count(n)", parameters( "id", id ) ); return result.single().get( 0 ).asInt(); } + private void testForEach( String query, int expectedSeenRecords ) + { + Transaction tx = await( session.beginTransactionAsync() ); + StatementResultCursor cursor = await( tx.runAsync( query ) ); + + final AtomicInteger recordsSeen = new AtomicInteger(); + Response forEachDone = cursor.forEachAsync( new Consumer() + { + @Override + public void accept( Record record ) + { + recordsSeen.incrementAndGet(); + } + } ); + + assertNull( await( forEachDone ) ); + assertEquals( expectedSeenRecords, recordsSeen.get() ); + } + private static void assertSyntaxError( Exception e ) { assertThat( e, instanceOf( ClientException.class ) ); From 6eef420f1cce09275fa7ef151f5151099ee9a9c2 Mon Sep 17 00:00:00 2001 From: lutovich Date: Thu, 21 Sep 2017 11:53:15 +0200 Subject: [PATCH 8/8] Added `#listAsync()` to async result cursor Method allows async retrieval of all records as list. --- .../async/InternalStatementResultCursor.java | 43 +++++++++++++++++++ .../handlers/PullAllResponseHandler.java | 1 - .../driver/v1/StatementResultCursor.java | 2 + .../driver/v1/integration/SessionAsyncIT.java | 26 +++++++++++ .../v1/integration/TransactionAsyncIT.java | 28 ++++++++++++ 5 files changed, 99 insertions(+), 1 deletion(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java index 7b2a9c3c3a..bb2cd5ed4b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java @@ -21,6 +21,7 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; +import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -87,6 +88,14 @@ public Response forEachAsync( final Consumer action ) return result; } + @Override + public Response> listAsync() + { + InternalPromise> result = connection.newPromise(); + internalListAsync( new ArrayList(), result ); + return result; + } + private void internalForEachAsync( final Consumer action, final InternalPromise result ) { final InternalFuture recordFuture = internalNextAsync(); @@ -121,6 +130,40 @@ else if ( future.isSuccess() ) } ); } + private void internalListAsync( final List records, final InternalPromise> result ) + { + final InternalFuture recordFuture = internalNextAsync(); + + recordFuture.addListener( new FutureListener() + { + @Override + public void operationComplete( Future future ) + { + if ( future.isCancelled() ) + { + result.cancel( true ); + } + else if ( future.isSuccess() ) + { + Record record = future.getNow(); + if ( record != null ) + { + records.add( record ); + internalListAsync( records, result ); + } + else + { + result.setSuccess( records ); + } + } + else + { + result.setFailure( future.cause() ); + } + } + } ); + } + private InternalFuture internalNextAsync() { if ( peekedRecordResponse != null ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java index c6aa1687eb..6e5e1f2345 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java @@ -130,7 +130,6 @@ public synchronized InternalFuture nextAsync() if ( recordPromise == null ) { recordPromise = connection.newPromise(); - System.out.println( "setting promise " + recordPromise.hashCode() ); } return recordPromise; } diff --git a/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java index 2573645458..528838b63c 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java +++ b/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java @@ -39,4 +39,6 @@ public interface StatementResultCursor Response peekAsync(); Response forEachAsync( Consumer action ); + + Response> listAsync(); } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java index 0abb112117..30ce5ee546 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.Future; @@ -490,6 +491,19 @@ public void shouldForEachWithNonEmptyCursor() testForEach( "UNWIND range(1, 10000) AS x RETURN x", 10000 ); } + @Test + public void shouldConvertToListWithEmptyCursor() + { + testList( "MATCH (n:NoSuchLabel) RETURN n", Collections.emptyList() ); + } + + @Test + public void shouldConvertToListWithNonEmptyCursor() + { + testList( "UNWIND range(1, 100, 10) AS x RETURN x", + Arrays.asList( 1L, 11L, 21L, 31L, 41L, 51L, 61L, 71L, 81L, 91L ) ); + } + private Future>> runNestedQueries( StatementResultCursor inputCursor ) { Promise>> resultPromise = GlobalEventExecutor.INSTANCE.newPromise(); @@ -576,6 +590,18 @@ public void accept( Record record ) assertEquals( expectedSeenRecords, recordsSeen.get() ); } + private void testList( String query, List expectedList ) + { + StatementResultCursor cursor = await( session.runAsync( query ) ); + List records = await( cursor.listAsync() ); + List actualList = new ArrayList<>(); + for ( Record record : records ) + { + actualList.add( record.get( 0 ).asObject() ); + } + assertEquals( expectedList, actualList ); + } + private static void assertSyntaxError( Exception e ) { assertThat( e, instanceOf( ClientException.class ) ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java index 4480b0d3e9..e1a0740093 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java @@ -23,7 +23,10 @@ import org.junit.Rule; import org.junit.Test; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -558,6 +561,18 @@ public void shouldForEachWithNonEmptyCursor() testForEach( "UNWIND range(1, 12555) AS x CREATE (n:Node {id: x}) RETURN n", 12555 ); } + @Test + public void shouldConvertToListWithEmptyCursor() + { + testList( "CREATE (:Person)-[:KNOWS]->(:Person)", Collections.emptyList() ); + } + + @Test + public void shouldConvertToListWithNonEmptyCursor() + { + testList( "UNWIND [1, '1', 2, '2', 3, '3'] AS x RETURN x", Arrays.asList( 1L, "1", 2L, "2", 3L, "3" ) ); + } + private int countNodes( Object id ) { StatementResult result = session.run( "MATCH (n:Node {id: $id}) RETURN count(n)", parameters( "id", id ) ); @@ -583,6 +598,19 @@ public void accept( Record record ) assertEquals( expectedSeenRecords, recordsSeen.get() ); } + private void testList( String query, List expectedList ) + { + Transaction tx = await( session.beginTransactionAsync() ); + StatementResultCursor cursor = await( tx.runAsync( query ) ); + List records = await( cursor.listAsync() ); + List actualList = new ArrayList<>(); + for ( Record record : records ) + { + actualList.add( record.get( 0 ).asObject() ); + } + assertEquals( expectedList, actualList ); + } + private static void assertSyntaxError( Exception e ) { assertThat( e, instanceOf( ClientException.class ) );