From 8cfe635ffb19757753a85b2aa6ad9d24fbb9223c Mon Sep 17 00:00:00 2001 From: Pontus Melke Date: Wed, 19 Oct 2016 09:03:14 +0200 Subject: [PATCH 1/2] Close all connections on driver.close() Connections/Sessions that were in flight, i.e. acquired from the pool, were not closed when shutting down the driver, connections remained open until GC happened. The expected behavior is that `driver.close()` closes down all sessions it has created. --- .../neo4j/driver/internal/NetworkSession.java | 8 +- .../BlockingPooledConnectionQueue.java | 121 ++++++++++++++++++ .../PooledConnectionReleaseConsumer.java | 11 +- .../net/pooling/SocketConnectionPool.java | 81 ++++++------ .../BlockingPooledConnectionQueueTest.java | 108 ++++++++++++++++ .../pooling/ConnectionInvalidationTest.java | 20 +-- .../net/pooling/PooledConnectionTest.java | 23 ++-- .../driver/v1/integration/SessionIT.java | 31 ++++- 8 files changed, 320 insertions(+), 83 deletions(-) create mode 100644 driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java create mode 100644 driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index 2fea7b6c88..d987bc8adc 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -139,7 +139,7 @@ public synchronized void reset() @Override public boolean isOpen() { - return isOpen.get(); + return isOpen.get() && connection.isOpen(); } @Override @@ -177,10 +177,6 @@ public void close() { connection.sync(); } - catch ( Throwable t ) - { - throw t; - } finally { closeConnection(); @@ -314,7 +310,7 @@ private void ensureConnectionIsOpen() private void ensureSessionIsOpen() { - if ( !isOpen() ) + if ( !isOpen.get() ) { throw new ClientException( "No more interaction with this session is allowed " + diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java new file mode 100644 index 0000000000..adff191649 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java @@ -0,0 +1,121 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.net.pooling; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + +import org.neo4j.driver.internal.util.Supplier; + +/** + * A blocking queue that also keeps track of connections that are acquired in order + * to facilitate termination of all connections. + */ +public class BlockingPooledConnectionQueue +{ + /** The backing queue, keeps track of connections currently in queue */ + private final BlockingQueue queue; + + /** Keeps track of acquired connections */ + private final Set acquiredConnections = + Collections.newSetFromMap(new ConcurrentHashMap()); + + public BlockingPooledConnectionQueue( int capacity ) + { + this.queue = new LinkedBlockingQueue<>( capacity ); + } + + /** + * Offer a connections back to the queue + * + * @param pooledConnection the connection to put back to the queue + * @return true if connections was accepted otherwise false + */ + public boolean offer( PooledConnection pooledConnection ) + { + boolean offer = queue.offer( pooledConnection ); + if ( offer ) + { + acquiredConnections.remove( pooledConnection ); + } + return offer; + } + + public List toList() + { + return new ArrayList<>( queue ); + } + + public boolean isEmpty() + { + return queue.isEmpty(); + } + + public int size() + { + return queue.size(); + } + + public boolean contains( PooledConnection pooledConnection ) + { + return queue.contains( pooledConnection ); + } + + /** + * Terminates all connections, both those that are currently in the queue as well + * as those that have been acquired. + */ + public void terminate() + { + while ( !queue.isEmpty() ) + { + PooledConnection conn = queue.poll(); + if ( conn != null ) + { + //close the underlying connection without adding it back to the queue + conn.dispose(); + } + } + for ( PooledConnection pooledConnection : acquiredConnections ) + { + pooledConnection.dispose(); + } + } + + /** + * Acquire connection or create a new one if the queue is empty + * @param supplier used to create a new connection if queue is empty + * @return a PooledConnection instance + */ + public PooledConnection acquire( Supplier supplier ) + { + PooledConnection poll = queue.poll(); + if ( poll == null ) + { + poll = supplier.get(); + } + acquiredConnections.add( poll ); + return poll; + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java index a4514d0c62..c2b8096dfb 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java @@ -18,7 +18,6 @@ */ package org.neo4j.driver.internal.net.pooling; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.util.Consumer; @@ -30,11 +29,11 @@ */ class PooledConnectionReleaseConsumer implements Consumer { - private final BlockingQueue connections; + private final BlockingPooledConnectionQueue connections; private final AtomicBoolean driverStopped; private final Function validConnection; - PooledConnectionReleaseConsumer( BlockingQueue connections, AtomicBoolean driverStopped, + PooledConnectionReleaseConsumer( BlockingPooledConnectionQueue connections, AtomicBoolean driverStopped, Function validConnection) { this.connections = connections; @@ -67,11 +66,7 @@ else if ( driverStopped.get() ) // which connection we get back, because other threads might be in the same situation as ours. It only // matters that we added *a* connection that might not be observed by the loop, and that we dispose of // *a* connection in response. - PooledConnection conn = connections.poll(); - if ( conn != null ) - { - conn.dispose(); - } + connections.terminate(); } } else diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java index 763b471f86..c4a8541306 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java @@ -18,12 +18,9 @@ */ package org.neo4j.driver.internal.net.pooling; -import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.ConnectionSettings; @@ -35,6 +32,7 @@ import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; import org.neo4j.driver.internal.util.Clock; +import org.neo4j.driver.internal.util.Supplier; import org.neo4j.driver.v1.AuthToken; import org.neo4j.driver.v1.AuthTokens; import org.neo4j.driver.v1.Logging; @@ -48,10 +46,10 @@ * try to return the session into the session pool, however if we failed to return it back, either because the pool * is full or the pool is being cleaned on driver.close, then we directly close the connection attached with the * session. - * + *

* The session is NOT meant to be thread safe, each thread should have an independent session and close it (return to * pool) when the work with the session has been done. - * + *

* The driver is thread safe. Each thread could try to get a session from the pool and then return it to the pool * at the same time. */ @@ -60,7 +58,8 @@ public class SocketConnectionPool implements ConnectionPool /** * Pools, organized by server address. */ - private final ConcurrentHashMap> pools = new ConcurrentHashMap<>(); + private final ConcurrentHashMap pools = + new ConcurrentHashMap<>(); private final Clock clock = Clock.SYSTEM; @@ -73,7 +72,7 @@ public class SocketConnectionPool implements ConnectionPool private final AtomicBoolean stopped = new AtomicBoolean( false ); public SocketConnectionPool( ConnectionSettings connectionSettings, SecurityPlan securityPlan, - PoolSettings poolSettings, Logging logging ) + PoolSettings poolSettings, Logging logging ) { this.connectionSettings = connectionSettings; this.securityPlan = securityPlan; @@ -94,41 +93,48 @@ private Connection connect( BoltServerAddress address ) throws ClientException private static Map tokenAsMap( AuthToken token ) { - if( token instanceof InternalAuthToken ) + if ( token instanceof InternalAuthToken ) { return ((InternalAuthToken) token).toMap(); } else { - throw new ClientException( "Unknown authentication token, `" + token + "`. Please use one of the supported " + + throw new ClientException( + "Unknown authentication token, `" + token + "`. Please use one of the supported " + "tokens from `" + AuthTokens.class.getSimpleName() + "`." ); } } @Override - public Connection acquire( BoltServerAddress address ) + public Connection acquire( final BoltServerAddress address ) { if ( stopped.get() ) { throw new IllegalStateException( "Pool has been closed, cannot acquire new values." ); } - BlockingQueue connections = pool( address ); - PooledConnection conn = connections.poll(); - if ( conn == null ) + final BlockingPooledConnectionQueue connections = pool( address ); + Supplier supplier = new Supplier() { - conn = new PooledConnection( connect( address ), new - PooledConnectionReleaseConsumer( connections, stopped, new PooledConnectionValidator( this, poolSettings ) ), clock ); - } + @Override + public PooledConnection get() + { + return new PooledConnection( connect( address ), new + PooledConnectionReleaseConsumer( connections, stopped, + new PooledConnectionValidator( SocketConnectionPool.this, poolSettings ) ), clock ); + + } + }; + PooledConnection conn = connections.acquire( supplier ); conn.updateTimestamp(); return conn; } - private BlockingQueue pool( BoltServerAddress address ) + private BlockingPooledConnectionQueue pool( BoltServerAddress address ) { - BlockingQueue pool = pools.get( address ); + BlockingPooledConnectionQueue pool = pools.get( address ); if ( pool == null ) { - pool = new LinkedBlockingQueue<>(poolSettings.maxIdleConnectionPoolSize()); + pool = new BlockingPooledConnectionQueue( poolSettings.maxIdleConnectionPoolSize() ); if ( pools.putIfAbsent( address, pool ) != null ) { @@ -142,19 +148,13 @@ private BlockingQueue pool( BoltServerAddress address ) @Override public void purge( BoltServerAddress address ) { - BlockingQueue connections = pools.remove( address ); + BlockingPooledConnectionQueue connections = pools.remove( address ); if ( connections == null ) { return; } - while (!connections.isEmpty()) - { - PooledConnection connection = connections.poll(); - if ( connection != null) - { - connection.dispose(); - } - } + + connections.terminate(); } @Override @@ -166,41 +166,34 @@ public boolean hasAddress( BoltServerAddress address ) @Override public void close() { - if( !stopped.compareAndSet( false, true ) ) + if ( !stopped.compareAndSet( false, true ) ) { // already closed or some other thread already started close return; } - for ( BlockingQueue pool : pools.values() ) + for ( BlockingPooledConnectionQueue pool : pools.values() ) { - while ( !pool.isEmpty() ) - { - PooledConnection conn = pool.poll(); - if ( conn != null ) - { - //close the underlying connection without adding it back to the queue - conn.dispose(); - } - } + pool.terminate(); } pools.clear(); } + //for testing - public List connectionsForAddress(BoltServerAddress address) + public List connectionsForAddress( BoltServerAddress address ) { - LinkedBlockingQueue pooledConnections = - (LinkedBlockingQueue) pools.get( address ); - if (pooledConnections == null) + BlockingPooledConnectionQueue pooledConnections = pools.get( address ); + if ( pooledConnections == null ) { return emptyList(); } else { - return new ArrayList<>( pooledConnections ); + return pooledConnections.toList(); } } + } diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java new file mode 100644 index 0000000000..de781893c2 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java @@ -0,0 +1,108 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.net.pooling; + + +import org.junit.Test; + +import org.neo4j.driver.internal.util.Supplier; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class BlockingPooledConnectionQueueTest +{ + @SuppressWarnings( "unchecked" ) + @Test + public void shouldCreateNewConnectionWhenEmpty() + { + // Given + PooledConnection connection = mock( PooledConnection.class ); + Supplier supplier = mock( Supplier.class ); + when( supplier.get() ).thenReturn( connection ); + BlockingPooledConnectionQueue queue = new BlockingPooledConnectionQueue( 10 ); + + // When + queue.acquire( supplier ); + + // Then + verify( supplier ).get(); + } + + @SuppressWarnings( "unchecked" ) + @Test + public void shouldNotCreateNewConnectionWhenNotEmpty() + { + // Given + PooledConnection connection = mock( PooledConnection.class ); + Supplier supplier = mock( Supplier.class ); + when( supplier.get() ).thenReturn( connection ); + BlockingPooledConnectionQueue queue = new BlockingPooledConnectionQueue( 1 ); + queue.offer( connection ); + + // When + queue.acquire( supplier ); + + // Then + verify( supplier, never() ).get(); + } + + @SuppressWarnings( "unchecked" ) + @Test + public void shouldTerminateAllSeenConnections() + { + // Given + PooledConnection connection1 = mock( PooledConnection.class ); + PooledConnection connection2 = mock( PooledConnection.class ); + Supplier supplier = mock( Supplier.class ); + when( supplier.get() ).thenReturn( connection1 ); + BlockingPooledConnectionQueue queue = new BlockingPooledConnectionQueue( 2 ); + queue.offer( connection1 ); + queue.offer( connection2 ); + assertThat( queue.size(), equalTo( 2 ) ); + + // When + queue.acquire( supplier ); + assertThat( queue.size(), equalTo( 1 ) ); + queue.terminate(); + + // Then + verify( connection1 ).dispose(); + verify( connection2 ).dispose(); + } + + @Test + public void shouldNotAcceptWhenFull() + { + // Given + PooledConnection connection1 = mock( PooledConnection.class ); + PooledConnection connection2 = mock( PooledConnection.class ); + BlockingPooledConnectionQueue queue = new BlockingPooledConnectionQueue( 1 ); + + // Then + assertTrue(queue.offer( connection1 )); + assertFalse(queue.offer( connection2 )); + } +} \ No newline at end of file diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java index b602d85554..de729527e3 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.HashMap; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.net.BoltServerAddress; @@ -73,7 +72,8 @@ public void shouldInvalidateConnectionThatIsOld() throws Throwable PooledConnection conn = new PooledConnection( delegate, Consumers.noOp(), clock ); // When/Then - BlockingQueue queue = mock( BlockingQueue.class ); + BlockingPooledConnectionQueue + queue = mock( BlockingPooledConnectionQueue.class ); PooledConnectionValidator validator = new PooledConnectionValidator( pool( true ), poolSettings ); @@ -81,7 +81,7 @@ public void shouldInvalidateConnectionThatIsOld() throws Throwable new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator); consumer.accept( conn ); - verify( queue, never() ).add( conn ); + verify( queue, never() ).offer( conn ); } @SuppressWarnings( "unchecked" ) @@ -99,7 +99,8 @@ public void shouldNotInvalidateConnectionThatIsNotOld() throws Throwable new PooledConnectionValidator( pool( true ), poolSettings ); // When/Then - BlockingQueue queue = mock( BlockingQueue.class ); + BlockingPooledConnectionQueue + queue = mock( BlockingPooledConnectionQueue.class ); PooledConnectionReleaseConsumer consumer = new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ),validator ); consumer.accept( conn ); @@ -117,12 +118,13 @@ public void shouldInvalidConnectionIfFailedToReset() throws Throwable PooledConnectionValidator validator = new PooledConnectionValidator( pool( true ), poolSettings ); // When/Then - BlockingQueue queue = mock( BlockingQueue.class ); + BlockingPooledConnectionQueue + queue = mock( BlockingPooledConnectionQueue.class ); PooledConnectionReleaseConsumer consumer = new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator ); consumer.accept( conn ); - verify( queue, never() ).add( conn ); + verify( queue, never() ).offer( conn ); } @Test @@ -169,7 +171,8 @@ private void assertUnrecoverable( Neo4jException exception ) // Then assertTrue( conn.hasUnrecoverableErrors() ); - BlockingQueue queue = mock( BlockingQueue.class ); + BlockingPooledConnectionQueue + queue = mock( BlockingPooledConnectionQueue.class ); PooledConnectionReleaseConsumer consumer = new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator ); consumer.accept( conn ); @@ -198,7 +201,8 @@ private void assertRecoverable( Neo4jException exception ) PoolSettings poolSettings = PoolSettings.defaultSettings(); PooledConnectionValidator validator = new PooledConnectionValidator( pool( true ), poolSettings ); - BlockingQueue queue = mock( BlockingQueue.class ); + BlockingPooledConnectionQueue + queue = mock( BlockingPooledConnectionQueue.class ); PooledConnectionReleaseConsumer consumer = new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator ); consumer.accept( conn ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java index 022dc0cf76..603f24f219 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java @@ -20,8 +20,6 @@ import org.junit.Test; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.spi.Connection; @@ -30,8 +28,8 @@ import org.neo4j.driver.v1.util.Function; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -64,7 +62,8 @@ public Boolean apply( PooledConnection pooledConnection ) public void shouldDisposeConnectionIfNotValidConnection() throws Throwable { // Given - final BlockingQueue pool = new LinkedBlockingQueue<>(1); + final BlockingPooledConnectionQueue + pool = new BlockingPooledConnectionQueue(1); final boolean[] flags = {false}; @@ -94,7 +93,8 @@ public void dispose() public void shouldReturnToThePoolIfIsValidConnectionAndIdlePoolIsNotFull() throws Throwable { // Given - final BlockingQueue pool = new LinkedBlockingQueue<>(1); + final BlockingPooledConnectionQueue + pool = new BlockingPooledConnectionQueue(1); final boolean[] flags = {false}; @@ -115,7 +115,7 @@ public void dispose() pooledConnection.close(); // Then - assertThat( pool, hasItem(pooledConnection) ); + assertTrue( pool.contains(pooledConnection)); assertThat( pool.size(), equalTo( 1 ) ); assertThat( flags[0], equalTo( false ) ); } @@ -125,7 +125,8 @@ public void dispose() public void shouldDisposeConnectionIfValidConnectionAndIdlePoolIsFull() throws Throwable { // Given - final BlockingQueue pool = new LinkedBlockingQueue<>(1); + final BlockingPooledConnectionQueue + pool = new BlockingPooledConnectionQueue(1); final boolean[] flags = {false}; @@ -148,7 +149,7 @@ public void dispose() shouldBeClosedConnection.close(); // Then - assertThat( pool, hasItem(pooledConnection) ); + assertTrue( pool.contains(pooledConnection) ); assertThat( pool.size(), equalTo( 1 ) ); assertThat( flags[0], equalTo( true ) ); } @@ -163,7 +164,8 @@ public void shouldDisposeConnectionIfPoolAlreadyClosed() throws Throwable // session.close() -> well, close the connection directly without putting back to the pool // Given - final BlockingQueue pool = new LinkedBlockingQueue<>(1); + final BlockingPooledConnectionQueue + pool = new BlockingPooledConnectionQueue(1); final boolean[] flags = {false}; Connection conn = mock( Connection.class ); @@ -192,7 +194,8 @@ public void shouldDisposeConnectionIfPoolStoppedAfterPuttingConnectionBackToPool { // Given final AtomicBoolean stopped = new AtomicBoolean( false ); - final BlockingQueue pool = new LinkedBlockingQueue(1){ + final BlockingPooledConnectionQueue + pool = new BlockingPooledConnectionQueue(1){ public boolean offer(PooledConnection conn) { stopped.set( true ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index eae187e536..4b8f945616 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -23,14 +23,17 @@ import org.junit.Test; import org.junit.rules.ExpectedException; -import org.neo4j.driver.internal.logging.ConsoleLogging; -import org.neo4j.driver.v1.*; +import org.neo4j.driver.v1.AuthToken; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; +import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.Transaction; import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.exceptions.Neo4jException; import org.neo4j.driver.v1.util.TestNeo4j; -import java.util.logging.Level; - import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; @@ -192,7 +195,7 @@ public void shouldNotAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable tx.run("CALL test.driver.longRunningStatement({seconds})", parameters( "seconds", 10 ) ); - Thread.sleep( 1* 1000 ); + Thread.sleep( 1000 ); session.reset(); exception.expect( ClientException.class ); @@ -215,7 +218,7 @@ public void shouldThrowExceptionOnCloseIfResetFailureIsNotConsumed() throws Thro Session session = driver.session(); session.run( "CALL test.driver.longRunningStatement({seconds})", parameters( "seconds", 10 ) ); - Thread.sleep( 1 * 1000 ); + Thread.sleep( 1000 ); session.reset(); exception.expect( ClientException.class ); @@ -239,7 +242,7 @@ public void shouldBeAbleToBeginTxAfterResetFailureIsConsumed() throws Throwable StatementResult procedureResult = tx.run("CALL test.driver.longRunningStatement({seconds})", parameters( "seconds", 10 ) ); - Thread.sleep( 1* 1000 ); + Thread.sleep( 1000 ); session.reset(); try @@ -356,4 +359,18 @@ public void shouldMarkTxAsFailedAndDisallowRunAfterSessionReset() } } } + + @Test + public void shouldCloseSessionWhenDriverIsClosed() throws Throwable + { + // Given + Driver driver = GraphDatabase.driver( neo4j.uri() ); + Session session = driver.session(); + + // When + driver.close(); + + // Then + assertFalse( session.isOpen() ); + } } From f7c4c1835de14d2b53c7002f5373e8dbf89361cc Mon Sep 17 00:00:00 2001 From: Pontus Melke Date: Fri, 21 Oct 2016 14:39:53 +0200 Subject: [PATCH 2/2] Fixes from code review --- .../BlockingPooledConnectionQueue.java | 77 ++++++++++++------- .../PooledConnectionReleaseConsumer.java | 29 +------ .../net/pooling/SocketConnectionPool.java | 13 +--- .../pooling/ConnectionInvalidationTest.java | 10 +-- .../net/pooling/PooledConnectionTest.java | 33 +++----- 5 files changed, 67 insertions(+), 95 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java index adff191649..99ce9cfde3 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java @@ -25,6 +25,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.util.Supplier; @@ -37,6 +38,8 @@ public class BlockingPooledConnectionQueue /** The backing queue, keeps track of connections currently in queue */ private final BlockingQueue queue; + private final AtomicBoolean isTerminating = new AtomicBoolean( false ); + /** Keeps track of acquired connections */ private final Set acquiredConnections = Collections.newSetFromMap(new ConcurrentHashMap()); @@ -54,14 +57,45 @@ public BlockingPooledConnectionQueue( int capacity ) */ public boolean offer( PooledConnection pooledConnection ) { + acquiredConnections.remove( pooledConnection ); boolean offer = queue.offer( pooledConnection ); - if ( offer ) - { - acquiredConnections.remove( pooledConnection ); + // not added back to the queue, dispose of the connection + if (!offer) { + pooledConnection.dispose(); + } + if (isTerminating.get()) { + PooledConnection poll = queue.poll(); + if (poll != null) + { + poll.dispose(); + } } return offer; } + /** + * Acquire connection or create a new one if the queue is empty + * @param supplier used to create a new connection if queue is empty + * @return a PooledConnection instance + */ + public PooledConnection acquire( Supplier supplier ) + { + + PooledConnection poll = queue.poll(); + if ( poll == null ) + { + poll = supplier.get(); + } + acquiredConnections.add( poll ); + + if (isTerminating.get()) { + acquiredConnections.remove( poll ); + poll.dispose(); + throw new IllegalStateException( "Pool has been closed, cannot acquire new values." ); + } + return poll; + } + public List toList() { return new ArrayList<>( queue ); @@ -88,34 +122,21 @@ public boolean contains( PooledConnection pooledConnection ) */ public void terminate() { - while ( !queue.isEmpty() ) + if (isTerminating.compareAndSet( false, true )) { - PooledConnection conn = queue.poll(); - if ( conn != null ) + while ( !queue.isEmpty() ) { - //close the underlying connection without adding it back to the queue - conn.dispose(); + PooledConnection conn = queue.poll(); + if ( conn != null ) + { + //close the underlying connection without adding it back to the queue + conn.dispose(); + } + } + for ( PooledConnection pooledConnection : acquiredConnections ) + { + pooledConnection.dispose(); } } - for ( PooledConnection pooledConnection : acquiredConnections ) - { - pooledConnection.dispose(); - } - } - - /** - * Acquire connection or create a new one if the queue is empty - * @param supplier used to create a new connection if queue is empty - * @return a PooledConnection instance - */ - public PooledConnection acquire( Supplier supplier ) - { - PooledConnection poll = queue.poll(); - if ( poll == null ) - { - poll = supplier.get(); - } - acquiredConnections.add( poll ); - return poll; } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java index c2b8096dfb..94eece4fd8 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java @@ -30,44 +30,21 @@ class PooledConnectionReleaseConsumer implements Consumer { private final BlockingPooledConnectionQueue connections; - private final AtomicBoolean driverStopped; private final Function validConnection; - PooledConnectionReleaseConsumer( BlockingPooledConnectionQueue connections, AtomicBoolean driverStopped, + PooledConnectionReleaseConsumer( BlockingPooledConnectionQueue connections, Function validConnection) { this.connections = connections; - this.driverStopped = driverStopped; this.validConnection = validConnection; } @Override public void accept( PooledConnection pooledConnection ) { - if( driverStopped.get() ) + if ( validConnection.apply( pooledConnection ) ) { - // if the driver already closed, then no need to try to return to pool, just directly close this connection - pooledConnection.dispose(); - } - else if ( validConnection.apply( pooledConnection ) ) - { - boolean released = connections.offer( pooledConnection ); - if( !released ) - { - // if the connection could be put back to the pool, then we let the pool to manage it. - // Otherwise, we close the connection directly here. - pooledConnection.dispose(); - } - else if ( driverStopped.get() ) - { - // If our adding the pooledConnection to the queue was racing with the closing of the driver, - // then the loop where the driver is closing all available connections might not observe our newly - // added connection. Thus, we must attempt to remove a connection and dispose it. It doesn't matter - // which connection we get back, because other threads might be in the same situation as ours. It only - // matters that we added *a* connection that might not be observed by the loop, and that we dispose of - // *a* connection in response. - connections.terminate(); - } + connections.offer( pooledConnection ); } else { diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java index c4a8541306..63b3177f21 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java @@ -69,7 +69,6 @@ public class SocketConnectionPool implements ConnectionPool private final Logging logging; /** Shutdown flag */ - private final AtomicBoolean stopped = new AtomicBoolean( false ); public SocketConnectionPool( ConnectionSettings connectionSettings, SecurityPlan securityPlan, PoolSettings poolSettings, Logging logging ) @@ -108,10 +107,6 @@ private static Map tokenAsMap( AuthToken token ) @Override public Connection acquire( final BoltServerAddress address ) { - if ( stopped.get() ) - { - throw new IllegalStateException( "Pool has been closed, cannot acquire new values." ); - } final BlockingPooledConnectionQueue connections = pool( address ); Supplier supplier = new Supplier() { @@ -119,7 +114,7 @@ public Connection acquire( final BoltServerAddress address ) public PooledConnection get() { return new PooledConnection( connect( address ), new - PooledConnectionReleaseConsumer( connections, stopped, + PooledConnectionReleaseConsumer( connections, new PooledConnectionValidator( SocketConnectionPool.this, poolSettings ) ), clock ); } @@ -166,12 +161,6 @@ public boolean hasAddress( BoltServerAddress address ) @Override public void close() { - if ( !stopped.compareAndSet( false, true ) ) - { - // already closed or some other thread already started close - return; - } - for ( BlockingPooledConnectionQueue pool : pools.values() ) { pool.terminate(); diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java index de729527e3..6028e57ba3 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java @@ -78,7 +78,7 @@ public void shouldInvalidateConnectionThatIsOld() throws Throwable new PooledConnectionValidator( pool( true ), poolSettings ); PooledConnectionReleaseConsumer consumer = - new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator); + new PooledConnectionReleaseConsumer( queue, validator); consumer.accept( conn ); verify( queue, never() ).offer( conn ); @@ -102,7 +102,7 @@ public void shouldNotInvalidateConnectionThatIsNotOld() throws Throwable BlockingPooledConnectionQueue queue = mock( BlockingPooledConnectionQueue.class ); PooledConnectionReleaseConsumer consumer = - new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ),validator ); + new PooledConnectionReleaseConsumer( queue,validator ); consumer.accept( conn ); verify( queue ).offer( conn ); @@ -121,7 +121,7 @@ public void shouldInvalidConnectionIfFailedToReset() throws Throwable BlockingPooledConnectionQueue queue = mock( BlockingPooledConnectionQueue.class ); PooledConnectionReleaseConsumer consumer = - new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator ); + new PooledConnectionReleaseConsumer( queue, validator ); consumer.accept( conn ); verify( queue, never() ).offer( conn ); @@ -174,7 +174,7 @@ private void assertUnrecoverable( Neo4jException exception ) BlockingPooledConnectionQueue queue = mock( BlockingPooledConnectionQueue.class ); PooledConnectionReleaseConsumer consumer = - new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator ); + new PooledConnectionReleaseConsumer( queue, validator ); consumer.accept( conn ); verify( queue, never() ).offer( conn ); @@ -204,7 +204,7 @@ private void assertRecoverable( Neo4jException exception ) BlockingPooledConnectionQueue queue = mock( BlockingPooledConnectionQueue.class ); PooledConnectionReleaseConsumer consumer = - new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator ); + new PooledConnectionReleaseConsumer( queue, validator ); consumer.accept( conn ); verify( queue ).offer( conn ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java index 603f24f219..47da7d2db1 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java @@ -68,8 +68,7 @@ public void shouldDisposeConnectionIfNotValidConnection() throws Throwable final boolean[] flags = {false}; Connection conn = mock( Connection.class ); - PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, - new AtomicBoolean( false ), INVALID_CONNECTION ); + PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, INVALID_CONNECTION ); PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) @@ -99,8 +98,7 @@ public void shouldReturnToThePoolIfIsValidConnectionAndIdlePoolIsNotFull() throw final boolean[] flags = {false}; Connection conn = mock( Connection.class ); - PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, - new AtomicBoolean( false ), VALID_CONNECTION ); + PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, VALID_CONNECTION ); PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) { @@ -131,8 +129,7 @@ public void shouldDisposeConnectionIfValidConnectionAndIdlePoolIsFull() throws T final boolean[] flags = {false}; Connection conn = mock( Connection.class ); - PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, - new AtomicBoolean( false ), VALID_CONNECTION); + PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, VALID_CONNECTION); PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ); PooledConnection shouldBeClosedConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) @@ -164,13 +161,12 @@ public void shouldDisposeConnectionIfPoolAlreadyClosed() throws Throwable // session.close() -> well, close the connection directly without putting back to the pool // Given - final BlockingPooledConnectionQueue - pool = new BlockingPooledConnectionQueue(1); + final BlockingPooledConnectionQueue pool = new BlockingPooledConnectionQueue(1); + pool.terminate(); final boolean[] flags = {false}; Connection conn = mock( Connection.class ); - PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, - new AtomicBoolean( true ), VALID_CONNECTION); + PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, VALID_CONNECTION); PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) { @@ -193,25 +189,14 @@ public void dispose() public void shouldDisposeConnectionIfPoolStoppedAfterPuttingConnectionBackToPool() throws Throwable { // Given - final AtomicBoolean stopped = new AtomicBoolean( false ); final BlockingPooledConnectionQueue - pool = new BlockingPooledConnectionQueue(1){ - public boolean offer(PooledConnection conn) - { - stopped.set( true ); - // some clean work to close all connection in pool - boolean offer = super.offer( conn ); - assertThat ( this.size(), equalTo( 1 ) ); - // we successfully put the connection back to the pool - return offer; - } - }; + pool = new BlockingPooledConnectionQueue(1); + pool.terminate(); final boolean[] flags = {false}; Connection conn = mock( Connection.class ); - PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, - stopped , VALID_CONNECTION); + PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, VALID_CONNECTION); PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) {