Skip to content

Commit

Permalink
Merge pull request #252 from pontusmelke/1.1-close-conn-on-driver-close
Browse files Browse the repository at this point in the history
Close all connections on driver.close()
  • Loading branch information
Zhen Li authored Oct 24, 2016
2 parents 60acb8a + f7c4c18 commit 0b2b4b2
Show file tree
Hide file tree
Showing 8 changed files with 352 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public synchronized void reset()
@Override
public boolean isOpen()
{
return isOpen.get();
return isOpen.get() && connection.isOpen();
}

@Override
Expand Down Expand Up @@ -177,10 +177,6 @@ public void close()
{
connection.sync();
}
catch ( Throwable t )
{
throw t;
}
finally
{
closeConnection();
Expand Down Expand Up @@ -314,7 +310,7 @@ private void ensureConnectionIsOpen()

private void ensureSessionIsOpen()
{
if ( !isOpen() )
if ( !isOpen.get() )
{
throw new ClientException(
"No more interaction with this session is allowed " +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/**
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.net.pooling;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.driver.internal.util.Supplier;

/**
* A blocking queue that also keeps track of connections that are acquired in order
* to facilitate termination of all connections.
*/
public class BlockingPooledConnectionQueue
{
/** The backing queue, keeps track of connections currently in queue */
private final BlockingQueue<PooledConnection> queue;

private final AtomicBoolean isTerminating = new AtomicBoolean( false );

/** Keeps track of acquired connections */
private final Set<PooledConnection> acquiredConnections =
Collections.newSetFromMap(new ConcurrentHashMap<PooledConnection, Boolean>());

public BlockingPooledConnectionQueue( int capacity )
{
this.queue = new LinkedBlockingQueue<>( capacity );
}

/**
* Offer a connections back to the queue
*
* @param pooledConnection the connection to put back to the queue
* @return <code>true</code> if connections was accepted otherwise <code>false</code>
*/
public boolean offer( PooledConnection pooledConnection )
{
acquiredConnections.remove( pooledConnection );
boolean offer = queue.offer( pooledConnection );
// not added back to the queue, dispose of the connection
if (!offer) {
pooledConnection.dispose();
}
if (isTerminating.get()) {
PooledConnection poll = queue.poll();
if (poll != null)
{
poll.dispose();
}
}
return offer;
}

/**
* Acquire connection or create a new one if the queue is empty
* @param supplier used to create a new connection if queue is empty
* @return a PooledConnection instance
*/
public PooledConnection acquire( Supplier<PooledConnection> supplier )
{

PooledConnection poll = queue.poll();
if ( poll == null )
{
poll = supplier.get();
}
acquiredConnections.add( poll );

if (isTerminating.get()) {
acquiredConnections.remove( poll );
poll.dispose();
throw new IllegalStateException( "Pool has been closed, cannot acquire new values." );
}
return poll;
}

public List<PooledConnection> toList()
{
return new ArrayList<>( queue );
}

public boolean isEmpty()
{
return queue.isEmpty();
}

public int size()
{
return queue.size();
}

public boolean contains( PooledConnection pooledConnection )
{
return queue.contains( pooledConnection );
}

/**
* Terminates all connections, both those that are currently in the queue as well
* as those that have been acquired.
*/
public void terminate()
{
if (isTerminating.compareAndSet( false, true ))
{
while ( !queue.isEmpty() )
{
PooledConnection conn = queue.poll();
if ( conn != null )
{
//close the underlying connection without adding it back to the queue
conn.dispose();
}
}
for ( PooledConnection pooledConnection : acquiredConnections )
{
pooledConnection.dispose();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.neo4j.driver.internal.net.pooling;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.driver.internal.util.Consumer;
Expand All @@ -30,49 +29,22 @@
*/
class PooledConnectionReleaseConsumer implements Consumer<PooledConnection>
{
private final BlockingQueue<PooledConnection> connections;
private final AtomicBoolean driverStopped;
private final BlockingPooledConnectionQueue connections;
private final Function<PooledConnection, Boolean> validConnection;

PooledConnectionReleaseConsumer( BlockingQueue<PooledConnection> connections, AtomicBoolean driverStopped,
PooledConnectionReleaseConsumer( BlockingPooledConnectionQueue connections,
Function<PooledConnection, Boolean> validConnection)
{
this.connections = connections;
this.driverStopped = driverStopped;
this.validConnection = validConnection;
}

@Override
public void accept( PooledConnection pooledConnection )
{
if( driverStopped.get() )
if ( validConnection.apply( pooledConnection ) )
{
// if the driver already closed, then no need to try to return to pool, just directly close this connection
pooledConnection.dispose();
}
else if ( validConnection.apply( pooledConnection ) )
{
boolean released = connections.offer( pooledConnection );
if( !released )
{
// if the connection could be put back to the pool, then we let the pool to manage it.
// Otherwise, we close the connection directly here.
pooledConnection.dispose();
}
else if ( driverStopped.get() )
{
// If our adding the pooledConnection to the queue was racing with the closing of the driver,
// then the loop where the driver is closing all available connections might not observe our newly
// added connection. Thus, we must attempt to remove a connection and dispose it. It doesn't matter
// which connection we get back, because other threads might be in the same situation as ours. It only
// matters that we added *a* connection that might not be observed by the loop, and that we dispose of
// *a* connection in response.
PooledConnection conn = connections.poll();
if ( conn != null )
{
conn.dispose();
}
}
connections.offer( pooledConnection );
}
else
{
Expand Down
Loading

0 comments on commit 0b2b4b2

Please sign in to comment.