diff --git a/driver/src/main/java/org/neo4j/driver/internal/BaseDriver.java b/driver/src/main/java/org/neo4j/driver/internal/BaseDriver.java index b2d82c8b3f..7db7ab24c0 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/BaseDriver.java +++ b/driver/src/main/java/org/neo4j/driver/internal/BaseDriver.java @@ -19,7 +19,11 @@ package org.neo4j.driver.internal; +import java.util.Set; + +import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.security.SecurityPlan; +import org.neo4j.driver.internal.spi.ConnectionPool; import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Logging; @@ -29,9 +33,12 @@ abstract class BaseDriver implements Driver { private final SecurityPlan securityPlan; protected final Logger log; + protected final ConnectionPool connections; - BaseDriver( SecurityPlan securityPlan, Logging logging ) + BaseDriver( ConnectionPool connections, BoltServerAddress address, SecurityPlan securityPlan, Logging logging ) { + this.connections = connections; + this.connections.add( address ); this.securityPlan = securityPlan; this.log = logging.getLog( Session.LOG_NAME ); } @@ -42,4 +49,9 @@ public boolean isEncrypted() return securityPlan.requiresEncryption(); } + //Used for testing + Set servers() + { + return connections.addresses(); + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java b/driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java new file mode 100644 index 0000000000..bc2d922fee --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java @@ -0,0 +1,310 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + +import java.util.List; + +import org.neo4j.driver.internal.net.BoltServerAddress; +import org.neo4j.driver.internal.net.pooling.PoolSettings; +import org.neo4j.driver.internal.net.pooling.SocketConnectionPool; +import org.neo4j.driver.internal.security.SecurityPlan; +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.util.Consumer; +import org.neo4j.driver.internal.util.Supplier; +import org.neo4j.driver.v1.Logging; +import org.neo4j.driver.v1.Record; +import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.SessionMode; +import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.ConnectionFailureException; +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; + +import static java.lang.String.format; + +public class ClusterDriver extends BaseDriver +{ + private static final String DISCOVER_MEMBERS = "dbms.cluster.discoverEndpointAcquisitionServers"; + private static final String ACQUIRE_ENDPOINTS = "dbms.cluster.acquireEndpoints"; + + private final Endpoints endpoints = new Endpoints(); + private final ClusterSettings clusterSettings; + private boolean discoverable = true; + + public ClusterDriver( BoltServerAddress seedAddress, ConnectionSettings connectionSettings, + ClusterSettings clusterSettings, + SecurityPlan securityPlan, + PoolSettings poolSettings, Logging logging ) + { + super( new SocketConnectionPool( connectionSettings, securityPlan, poolSettings, logging ),seedAddress, securityPlan, logging ); + this.clusterSettings = clusterSettings; + discover(); + } + + synchronized void discover() + { + if (!discoverable) + { + return; + } + + try + { + boolean success = false; + while ( !connections.isEmpty() && !success ) + { + success = call( DISCOVER_MEMBERS, new Consumer() + { + @Override + public void accept( Record record ) + { + connections.add(new BoltServerAddress( record.get( "address" ).asString() )); + } + } ); + } + if ( !success ) + { + throw new ServiceUnavailableException( "Run out of servers" ); + } + } + catch ( ClientException ex ) + { + if ( ex.code().equals( "Neo.ClientError.Procedure.ProcedureNotFound" ) ) + { + //no procedure there, not much to do, stick with what we've got + //this may happen because server is running in standalone mode + log.warn( "Could not find procedure %s", DISCOVER_MEMBERS ); + discoverable = false; + } + else + { + throw ex; + } + } + } + + //must be called from a synchronized method + private boolean call( String procedureName, Consumer recorder ) + { + Connection acquire = null; + Session session = null; + try { + acquire = connections.acquire(); + session = new NetworkSession( acquire, log ); + + StatementResult records = session.run( format( "CALL %s", procedureName ) ); + while ( records.hasNext() ) + { + recorder.accept( records.next() ); + } + } + catch ( ConnectionFailureException e ) + { + if (acquire != null) + { + forget( acquire.address() ); + } + return false; + } + finally + { + if (acquire != null) + { + acquire.close(); + } + if (session != null) + { + session.close(); + } + } + return true; + } + + //must be called from a synchronized method + private void callWithRetry(String procedureName, Consumer recorder ) + { + while ( !connections.isEmpty() ) + { + Connection acquire = null; + Session session = null; + try { + acquire = connections.acquire(); + session = new NetworkSession( acquire, log ); + List list = session.run( format( "CALL %s", procedureName ) ).list(); + for ( Record record : list ) + { + recorder.accept( record ); + } + //we found results give up + return; + } + catch ( ConnectionFailureException e ) + { + if (acquire != null) + { + forget( acquire.address() ); + } + } + finally + { + if (acquire != null) + { + acquire.close(); + } + if (session != null) + { + session.close(); + } + } + } + + throw new ServiceUnavailableException( "Failed to communicate with any of the cluster members" ); + } + + private synchronized void forget( BoltServerAddress address ) + { + connections.purge( address ); + } + + @Override + public Session session() + { + return session( SessionMode.WRITE ); + } + + @Override + public Session session( final SessionMode mode ) + { + switch ( mode ) + { + case READ: + return new ReadNetworkSession( new Supplier() + { + @Override + public Connection get() + { + return acquireConnection( mode ); + } + }, new Consumer() + { + @Override + public void accept( Connection connection ) + { + forget( connection.address() ); + } + }, clusterSettings, log ); + case WRITE: + return new WriteNetworkSession( acquireConnection( mode ), clusterSettings, log ); + default: + throw new UnsupportedOperationException(); + } + } + + private synchronized Connection acquireConnection( SessionMode mode ) + { + if (!discoverable) + { + return connections.acquire(); + } + + //if we are short on servers, find new ones + if ( connections.addressCount() < clusterSettings.minimumNumberOfServers() ) + { + discover(); + } + + endpoints.clear(); + try + { + callWithRetry( ACQUIRE_ENDPOINTS, new Consumer() + { + @Override + public void accept( Record record ) + { + String serverMode = record.get( "role" ).asString(); + if ( serverMode.equals( "READ" ) ) + { + endpoints.readServer = new BoltServerAddress( record.get( "address" ).asString() ); + } + else if ( serverMode.equals( "WRITE" ) ) + { + endpoints.writeServer = new BoltServerAddress( record.get( "address" ).asString() ); + } + } + } ); + } + catch (ClientException e) + { + if ( e.code().equals( "Neo.ClientError.Procedure.ProcedureNotFound" ) ) + { + log.warn( "Could not find procedure %s", ACQUIRE_ENDPOINTS ); + discoverable = false; + return connections.acquire(); + } + throw e; + } + + if ( !endpoints.valid() ) + { + throw new ServiceUnavailableException("Could not establish any endpoints for the call"); + } + + + switch ( mode ) + { + case READ: + return connections.acquire( endpoints.readServer ); + case WRITE: + return connections.acquire( endpoints.writeServer ); + default: + throw new ClientException( mode + " is not supported for creating new sessions" ); + } + } + + @Override + public void close() + { + try + { + connections.close(); + } + catch ( Exception ex ) + { + log.error( format( "~~ [ERROR] %s", ex.getMessage() ), ex ); + } + } + + private static class Endpoints + { + BoltServerAddress readServer; + BoltServerAddress writeServer; + + public boolean valid() + { + return readServer != null && writeServer != null; + } + + public void clear() + { + readServer = null; + writeServer = null; + } + } + +} \ No newline at end of file diff --git a/driver/src/main/java/org/neo4j/driver/internal/ClusterSettings.java b/driver/src/main/java/org/neo4j/driver/internal/ClusterSettings.java new file mode 100644 index 0000000000..d3c24254c0 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/ClusterSettings.java @@ -0,0 +1,49 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.neo4j.driver.internal; + +import org.neo4j.driver.v1.Config; + +public class ClusterSettings +{ + private final int readRetry; + private final int minimumNumberOfServers; + + public ClusterSettings( int readRetry, int minimumNumberOfServers ) + { + this.readRetry = readRetry; + this.minimumNumberOfServers = minimumNumberOfServers; + } + + public static ClusterSettings fromConfig( Config config ) + { + return new ClusterSettings( config.maximumReadRetriesForCluster(), config.minimumKnownClusterSize() ) ; + } + + public int readRetry() + { + return readRetry; + } + + public int minimumNumberOfServers() + { + return minimumNumberOfServers; + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/DirectDriver.java b/driver/src/main/java/org/neo4j/driver/internal/DirectDriver.java index 5c6c24dbdf..8f38a3cf05 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DirectDriver.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DirectDriver.java @@ -23,29 +23,30 @@ import org.neo4j.driver.internal.net.pooling.PoolSettings; import org.neo4j.driver.internal.net.pooling.SocketConnectionPool; import org.neo4j.driver.internal.security.SecurityPlan; -import org.neo4j.driver.internal.spi.ConnectionPool; import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.SessionMode; import static java.lang.String.format; public class DirectDriver extends BaseDriver { - private final BoltServerAddress address; - private final ConnectionPool connections; - public DirectDriver( BoltServerAddress address, ConnectionSettings connectionSettings, SecurityPlan securityPlan, PoolSettings poolSettings, Logging logging ) { - super( securityPlan, logging ); - this.address = address; - this.connections = new SocketConnectionPool( connectionSettings, securityPlan, poolSettings, logging ); + super( new SocketConnectionPool( connectionSettings, securityPlan, poolSettings, logging ), address, securityPlan, logging ); } @Override public Session session() { - return new NetworkSession( connections.acquire( address ), log ); + return new NetworkSession( connections.acquire(), log ); + } + + @Override + public Session session( SessionMode ignore ) + { + return session(); } @Override @@ -60,5 +61,4 @@ public void close() log.error( format( "~~ [ERROR] %s", ex.getMessage() ), ex ); } } - } diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index fa636ccb5f..927091ad6d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -38,7 +38,7 @@ public class NetworkSession implements Session { - private final Connection connection; + protected Connection connection; private final Logger logger; private String lastBookmark = null; diff --git a/driver/src/main/java/org/neo4j/driver/internal/ReadNetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/ReadNetworkSession.java new file mode 100644 index 0000000000..65a353457f --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/ReadNetworkSession.java @@ -0,0 +1,64 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + + +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.util.Consumer; +import org.neo4j.driver.internal.util.Supplier; +import org.neo4j.driver.v1.Logger; +import org.neo4j.driver.v1.Statement; +import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.exceptions.ConnectionFailureException; +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; + +public class ReadNetworkSession extends NetworkSession +{ + private final Supplier connectionSupplier; + private final Consumer failed; + private final ClusterSettings clusterSettings; + + ReadNetworkSession(Supplier connectionSupplier, Consumer failed, + ClusterSettings clusterSettings, Logger logger ) + { + super(connectionSupplier.get(), logger); + this.connectionSupplier = connectionSupplier; + this.clusterSettings = clusterSettings; + this.failed = failed; + } + + @Override + public StatementResult run( Statement statement ) + { + for ( int i = 0; i < clusterSettings.readRetry(); i++ ) + { + try + { + return super.run( statement ); + } + catch ( ConnectionFailureException e ) + { + failed.accept(connection); + connection = connectionSupplier.get(); + } + } + + throw new ServiceUnavailableException( "Not able to connect to any members of the cluster" ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/WriteNetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/WriteNetworkSession.java new file mode 100644 index 0000000000..a7a5a63487 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/WriteNetworkSession.java @@ -0,0 +1,50 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + + +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.v1.Logger; +import org.neo4j.driver.v1.Statement; +import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.exceptions.ConnectionFailureException; +import org.neo4j.driver.v1.exceptions.SessionExpiredException; + +public class WriteNetworkSession extends NetworkSession +{ + + WriteNetworkSession(Connection connection, ClusterSettings clusterSettings, Logger logger ) + { + super(connection, logger); + } + + @Override + public StatementResult run( Statement statement ) + { + try + { + return super.run( statement ); + }//TODO we need to catch exceptions due to leader switches etc here + catch ( ConnectionFailureException e ) + { + throw new SessionExpiredException( "Failed to perform write load to server", e ); + } + + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/BoltServerAddress.java b/driver/src/main/java/org/neo4j/driver/internal/net/BoltServerAddress.java index fad7a53171..e5f88029b1 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/BoltServerAddress.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/BoltServerAddress.java @@ -19,7 +19,11 @@ package org.neo4j.driver.internal.net; -import java.net.*; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.URI; +import java.net.UnknownHostException; import static java.lang.String.format; @@ -151,5 +155,4 @@ public boolean isLocal() return false; } } - } diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/BufferingChunkedInput.java b/driver/src/main/java/org/neo4j/driver/internal/net/BufferingChunkedInput.java index 35ec382119..49aedc2bd0 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/BufferingChunkedInput.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/BufferingChunkedInput.java @@ -27,6 +27,7 @@ import org.neo4j.driver.internal.packstream.PackInput; import org.neo4j.driver.internal.util.BytePrinter; import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.ConnectionFailureException; import static java.lang.Math.min; @@ -408,15 +409,15 @@ private static void readNextPacket( ReadableByteChannel channel, ByteBuffer buff int read = channel.read( buffer ); if ( read == -1 ) { - throw new ClientException( + throw new ConnectionFailureException( "Connection terminated while receiving data. This can happen due to network " + - "instabilities, or due to restarts of the database." ); + "instabilities, or due to restarts of the database."); } buffer.flip(); } catch ( ClosedByInterruptException e ) { - throw new ClientException( + throw new ConnectionFailureException( "Connection to the database was lost because someone called `interrupt()` on the driver " + "thread waiting for a reply. " + "This normally happens because the JVM is shutting down, but it can also happen because your " + diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java b/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java index 6e162c56b7..8d1a568d68 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java @@ -21,8 +21,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.Collector; +import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.exceptions.ClientException; @@ -234,4 +234,10 @@ public String server() { return delegate.server(); } + + @Override + public BoltServerAddress address() + { + return delegate.address(); + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/SocketClient.java b/driver/src/main/java/org/neo4j/driver/internal/net/SocketClient.java index b963aa9e6a..f07aa271d5 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/SocketClient.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/SocketClient.java @@ -34,6 +34,7 @@ import org.neo4j.driver.internal.util.BytePrinter; import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.ConnectionFailureException; import static java.lang.String.format; import static java.nio.ByteOrder.BIG_ENDIAN; @@ -76,7 +77,7 @@ void blockingRead( ByteBuffer buf ) throws IOException if (channel.read( buf ) < 0) { String bufStr = BytePrinter.hex( buf ).trim(); - throw new ClientException( format( + throw new ConnectionFailureException( format( "Connection terminated while receiving data. This can happen due to network " + "instabilities, or due to restarts of the database. Expected %s bytes, received %s.", buf.limit(), bufStr.isEmpty() ? "none" : bufStr ) ); @@ -91,7 +92,7 @@ void blockingWrite( ByteBuffer buf ) throws IOException if (channel.write( buf ) < 0) { String bufStr = BytePrinter.hex( buf ).trim(); - throw new ClientException( format( + throw new ConnectionFailureException( format( "Connection terminated while sending data. This can happen due to network " + "instabilities, or due to restarts of the database. Expected %s bytes, wrote %s.", buf.limit(), bufStr.isEmpty() ? "none" :bufStr ) ); @@ -111,7 +112,7 @@ public void start() } catch ( ConnectException e ) { - throw new ClientException( format( + throw new ConnectionFailureException( format( "Unable to connect to %s, ensure the database is running and that there is a " + "working network connection to it.", address ) ); } @@ -182,6 +183,7 @@ public void stop() } catch ( IOException e ) { + //noinspection StatementWithEmptyBody if ( e.getMessage().equals( "An existing connection was forcibly closed by the remote host" ) ) { // Swallow this exception as it is caused by connection already closed by server @@ -291,4 +293,9 @@ public static ByteChannel create( BoltServerAddress address, SecurityPlan securi return channel; } } + + public BoltServerAddress address() + { + return address; + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java b/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java index 27d451f17c..43de6f0ab5 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java @@ -29,8 +29,8 @@ import org.neo4j.driver.internal.messaging.Message; import org.neo4j.driver.internal.messaging.RunMessage; import org.neo4j.driver.internal.security.SecurityPlan; -import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.Collector; +import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Value; @@ -238,4 +238,10 @@ public String server() { return initCollector.server( ); } + + @Override + public BoltServerAddress address() + { + return this.socket.address(); + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java index 3b22643a2b..4542c07ec5 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java @@ -20,8 +20,9 @@ import java.util.Map; -import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.spi.Collector; +import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.Consumer; import org.neo4j.driver.v1.Value; @@ -236,6 +237,12 @@ public String server() return delegate.server(); } + @Override + public BoltServerAddress address() + { + return delegate.address(); + } + public void dispose() { delegate.close(); diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java index 9c62ebe728..a1e18a7eba 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java @@ -18,9 +18,11 @@ */ package org.neo4j.driver.internal.net.pooling; +import java.util.Comparator; import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -57,7 +59,22 @@ public class SocketConnectionPool implements ConnectionPool /** * Pools, organized by server address. */ - private final ConcurrentHashMap> pools = new ConcurrentHashMap<>(); + private final ConcurrentSkipListMap> pools = new ConcurrentSkipListMap<>( + + new Comparator() + { + @Override + public int compare( BoltServerAddress o1, BoltServerAddress o2 ) + { + int compare = o1.host().compareTo( o2.host() ); + if (compare == 0) + { + compare = Integer.compare( o1.port(), o2.port() ); + } + + return compare; + } + } ); private final Clock clock = Clock.SYSTEM; @@ -66,6 +83,8 @@ public class SocketConnectionPool implements ConnectionPool private final PoolSettings poolSettings; private final Logging logging; + private BoltServerAddress current = null; + /** Shutdown flag */ private final AtomicBoolean stopped = new AtomicBoolean( false ); @@ -120,6 +139,55 @@ public Connection acquire( BoltServerAddress address ) return conn; } + @Override + public Connection acquire() + { + if ( current == null ) + { + current = pools.firstKey(); + } + else + { + current = pools.higherKey( current ); + //We've gone through all connections, start over + if (current == null) + { + current = pools.firstKey(); + } + } + + if ( current == null ) + { + throw new IllegalStateException( "Cannot acquire connection from an empty pool" ); + } + + return acquire( current ); + } + + @Override + public boolean isEmpty() + { + return pools.isEmpty(); + } + + @Override + public int addressCount() + { + return pools.size(); + } + + @Override + public void add( BoltServerAddress address ) + { + pools.putIfAbsent( address, new LinkedBlockingQueue( ) ); + } + + @Override + public Set addresses() + { + return pools.keySet(); + } + private BlockingQueue pool( BoltServerAddress address ) { BlockingQueue pool = pools.get( address ); @@ -135,6 +203,24 @@ private BlockingQueue pool( BoltServerAddress address ) return pool; } + @Override + public void purge( BoltServerAddress address ) + { + BlockingQueue connections = pools.remove( address ); + if ( connections == null ) + { + return; + } + while (!connections.isEmpty()) + { + PooledConnection connection = connections.poll(); + if ( connection != null) + { + connection.dispose(); + } + } + } + @Override public void close() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java index 616f2289b9..ed98953ce5 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java @@ -20,6 +20,7 @@ import java.util.Map; +import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.v1.Value; /** @@ -123,4 +124,9 @@ public interface Connection extends AutoCloseable * @return The version of the server connected to. */ String server(); + + /** + * Returns the BoltServerAddress connected to + */ + BoltServerAddress address(); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java b/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java index b6ca104614..4172ed1318 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java @@ -19,6 +19,8 @@ package org.neo4j.driver.internal.spi; +import java.util.Set; + import org.neo4j.driver.internal.net.BoltServerAddress; public interface ConnectionPool extends AutoCloseable @@ -27,8 +29,44 @@ public interface ConnectionPool extends AutoCloseable * Acquire a connection - if a live connection exists in the pool, it will * be used, otherwise a new connection will be created. * - * @param address + * @param address The address to acquire */ Connection acquire( BoltServerAddress address ); + /** + * Acquire a connection to one of the addresses that are currently in the pool + * @return A connection to one of the addresses in the pool + * @throws IllegalStateException if the pool is empty + */ + Connection acquire(); + + /** + * Removes all connections to a given address from the pool. + * @param address The address to remove. + */ + void purge( BoltServerAddress address ); + + /** + * Checks if the connection pool is empty + * @return true if the pool is empty, otherwise false + */ + boolean isEmpty(); + + /** + * Returns the number of addresses stored in the pool. + * @return the current number of addresses stored in the pool + */ + int addressCount(); + + /** + * Adds an address to the pool. + * @param address the address to add + */ + void add( BoltServerAddress address ); + + /** + * Returns all addresses known by the pool + * @return the addresses known by the pool + */ + Set addresses(); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/Supplier.java b/driver/src/main/java/org/neo4j/driver/internal/util/Supplier.java new file mode 100644 index 0000000000..89dace5efe --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/util/Supplier.java @@ -0,0 +1,23 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.util; + +public interface Supplier { + T get(); +} diff --git a/driver/src/main/java/org/neo4j/driver/v1/Config.java b/driver/src/main/java/org/neo4j/driver/v1/Config.java index 3ad6906a3f..d8bb9a4313 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Config.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Config.java @@ -21,9 +21,9 @@ import java.io.File; import java.util.logging.Level; -import org.neo4j.driver.internal.logging.ConsoleLogging; import org.neo4j.driver.internal.logging.JULogging; import org.neo4j.driver.internal.net.pooling.PoolSettings; +import org.neo4j.driver.v1.exceptions.ConnectionFailureException; import org.neo4j.driver.v1.util.Immutable; import static java.lang.System.getProperty; @@ -63,7 +63,10 @@ public class Config /** Strategy for how to trust encryption certificate */ private final TrustStrategy trustStrategy; - private Config( ConfigBuilder builder ) + private final int minServersInCluster; + private final int readRetries; + + private Config( ConfigBuilder builder) { this.logging = builder.logging; @@ -73,6 +76,8 @@ private Config( ConfigBuilder builder ) this.encryptionLevel = builder.encryptionLevel; this.trustStrategy = builder.trustStrategy; + this.minServersInCluster = builder.minServersInCluster; + this.readRetries = builder.readRetries; } /** @@ -129,6 +134,22 @@ public TrustStrategy trustStrategy() return trustStrategy; } + /** + * @return the number of retries to be attempted for read sessions + */ + public int maximumReadRetriesForCluster() + { + return readRetries; + } + + /** + * @return the minimum number of servers the driver should know about. + */ + public int minimumKnownClusterSize() + { + return minServersInCluster; + } + /** * Return a {@link ConfigBuilder} instance * @return a {@link ConfigBuilder} instance @@ -158,6 +179,8 @@ public static class ConfigBuilder private EncryptionLevel encryptionLevel = EncryptionLevel.REQUIRED_NON_LOCAL; private TrustStrategy trustStrategy = trustOnFirstUse( new File( getProperty( "user.home" ), ".neo4j" + File.separator + "known_hosts" ) ); + public int minServersInCluster = 3; + public int readRetries = 3; private ConfigBuilder() {} @@ -262,6 +285,37 @@ public ConfigBuilder withTrustStrategy( TrustStrategy trustStrategy ) return this; } + /** + * For read queries the driver can do automatic retries upon server failures, + * + * This setting specifies how many retries that should be attempted before giving up + * and throw a {@link ConnectionFailureException}. If not specified this setting defaults to 3 retries before + * giving up. + * @param retries The number or retries to attempt before giving up. + * @return this builder + */ + public ConfigBuilder withMaximumReadRetriesForCluster( int retries ) + { + this.readRetries = retries; + return this; + } + + /** + * Specifies the minimum numbers in a cluster a driver should know about. + *

+ * Once the number of servers drops below this threshold, the driver will automatically trigger a discovery + * event + * asking the servers for more members. + * + * @param minNumberOfServers the minimum number of servers the driver should know about + * @return this builder + */ + public ConfigBuilder withMinimumKnownClusterSize( int minNumberOfServers ) + { + this.minServersInCluster = minNumberOfServers; + return this; + } + /** * Create a config instance from this builder. * @return a {@link Config} instance diff --git a/driver/src/main/java/org/neo4j/driver/v1/Driver.java b/driver/src/main/java/org/neo4j/driver/v1/Driver.java index 058aca817d..498112467c 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Driver.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Driver.java @@ -16,6 +16,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.neo4j.driver.v1; import java.net.URI; @@ -85,6 +86,8 @@ public interface Driver extends AutoCloseable */ Session session(); + Session session(SessionMode mode); + /** * Close all the resources assigned to this driver */ diff --git a/driver/src/main/java/org/neo4j/driver/v1/GraphDatabase.java b/driver/src/main/java/org/neo4j/driver/v1/GraphDatabase.java index 08df3a6a1c..e5eddcfab6 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/GraphDatabase.java +++ b/driver/src/main/java/org/neo4j/driver/v1/GraphDatabase.java @@ -18,17 +18,19 @@ */ package org.neo4j.driver.v1; -import org.neo4j.driver.internal.DirectDriver; +import java.io.IOException; +import java.net.URI; +import java.security.GeneralSecurityException; + +import org.neo4j.driver.internal.ClusterDriver; +import org.neo4j.driver.internal.ClusterSettings; import org.neo4j.driver.internal.ConnectionSettings; +import org.neo4j.driver.internal.DirectDriver; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.net.pooling.PoolSettings; import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.v1.exceptions.ClientException; -import java.io.IOException; -import java.net.URI; -import java.security.GeneralSecurityException; - import static java.lang.String.format; import static org.neo4j.driver.internal.security.SecurityPlan.insecure; import static org.neo4j.driver.v1.Config.EncryptionLevel.REQUIRED; @@ -165,10 +167,12 @@ public static Driver driver( URI uri, AuthToken authToken, Config config ) config.idleTimeBeforeConnectionTest() ); // And finally, construct the driver proper - switch ( scheme ) + switch ( scheme.toLowerCase() ) { case "bolt": return new DirectDriver( address, connectionSettings, securityPlan, poolSettings, config.logging() ); + case "bolt+discovery": + return new ClusterDriver( address, connectionSettings, ClusterSettings.fromConfig( config ), securityPlan, poolSettings, config.logging() ); default: throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) ); } diff --git a/driver/src/main/java/org/neo4j/driver/v1/Session.java b/driver/src/main/java/org/neo4j/driver/v1/Session.java index a487f15ffc..5b0bc1d85e 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Session.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Session.java @@ -85,7 +85,7 @@ public interface Session extends Resource, StatementRunner * or if this transaction was rolled back, the bookmark value will * be null. * - * @return a reference to a previous transaction + * @return a reference to a previous transac'tion */ String lastBookmark(); diff --git a/driver/src/main/java/org/neo4j/driver/v1/SessionMode.java b/driver/src/main/java/org/neo4j/driver/v1/SessionMode.java new file mode 100644 index 0000000000..09f177235b --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/v1/SessionMode.java @@ -0,0 +1,25 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.v1; + +public enum SessionMode +{ + READ, + WRITE +} diff --git a/driver/src/main/java/org/neo4j/driver/v1/exceptions/ConnectionFailureException.java b/driver/src/main/java/org/neo4j/driver/v1/exceptions/ConnectionFailureException.java new file mode 100644 index 0000000000..1b7ea46ddb --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/v1/exceptions/ConnectionFailureException.java @@ -0,0 +1,32 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.v1.exceptions; + +/** + * A ConnectionFailureException indicates that there is a problem within the underlying connection, probably + * been terminated. + * @since 1.1 + */ +public class ConnectionFailureException extends Neo4jException +{ + public ConnectionFailureException( String message ) + { + super( message ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/v1/exceptions/ServiceUnavailableException.java b/driver/src/main/java/org/neo4j/driver/v1/exceptions/ServiceUnavailableException.java new file mode 100644 index 0000000000..5172413942 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/v1/exceptions/ServiceUnavailableException.java @@ -0,0 +1,31 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.v1.exceptions; + +/** + * An ServiceUnavailableException indicates that the driver cannot communicate with the cluster. + * @since 1.1 + */ +public class ServiceUnavailableException extends Neo4jException +{ + public ServiceUnavailableException( String message ) + { + super( message ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/v1/exceptions/SessionExpiredException.java b/driver/src/main/java/org/neo4j/driver/v1/exceptions/SessionExpiredException.java new file mode 100644 index 0000000000..84ba94eca9 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/v1/exceptions/SessionExpiredException.java @@ -0,0 +1,33 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.v1.exceptions; + +/** + * A SessionExpiredException indicates that the session can no longer satisfy the criteria under which it + * was acquired, e.g. a server no longer accepts write requests. A new session needs to be acquired from the driver + * and all actions taken on the expired session must be replayed. + * @since 1.1 + */ +public class SessionExpiredException extends Neo4jException +{ + public SessionExpiredException( String message, Throwable throwable ) + { + super( message, throwable ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/ClusterDriverTest.java b/driver/src/test/java/org/neo4j/driver/internal/ClusterDriverTest.java new file mode 100644 index 0000000000..b3379d8bd8 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/ClusterDriverTest.java @@ -0,0 +1,113 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.neo4j.driver.internal; + +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.net.URI; +import java.util.Set; +import java.util.logging.Level; + +import org.neo4j.driver.internal.logging.ConsoleLogging; +import org.neo4j.driver.internal.net.BoltServerAddress; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.GraphDatabase; +import org.neo4j.driver.v1.util.StubServer; + +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertThat; + +public class ClusterDriverTest +{ + + @Rule + public ExpectedException exception = ExpectedException.none(); + + private static final Config config = Config.build().withLogging( new ConsoleLogging( Level.INFO ) ).toConfig(); + + @Ignore + public void shouldDiscoverServers() throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( "../driver/src/test/resources/discover_servers.script", 9001 ); + URI uri = URI.create( "bolt+discovery://127.0.0.1:9001" ); + + // When + try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ) ) + { + // Then + Set addresses = driver.servers(); + assertThat( addresses, hasSize( 3 ) ); + assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) ) ); + assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9002 ) ) ); + assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9003 ) ) ); + } + + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + } + + @Ignore + public void shouldDiscoverNewServers() throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( "../driver/src/test/resources/discover_new_servers.script", 9001 ); + URI uri = URI.create( "bolt+discovery://127.0.0.1:9001" ); + + // When + try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ) ) + { + // Then + Set addresses = driver.servers(); + assertThat( addresses, hasSize( 4 ) ); + assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) ) ); + assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9002 ) ) ); + assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9003 ) ) ); + assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9004 ) ) ); + } + + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + } + + @Ignore + public void shouldHandleEmptyResponse() throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( "../driver/src/test/resources/handle_empty_response.script", 9001 ); + URI uri = URI.create( "bolt+discovery://127.0.0.1:9001" ); + try (ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config )) + { + Set servers = driver.servers(); + assertThat(servers, hasSize( 1 )); + assertThat(servers, hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) )); + } + + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + } + + +} \ No newline at end of file diff --git a/driver/src/test/java/org/neo4j/driver/internal/DirectDriverTest.java b/driver/src/test/java/org/neo4j/driver/internal/DirectDriverTest.java new file mode 100644 index 0000000000..731bb5c228 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/DirectDriverTest.java @@ -0,0 +1,103 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.neo4j.driver.internal; + +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.net.URI; +import java.util.Collection; + +import org.neo4j.driver.internal.net.BoltServerAddress; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; +import org.neo4j.driver.v1.Record; +import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.util.StubServer; + +import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertThat; +import static org.neo4j.driver.v1.Values.parameters; + +public class DirectDriverTest +{ + @Test + public void shouldUseDefaultPortIfMissing() + { + // Given + URI uri = URI.create( "bolt://localhost" ); + + // When + DirectDriver driver = (DirectDriver) GraphDatabase.driver( uri ); + + // Then + Collection addresses = driver.servers(); + assertThat( addresses.size(), equalTo( 1 ) ); + for ( BoltServerAddress address : addresses ) + { + assertThat( address.port(), equalTo( BoltServerAddress.DEFAULT_PORT ) ); + } + + } + + @Test + public void shouldRegisterSingleServer() + { + // Given + URI uri = URI.create( "bolt://localhost:7687" ); + BoltServerAddress address = BoltServerAddress.from( uri ); + + // When + DirectDriver driver = (DirectDriver) GraphDatabase.driver( uri ); + + // Then + Collection addresses = driver.servers(); + assertThat( addresses.size(), equalTo( 1 ) ); + assertThat( addresses.contains( address ), equalTo( true ) ); + + } + + @Ignore + public void shouldBeAbleRunCypher() throws StubServer.ForceKilled, InterruptedException, IOException + { + // Given + StubServer server = StubServer.start( "../driver/src/test/resources/return_x.script" ); + URI uri = URI.create( "bolt://localhost:7687" ); + int x; + + // When + try ( Driver driver = GraphDatabase.driver( uri ) ) + { + try ( Session session = driver.session() ) + { + Record record = session.run( "RETURN {x}", parameters( "x", 1 ) ).single(); + x = record.get( 0 ).asInt(); + } + } + + // Then + assertThat( x, equalTo( 1 ) ); + + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + } + +} \ No newline at end of file diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/BufferingChunkedInputTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/BufferingChunkedInputTest.java index 65027eb97e..f80cfa8b01 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/BufferingChunkedInputTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/BufferingChunkedInputTest.java @@ -31,6 +31,7 @@ import java.util.Arrays; import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.ConnectionFailureException; import org.neo4j.driver.v1.util.RecordingByteChannel; import static org.hamcrest.CoreMatchers.equalTo; @@ -484,7 +485,7 @@ public void shouldFailNicelyOnClosedConnections() throws IOException BufferingChunkedInput input = new BufferingChunkedInput( channel ); //Expect - exception.expect( ClientException.class ); + exception.expect( ConnectionFailureException.class ); exception.expectMessage( "Connection terminated while receiving data. This can happen due to network " + "instabilities, or due to restarts of the database." ); // When diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/SocketClientTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/SocketClientTest.java index 6628308e77..94897ac3db 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/SocketClientTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/SocketClientTest.java @@ -18,6 +18,11 @@ */ package org.neo4j.driver.internal.net; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + import java.io.IOException; import java.net.ServerSocket; import java.nio.ByteBuffer; @@ -25,14 +30,10 @@ import java.util.ArrayList; import java.util.List; -import org.junit.Ignore; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - import org.neo4j.driver.internal.logging.DevNullLogger; import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.ConnectionFailureException; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -100,7 +101,7 @@ public void shouldFailIfConnectionFailsWhileReading() throws IOException SocketClient client = dummyClient(); //Expect - exception.expect( ClientException.class ); + exception.expect( ConnectionFailureException.class ); exception.expectMessage( "Expected 4 bytes, received none" ); // When @@ -138,7 +139,7 @@ public void shouldFailIfConnectionFailsWhileWriting() throws IOException SocketClient client = dummyClient(); //Expect - exception.expect( ClientException.class ); + exception.expect( ConnectionFailureException.class ); exception.expectMessage( "Expected 4 bytes, wrote 00" ); // When diff --git a/driver/src/test/java/org/neo4j/driver/v1/DriverDocIT.java b/driver/src/test/java/org/neo4j/driver/v1/DriverDocIT.java index 9cc6b88a61..44638c81d3 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/DriverDocIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/DriverDocIT.java @@ -20,39 +20,35 @@ import javadoctest.DocSnippet; import javadoctest.DocTestRunner; -import org.junit.Rule; +import org.junit.Assert; +import org.junit.Ignore; import org.junit.runner.RunWith; +import java.io.IOException; import java.util.LinkedList; import java.util.List; -import org.neo4j.driver.v1.util.TestNeo4jSession; +import org.neo4j.driver.v1.util.StubServer; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertEquals; +import static org.hamcrest.core.IsEqual.equalTo; @RunWith( DocTestRunner.class ) public class DriverDocIT { - @Rule - public TestNeo4jSession session = new TestNeo4jSession(); - + @Ignore /** @see Driver */ @SuppressWarnings("unchecked") - public void exampleUsage( DocSnippet snippet ) + public void exampleUsage( DocSnippet snippet ) throws IOException, InterruptedException, StubServer.ForceKilled { // given + StubServer server = StubServer.start( "../driver/src/test/resources/driver_snippet.script" ); snippet.addImport( List.class ); snippet.addImport( LinkedList.class ); - session.run( "MATCH (n) DETACH DELETE (n)" ); // when snippet.run(); - // then it should've created a bunch of data - StatementResult result = session.run( "MATCH (n) RETURN count(n)" ); - assertEquals( 3, result.single().get( 0 ).asInt() ); - assertThat( (List)snippet.get( "names" ), containsInAnyOrder( "Bob", "Alice", "Tina" ) ); + // then + Assert.assertThat( server.exitStatus(), equalTo( 0 ) ); } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java b/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java index 647386e4e4..047f2c29d9 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java +++ b/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java @@ -19,21 +19,22 @@ package org.neo4j.driver.v1; -import org.junit.Rule; +import org.junit.Ignore; import org.junit.Test; -import org.neo4j.driver.internal.DirectDriver; -import org.neo4j.driver.v1.util.TestNeo4jSession; +import java.io.IOException; import java.net.URI; +import org.neo4j.driver.internal.ClusterDriver; +import org.neo4j.driver.internal.DirectDriver; +import org.neo4j.driver.v1.util.StubServer; + +import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; -import static org.junit.Assert.*; +import static org.junit.Assert.assertThat; public class GraphDatabaseTest { - @Rule - public TestNeo4jSession session = new TestNeo4jSession(); - @Test public void boltSchemeShouldInstantiateDirectDriver() { @@ -48,4 +49,22 @@ public void boltSchemeShouldInstantiateDirectDriver() } + @Ignore + public void boltPlusDiscoverySchemeShouldInstantiateClusterDriver() throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( "../driver/src/test/resources/discover_servers.script" ); + URI uri = URI.create( "bolt+discovery://localhost:7687" ); + + // When + Driver driver = GraphDatabase.driver( uri ); + + // Then + assertThat( driver, instanceOf( ClusterDriver.class ) ); + + // Finally + driver.close(); + assertThat( server.exitStatus(), equalTo( 0 ) ); + } + } \ No newline at end of file diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/ErrorIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/ErrorIT.java index 6d48c2d704..ba0e65b591 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/ErrorIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/ErrorIT.java @@ -31,6 +31,7 @@ import org.neo4j.driver.v1.StatementResult; import org.neo4j.driver.v1.Transaction; import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.ConnectionFailureException; import org.neo4j.driver.v1.util.TestNeo4jSession; import static org.hamcrest.CoreMatchers.equalTo; @@ -63,7 +64,7 @@ public void shouldNotAllowMoreTxAfterClientException() throws Throwable Transaction tx = session.beginTransaction(); // And Given an error has occurred - try { tx.run( "invalid" ).consume(); } catch ( ClientException e ) {} + try { tx.run( "invalid" ).consume(); } catch ( ClientException e ) {/*empty*/} // Expect exception.expect( ClientException.class ); @@ -79,7 +80,7 @@ public void shouldNotAllowMoreTxAfterClientException() throws Throwable public void shouldAllowNewStatementAfterRecoverableError() throws Throwable { // Given an error has occurred - try { session.run( "invalid" ).consume(); } catch ( ClientException e ) {} + try { session.run( "invalid" ).consume(); } catch ( ClientException e ) {/*empty*/} // When StatementResult cursor = session.run( "RETURN 1" ); @@ -97,7 +98,7 @@ public void shouldAllowNewTransactionAfterRecoverableError() throws Throwable { tx.run( "invalid" ).consume(); } - catch ( ClientException e ) {} + catch ( ClientException e ) {/*empty*/} // When try ( Transaction tx = session.beginTransaction() ) @@ -114,13 +115,14 @@ public void shouldAllowNewTransactionAfterRecoverableError() throws Throwable public void shouldExplainConnectionError() throws Throwable { // Expect - exception.expect( ClientException.class ); + exception.expect( ConnectionFailureException.class ); exception.expectMessage( "Unable to connect to localhost:7777, ensure the database is running " + "and that there is a working network connection to it." ); // When + //noinspection EmptyTryBlock try ( Driver driver = GraphDatabase.driver( "bolt://localhost:7777" ); - Session session = driver.session()) {} + Session ignore = driver.session()) {/*empty*/} } @Test @@ -165,7 +167,8 @@ public void shouldGetHelpfulErrorWhenTryingToConnectToHttpPort() throws Throwabl "(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)" ); // When - try(Session session = driver.session() ){} + //noinspection EmptyTryBlock + try(Session ignore = driver.session() ){/*empty*/} } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/ServerKilledIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/ServerKilledIT.java index 02e60eec3b..24c6ee9ea1 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/ServerKilledIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/ServerKilledIT.java @@ -24,7 +24,7 @@ import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.GraphDatabase; import org.neo4j.driver.v1.Session; -import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.ConnectionFailureException; import org.neo4j.driver.v1.util.Neo4jRunner; import org.neo4j.driver.v1.util.TestNeo4j; @@ -68,7 +68,7 @@ public void shouldRecoverFromServerRestart() throws Throwable { s.run( "RETURN 'Hello, world!'" ); } - catch ( ClientException e ) + catch ( ConnectionFailureException e ) { if ( toleratedFailures-- == 0 ) { diff --git a/driver/src/test/java/org/neo4j/driver/v1/tck/ErrorReportingSteps.java b/driver/src/test/java/org/neo4j/driver/v1/tck/ErrorReportingSteps.java index 6867798909..b59b1abf37 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/tck/ErrorReportingSteps.java +++ b/driver/src/test/java/org/neo4j/driver/v1/tck/ErrorReportingSteps.java @@ -35,12 +35,13 @@ import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.Transaction; import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.ConnectionFailureException; import static junit.framework.Assert.assertNull; import static junit.framework.TestCase.assertNotNull; -import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertTrue; import static org.neo4j.driver.v1.tck.Environment.driver; public class ErrorReportingSteps @@ -176,7 +177,9 @@ public void iSetUpADriverToAnIncorrectPort() throws Throwable public void itThrowsAnClientException( List data ) throws Throwable { assertNotNull( exception ); - assertThat( exception, instanceOf( ClientException.class ) ); + //TODO tck needs update, connection failures should not be client exceptions + assertTrue( exception instanceof ConnectionFailureException || + exception instanceof ClientException ); assertThat( exception.getMessage(), startsWith( data.get( 1 ) ) ); } diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/StubServer.java b/driver/src/test/java/org/neo4j/driver/v1/util/StubServer.java new file mode 100644 index 0000000000..f8d94d48f5 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/v1/util/StubServer.java @@ -0,0 +1,75 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.neo4j.driver.v1.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static java.lang.Thread.sleep; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; + +public class StubServer +{ + // This may be thrown if the driver has not been closed properly + public static class ForceKilled extends Exception {} + + private static final int DEFAULT_PORT = 7687; + + private Process process = null; + + private StubServer( String script, int port ) throws IOException, InterruptedException + { + List command = new ArrayList<>(); + command.addAll( singletonList( "/usr/local/bin/boltstub" ) ); + command.addAll( asList( Integer.toString( port ), script ) ); + ProcessBuilder server = new ProcessBuilder().inheritIO().command( command ); + process = server.start(); + sleep( 500 ); // might take a moment for the socket to start listening + } + + public static StubServer start( String script ) throws IOException, InterruptedException + { + return start( script, DEFAULT_PORT ); + } + + public static StubServer start( String script, int port ) throws IOException, InterruptedException + { + return new StubServer( script, port ); + } + + public int exitStatus() throws InterruptedException, ForceKilled + { + sleep( 500 ); // wait for a moment to allow disconnection to occur + try + { + return process.exitValue(); + } + catch ( IllegalThreadStateException ex ) + { + // not exited yet + process.destroy(); + process.waitFor(); + throw new ForceKilled(); + } + } + +} diff --git a/driver/src/test/resources/acquire_read_endpoint.script b/driver/src/test/resources/acquire_read_endpoint.script new file mode 100644 index 0000000000..656e8e355f --- /dev/null +++ b/driver/src/test/resources/acquire_read_endpoint.script @@ -0,0 +1,12 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {} +!: AUTO PULL_ALL + +C: RUN "CALL dbms.cluster.discoverMembers" {} + PULL_ALL +S: SUCCESS {"fields": ["address"]} + RECORD ["127.0.0.1:9001"] + RECORD ["127.0.0.1:9002"] + RECORD ["127.0.0.1:9003"] + SUCCESS {} diff --git a/driver/src/test/resources/discover_invalid_server.script b/driver/src/test/resources/discover_invalid_server.script new file mode 100644 index 0000000000..c3ecbeb0b5 --- /dev/null +++ b/driver/src/test/resources/discover_invalid_server.script @@ -0,0 +1,12 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {} +!: AUTO PULL_ALL + +C: RUN "CALL dbms.cluster.discoverMembers" {} + PULL_ALL +S: SUCCESS {"fields": ["address"]} + RECORD ["127.0.0.1:9001"] + SUCCESS {} +C: RUN "CALL dbms.cluster.discoverMembers" {} + PULL_ALL diff --git a/driver/src/test/resources/discover_new_servers.script b/driver/src/test/resources/discover_new_servers.script new file mode 100644 index 0000000000..034ff3b962 --- /dev/null +++ b/driver/src/test/resources/discover_new_servers.script @@ -0,0 +1,12 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {} +!: AUTO PULL_ALL + +C: RUN "CALL dbms.cluster.discoverMembers" {} + PULL_ALL +S: SUCCESS {"fields": ["address"]} + RECORD ["127.0.0.1:9004"] + RECORD ["127.0.0.1:9002"] + RECORD ["127.0.0.1:9003"] + SUCCESS {} diff --git a/driver/src/test/resources/discover_servers.script b/driver/src/test/resources/discover_servers.script new file mode 100644 index 0000000000..656e8e355f --- /dev/null +++ b/driver/src/test/resources/discover_servers.script @@ -0,0 +1,12 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {} +!: AUTO PULL_ALL + +C: RUN "CALL dbms.cluster.discoverMembers" {} + PULL_ALL +S: SUCCESS {"fields": ["address"]} + RECORD ["127.0.0.1:9001"] + RECORD ["127.0.0.1:9002"] + RECORD ["127.0.0.1:9003"] + SUCCESS {} diff --git a/driver/src/test/resources/driver_snippet.script b/driver/src/test/resources/driver_snippet.script new file mode 100644 index 0000000000..5f79abb0b6 --- /dev/null +++ b/driver/src/test/resources/driver_snippet.script @@ -0,0 +1,33 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {} +!: AUTO PULL_ALL + +C: RUN "CREATE (n {name:'Bob'})" {} + PULL_ALL +S: SUCCESS {} + SUCCESS {} +C: RUN "BEGIN" {} + PULL_ALL + RUN "CREATE (n {name:'Alice'})" {} + PULL_ALL +S: SUCCESS {} + SUCCESS {} + SUCCESS {} + SUCCESS {} +C: RUN "CREATE (n {name:'Tina'})" {} + PULL_ALL +S: SUCCESS {} + SUCCESS {} +C: RUN "COMMIT" {} + PULL_ALL +S: SUCCESS {} + SUCCESS {} + +C: RUN "MATCH (n) RETURN n.name" {} + PULL_ALL +S: SUCCESS {"fields": ["n.name"]} + RECORD ["Bob"] + RECORD ["Alice"] + RECORD ["Tina"] + SUCCESS {} diff --git a/driver/src/test/resources/handle_empty_response.script b/driver/src/test/resources/handle_empty_response.script new file mode 100644 index 0000000000..4fc62f9943 --- /dev/null +++ b/driver/src/test/resources/handle_empty_response.script @@ -0,0 +1,9 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {} +!: AUTO PULL_ALL + +C: RUN "CALL dbms.cluster.discoverMembers" {} + PULL_ALL +S: SUCCESS {"fields": ["address"]} + SUCCESS {} diff --git a/driver/src/test/resources/return_x.script b/driver/src/test/resources/return_x.script new file mode 100644 index 0000000000..929e73bf3a --- /dev/null +++ b/driver/src/test/resources/return_x.script @@ -0,0 +1,10 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {} +!: AUTO PULL_ALL + +C: RUN "RETURN {x}" {"x": 1} + PULL_ALL +S: SUCCESS {"fields": ["x"]} + RECORD [1] + SUCCESS {}