Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements for StatementResultCursor API #405

Merged
merged 8 commits into from
Sep 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 29 additions & 17 deletions driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.neo4j.driver.internal;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.EventExecutorGroup;

import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -72,14 +74,18 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
RoutingSettings newRoutingSettings = routingSettings.withRoutingContext( new RoutingContext( uri ) );
SecurityPlan securityPlan = createSecurityPlan( address, config );
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config );
RetryLogic retryLogic = createRetryLogic( retrySettings, config.logging() );

AsyncConnectionPool asyncConnectionPool = createAsyncConnectionPool( authToken, securityPlan, config );
Bootstrap bootstrap = BootstrapFactory.newBootstrap();
EventLoopGroup eventLoopGroup = bootstrap.config().group();
RetryLogic retryLogic = createRetryLogic( retrySettings, eventLoopGroup, config.logging() );

AsyncConnectionPool asyncConnectionPool = createAsyncConnectionPool( authToken, securityPlan, bootstrap,
config );

try
{
return createDriver( uri, address, connectionPool, config, newRoutingSettings, securityPlan, retryLogic,
asyncConnectionPool );
asyncConnectionPool, eventLoopGroup );
}
catch ( Throwable driverError )
{
Expand All @@ -98,14 +104,13 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
}

private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, SecurityPlan securityPlan,
Config config )
Bootstrap bootstrap, Config config )
{
Clock clock = createClock();
ConnectionSettings connectionSettings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
ActiveChannelTracker activeChannelTracker = new ActiveChannelTracker( config.logging() );
AsyncConnectorImpl connector = new AsyncConnectorImpl( connectionSettings, securityPlan,
activeChannelTracker, config.logging(), clock );
Bootstrap bootstrap = BootstrapFactory.newBootstrap();
PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize(),
config.idleTimeBeforeConnectionTest(), config.maxConnectionLifetimeMillis(),
config.maxConnectionPoolSize(),
Expand All @@ -116,16 +121,18 @@ private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, Secu

private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool,
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan,
RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool )
RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool, EventExecutorGroup eventExecutorGroup )
{
String scheme = uri.getScheme().toLowerCase();
switch ( scheme )
{
case BOLT_URI_SCHEME:
assertNoRoutingContext( uri, routingSettings );
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool );
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool,
eventExecutorGroup );
case BOLT_ROUTING_URI_SCHEME:
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic );
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic,
eventExecutorGroup );
default:
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
}
Expand All @@ -137,11 +144,13 @@ private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool
* <b>This method is protected only for testing</b>
*/
protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool )
SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool,
EventExecutorGroup eventExecutorGroup )
{
ConnectionProvider connectionProvider =
new DirectConnectionProvider( address, connectionPool, asyncConnectionPool );
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
SessionFactory sessionFactory =
createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config );
return createDriver( config, securityPlan, sessionFactory );
}

Expand All @@ -151,14 +160,16 @@ protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool c
* <b>This method is protected only for testing</b>
*/
protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic )
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic,
EventExecutorGroup eventExecutorGroup )
{
if ( !securityPlan.isRoutingCompatible() )
{
throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" );
}
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, config, routingSettings );
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
SessionFactory sessionFactory =
createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config );
return createDriver( config, securityPlan, sessionFactory );
}

Expand Down Expand Up @@ -239,20 +250,21 @@ protected Connector createConnector( final ConnectionSettings connectionSettings
* <p>
* <b>This method is protected only for testing</b>
*/
protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider,
RetryLogic retryLogic, Config config )
protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider, RetryLogic retryLogic,
EventExecutorGroup eventExecutorGroup, Config config )
{
return new SessionFactoryImpl( connectionProvider, retryLogic, config );
return new SessionFactoryImpl( connectionProvider, retryLogic, eventExecutorGroup, config );
}

/**
* Creates new {@link RetryLogic >}.
* <p>
* <b>This method is protected only for testing</b>
*/
protected RetryLogic createRetryLogic( RetrySettings settings, Logging logging )
protected RetryLogic createRetryLogic( RetrySettings settings, EventExecutorGroup eventExecutorGroup,
Logging logging )
{
return new ExponentialBackoffRetryLogic( settings, createClock(), logging );
return new ExponentialBackoffRetryLogic( settings, eventExecutorGroup, createClock(), logging );
}

private static SecurityPlan createSecurityPlan( BoltServerAddress address, Config config )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.neo4j.driver.internal.handlers.RollbackTxResponseHandler;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.types.InternalTypeSystem;
import org.neo4j.driver.internal.util.BiConsumer;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Response;
import org.neo4j.driver.v1.Statement;
Expand Down Expand Up @@ -219,7 +220,7 @@ public Response<Void> commitAsync()
return internalCommitAsync();
}

private InternalFuture<Void> internalCommitAsync()
InternalFuture<Void> internalCommitAsync()
{
if ( state == State.COMMITTED )
{
Expand Down Expand Up @@ -259,12 +260,12 @@ else if ( state == State.ROLLED_BACK )
}
}

private Runnable releaseConnectionAndNotifySession()
private BiConsumer<Void,Throwable> releaseConnectionAndNotifySession()
{
return new Runnable()
return new BiConsumer<Void,Throwable>()
{
@Override
public void run()
public void accept( Void result, Throwable error )
{
asyncConnection.release();
session.asyncTransactionClosed( ExplicitTransaction.this );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class InternalStatementResult implements StatementResult
{
this.statement = statement;
this.connection = connection;
this.runResponseHandler = new RunResponseHandler( null );
this.runResponseHandler = new RunResponseHandler( null, null );
this.pullAllResponseHandler = new RecordsResponseHandler( runResponseHandler );
this.resourcesHandler = resourcesHandler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.neo4j.driver.internal;

import io.netty.util.concurrent.EventExecutorGroup;

import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.spi.ConnectionProvider;
import org.neo4j.driver.v1.AccessMode;
Expand All @@ -30,9 +32,9 @@ class LeakLoggingNetworkSession extends NetworkSession
private final String stackTrace;

LeakLoggingNetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic,
Logging logging )
EventExecutorGroup eventExecutorGroup, Logging logging )
{
super( connectionProvider, mode, retryLogic, logging );
super( connectionProvider, mode, retryLogic, eventExecutorGroup, logging );
this.stackTrace = captureStackTrace();
}

Expand Down
Loading