Skip to content

Commit

Permalink
Fixes from code review
Browse files Browse the repository at this point in the history
  • Loading branch information
pontusmelke committed Oct 24, 2016
1 parent 8cfe635 commit f7c4c18
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.driver.internal.util.Supplier;

Expand All @@ -37,6 +38,8 @@ public class BlockingPooledConnectionQueue
/** The backing queue, keeps track of connections currently in queue */
private final BlockingQueue<PooledConnection> queue;

private final AtomicBoolean isTerminating = new AtomicBoolean( false );

/** Keeps track of acquired connections */
private final Set<PooledConnection> acquiredConnections =
Collections.newSetFromMap(new ConcurrentHashMap<PooledConnection, Boolean>());
Expand All @@ -54,14 +57,45 @@ public BlockingPooledConnectionQueue( int capacity )
*/
public boolean offer( PooledConnection pooledConnection )
{
acquiredConnections.remove( pooledConnection );
boolean offer = queue.offer( pooledConnection );
if ( offer )
{
acquiredConnections.remove( pooledConnection );
// not added back to the queue, dispose of the connection
if (!offer) {
pooledConnection.dispose();
}
if (isTerminating.get()) {
PooledConnection poll = queue.poll();
if (poll != null)
{
poll.dispose();
}
}
return offer;
}

/**
* Acquire connection or create a new one if the queue is empty
* @param supplier used to create a new connection if queue is empty
* @return a PooledConnection instance
*/
public PooledConnection acquire( Supplier<PooledConnection> supplier )
{

PooledConnection poll = queue.poll();
if ( poll == null )
{
poll = supplier.get();
}
acquiredConnections.add( poll );

if (isTerminating.get()) {
acquiredConnections.remove( poll );
poll.dispose();
throw new IllegalStateException( "Pool has been closed, cannot acquire new values." );
}
return poll;
}

public List<PooledConnection> toList()
{
return new ArrayList<>( queue );
Expand All @@ -88,34 +122,21 @@ public boolean contains( PooledConnection pooledConnection )
*/
public void terminate()
{
while ( !queue.isEmpty() )
if (isTerminating.compareAndSet( false, true ))
{
PooledConnection conn = queue.poll();
if ( conn != null )
while ( !queue.isEmpty() )
{
//close the underlying connection without adding it back to the queue
conn.dispose();
PooledConnection conn = queue.poll();
if ( conn != null )
{
//close the underlying connection without adding it back to the queue
conn.dispose();
}
}
for ( PooledConnection pooledConnection : acquiredConnections )
{
pooledConnection.dispose();
}
}
for ( PooledConnection pooledConnection : acquiredConnections )
{
pooledConnection.dispose();
}
}

/**
* Acquire connection or create a new one if the queue is empty
* @param supplier used to create a new connection if queue is empty
* @return a PooledConnection instance
*/
public PooledConnection acquire( Supplier<PooledConnection> supplier )
{
PooledConnection poll = queue.poll();
if ( poll == null )
{
poll = supplier.get();
}
acquiredConnections.add( poll );
return poll;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,44 +30,21 @@
class PooledConnectionReleaseConsumer implements Consumer<PooledConnection>
{
private final BlockingPooledConnectionQueue connections;
private final AtomicBoolean driverStopped;
private final Function<PooledConnection, Boolean> validConnection;

PooledConnectionReleaseConsumer( BlockingPooledConnectionQueue connections, AtomicBoolean driverStopped,
PooledConnectionReleaseConsumer( BlockingPooledConnectionQueue connections,
Function<PooledConnection, Boolean> validConnection)
{
this.connections = connections;
this.driverStopped = driverStopped;
this.validConnection = validConnection;
}

@Override
public void accept( PooledConnection pooledConnection )
{
if( driverStopped.get() )
if ( validConnection.apply( pooledConnection ) )
{
// if the driver already closed, then no need to try to return to pool, just directly close this connection
pooledConnection.dispose();
}
else if ( validConnection.apply( pooledConnection ) )
{
boolean released = connections.offer( pooledConnection );
if( !released )
{
// if the connection could be put back to the pool, then we let the pool to manage it.
// Otherwise, we close the connection directly here.
pooledConnection.dispose();
}
else if ( driverStopped.get() )
{
// If our adding the pooledConnection to the queue was racing with the closing of the driver,
// then the loop where the driver is closing all available connections might not observe our newly
// added connection. Thus, we must attempt to remove a connection and dispose it. It doesn't matter
// which connection we get back, because other threads might be in the same situation as ours. It only
// matters that we added *a* connection that might not be observed by the loop, and that we dispose of
// *a* connection in response.
connections.terminate();
}
connections.offer( pooledConnection );
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public class SocketConnectionPool implements ConnectionPool
private final Logging logging;

/** Shutdown flag */
private final AtomicBoolean stopped = new AtomicBoolean( false );

public SocketConnectionPool( ConnectionSettings connectionSettings, SecurityPlan securityPlan,
PoolSettings poolSettings, Logging logging )
Expand Down Expand Up @@ -108,18 +107,14 @@ private static Map<String,Value> tokenAsMap( AuthToken token )
@Override
public Connection acquire( final BoltServerAddress address )
{
if ( stopped.get() )
{
throw new IllegalStateException( "Pool has been closed, cannot acquire new values." );
}
final BlockingPooledConnectionQueue connections = pool( address );
Supplier<PooledConnection> supplier = new Supplier<PooledConnection>()
{
@Override
public PooledConnection get()
{
return new PooledConnection( connect( address ), new
PooledConnectionReleaseConsumer( connections, stopped,
PooledConnectionReleaseConsumer( connections,
new PooledConnectionValidator( SocketConnectionPool.this, poolSettings ) ), clock );

}
Expand Down Expand Up @@ -166,12 +161,6 @@ public boolean hasAddress( BoltServerAddress address )
@Override
public void close()
{
if ( !stopped.compareAndSet( false, true ) )
{
// already closed or some other thread already started close
return;
}

for ( BlockingPooledConnectionQueue pool : pools.values() )
{
pool.terminate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void shouldInvalidateConnectionThatIsOld() throws Throwable
new PooledConnectionValidator( pool( true ), poolSettings );

PooledConnectionReleaseConsumer consumer =
new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator);
new PooledConnectionReleaseConsumer( queue, validator);
consumer.accept( conn );

verify( queue, never() ).offer( conn );
Expand All @@ -102,7 +102,7 @@ public void shouldNotInvalidateConnectionThatIsNotOld() throws Throwable
BlockingPooledConnectionQueue
queue = mock( BlockingPooledConnectionQueue.class );
PooledConnectionReleaseConsumer consumer =
new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ),validator );
new PooledConnectionReleaseConsumer( queue,validator );
consumer.accept( conn );

verify( queue ).offer( conn );
Expand All @@ -121,7 +121,7 @@ public void shouldInvalidConnectionIfFailedToReset() throws Throwable
BlockingPooledConnectionQueue
queue = mock( BlockingPooledConnectionQueue.class );
PooledConnectionReleaseConsumer consumer =
new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator );
new PooledConnectionReleaseConsumer( queue, validator );
consumer.accept( conn );

verify( queue, never() ).offer( conn );
Expand Down Expand Up @@ -174,7 +174,7 @@ private void assertUnrecoverable( Neo4jException exception )
BlockingPooledConnectionQueue
queue = mock( BlockingPooledConnectionQueue.class );
PooledConnectionReleaseConsumer consumer =
new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator );
new PooledConnectionReleaseConsumer( queue, validator );
consumer.accept( conn );

verify( queue, never() ).offer( conn );
Expand Down Expand Up @@ -204,7 +204,7 @@ private void assertRecoverable( Neo4jException exception )
BlockingPooledConnectionQueue
queue = mock( BlockingPooledConnectionQueue.class );
PooledConnectionReleaseConsumer consumer =
new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator );
new PooledConnectionReleaseConsumer( queue, validator );
consumer.accept( conn );

verify( queue ).offer( conn );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ public void shouldDisposeConnectionIfNotValidConnection() throws Throwable
final boolean[] flags = {false};

Connection conn = mock( Connection.class );
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool,
new AtomicBoolean( false ), INVALID_CONNECTION );
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, INVALID_CONNECTION );


PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM )
Expand Down Expand Up @@ -99,8 +98,7 @@ public void shouldReturnToThePoolIfIsValidConnectionAndIdlePoolIsNotFull() throw
final boolean[] flags = {false};

Connection conn = mock( Connection.class );
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool,
new AtomicBoolean( false ), VALID_CONNECTION );
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, VALID_CONNECTION );

PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM )
{
Expand Down Expand Up @@ -131,8 +129,7 @@ public void shouldDisposeConnectionIfValidConnectionAndIdlePoolIsFull() throws T
final boolean[] flags = {false};

Connection conn = mock( Connection.class );
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool,
new AtomicBoolean( false ), VALID_CONNECTION);
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, VALID_CONNECTION);

PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM );
PooledConnection shouldBeClosedConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM )
Expand Down Expand Up @@ -164,13 +161,12 @@ public void shouldDisposeConnectionIfPoolAlreadyClosed() throws Throwable
// session.close() -> well, close the connection directly without putting back to the pool

// Given
final BlockingPooledConnectionQueue
pool = new BlockingPooledConnectionQueue(1);
final BlockingPooledConnectionQueue pool = new BlockingPooledConnectionQueue(1);
pool.terminate();
final boolean[] flags = {false};

Connection conn = mock( Connection.class );
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool,
new AtomicBoolean( true ), VALID_CONNECTION);
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, VALID_CONNECTION);

PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM )
{
Expand All @@ -193,25 +189,14 @@ public void dispose()
public void shouldDisposeConnectionIfPoolStoppedAfterPuttingConnectionBackToPool() throws Throwable
{
// Given
final AtomicBoolean stopped = new AtomicBoolean( false );
final BlockingPooledConnectionQueue
pool = new BlockingPooledConnectionQueue(1){
public boolean offer(PooledConnection conn)
{
stopped.set( true );
// some clean work to close all connection in pool
boolean offer = super.offer( conn );
assertThat ( this.size(), equalTo( 1 ) );
// we successfully put the connection back to the pool
return offer;
}
};
pool = new BlockingPooledConnectionQueue(1);
pool.terminate();
final boolean[] flags = {false};

Connection conn = mock( Connection.class );

PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool,
stopped , VALID_CONNECTION);
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, VALID_CONNECTION);

PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM )
{
Expand Down

0 comments on commit f7c4c18

Please sign in to comment.