Skip to content

Commit

Permalink
Added couple unit tests
Browse files Browse the repository at this point in the history
 * to verify that created event loop threads are the same as default
   Netty event loop threads

 * to verify that it's possible to execute blocking operations in
   `ForkJoinPool.commonPool()` when chaining async stages
  • Loading branch information
lutovich committed Nov 23, 2017
1 parent 12f6c3a commit cc29ca7
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@ public class EventLoopGroupFactoryTest
@After
public void tearDown()
{
if ( eventLoopGroup != null )
{
eventLoopGroup.shutdownGracefully().syncUninterruptibly();
}
shutdown( eventLoopGroup );
}

@Test
Expand Down Expand Up @@ -91,4 +88,46 @@ public void shouldAssertNotInEventLoopThread() throws Exception
assertThat( e.getCause(), is( blockingOperationInEventLoopError() ) );
}
}

/**
* Test verifies that our event loop group uses same kind of thread as Netty does by default.
* It's needed because default Netty setup has good performance.
*/
@Test
public void shouldUseSameThreadClassAsNioEventLoopGroupDoesByDefault() throws Exception
{
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup( 1 );
eventLoopGroup = EventLoopGroupFactory.newEventLoopGroup( 1 );
try
{
Thread defaultThread = getThread( nioEventLoopGroup );
Thread driverThread = getThread( eventLoopGroup );

assertEquals( defaultThread.getClass(), driverThread.getClass().getSuperclass() );
assertEquals( defaultThread.getPriority(), driverThread.getPriority() );
}
finally
{
shutdown( nioEventLoopGroup );
}
}

private static Thread getThread( EventLoopGroup eventLoopGroup ) throws Exception
{
return eventLoopGroup.next().submit( Thread::currentThread ).get( 10, SECONDS );
}

private static void shutdown( EventLoopGroup group )
{
if ( group != null )
{
try
{
group.shutdownGracefully().syncUninterruptibly();
}
catch ( Throwable ignore )
{
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,22 @@ public void shouldFailToExecuteBlockingRunChainedWithAsyncRun()
}
}

@Test
public void shouldAllowBlockingOperationInCommonPoolWhenChaining()
{
CompletionStage<Node> nodeStage = session.runAsync( "RETURN 42 AS value" )
.thenCompose( StatementResultCursor::singleAsync )
// move execution to ForkJoinPool.commonPool()
.thenApplyAsync( record -> session.run( "CREATE (n:Node {value: $value}) RETURN n", record ) )
.thenApply( StatementResult::single )
.thenApply( record -> record.get( 0 ).asNode() );

Node node = await( nodeStage );

assertEquals( 42, node.get( "value" ).asInt() );
assertEquals( 1, countNodesByLabel( "Node" ) );
}

private Future<List<CompletionStage<Record>>> runNestedQueries( StatementResultCursor inputCursor )
{
CompletableFuture<List<CompletionStage<Record>>> resultFuture = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,28 @@ public void shouldFailToExecuteBlockingRunChainedWithAsyncTransaction()
}
}

@Test
public void shouldAllowUsingBlockingApiInCommonPoolWhenChaining()
{
CompletionStage<Transaction> txStage = session.beginTransactionAsync()
// move execution to ForkJoinPool.commonPool()
.thenApplyAsync( tx ->
{
tx.run( "UNWIND [1,1,2] AS x CREATE (:Node {id: x})" );
tx.run( "CREATE (:Node {id: 42})" );
tx.success();
tx.close();
return tx;
} );

Transaction tx = await( txStage );

assertFalse( tx.isOpen() );
assertEquals( 2, countNodes( 1 ) );
assertEquals( 1, countNodes( 2 ) );
assertEquals( 1, countNodes( 42 ) );
}

private int countNodes( Object id )
{
StatementResult result = session.run( "MATCH (n:Node {id: $id}) RETURN count(n)", parameters( "id", id ) );
Expand Down

0 comments on commit cc29ca7

Please sign in to comment.