Skip to content

Commit

Permalink
Remove retry handling for read sessions
Browse files Browse the repository at this point in the history
The plan of automatically handle retries on read sessions didn't really pan
out since we don't really control when data is transported over the network
and errors are noticed. Instead we treat reads and writes in the same way, i.e.
throwing a `SessionExpiredException` on all connection failures.
  • Loading branch information
pontusmelke committed Sep 13, 2016
1 parent e47dcf6 commit e1bec19
Show file tree
Hide file tree
Showing 14 changed files with 228 additions and 154 deletions.
86 changes: 49 additions & 37 deletions driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.Consumer;
import org.neo4j.driver.internal.util.Supplier;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Session;
Expand Down Expand Up @@ -180,6 +179,10 @@ private void callWithRetry(String procedureName, Consumer<Record> recorder )
private synchronized void forget( BoltServerAddress address )
{
connections.purge( address );
if ( endpoints.contains( address ) )
{
endpoints.clear();
}
}

@Override
Expand All @@ -191,32 +194,17 @@ public Session session()
@Override
public Session session( final SessionMode mode )
{
switch ( mode )
return new ClusteredNetworkSession( acquireConnection( mode ), clusterSettings, new Consumer<BoltServerAddress>()
{
case READ:
return new ReadNetworkSession( new Supplier<Connection>()
@Override
public void accept( BoltServerAddress address )
{
@Override
public Connection get()
{
return acquireConnection( mode );
}
}, new Consumer<Connection>()
{
@Override
public void accept( Connection connection )
{
forget( connection.address() );
}
}, clusterSettings, log );
case WRITE:
return new WriteNetworkSession( acquireConnection( mode ), clusterSettings, log );
default:
throw new UnsupportedOperationException();
}
forget( address );
}
}, log );
}

private synchronized Connection acquireConnection( SessionMode mode )
private Connection acquireConnection( SessionMode mode )
{
if (!discoverable)
{
Expand All @@ -228,7 +216,24 @@ private synchronized Connection acquireConnection( SessionMode mode )
{
discover();
}
if ( !endpoints.valid() )
{
discoverEndpoints();
}

switch ( mode )
{
case READ:
return connections.acquire( endpoints.readServer );
case WRITE:
return connections.acquire( endpoints.writeServer );
default:
throw new ClientException( mode + " is not supported for creating new sessions" );
}
}

private synchronized void discoverEndpoints()
{
endpoints.clear();
try
{
Expand All @@ -255,7 +260,9 @@ else if ( serverMode.equals( "WRITE" ) )
{
log.warn( "Could not find procedure %s", ACQUIRE_ENDPOINTS );
discoverable = false;
return connections.acquire();
Connection connection = connections.acquire();
endpoints.readServer = connection.address();
endpoints.writeServer = connection.address();
}
throw e;
}
Expand All @@ -264,17 +271,6 @@ else if ( serverMode.equals( "WRITE" ) )
{
throw new ServiceUnavailableException("Could not establish any endpoints for the call");
}


switch ( mode )
{
case READ:
return connections.acquire( endpoints.readServer );
case WRITE:
return connections.acquire( endpoints.writeServer );
default:
throw new ClientException( mode + " is not supported for creating new sessions" );
}
}

@Override
Expand All @@ -292,8 +288,8 @@ public void close()

private static class Endpoints
{
BoltServerAddress readServer;
BoltServerAddress writeServer;
private BoltServerAddress readServer;
private BoltServerAddress writeServer;

public boolean valid()
{
Expand All @@ -305,6 +301,22 @@ public void clear()
readServer = null;
writeServer = null;
}

boolean contains(BoltServerAddress address)
{
if (readServer != null && writeServer != null)
{
return readServer.equals( address ) || writeServer.equals( address );
}
else if ( readServer != null )
{
return readServer.equals( address );
}
else
{
return writeServer != null && writeServer.equals( address );
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,16 @@

public class ClusterSettings
{
private final int readRetry;
private final int minimumNumberOfServers;

public ClusterSettings( int readRetry, int minimumNumberOfServers )
public ClusterSettings( int minimumNumberOfServers )
{
this.readRetry = readRetry;
this.minimumNumberOfServers = minimumNumberOfServers;
}

public static ClusterSettings fromConfig( Config config )
{
return new ClusterSettings( config.maximumReadRetriesForCluster(), config.minimumKnownClusterSize() ) ;
}

public int readRetry()
{
return readRetry;
return new ClusterSettings( config.minimumKnownClusterSize() ) ;
}

public int minimumNumberOfServers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,35 @@
package org.neo4j.driver.internal;


import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.Consumer;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Statement;
import org.neo4j.driver.v1.StatementResult;
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
import org.neo4j.driver.v1.exceptions.SessionExpiredException;

public class WriteNetworkSession extends NetworkSession
public class ClusteredNetworkSession extends NetworkSession
{
private final Consumer<BoltServerAddress> onFailedConnection;

WriteNetworkSession(Connection connection, ClusterSettings clusterSettings, Logger logger )
ClusteredNetworkSession( Connection connection, ClusterSettings clusterSettings, Consumer<BoltServerAddress> onFailedConnection, Logger logger )
{
super(connection, logger);
super( connection, logger );
this.onFailedConnection = onFailedConnection;
}

@Override
public StatementResult run( Statement statement )
{
try
{
return super.run( statement );
return new ClusteredStatementResult( super.run( statement ), connection.address(), onFailedConnection );
}//TODO we need to catch exceptions due to leader switches etc here
catch ( ConnectionFailureException e )
{
onFailedConnection.accept( connection.address() );
throw new SessionExpiredException( "Failed to perform write load to server", e );
}

Expand Down

This file was deleted.

27 changes: 0 additions & 27 deletions driver/src/main/java/org/neo4j/driver/v1/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import org.neo4j.driver.internal.logging.JULogging;
import org.neo4j.driver.internal.net.pooling.PoolSettings;
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
import org.neo4j.driver.v1.util.Immutable;

import static java.lang.System.getProperty;
Expand Down Expand Up @@ -64,7 +63,6 @@ public class Config
private final TrustStrategy trustStrategy;

private final int minServersInCluster;
private final int readRetries;

private Config( ConfigBuilder builder)
{
Expand All @@ -77,7 +75,6 @@ private Config( ConfigBuilder builder)
this.encryptionLevel = builder.encryptionLevel;
this.trustStrategy = builder.trustStrategy;
this.minServersInCluster = builder.minServersInCluster;
this.readRetries = builder.readRetries;
}

/**
Expand Down Expand Up @@ -134,14 +131,6 @@ public TrustStrategy trustStrategy()
return trustStrategy;
}

/**
* @return the number of retries to be attempted for read sessions
*/
public int maximumReadRetriesForCluster()
{
return readRetries;
}

/**
* @return the minimum number of servers the driver should know about.
*/
Expand Down Expand Up @@ -180,7 +169,6 @@ public static class ConfigBuilder
private TrustStrategy trustStrategy = trustOnFirstUse(
new File( getProperty( "user.home" ), ".neo4j" + File.separator + "known_hosts" ) );
public int minServersInCluster = 3;
public int readRetries = 3;

private ConfigBuilder() {}

Expand Down Expand Up @@ -285,21 +273,6 @@ public ConfigBuilder withTrustStrategy( TrustStrategy trustStrategy )
return this;
}

/**
* For read queries the driver can do automatic retries upon server failures,
*
* This setting specifies how many retries that should be attempted before giving up
* and throw a {@link ConnectionFailureException}. If not specified this setting defaults to 3 retries before
* giving up.
* @param retries The number or retries to attempt before giving up.
* @return this builder
*/
public ConfigBuilder withMaximumReadRetriesForCluster( int retries )
{
this.readRetries = retries;
return this;
}

/**
* Specifies the minimum numbers in a cluster a driver should know about.
* <p>
Expand Down
Loading

0 comments on commit e1bec19

Please sign in to comment.