diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index 93cd1c558e..8c61bbc071 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -139,7 +139,7 @@ public synchronized void reset() @Override public boolean isOpen() { - return isOpen.get(); + return isOpen.get() && connection.isOpen(); } @Override @@ -177,10 +177,6 @@ public void close() { connection.sync(); } - catch ( Throwable t ) - { - throw t; - } finally { closeConnection(); @@ -314,7 +310,7 @@ private void ensureConnectionIsOpen() private void ensureSessionIsOpen() { - if ( !isOpen() ) + if ( !isOpen.get() ) { throw new ClientException( "No more interaction with this session is allowed " + diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java new file mode 100644 index 0000000000..99ce9cfde3 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java @@ -0,0 +1,142 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.net.pooling; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.neo4j.driver.internal.util.Supplier; + +/** + * A blocking queue that also keeps track of connections that are acquired in order + * to facilitate termination of all connections. + */ +public class BlockingPooledConnectionQueue +{ + /** The backing queue, keeps track of connections currently in queue */ + private final BlockingQueue queue; + + private final AtomicBoolean isTerminating = new AtomicBoolean( false ); + + /** Keeps track of acquired connections */ + private final Set acquiredConnections = + Collections.newSetFromMap(new ConcurrentHashMap()); + + public BlockingPooledConnectionQueue( int capacity ) + { + this.queue = new LinkedBlockingQueue<>( capacity ); + } + + /** + * Offer a connections back to the queue + * + * @param pooledConnection the connection to put back to the queue + * @return true if connections was accepted otherwise false + */ + public boolean offer( PooledConnection pooledConnection ) + { + acquiredConnections.remove( pooledConnection ); + boolean offer = queue.offer( pooledConnection ); + // not added back to the queue, dispose of the connection + if (!offer) { + pooledConnection.dispose(); + } + if (isTerminating.get()) { + PooledConnection poll = queue.poll(); + if (poll != null) + { + poll.dispose(); + } + } + return offer; + } + + /** + * Acquire connection or create a new one if the queue is empty + * @param supplier used to create a new connection if queue is empty + * @return a PooledConnection instance + */ + public PooledConnection acquire( Supplier supplier ) + { + + PooledConnection poll = queue.poll(); + if ( poll == null ) + { + poll = supplier.get(); + } + acquiredConnections.add( poll ); + + if (isTerminating.get()) { + acquiredConnections.remove( poll ); + poll.dispose(); + throw new IllegalStateException( "Pool has been closed, cannot acquire new values." ); + } + return poll; + } + + public List toList() + { + return new ArrayList<>( queue ); + } + + public boolean isEmpty() + { + return queue.isEmpty(); + } + + public int size() + { + return queue.size(); + } + + public boolean contains( PooledConnection pooledConnection ) + { + return queue.contains( pooledConnection ); + } + + /** + * Terminates all connections, both those that are currently in the queue as well + * as those that have been acquired. + */ + public void terminate() + { + if (isTerminating.compareAndSet( false, true )) + { + while ( !queue.isEmpty() ) + { + PooledConnection conn = queue.poll(); + if ( conn != null ) + { + //close the underlying connection without adding it back to the queue + conn.dispose(); + } + } + for ( PooledConnection pooledConnection : acquiredConnections ) + { + pooledConnection.dispose(); + } + } + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java index a4514d0c62..94eece4fd8 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java @@ -18,7 +18,6 @@ */ package org.neo4j.driver.internal.net.pooling; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.util.Consumer; @@ -30,49 +29,22 @@ */ class PooledConnectionReleaseConsumer implements Consumer { - private final BlockingQueue connections; - private final AtomicBoolean driverStopped; + private final BlockingPooledConnectionQueue connections; private final Function validConnection; - PooledConnectionReleaseConsumer( BlockingQueue connections, AtomicBoolean driverStopped, + PooledConnectionReleaseConsumer( BlockingPooledConnectionQueue connections, Function validConnection) { this.connections = connections; - this.driverStopped = driverStopped; this.validConnection = validConnection; } @Override public void accept( PooledConnection pooledConnection ) { - if( driverStopped.get() ) + if ( validConnection.apply( pooledConnection ) ) { - // if the driver already closed, then no need to try to return to pool, just directly close this connection - pooledConnection.dispose(); - } - else if ( validConnection.apply( pooledConnection ) ) - { - boolean released = connections.offer( pooledConnection ); - if( !released ) - { - // if the connection could be put back to the pool, then we let the pool to manage it. - // Otherwise, we close the connection directly here. - pooledConnection.dispose(); - } - else if ( driverStopped.get() ) - { - // If our adding the pooledConnection to the queue was racing with the closing of the driver, - // then the loop where the driver is closing all available connections might not observe our newly - // added connection. Thus, we must attempt to remove a connection and dispose it. It doesn't matter - // which connection we get back, because other threads might be in the same situation as ours. It only - // matters that we added *a* connection that might not be observed by the loop, and that we dispose of - // *a* connection in response. - PooledConnection conn = connections.poll(); - if ( conn != null ) - { - conn.dispose(); - } - } + connections.offer( pooledConnection ); } else { diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java index 763b471f86..63b3177f21 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java @@ -18,12 +18,9 @@ */ package org.neo4j.driver.internal.net.pooling; -import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.ConnectionSettings; @@ -35,6 +32,7 @@ import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; import org.neo4j.driver.internal.util.Clock; +import org.neo4j.driver.internal.util.Supplier; import org.neo4j.driver.v1.AuthToken; import org.neo4j.driver.v1.AuthTokens; import org.neo4j.driver.v1.Logging; @@ -48,10 +46,10 @@ * try to return the session into the session pool, however if we failed to return it back, either because the pool * is full or the pool is being cleaned on driver.close, then we directly close the connection attached with the * session. - * + *

* The session is NOT meant to be thread safe, each thread should have an independent session and close it (return to * pool) when the work with the session has been done. - * + *

* The driver is thread safe. Each thread could try to get a session from the pool and then return it to the pool * at the same time. */ @@ -60,7 +58,8 @@ public class SocketConnectionPool implements ConnectionPool /** * Pools, organized by server address. */ - private final ConcurrentHashMap> pools = new ConcurrentHashMap<>(); + private final ConcurrentHashMap pools = + new ConcurrentHashMap<>(); private final Clock clock = Clock.SYSTEM; @@ -70,10 +69,9 @@ public class SocketConnectionPool implements ConnectionPool private final Logging logging; /** Shutdown flag */ - private final AtomicBoolean stopped = new AtomicBoolean( false ); public SocketConnectionPool( ConnectionSettings connectionSettings, SecurityPlan securityPlan, - PoolSettings poolSettings, Logging logging ) + PoolSettings poolSettings, Logging logging ) { this.connectionSettings = connectionSettings; this.securityPlan = securityPlan; @@ -94,41 +92,44 @@ private Connection connect( BoltServerAddress address ) throws ClientException private static Map tokenAsMap( AuthToken token ) { - if( token instanceof InternalAuthToken ) + if ( token instanceof InternalAuthToken ) { return ((InternalAuthToken) token).toMap(); } else { - throw new ClientException( "Unknown authentication token, `" + token + "`. Please use one of the supported " + + throw new ClientException( + "Unknown authentication token, `" + token + "`. Please use one of the supported " + "tokens from `" + AuthTokens.class.getSimpleName() + "`." ); } } @Override - public Connection acquire( BoltServerAddress address ) + public Connection acquire( final BoltServerAddress address ) { - if ( stopped.get() ) - { - throw new IllegalStateException( "Pool has been closed, cannot acquire new values." ); - } - BlockingQueue connections = pool( address ); - PooledConnection conn = connections.poll(); - if ( conn == null ) + final BlockingPooledConnectionQueue connections = pool( address ); + Supplier supplier = new Supplier() { - conn = new PooledConnection( connect( address ), new - PooledConnectionReleaseConsumer( connections, stopped, new PooledConnectionValidator( this, poolSettings ) ), clock ); - } + @Override + public PooledConnection get() + { + return new PooledConnection( connect( address ), new + PooledConnectionReleaseConsumer( connections, + new PooledConnectionValidator( SocketConnectionPool.this, poolSettings ) ), clock ); + + } + }; + PooledConnection conn = connections.acquire( supplier ); conn.updateTimestamp(); return conn; } - private BlockingQueue pool( BoltServerAddress address ) + private BlockingPooledConnectionQueue pool( BoltServerAddress address ) { - BlockingQueue pool = pools.get( address ); + BlockingPooledConnectionQueue pool = pools.get( address ); if ( pool == null ) { - pool = new LinkedBlockingQueue<>(poolSettings.maxIdleConnectionPoolSize()); + pool = new BlockingPooledConnectionQueue( poolSettings.maxIdleConnectionPoolSize() ); if ( pools.putIfAbsent( address, pool ) != null ) { @@ -142,19 +143,13 @@ private BlockingQueue pool( BoltServerAddress address ) @Override public void purge( BoltServerAddress address ) { - BlockingQueue connections = pools.remove( address ); + BlockingPooledConnectionQueue connections = pools.remove( address ); if ( connections == null ) { return; } - while (!connections.isEmpty()) - { - PooledConnection connection = connections.poll(); - if ( connection != null) - { - connection.dispose(); - } - } + + connections.terminate(); } @Override @@ -166,41 +161,28 @@ public boolean hasAddress( BoltServerAddress address ) @Override public void close() { - if( !stopped.compareAndSet( false, true ) ) - { - // already closed or some other thread already started close - return; - } - - for ( BlockingQueue pool : pools.values() ) + for ( BlockingPooledConnectionQueue pool : pools.values() ) { - while ( !pool.isEmpty() ) - { - PooledConnection conn = pool.poll(); - if ( conn != null ) - { - //close the underlying connection without adding it back to the queue - conn.dispose(); - } - } + pool.terminate(); } pools.clear(); } + //for testing - public List connectionsForAddress(BoltServerAddress address) + public List connectionsForAddress( BoltServerAddress address ) { - LinkedBlockingQueue pooledConnections = - (LinkedBlockingQueue) pools.get( address ); - if (pooledConnections == null) + BlockingPooledConnectionQueue pooledConnections = pools.get( address ); + if ( pooledConnections == null ) { return emptyList(); } else { - return new ArrayList<>( pooledConnections ); + return pooledConnections.toList(); } } + } diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java new file mode 100644 index 0000000000..de781893c2 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java @@ -0,0 +1,108 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.net.pooling; + + +import org.junit.Test; + +import org.neo4j.driver.internal.util.Supplier; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class BlockingPooledConnectionQueueTest +{ + @SuppressWarnings( "unchecked" ) + @Test + public void shouldCreateNewConnectionWhenEmpty() + { + // Given + PooledConnection connection = mock( PooledConnection.class ); + Supplier supplier = mock( Supplier.class ); + when( supplier.get() ).thenReturn( connection ); + BlockingPooledConnectionQueue queue = new BlockingPooledConnectionQueue( 10 ); + + // When + queue.acquire( supplier ); + + // Then + verify( supplier ).get(); + } + + @SuppressWarnings( "unchecked" ) + @Test + public void shouldNotCreateNewConnectionWhenNotEmpty() + { + // Given + PooledConnection connection = mock( PooledConnection.class ); + Supplier supplier = mock( Supplier.class ); + when( supplier.get() ).thenReturn( connection ); + BlockingPooledConnectionQueue queue = new BlockingPooledConnectionQueue( 1 ); + queue.offer( connection ); + + // When + queue.acquire( supplier ); + + // Then + verify( supplier, never() ).get(); + } + + @SuppressWarnings( "unchecked" ) + @Test + public void shouldTerminateAllSeenConnections() + { + // Given + PooledConnection connection1 = mock( PooledConnection.class ); + PooledConnection connection2 = mock( PooledConnection.class ); + Supplier supplier = mock( Supplier.class ); + when( supplier.get() ).thenReturn( connection1 ); + BlockingPooledConnectionQueue queue = new BlockingPooledConnectionQueue( 2 ); + queue.offer( connection1 ); + queue.offer( connection2 ); + assertThat( queue.size(), equalTo( 2 ) ); + + // When + queue.acquire( supplier ); + assertThat( queue.size(), equalTo( 1 ) ); + queue.terminate(); + + // Then + verify( connection1 ).dispose(); + verify( connection2 ).dispose(); + } + + @Test + public void shouldNotAcceptWhenFull() + { + // Given + PooledConnection connection1 = mock( PooledConnection.class ); + PooledConnection connection2 = mock( PooledConnection.class ); + BlockingPooledConnectionQueue queue = new BlockingPooledConnectionQueue( 1 ); + + // Then + assertTrue(queue.offer( connection1 )); + assertFalse(queue.offer( connection2 )); + } +} \ No newline at end of file diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java index b602d85554..6028e57ba3 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.HashMap; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.net.BoltServerAddress; @@ -73,15 +72,16 @@ public void shouldInvalidateConnectionThatIsOld() throws Throwable PooledConnection conn = new PooledConnection( delegate, Consumers.noOp(), clock ); // When/Then - BlockingQueue queue = mock( BlockingQueue.class ); + BlockingPooledConnectionQueue + queue = mock( BlockingPooledConnectionQueue.class ); PooledConnectionValidator validator = new PooledConnectionValidator( pool( true ), poolSettings ); PooledConnectionReleaseConsumer consumer = - new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator); + new PooledConnectionReleaseConsumer( queue, validator); consumer.accept( conn ); - verify( queue, never() ).add( conn ); + verify( queue, never() ).offer( conn ); } @SuppressWarnings( "unchecked" ) @@ -99,9 +99,10 @@ public void shouldNotInvalidateConnectionThatIsNotOld() throws Throwable new PooledConnectionValidator( pool( true ), poolSettings ); // When/Then - BlockingQueue queue = mock( BlockingQueue.class ); + BlockingPooledConnectionQueue + queue = mock( BlockingPooledConnectionQueue.class ); PooledConnectionReleaseConsumer consumer = - new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ),validator ); + new PooledConnectionReleaseConsumer( queue,validator ); consumer.accept( conn ); verify( queue ).offer( conn ); @@ -117,12 +118,13 @@ public void shouldInvalidConnectionIfFailedToReset() throws Throwable PooledConnectionValidator validator = new PooledConnectionValidator( pool( true ), poolSettings ); // When/Then - BlockingQueue queue = mock( BlockingQueue.class ); + BlockingPooledConnectionQueue + queue = mock( BlockingPooledConnectionQueue.class ); PooledConnectionReleaseConsumer consumer = - new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator ); + new PooledConnectionReleaseConsumer( queue, validator ); consumer.accept( conn ); - verify( queue, never() ).add( conn ); + verify( queue, never() ).offer( conn ); } @Test @@ -169,9 +171,10 @@ private void assertUnrecoverable( Neo4jException exception ) // Then assertTrue( conn.hasUnrecoverableErrors() ); - BlockingQueue queue = mock( BlockingQueue.class ); + BlockingPooledConnectionQueue + queue = mock( BlockingPooledConnectionQueue.class ); PooledConnectionReleaseConsumer consumer = - new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator ); + new PooledConnectionReleaseConsumer( queue, validator ); consumer.accept( conn ); verify( queue, never() ).offer( conn ); @@ -198,9 +201,10 @@ private void assertRecoverable( Neo4jException exception ) PoolSettings poolSettings = PoolSettings.defaultSettings(); PooledConnectionValidator validator = new PooledConnectionValidator( pool( true ), poolSettings ); - BlockingQueue queue = mock( BlockingQueue.class ); + BlockingPooledConnectionQueue + queue = mock( BlockingPooledConnectionQueue.class ); PooledConnectionReleaseConsumer consumer = - new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator ); + new PooledConnectionReleaseConsumer( queue, validator ); consumer.accept( conn ); verify( queue ).offer( conn ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java index 022dc0cf76..47da7d2db1 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java @@ -20,8 +20,6 @@ import org.junit.Test; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.spi.Connection; @@ -30,8 +28,8 @@ import org.neo4j.driver.v1.util.Function; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -64,13 +62,13 @@ public Boolean apply( PooledConnection pooledConnection ) public void shouldDisposeConnectionIfNotValidConnection() throws Throwable { // Given - final BlockingQueue pool = new LinkedBlockingQueue<>(1); + final BlockingPooledConnectionQueue + pool = new BlockingPooledConnectionQueue(1); final boolean[] flags = {false}; Connection conn = mock( Connection.class ); - PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, - new AtomicBoolean( false ), INVALID_CONNECTION ); + PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, INVALID_CONNECTION ); PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) @@ -94,13 +92,13 @@ public void dispose() public void shouldReturnToThePoolIfIsValidConnectionAndIdlePoolIsNotFull() throws Throwable { // Given - final BlockingQueue pool = new LinkedBlockingQueue<>(1); + final BlockingPooledConnectionQueue + pool = new BlockingPooledConnectionQueue(1); final boolean[] flags = {false}; Connection conn = mock( Connection.class ); - PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, - new AtomicBoolean( false ), VALID_CONNECTION ); + PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, VALID_CONNECTION ); PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) { @@ -115,7 +113,7 @@ public void dispose() pooledConnection.close(); // Then - assertThat( pool, hasItem(pooledConnection) ); + assertTrue( pool.contains(pooledConnection)); assertThat( pool.size(), equalTo( 1 ) ); assertThat( flags[0], equalTo( false ) ); } @@ -125,13 +123,13 @@ public void dispose() public void shouldDisposeConnectionIfValidConnectionAndIdlePoolIsFull() throws Throwable { // Given - final BlockingQueue pool = new LinkedBlockingQueue<>(1); + final BlockingPooledConnectionQueue + pool = new BlockingPooledConnectionQueue(1); final boolean[] flags = {false}; Connection conn = mock( Connection.class ); - PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, - new AtomicBoolean( false ), VALID_CONNECTION); + PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, VALID_CONNECTION); PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ); PooledConnection shouldBeClosedConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) @@ -148,7 +146,7 @@ public void dispose() shouldBeClosedConnection.close(); // Then - assertThat( pool, hasItem(pooledConnection) ); + assertTrue( pool.contains(pooledConnection) ); assertThat( pool.size(), equalTo( 1 ) ); assertThat( flags[0], equalTo( true ) ); } @@ -163,12 +161,12 @@ public void shouldDisposeConnectionIfPoolAlreadyClosed() throws Throwable // session.close() -> well, close the connection directly without putting back to the pool // Given - final BlockingQueue pool = new LinkedBlockingQueue<>(1); + final BlockingPooledConnectionQueue pool = new BlockingPooledConnectionQueue(1); + pool.terminate(); final boolean[] flags = {false}; Connection conn = mock( Connection.class ); - PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, - new AtomicBoolean( true ), VALID_CONNECTION); + PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, VALID_CONNECTION); PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) { @@ -191,24 +189,14 @@ public void dispose() public void shouldDisposeConnectionIfPoolStoppedAfterPuttingConnectionBackToPool() throws Throwable { // Given - final AtomicBoolean stopped = new AtomicBoolean( false ); - final BlockingQueue pool = new LinkedBlockingQueue(1){ - public boolean offer(PooledConnection conn) - { - stopped.set( true ); - // some clean work to close all connection in pool - boolean offer = super.offer( conn ); - assertThat ( this.size(), equalTo( 1 ) ); - // we successfully put the connection back to the pool - return offer; - } - }; + final BlockingPooledConnectionQueue + pool = new BlockingPooledConnectionQueue(1); + pool.terminate(); final boolean[] flags = {false}; Connection conn = mock( Connection.class ); - PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, - stopped , VALID_CONNECTION); + PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, VALID_CONNECTION); PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) { diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index eae187e536..4b8f945616 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -23,14 +23,17 @@ import org.junit.Test; import org.junit.rules.ExpectedException; -import org.neo4j.driver.internal.logging.ConsoleLogging; -import org.neo4j.driver.v1.*; +import org.neo4j.driver.v1.AuthToken; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; +import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.Transaction; import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.exceptions.Neo4jException; import org.neo4j.driver.v1.util.TestNeo4j; -import java.util.logging.Level; - import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; @@ -192,7 +195,7 @@ public void shouldNotAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable tx.run("CALL test.driver.longRunningStatement({seconds})", parameters( "seconds", 10 ) ); - Thread.sleep( 1* 1000 ); + Thread.sleep( 1000 ); session.reset(); exception.expect( ClientException.class ); @@ -215,7 +218,7 @@ public void shouldThrowExceptionOnCloseIfResetFailureIsNotConsumed() throws Thro Session session = driver.session(); session.run( "CALL test.driver.longRunningStatement({seconds})", parameters( "seconds", 10 ) ); - Thread.sleep( 1 * 1000 ); + Thread.sleep( 1000 ); session.reset(); exception.expect( ClientException.class ); @@ -239,7 +242,7 @@ public void shouldBeAbleToBeginTxAfterResetFailureIsConsumed() throws Throwable StatementResult procedureResult = tx.run("CALL test.driver.longRunningStatement({seconds})", parameters( "seconds", 10 ) ); - Thread.sleep( 1* 1000 ); + Thread.sleep( 1000 ); session.reset(); try @@ -356,4 +359,18 @@ public void shouldMarkTxAsFailedAndDisallowRunAfterSessionReset() } } } + + @Test + public void shouldCloseSessionWhenDriverIsClosed() throws Throwable + { + // Given + Driver driver = GraphDatabase.driver( neo4j.uri() ); + Session session = driver.session(); + + // When + driver.close(); + + // Then + assertFalse( session.isOpen() ); + } }