From 0d27cb88adf566f09a320981a1478ee805de8531 Mon Sep 17 00:00:00 2001 From: Dmitriy Tverdiakov Date: Fri, 20 Aug 2021 14:15:19 +0100 Subject: [PATCH] Add support for Resolver and DomainNameResolver to async Testkit backend --- .../org/testkit/backend/TestkitState.java | 20 +++++-- .../TestkitRequestProcessorHandler.java | 34 +++++++----- .../DomainNameResolutionCompleted.java | 37 ++----------- .../backend/messages/requests/NewDriver.java | 54 ++++++++++++++++--- .../requests/ResolverResolutionCompleted.java | 22 ++------ .../backend/messages/requests/StartTest.java | 8 +-- .../requests/TestkitCallbackResult.java | 49 +++++++++++++++++ .../DomainNameResolutionRequired.java | 8 ++- .../responses/ResolverResolutionRequired.java | 8 ++- .../messages/responses/TestkitCallback.java | 30 +++++++++++ 10 files changed, 183 insertions(+), 87 deletions(-) create mode 100644 testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitCallbackResult.java create mode 100644 testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/TestkitCallback.java diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java index 2d14850180..07850519e4 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java @@ -19,12 +19,14 @@ package neo4j.org.testkit.backend; import lombok.Getter; +import neo4j.org.testkit.backend.messages.requests.TestkitCallbackResult; +import neo4j.org.testkit.backend.messages.responses.TestkitCallback; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; -import java.net.InetAddress; import java.util.HashMap; import java.util.Map; -import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.function.Consumer; import java.util.function.Supplier; @@ -35,7 +37,6 @@ import org.neo4j.driver.async.ResultCursor; import org.neo4j.driver.exceptions.Neo4jException; import org.neo4j.driver.internal.cluster.RoutingTableRegistry; -import org.neo4j.driver.net.ServerAddress; @Getter public class TestkitState @@ -52,8 +53,7 @@ public class TestkitState private int idGenerator = 0; private final Consumer responseWriter; private final Supplier processor; - private final Map> idToServerAddresses = new HashMap<>(); - private final Map idToResolvedAddresses = new HashMap<>(); + private final Map> callbackIdToFuture = new HashMap<>(); public TestkitState( Consumer responseWriter, Supplier processor ) { @@ -61,6 +61,16 @@ public TestkitState( Consumer responseWriter, Supplier this.processor = processor; } + public CompletionStage dispatchTestkitCallback( TestkitCallback response ) + { + CompletableFuture future = new CompletableFuture<>(); + callbackIdToFuture.put( response.getCallbackId(), future ); + responseWriter.accept( response ); + // This is required for sync backend, but should be removed during migration to Netty implementation. + processor.get(); + return future; + } + public String newId() { return String.valueOf( idGenerator++ ); diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java index ccac59c4fb..2a9e188148 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java @@ -27,6 +27,7 @@ import neo4j.org.testkit.backend.messages.responses.DriverError; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import org.neo4j.driver.exceptions.Neo4jException; @@ -49,20 +50,25 @@ public void channelRegistered( ChannelHandlerContext ctx ) throws Exception public void channelRead( ChannelHandlerContext ctx, Object msg ) { TestkitRequest testkitRequest = (TestkitRequest) msg; - try - { - testkitRequest.processAsync( testkitState ) - .thenAccept( responseOpt -> responseOpt.ifPresent( ctx::writeAndFlush ) ) - .exceptionally( throwable -> - { - ctx.writeAndFlush( createErrorResponse( throwable ) ); - return null; - } ); - } - catch ( Throwable throwable ) - { - ctx.writeAndFlush( createErrorResponse( throwable ) ); - } + // Processing is done in a separate thread to avoid blocking EventLoop because some testing logic, like resolvers support, is blocking. + CompletableFuture.runAsync( + () -> + { + try + { + testkitRequest.processAsync( testkitState ) + .thenAccept( responseOpt -> responseOpt.ifPresent( ctx::writeAndFlush ) ) + .exceptionally( throwable -> + { + ctx.writeAndFlush( createErrorResponse( throwable ) ); + return null; + } ); + } + catch ( Throwable throwable ) + { + ctx.writeAndFlush( createErrorResponse( throwable ) ); + } + } ); } private TestkitResponse createErrorResponse( Throwable throwable ) diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DomainNameResolutionCompleted.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DomainNameResolutionCompleted.java index cde9a111b3..7878fa749a 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DomainNameResolutionCompleted.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DomainNameResolutionCompleted.java @@ -21,55 +21,26 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; -import neo4j.org.testkit.backend.TestkitState; -import neo4j.org.testkit.backend.messages.responses.TestkitResponse; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletionStage; @Setter @Getter @NoArgsConstructor -public class DomainNameResolutionCompleted implements TestkitRequest +public class DomainNameResolutionCompleted implements TestkitCallbackResult { private DomainNameResolutionCompletedBody data; @Override - public TestkitResponse process( TestkitState testkitState ) + public String getCallbackId() { - testkitState.getIdToResolvedAddresses().put( - data.getRequestId(), - data.getAddresses() - .stream() - .map( - addr -> - { - try - { - return InetAddress.getByName( addr ); - } - catch ( UnknownHostException e ) - { - throw new RuntimeException( e ); - } - } ) - .toArray( InetAddress[]::new ) ); - return null; - } - - @Override - public CompletionStage> processAsync( TestkitState testkitState ) - { - throw new UnsupportedOperationException(); + return data.getRequestId(); } @Setter @Getter @NoArgsConstructor - private static class DomainNameResolutionCompletedBody + public static class DomainNameResolutionCompletedBody { private String requestId; private List addresses; diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java index 86ab421d19..51890a6f74 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java @@ -30,15 +30,20 @@ import neo4j.org.testkit.backend.messages.responses.ResolverResolutionRequired; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.net.InetAddress; import java.net.URI; +import java.net.UnknownHostException; +import java.util.LinkedHashSet; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.neo4j.driver.AuthToken; import org.neo4j.driver.AuthTokens; import org.neo4j.driver.Config; +import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.DefaultDomainNameResolver; import org.neo4j.driver.internal.DomainNameResolver; import org.neo4j.driver.internal.DriverFactory; @@ -128,9 +133,20 @@ private ServerAddressResolver callbackResolver( TestkitState testkitState ) ResolverResolutionRequired.builder() .data( body ) .build(); - testkitState.getResponseWriter().accept( response ); - testkitState.getProcessor().get(); - return testkitState.getIdToServerAddresses().remove( callbackId ); + CompletionStage c = testkitState.dispatchTestkitCallback( response ); + ResolverResolutionCompleted resolutionCompleted; + try + { + resolutionCompleted = (ResolverResolutionCompleted) c.toCompletableFuture().get(); + } + catch ( Exception e ) + { + throw new RuntimeException( e ); + } + return resolutionCompleted.getData().getAddresses() + .stream() + .map( BoltServerAddress::new ) + .collect( Collectors.toCollection( LinkedHashSet::new ) ); }; } @@ -144,13 +160,37 @@ private DomainNameResolver callbackDomainNameResolver( TestkitState testkitState .id( callbackId ) .name( address ) .build(); - DomainNameResolutionRequired response = + DomainNameResolutionRequired callback = DomainNameResolutionRequired.builder() .data( body ) .build(); - testkitState.getResponseWriter().accept( response ); - testkitState.getProcessor().get(); - return testkitState.getIdToResolvedAddresses().remove( callbackId ); + + CompletionStage callbackStage = testkitState.dispatchTestkitCallback( callback ); + DomainNameResolutionCompleted resolutionCompleted; + try + { + resolutionCompleted = (DomainNameResolutionCompleted) callbackStage.toCompletableFuture().get(); + } + catch ( Exception e ) + { + throw new RuntimeException( "Unexpected failure during Testkit callback", e ); + } + + return resolutionCompleted.getData().getAddresses() + .stream() + .map( + addr -> + { + try + { + return InetAddress.getByName( addr ); + } + catch ( UnknownHostException e ) + { + throw new RuntimeException( e ); + } + } ) + .toArray( InetAddress[]::new ); }; } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResolverResolutionCompleted.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResolverResolutionCompleted.java index 86810f63f2..2a64e22911 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResolverResolutionCompleted.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResolverResolutionCompleted.java @@ -21,36 +21,20 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; -import neo4j.org.testkit.backend.TestkitState; -import neo4j.org.testkit.backend.messages.responses.TestkitResponse; -import java.util.LinkedHashSet; import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletionStage; -import java.util.stream.Collectors; - -import org.neo4j.driver.internal.BoltServerAddress; @Setter @Getter @NoArgsConstructor -public class ResolverResolutionCompleted implements TestkitRequest +public class ResolverResolutionCompleted implements TestkitCallbackResult { private ResolverResolutionCompletedBody data; @Override - public TestkitResponse process( TestkitState testkitState ) - { - testkitState.getIdToServerAddresses().put( data.getRequestId(), data.getAddresses().stream().map( BoltServerAddress::new ) - .collect( Collectors.toCollection( LinkedHashSet::new ) ) ); - return null; - } - - @Override - public CompletionStage> processAsync( TestkitState testkitState ) + public String getCallbackId() { - throw new UnsupportedOperationException(); + return data.getRequestId(); } @Setter diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java index af45836492..dea1551ff1 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java @@ -41,13 +41,7 @@ public class StartTest implements TestkitRequest static { - ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_fail_when_driver_closed_using_session_run$", "Does not throw error" ); - ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_read_successfully_on_empty_discovery_result_using_session_run$", "Resolver not implemented" ); - ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_request_rt_from_all_initial_routers_until_successful", "Resolver not implemented" ); - ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_revert_to_initial_router_if_known_router_throws_protocol_errors", "Resolver not implemented" ); - ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_successfully_acquire_rt_when_router_ip_changes$", "Resolver not implemented" ); - ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_use_resolver_during_rediscovery_when_existing_routers_fail$", "Resolver not implemented" ); - ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_reject_server_using_verify_connectivity_bolt_3x0", "Does not error as expected" ); + ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_reject_server_using_verify_connectivity_bolt_3x0$", "Does not error as expected" ); } private StartTestBody data; diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitCallbackResult.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitCallbackResult.java new file mode 100644 index 0000000000..7a4cfbba6a --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitCallbackResult.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.messages.requests; + +import neo4j.org.testkit.backend.TestkitState; +import neo4j.org.testkit.backend.messages.responses.TestkitCallback; +import neo4j.org.testkit.backend.messages.responses.TestkitResponse; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +/** + * This request is sent by Testkit in response to previously sent {@link TestkitCallback}. + */ +public interface TestkitCallbackResult extends TestkitRequest +{ + String getCallbackId(); + + @Override + default TestkitResponse process( TestkitState testkitState ) + { + testkitState.getCallbackIdToFuture().get( getCallbackId() ).complete( this ); + return null; + } + + @Override + default CompletionStage> processAsync( TestkitState testkitState ) + { + testkitState.getCallbackIdToFuture().get( getCallbackId() ).complete( this ); + return CompletableFuture.completedFuture( Optional.empty() ); + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/DomainNameResolutionRequired.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/DomainNameResolutionRequired.java index 3f803cc021..06fc58d674 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/DomainNameResolutionRequired.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/DomainNameResolutionRequired.java @@ -25,7 +25,7 @@ @Setter @Getter @Builder -public class DomainNameResolutionRequired implements TestkitResponse +public class DomainNameResolutionRequired implements TestkitCallback { private DomainNameResolutionRequiredBody data; @@ -35,6 +35,12 @@ public String testkitName() return "DomainNameResolutionRequired"; } + @Override + public String getCallbackId() + { + return data.getId(); + } + @Setter @Getter @Builder diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/ResolverResolutionRequired.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/ResolverResolutionRequired.java index 685e3a3742..d39eda8c5b 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/ResolverResolutionRequired.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/ResolverResolutionRequired.java @@ -25,7 +25,7 @@ @Setter @Getter @Builder -public class ResolverResolutionRequired implements TestkitResponse +public class ResolverResolutionRequired implements TestkitCallback { private ResolverResolutionRequiredBody data; @@ -35,6 +35,12 @@ public String testkitName() return "ResolverResolutionRequired"; } + @Override + public String getCallbackId() + { + return data.getId(); + } + @Setter @Getter @Builder diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/TestkitCallback.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/TestkitCallback.java new file mode 100644 index 0000000000..f43e37ccf8 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/TestkitCallback.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.messages.responses; + +import neo4j.org.testkit.backend.messages.requests.TestkitCallbackResult; + +/** + * This is a special type of {@link TestkitResponse} that is typically sent during driver action processing to request some action or data from Testkit, which + * must respond with {@link TestkitCallbackResult}. + */ +public interface TestkitCallback extends TestkitResponse +{ + String getCallbackId(); +}