Skip to content

Commit

Permalink
Add reactive backend support (#998)
Browse files Browse the repository at this point in the history
This update brings reactive backend support.

Its scope is limited to providing partial transparent support for existing test cases. More updates are expected in future PRs.
  • Loading branch information
injectives committed Sep 6, 2021
1 parent 9c1f119 commit 8414154
Show file tree
Hide file tree
Showing 31 changed files with 662 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,24 @@ public class Runner
{
public static void main( String[] args ) throws InterruptedException
{
boolean asyncMode = args.length > 0 && args[0].equals( "async" );
TestkitRequestProcessorHandler.BackendMode backendMode;
String modeArg = args.length > 0 ? args[0] : null;
if ( "async".equals( modeArg ) )
{
backendMode = TestkitRequestProcessorHandler.BackendMode.ASYNC;
}
else if ( "reactive".equals( modeArg ) )
{
backendMode = TestkitRequestProcessorHandler.BackendMode.REACTIVE;
}
else
{
backendMode = TestkitRequestProcessorHandler.BackendMode.SYNC;
}

EventLoopGroup group = new NioEventLoopGroup();
try

{
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group( group )
Expand All @@ -50,7 +65,7 @@ protected void initChannel( SocketChannel channel )
channel.pipeline().addLast( new TestkitMessageInboundHandler() );
channel.pipeline().addLast( new TestkitMessageOutboundHandler() );
channel.pipeline().addLast( new TestkitRequestResponseMapperHandler() );
channel.pipeline().addLast( new TestkitRequestProcessorHandler( asyncMode ) );
channel.pipeline().addLast( new TestkitRequestProcessorHandler( backendMode ) );
}
} );
ChannelFuture server = bootstrap.bind().sync();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package neo4j.org.testkit.backend;

import lombok.Getter;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.concurrent.CompletableFuture;

public class RxBlockingSubscriber<T> implements Subscriber<T>
{
@Getter
private final CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
private CompletableFuture<CompletableFuture<T>> nextSignalConsumerFuture;

public void setNextSignalConsumer( CompletableFuture<T> nextSignalConsumer )
{
nextSignalConsumerFuture.complete( nextSignalConsumer );
}

@Override
public void onSubscribe( Subscription s )
{
nextSignalConsumerFuture = new CompletableFuture<>();
subscriptionFuture.complete( s );
}

@Override
public void onNext( T t )
{
blockUntilNextSignalConsumer().complete( t );
}

@Override
public void onError( Throwable t )
{
blockUntilNextSignalConsumer().completeExceptionally( t );
}

@Override
public void onComplete()
{
blockUntilNextSignalConsumer().complete( null );
}

private CompletableFuture<T> blockUntilNextSignalConsumer()
{
CompletableFuture<T> nextSignalConsumer;
try
{
nextSignalConsumer = nextSignalConsumerFuture.get();
}
catch ( Throwable throwable )
{
throw new RuntimeException( "Failed waiting for next signal consumer", throwable );
}
nextSignalConsumerFuture = new CompletableFuture<>();
return nextSignalConsumer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package neo4j.org.testkit.backend;

import lombok.Getter;
import lombok.Setter;

import java.util.concurrent.CompletableFuture;

import org.neo4j.driver.reactive.RxSession;

@Getter
@Setter
public class RxSessionState
{
public RxSession session;
public CompletableFuture<Void> txWorkFuture;

public RxSessionState( RxSession session )
{
this.session = session;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import lombok.Getter;
import neo4j.org.testkit.backend.messages.requests.TestkitCallbackResult;
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
import reactor.core.publisher.Mono;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -30,12 +31,15 @@
import java.util.function.Consumer;

import org.neo4j.driver.Driver;
import org.neo4j.driver.Record;
import org.neo4j.driver.Result;
import org.neo4j.driver.Transaction;
import org.neo4j.driver.async.AsyncTransaction;
import org.neo4j.driver.async.ResultCursor;
import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.internal.cluster.RoutingTableRegistry;
import org.neo4j.driver.reactive.RxResult;
import org.neo4j.driver.reactive.RxTransaction;

@Getter
public class TestkitState
Expand All @@ -46,12 +50,17 @@ public class TestkitState
private final Map<String,RoutingTableRegistry> routingTableRegistry = new HashMap<>();
private final Map<String,SessionState> sessionStates = new HashMap<>();
private final Map<String,AsyncSessionState> asyncSessionStates = new HashMap<>();
private final Map<String,RxSessionState> rxSessionStates = new HashMap<>();
private final Map<String,Result> results = new HashMap<>();
private final Map<String,ResultCursor> resultCursors = new HashMap<>();
private final Map<String,RxResult> rxResults = new HashMap<>();
private final Map<String,RxBlockingSubscriber<Record>> rxResultIdToRecordSubscriber = new HashMap<>();
@Getter( AccessLevel.NONE )
private final Map<String,Transaction> transactions = new HashMap<>();
@Getter( AccessLevel.NONE )
private final Map<String,AsyncTransaction> asyncTransactions = new HashMap<>();
@Getter( AccessLevel.NONE )
private final Map<String,RxTransaction> rxTransactions = new HashMap<>();
private final Map<String,Neo4jException> errors = new HashMap<>();
@Getter( AccessLevel.NONE )
private final AtomicInteger idGenerator = new AtomicInteger( 0 );
Expand Down Expand Up @@ -101,4 +110,20 @@ public CompletableFuture<AsyncTransaction> getAsyncTransaction( String id )
}
return CompletableFuture.completedFuture( asyncTransactions.get( id ) );
}

public String addRxTransaction( RxTransaction transaction )
{
String id = newId();
this.rxTransactions.put( id, transaction );
return id;
}

public Mono<RxTransaction> getRxTransaction( String id )
{
if ( !this.rxTransactions.containsKey( id ) )
{
return Mono.error( new RuntimeException( TRANSACTION_NOT_FOUND_MESSAGE ) );
}
return Mono.just( rxTransactions.get( id ) );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.BiFunction;

import org.neo4j.driver.exceptions.Neo4jException;
Expand All @@ -39,15 +41,24 @@
public class TestkitRequestProcessorHandler extends ChannelInboundHandlerAdapter
{
private final TestkitState testkitState = new TestkitState( this::writeAndFlush );
private final BiFunction<TestkitRequest, TestkitState, CompletionStage<TestkitResponse>> processorImpl;
private final BiFunction<TestkitRequest,TestkitState,CompletionStage<TestkitResponse>> processorImpl;
// Some requests require multiple threads
private final Executor requestExecutorService = Executors.newFixedThreadPool( 10 );
private Channel channel;

public TestkitRequestProcessorHandler( boolean asyncMode )
public TestkitRequestProcessorHandler( BackendMode backendMode )
{
if (asyncMode) {
processorImpl = (request, state) -> request.processAsync( state );
} else {
switch ( backendMode )
{
case ASYNC:
processorImpl = TestkitRequestProcessorHandler::wrapSyncRequest;
break;
case REACTIVE:
processorImpl = ( request, state ) -> request.processRx( state ).toFuture();
break;
default:
processorImpl = TestkitRequest::processAsync;
break;
}
}

Expand All @@ -62,20 +73,29 @@ public void channelRegistered( ChannelHandlerContext ctx ) throws Exception
public void channelRead( ChannelHandlerContext ctx, Object msg )
{
// Processing is done in a separate thread to avoid blocking EventLoop because some testing logic, like resolvers support, is blocking.
CompletableFuture.supplyAsync( () -> (TestkitRequest) msg )
.thenCompose( request -> processorImpl.apply( request, testkitState ) )
.thenApply( response ->
{
if ( response != null )
{
ctx.writeAndFlush( response );
}
return null;
} ).exceptionally( throwable ->
{
ctx.writeAndFlush( createErrorResponse( throwable ) );
return null;
} );
requestExecutorService.execute( () ->
{
try
{
TestkitRequest request = (TestkitRequest) msg;
CompletionStage<TestkitResponse> responseStage = processorImpl.apply( request, testkitState );
responseStage.whenComplete( ( response, throwable ) ->
{
if ( throwable != null )
{
ctx.writeAndFlush( createErrorResponse( throwable ) );
}
else if ( response != null )
{
ctx.writeAndFlush( response );
}
} );
}
catch ( Throwable throwable )
{
ctx.writeAndFlush( createErrorResponse( throwable ) );
}
} );
}

private static CompletionStage<TestkitResponse> wrapSyncRequest( TestkitRequest testkitRequest, TestkitState testkitState )
Expand Down Expand Up @@ -145,4 +165,11 @@ private void writeAndFlush( TestkitResponse response )
}
channel.writeAndFlush( response );
}

public enum BackendMode
{
SYNC,
ASYNC,
REACTIVE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import neo4j.org.testkit.backend.TestkitState;
import neo4j.org.testkit.backend.messages.responses.MultiDBSupport;
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
import reactor.core.publisher.Mono;

import java.util.concurrent.CompletionStage;

Expand All @@ -48,6 +49,12 @@ public CompletionStage<TestkitResponse> processAsync( TestkitState testkitState
.thenApply( this::createResponse );
}

@Override
public Mono<TestkitResponse> processRx( TestkitState testkitState )
{
return Mono.fromCompletionStage( processAsync( testkitState ) );
}

private MultiDBSupport createResponse( boolean available )
{
return MultiDBSupport.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import neo4j.org.testkit.backend.TestkitState;
import neo4j.org.testkit.backend.messages.responses.Driver;
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
import reactor.core.publisher.Mono;

import java.util.concurrent.CompletionStage;

Expand All @@ -47,6 +48,12 @@ public CompletionStage<TestkitResponse> processAsync( TestkitState testkitState
.thenApply( ignored -> createResponse() );
}

@Override
public Mono<TestkitResponse> processRx( TestkitState testkitState )
{
return Mono.fromCompletionStage( processAsync( testkitState ) );
}

private Driver createResponse()
{
return Driver.builder().data( Driver.DriverBody.builder().id( data.getDriverId() ).build() ).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import neo4j.org.testkit.backend.TestkitState;
import neo4j.org.testkit.backend.messages.responses.FeatureList;
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
import reactor.core.publisher.Mono;

import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -62,6 +63,12 @@ public CompletionStage<TestkitResponse> processAsync( TestkitState testkitState
return CompletableFuture.completedFuture( createResponse( COMMON_FEATURES ) );
}

@Override
public Mono<TestkitResponse> processRx( TestkitState testkitState )
{
return Mono.just( createResponse( COMMON_FEATURES ) );
}

private FeatureList createResponse( Set<String> features )
{
return FeatureList.builder().data( FeatureList.FeatureListBody.builder().features( features ).build() ).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import neo4j.org.testkit.backend.TestkitState;
import neo4j.org.testkit.backend.messages.responses.RoutingTable;
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
import reactor.core.publisher.Mono;

import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -78,7 +79,13 @@ public TestkitResponse process( TestkitState testkitState )
@Override
public CompletionStage<TestkitResponse> processAsync( TestkitState testkitState )
{
return CompletableFuture.completedFuture( process( testkitState ) ) ;
return CompletableFuture.completedFuture( process( testkitState ) );
}

@Override
public Mono<TestkitResponse> processRx( TestkitState testkitState )
{
return Mono.just( process( testkitState ) );
}

@Setter
Expand Down
Loading

0 comments on commit 8414154

Please sign in to comment.