Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Resolver and DomainNameResolver to async Testkit backend #992

Merged
merged 1 commit into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package neo4j.org.testkit.backend;

import lombok.Getter;
import neo4j.org.testkit.backend.messages.requests.TestkitCallbackResult;
import neo4j.org.testkit.backend.messages.responses.TestkitCallback;
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;

import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import java.util.function.Supplier;

Expand All @@ -35,7 +37,6 @@
import org.neo4j.driver.async.ResultCursor;
import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.internal.cluster.RoutingTableRegistry;
import org.neo4j.driver.net.ServerAddress;

@Getter
public class TestkitState
Expand All @@ -52,15 +53,24 @@ public class TestkitState
private int idGenerator = 0;
private final Consumer<TestkitResponse> responseWriter;
private final Supplier<Boolean> processor;
private final Map<String,Set<ServerAddress>> idToServerAddresses = new HashMap<>();
private final Map<String,InetAddress[]> idToResolvedAddresses = new HashMap<>();
private final Map<String,CompletableFuture<TestkitCallbackResult>> callbackIdToFuture = new HashMap<>();

public TestkitState( Consumer<TestkitResponse> responseWriter, Supplier<Boolean> processor )
{
this.responseWriter = responseWriter;
this.processor = processor;
}

public CompletionStage<TestkitCallbackResult> dispatchTestkitCallback( TestkitCallback response )
{
CompletableFuture<TestkitCallbackResult> future = new CompletableFuture<>();
callbackIdToFuture.put( response.getCallbackId(), future );
responseWriter.accept( response );
// This is required for sync backend, but should be removed during migration to Netty implementation.
processor.get();
return future;
}

public String newId()
{
return String.valueOf( idGenerator++ );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import neo4j.org.testkit.backend.messages.responses.DriverError;
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

import org.neo4j.driver.exceptions.Neo4jException;
Expand All @@ -49,20 +50,25 @@ public void channelRegistered( ChannelHandlerContext ctx ) throws Exception
public void channelRead( ChannelHandlerContext ctx, Object msg )
{
TestkitRequest testkitRequest = (TestkitRequest) msg;
try
{
testkitRequest.processAsync( testkitState )
.thenAccept( responseOpt -> responseOpt.ifPresent( ctx::writeAndFlush ) )
.exceptionally( throwable ->
{
ctx.writeAndFlush( createErrorResponse( throwable ) );
return null;
} );
}
catch ( Throwable throwable )
{
ctx.writeAndFlush( createErrorResponse( throwable ) );
}
// Processing is done in a separate thread to avoid blocking EventLoop because some testing logic, like resolvers support, is blocking.
CompletableFuture.runAsync(
() ->
{
try
{
testkitRequest.processAsync( testkitState )
.thenAccept( responseOpt -> responseOpt.ifPresent( ctx::writeAndFlush ) )
.exceptionally( throwable ->
{
ctx.writeAndFlush( createErrorResponse( throwable ) );
return null;
} );
}
catch ( Throwable throwable )
{
ctx.writeAndFlush( createErrorResponse( throwable ) );
}
} );
}

private TestkitResponse createErrorResponse( Throwable throwable )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,55 +21,26 @@
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import neo4j.org.testkit.backend.TestkitState;
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;

@Setter
@Getter
@NoArgsConstructor
public class DomainNameResolutionCompleted implements TestkitRequest
public class DomainNameResolutionCompleted implements TestkitCallbackResult
{
private DomainNameResolutionCompletedBody data;

@Override
public TestkitResponse process( TestkitState testkitState )
public String getCallbackId()
{
testkitState.getIdToResolvedAddresses().put(
data.getRequestId(),
data.getAddresses()
.stream()
.map(
addr ->
{
try
{
return InetAddress.getByName( addr );
}
catch ( UnknownHostException e )
{
throw new RuntimeException( e );
}
} )
.toArray( InetAddress[]::new ) );
return null;
}

@Override
public CompletionStage<Optional<TestkitResponse>> processAsync( TestkitState testkitState )
{
throw new UnsupportedOperationException();
return data.getRequestId();
}

@Setter
@Getter
@NoArgsConstructor
private static class DomainNameResolutionCompletedBody
public static class DomainNameResolutionCompletedBody
{
private String requestId;
private List<String> addresses;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,20 @@
import neo4j.org.testkit.backend.messages.responses.ResolverResolutionRequired;
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;

import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.LinkedHashSet;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.neo4j.driver.AuthToken;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Config;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.DefaultDomainNameResolver;
import org.neo4j.driver.internal.DomainNameResolver;
import org.neo4j.driver.internal.DriverFactory;
Expand Down Expand Up @@ -128,9 +133,20 @@ private ServerAddressResolver callbackResolver( TestkitState testkitState )
ResolverResolutionRequired.builder()
.data( body )
.build();
testkitState.getResponseWriter().accept( response );
testkitState.getProcessor().get();
return testkitState.getIdToServerAddresses().remove( callbackId );
CompletionStage<TestkitCallbackResult> c = testkitState.dispatchTestkitCallback( response );
ResolverResolutionCompleted resolutionCompleted;
try
{
resolutionCompleted = (ResolverResolutionCompleted) c.toCompletableFuture().get();
}
catch ( Exception e )
{
throw new RuntimeException( e );
}
return resolutionCompleted.getData().getAddresses()
.stream()
.map( BoltServerAddress::new )
.collect( Collectors.toCollection( LinkedHashSet::new ) );
};
}

Expand All @@ -144,13 +160,37 @@ private DomainNameResolver callbackDomainNameResolver( TestkitState testkitState
.id( callbackId )
.name( address )
.build();
DomainNameResolutionRequired response =
DomainNameResolutionRequired callback =
DomainNameResolutionRequired.builder()
.data( body )
.build();
testkitState.getResponseWriter().accept( response );
testkitState.getProcessor().get();
return testkitState.getIdToResolvedAddresses().remove( callbackId );

CompletionStage<TestkitCallbackResult> callbackStage = testkitState.dispatchTestkitCallback( callback );
DomainNameResolutionCompleted resolutionCompleted;
try
{
resolutionCompleted = (DomainNameResolutionCompleted) callbackStage.toCompletableFuture().get();
}
catch ( Exception e )
{
throw new RuntimeException( "Unexpected failure during Testkit callback", e );
}

return resolutionCompleted.getData().getAddresses()
.stream()
.map(
addr ->
{
try
{
return InetAddress.getByName( addr );
}
catch ( UnknownHostException e )
{
throw new RuntimeException( e );
}
} )
.toArray( InetAddress[]::new );
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,20 @@
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import neo4j.org.testkit.backend.TestkitState;
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;

import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;

import org.neo4j.driver.internal.BoltServerAddress;

@Setter
@Getter
@NoArgsConstructor
public class ResolverResolutionCompleted implements TestkitRequest
public class ResolverResolutionCompleted implements TestkitCallbackResult
{
private ResolverResolutionCompletedBody data;

@Override
public TestkitResponse process( TestkitState testkitState )
{
testkitState.getIdToServerAddresses().put( data.getRequestId(), data.getAddresses().stream().map( BoltServerAddress::new )
.collect( Collectors.toCollection( LinkedHashSet::new ) ) );
return null;
}

@Override
public CompletionStage<Optional<TestkitResponse>> processAsync( TestkitState testkitState )
public String getCallbackId()
{
throw new UnsupportedOperationException();
return data.getRequestId();
}

@Setter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,7 @@ public class StartTest implements TestkitRequest

static
{
ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_fail_when_driver_closed_using_session_run$", "Does not throw error" );
ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_read_successfully_on_empty_discovery_result_using_session_run$", "Resolver not implemented" );
ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_request_rt_from_all_initial_routers_until_successful", "Resolver not implemented" );
ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_revert_to_initial_router_if_known_router_throws_protocol_errors", "Resolver not implemented" );
ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_successfully_acquire_rt_when_router_ip_changes$", "Resolver not implemented" );
ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_use_resolver_during_rediscovery_when_existing_routers_fail$", "Resolver not implemented" );
ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_reject_server_using_verify_connectivity_bolt_3x0", "Does not error as expected" );
ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_reject_server_using_verify_connectivity_bolt_3x0$", "Does not error as expected" );
}

private StartTestBody data;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package neo4j.org.testkit.backend.messages.requests;

import neo4j.org.testkit.backend.TestkitState;
import neo4j.org.testkit.backend.messages.responses.TestkitCallback;
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

/**
* This request is sent by Testkit in response to previously sent {@link TestkitCallback}.
*/
public interface TestkitCallbackResult extends TestkitRequest
{
String getCallbackId();

@Override
default TestkitResponse process( TestkitState testkitState )
{
testkitState.getCallbackIdToFuture().get( getCallbackId() ).complete( this );
return null;
}

@Override
default CompletionStage<Optional<TestkitResponse>> processAsync( TestkitState testkitState )
{
testkitState.getCallbackIdToFuture().get( getCallbackId() ).complete( this );
return CompletableFuture.completedFuture( Optional.empty() );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
@Setter
@Getter
@Builder
public class DomainNameResolutionRequired implements TestkitResponse
public class DomainNameResolutionRequired implements TestkitCallback
{
private DomainNameResolutionRequiredBody data;

Expand All @@ -35,6 +35,12 @@ public String testkitName()
return "DomainNameResolutionRequired";
}

@Override
public String getCallbackId()
{
return data.getId();
}

@Setter
@Getter
@Builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
@Setter
@Getter
@Builder
public class ResolverResolutionRequired implements TestkitResponse
public class ResolverResolutionRequired implements TestkitCallback
{
private ResolverResolutionRequiredBody data;

Expand All @@ -35,6 +35,12 @@ public String testkitName()
return "ResolverResolutionRequired";
}

@Override
public String getCallbackId()
{
return data.getId();
}

@Setter
@Getter
@Builder
Expand Down
Loading