Skip to content

Commit

Permalink
Merge pull request #405 from lutovich/1.5-async-result-cursor-api
Browse files Browse the repository at this point in the history
Improvements for StatementResultCursor API
  • Loading branch information
lutovich authored Sep 22, 2017
2 parents 8290bab + 6eef420 commit 192d33b
Show file tree
Hide file tree
Showing 33 changed files with 1,714 additions and 251 deletions.
46 changes: 29 additions & 17 deletions driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.neo4j.driver.internal;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.EventExecutorGroup;

import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -72,14 +74,18 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
RoutingSettings newRoutingSettings = routingSettings.withRoutingContext( new RoutingContext( uri ) );
SecurityPlan securityPlan = createSecurityPlan( address, config );
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config );
RetryLogic retryLogic = createRetryLogic( retrySettings, config.logging() );

AsyncConnectionPool asyncConnectionPool = createAsyncConnectionPool( authToken, securityPlan, config );
Bootstrap bootstrap = BootstrapFactory.newBootstrap();
EventLoopGroup eventLoopGroup = bootstrap.config().group();
RetryLogic retryLogic = createRetryLogic( retrySettings, eventLoopGroup, config.logging() );

AsyncConnectionPool asyncConnectionPool = createAsyncConnectionPool( authToken, securityPlan, bootstrap,
config );

try
{
return createDriver( uri, address, connectionPool, config, newRoutingSettings, securityPlan, retryLogic,
asyncConnectionPool );
asyncConnectionPool, eventLoopGroup );
}
catch ( Throwable driverError )
{
Expand All @@ -98,14 +104,13 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
}

private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, SecurityPlan securityPlan,
Config config )
Bootstrap bootstrap, Config config )
{
Clock clock = createClock();
ConnectionSettings connectionSettings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
ActiveChannelTracker activeChannelTracker = new ActiveChannelTracker( config.logging() );
AsyncConnectorImpl connector = new AsyncConnectorImpl( connectionSettings, securityPlan,
activeChannelTracker, config.logging(), clock );
Bootstrap bootstrap = BootstrapFactory.newBootstrap();
PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize(),
config.idleTimeBeforeConnectionTest(), config.maxConnectionLifetimeMillis(),
config.maxConnectionPoolSize(),
Expand All @@ -116,16 +121,18 @@ private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, Secu

private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool,
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan,
RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool )
RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool, EventExecutorGroup eventExecutorGroup )
{
String scheme = uri.getScheme().toLowerCase();
switch ( scheme )
{
case BOLT_URI_SCHEME:
assertNoRoutingContext( uri, routingSettings );
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool );
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool,
eventExecutorGroup );
case BOLT_ROUTING_URI_SCHEME:
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic );
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic,
eventExecutorGroup );
default:
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
}
Expand All @@ -137,11 +144,13 @@ private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool
* <b>This method is protected only for testing</b>
*/
protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool )
SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool,
EventExecutorGroup eventExecutorGroup )
{
ConnectionProvider connectionProvider =
new DirectConnectionProvider( address, connectionPool, asyncConnectionPool );
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
SessionFactory sessionFactory =
createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config );
return createDriver( config, securityPlan, sessionFactory );
}

Expand All @@ -151,14 +160,16 @@ protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool c
* <b>This method is protected only for testing</b>
*/
protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic )
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic,
EventExecutorGroup eventExecutorGroup )
{
if ( !securityPlan.isRoutingCompatible() )
{
throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" );
}
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, config, routingSettings );
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
SessionFactory sessionFactory =
createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config );
return createDriver( config, securityPlan, sessionFactory );
}

Expand Down Expand Up @@ -239,20 +250,21 @@ protected Connector createConnector( final ConnectionSettings connectionSettings
* <p>
* <b>This method is protected only for testing</b>
*/
protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider,
RetryLogic retryLogic, Config config )
protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider, RetryLogic retryLogic,
EventExecutorGroup eventExecutorGroup, Config config )
{
return new SessionFactoryImpl( connectionProvider, retryLogic, config );
return new SessionFactoryImpl( connectionProvider, retryLogic, eventExecutorGroup, config );
}

/**
* Creates new {@link RetryLogic >}.
* <p>
* <b>This method is protected only for testing</b>
*/
protected RetryLogic createRetryLogic( RetrySettings settings, Logging logging )
protected RetryLogic createRetryLogic( RetrySettings settings, EventExecutorGroup eventExecutorGroup,
Logging logging )
{
return new ExponentialBackoffRetryLogic( settings, createClock(), logging );
return new ExponentialBackoffRetryLogic( settings, eventExecutorGroup, createClock(), logging );
}

private static SecurityPlan createSecurityPlan( BoltServerAddress address, Config config )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.neo4j.driver.internal.handlers.RollbackTxResponseHandler;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.types.InternalTypeSystem;
import org.neo4j.driver.internal.util.BiConsumer;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Response;
import org.neo4j.driver.v1.Statement;
Expand Down Expand Up @@ -219,7 +220,7 @@ public Response<Void> commitAsync()
return internalCommitAsync();
}

private InternalFuture<Void> internalCommitAsync()
InternalFuture<Void> internalCommitAsync()
{
if ( state == State.COMMITTED )
{
Expand Down Expand Up @@ -259,12 +260,12 @@ else if ( state == State.ROLLED_BACK )
}
}

private Runnable releaseConnectionAndNotifySession()
private BiConsumer<Void,Throwable> releaseConnectionAndNotifySession()
{
return new Runnable()
return new BiConsumer<Void,Throwable>()
{
@Override
public void run()
public void accept( Void result, Throwable error )
{
asyncConnection.release();
session.asyncTransactionClosed( ExplicitTransaction.this );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class InternalStatementResult implements StatementResult
{
this.statement = statement;
this.connection = connection;
this.runResponseHandler = new RunResponseHandler( null );
this.runResponseHandler = new RunResponseHandler( null, null );
this.pullAllResponseHandler = new RecordsResponseHandler( runResponseHandler );
this.resourcesHandler = resourcesHandler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.neo4j.driver.internal;

import io.netty.util.concurrent.EventExecutorGroup;

import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.spi.ConnectionProvider;
import org.neo4j.driver.v1.AccessMode;
Expand All @@ -30,9 +32,9 @@ class LeakLoggingNetworkSession extends NetworkSession
private final String stackTrace;

LeakLoggingNetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic,
Logging logging )
EventExecutorGroup eventExecutorGroup, Logging logging )
{
super( connectionProvider, mode, retryLogic, logging );
super( connectionProvider, mode, retryLogic, eventExecutorGroup, logging );
this.stackTrace = captureStackTrace();
}

Expand Down
Loading

0 comments on commit 192d33b

Please sign in to comment.