Skip to content

Commit

Permalink
Fixed couple flaky tests
Browse files Browse the repository at this point in the history
Tests that asserted blocking operations are prohibited in even loop
threads were flaky. They assumed lambdas in `CompletionStage` chain
are always executed by event loop threads. However, this is not true.
There is no guarantee which thread executes the callback.

This commit adds additional check to only perform blocking operation
when in event loop thread. Exceptions are expected in this case.
  • Loading branch information
lutovich committed Dec 15, 2017
1 parent 3eb4c9d commit f2f55c2
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,25 @@ public static EventLoopGroup newEventLoopGroup()
*/
public static void assertNotInEventLoopThread() throws IllegalStateException
{
if ( Thread.currentThread() instanceof DriverThread )
if ( isEventLoopThread( Thread.currentThread() ) )
{
throw new IllegalStateException(
"Blocking operation can't be executed in IO thread because it might result in a deadlock. " +
"Please do not use blocking API when chaining futures returned by async API methods." );
}
}

/**
* Check if given thread is an event loop IO thread.
*
* @param thread the thread to check.
* @return {@code true} when given thread belongs to the event loop, {@code false} otherwise.
*/
public static boolean isEventLoopThread( Thread thread )
{
return thread instanceof DriverThread;
}

/**
* Same as {@link NioEventLoopGroup} but uses a different {@link ThreadFactory} that produces threads of
* {@link DriverThread} class. Such threads can be recognized by {@link #assertNotInEventLoopThread()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.neo4j.driver.internal.util.Iterables.count;
import static org.neo4j.driver.internal.util.Matchers.blockingOperationInEventLoopError;
Expand Down Expand Up @@ -77,7 +79,7 @@ public void shouldAssertNotInEventLoopThread() throws Exception
EventLoopGroupFactory.assertNotInEventLoopThread();

// submit assertion to the event loop thread, it should fail there
Future<?> assertFuture = eventLoopGroup.next().submit( EventLoopGroupFactory::assertNotInEventLoopThread );
Future<?> assertFuture = eventLoopGroup.submit( EventLoopGroupFactory::assertNotInEventLoopThread );
try
{
assertFuture.get( 30, SECONDS );
Expand All @@ -89,6 +91,17 @@ public void shouldAssertNotInEventLoopThread() throws Exception
}
}

@Test
public void shouldCheckIfEventLoopThread() throws Exception
{
eventLoopGroup = EventLoopGroupFactory.newEventLoopGroup( 1 );

Thread eventLoopThread = getThread( eventLoopGroup );
assertTrue( EventLoopGroupFactory.isEventLoopThread( eventLoopThread ) );

assertFalse( EventLoopGroupFactory.isEventLoopThread( Thread.currentThread() ) );
}

/**
* Test verifies that our event loop group uses same kind of thread as Netty does by default.
* It's needed because default Netty setup has good performance.
Expand All @@ -114,7 +127,7 @@ public void shouldUseSameThreadClassAsNioEventLoopGroupDoesByDefault() throws Ex

private static Thread getThread( EventLoopGroup eventLoopGroup ) throws Exception
{
return eventLoopGroup.next().submit( Thread::currentThread ).get( 10, SECONDS );
return eventLoopGroup.submit( Thread::currentThread ).get( 10, SECONDS );
}

private static void shutdown( EventLoopGroup group )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import org.neo4j.driver.internal.async.EventLoopGroupFactory;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.internal.util.ServerVersion;
import org.neo4j.driver.v1.Record;
Expand Down Expand Up @@ -1045,47 +1046,50 @@ public void shouldBePossibleToMixRunAsyncAndBlockingSessionClose()
@Test
public void shouldFailToExecuteBlockingRunInAsyncTransactionFunction()
{
TransactionWork<CompletionStage<List<Record>>> completionStageTransactionWork = tx ->
TransactionWork<CompletionStage<Void>> completionStageTransactionWork = tx ->
{
StatementResult result = tx.run( "UNWIND range(1, 10000) AS x CREATE (n:AsyncNode {x: x}) RETURN n" );
List<Record> records = new ArrayList<>();
while ( result.hasNext() )
if ( EventLoopGroupFactory.isEventLoopThread( Thread.currentThread() ) )
{
records.add( result.next() );
try
{
tx.run( "UNWIND range(1, 10000) AS x CREATE (n:AsyncNode {x: x}) RETURN n" );
fail( "Exception expected" );
}
catch ( IllegalStateException e )
{
assertThat( e, is( blockingOperationInEventLoopError() ) );
}
}

return completedFuture( records );
return completedFuture( null );
};

CompletionStage<List<Record>> result = session.readTransactionAsync( completionStageTransactionWork );

try
{
await( result );
fail( "Exception expected" );
}
catch ( IllegalStateException e )
{
assertThat( e, is( blockingOperationInEventLoopError() ) );
}
CompletionStage<Void> result = session.readTransactionAsync( completionStageTransactionWork );
assertNull( await( result ) );
}

@Test
public void shouldFailToExecuteBlockingRunChainedWithAsyncRun()
{
CompletionStage<StatementResult> result = session.runAsync( "RETURN 1" )
CompletionStage<Void> result = session.runAsync( "RETURN 1" )
.thenCompose( StatementResultCursor::singleAsync )
.thenApply( record -> session.run( "RETURN $x", parameters( "x", record.get( 0 ).asInt() ) ) );
.thenApply( record ->
{
if ( EventLoopGroupFactory.isEventLoopThread( Thread.currentThread() ) )
{
try
{
session.run( "RETURN $x", parameters( "x", record.get( 0 ).asInt() ) );
fail( "Exception expected" );
}
catch ( IllegalStateException e )
{
assertThat( e, is( blockingOperationInEventLoopError() ) );
}
}
return null;
} );

try
{
await( result );
fail( "Exception expected" );
}
catch ( IllegalStateException e )
{
assertThat( e, is( blockingOperationInEventLoopError() ) );
}
assertNull( await( result ) );
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.neo4j.driver.internal.ExplicitTransaction;
import org.neo4j.driver.internal.async.EventLoopGroupFactory;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.Statement;
Expand Down Expand Up @@ -1239,23 +1240,25 @@ public void shouldPropagateFailureFromSummary()
@Test
public void shouldFailToExecuteBlockingRunChainedWithAsyncTransaction()
{
assumeDatabaseSupportsBookmarks();

session.writeTransaction( tx -> tx.run( "CREATE ()" ) );
assertNotNull( session.lastBookmark() );

CompletionStage<StatementResult> result = session.beginTransactionAsync()
.thenApply( tx -> tx.run( "CREATE ()" ) );
CompletionStage<Void> result = session.beginTransactionAsync()
.thenApply( tx ->
{
if ( EventLoopGroupFactory.isEventLoopThread( Thread.currentThread() ) )
{
try
{
tx.run( "CREATE ()" );
fail( "Exception expected" );
}
catch ( IllegalStateException e )
{
assertThat( e, is( blockingOperationInEventLoopError() ) );
}
}
return null;
} );

try
{
await( result );
fail( "Exception expected" );
}
catch ( IllegalStateException e )
{
assertThat( e, is( blockingOperationInEventLoopError() ) );
}
assertNull( await( result ) );
}

@Test
Expand Down

0 comments on commit f2f55c2

Please sign in to comment.