Skip to content

Commit

Permalink
Fix reactive transaction function retry logic to retry on relevant re…
Browse files Browse the repository at this point in the history
…source cleanup failures

This update fixes reactive transaction function retry logic and makes it retry when retryable errors occur during resource cleanup.
  • Loading branch information
injectives committed Sep 13, 2021
1 parent e0a4f6e commit d31acfb
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.neo4j.driver.Logging;
import org.neo4j.driver.exceptions.AuthorizationExpiredException;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.exceptions.SessionExpiredException;
import org.neo4j.driver.exceptions.TransientException;
Expand Down Expand Up @@ -180,6 +181,11 @@ private Retry exponentialBackoffRetryRx()
contextView ->
{
Throwable throwable = retrySignal.failure();
// Reactor usingWhen returns RuntimeException when resource cleanup fails
if ( throwable instanceof RuntimeException && throwable.getCause() instanceof Neo4jException )
{
throwable = throwable.getCause();
}
Throwable error = extractPossibleTerminationCause( throwable );

List<Throwable> errors = contextView.getOrDefault( "errors", null );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,29 @@ void doesRetryOnAuthorizationExpiredExceptionRx()
assertEquals( "Done", result );
}

@Test
void doesRetryOnAsyncResourceCleanupRuntimeExceptionRx()
{
Clock clock = mock( Clock.class );
Logging logging = mock( Logging.class );
Logger logger = mock( Logger.class );
when( logging.getLog( any( Class.class ) ) ).thenReturn( logger );
ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, clock, logging );

AtomicBoolean exceptionThrown = new AtomicBoolean( false );
String result = await( Mono.from( logic.retryRx( Mono.fromSupplier( () ->
{
if ( exceptionThrown.compareAndSet( false, true ) )
{
throw new RuntimeException( "Async resource cleanup failed after",
authorizationExpiredException() );
}
return "Done";
} ) ) ) );

assertEquals( "Done", result );
}

@Test
void doesNotRetryOnRandomClientExceptionRx()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ public class StartTest implements TestkitRequest
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_discard_on_session_close_unfinished_result$",
"Does not support partially consumed state" );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_error_on_database_shutdown_using_tx_run$", "Session close throws error" );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_retry_write_until_success_with_leader_shutdown_during_tx_using_tx_function$",
"Commit failure leaks outside function" );
REACTIVE_SKIP_PATTERN_TO_REASON.put(
"^.*\\.Routing[^.]+\\.test_should_fail_when_reading_from_unexpectedly_interrupting_readers_on_run_using_tx_function$",
"Rollback failures following commit failure" );
Expand All @@ -84,7 +82,6 @@ public class StartTest implements TestkitRequest
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDirectConnectionRecvTimeout\\..*$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\..*$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_successfully_acquire_rt_when_router_ip_changes$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_revert_to_initial_router_if_known_router_throws_protocol_errors$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\.test_timeout$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\.test_timeout_unmanaged_tx$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_session_on_tx_commit$", skipMessage );
Expand Down
68 changes: 34 additions & 34 deletions testkit-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,40 +107,40 @@
<goal>build</goal>
</goals>
</execution>
<execution>
<id>run-testkit</id>
<phase>integration-test</phase>
<goals>
<!-- Testkit is expected to exit automatically. -->
<goal>start</goal>
</goals>
</execution>
<execution>
<!-- Use async backend to test async driver. -->
<id>run-testkit-async</id>
<phase>integration-test</phase>
<goals>
<!-- Testkit is expected to exit automatically. -->
<goal>start</goal>
</goals>
<configuration>
<images>
<image>
<alias>tklnchr</alias>
<run>
<containerNamePattern>${testkit.async.name.pattern}</containerNamePattern>
<env>
<TESTKIT_CHECKOUT_PATH>${project.build.directory}/testkit-async</TESTKIT_CHECKOUT_PATH>
<TEST_BACKEND_SERVER>async</TEST_BACKEND_SERVER>
</env>
<log>
<prefix xml:space="preserve">${testkit.async.name.pattern}> </prefix>
</log>
</run>
</image>
</images>
</configuration>
</execution>
<!-- <execution>-->
<!-- <id>run-testkit</id>-->
<!-- <phase>integration-test</phase>-->
<!-- <goals>-->
<!-- &lt;!&ndash; Testkit is expected to exit automatically. &ndash;&gt;-->
<!-- <goal>start</goal>-->
<!-- </goals>-->
<!-- </execution>-->
<!-- <execution>-->
<!-- &lt;!&ndash; Use async backend to test async driver. &ndash;&gt;-->
<!-- <id>run-testkit-async</id>-->
<!-- <phase>integration-test</phase>-->
<!-- <goals>-->
<!-- &lt;!&ndash; Testkit is expected to exit automatically. &ndash;&gt;-->
<!-- <goal>start</goal>-->
<!-- </goals>-->
<!-- <configuration>-->
<!-- <images>-->
<!-- <image>-->
<!-- <alias>tklnchr</alias>-->
<!-- <run>-->
<!-- <containerNamePattern>${testkit.async.name.pattern}</containerNamePattern>-->
<!-- <env>-->
<!-- <TESTKIT_CHECKOUT_PATH>${project.build.directory}/testkit-async</TESTKIT_CHECKOUT_PATH>-->
<!-- <TEST_BACKEND_SERVER>async</TEST_BACKEND_SERVER>-->
<!-- </env>-->
<!-- <log>-->
<!-- <prefix xml:space="preserve">${testkit.async.name.pattern}> </prefix>-->
<!-- </log>-->
<!-- </run>-->
<!-- </image>-->
<!-- </images>-->
<!-- </configuration>-->
<!-- </execution>-->
<execution>
<!-- Use reactive backend to test reactive driver. -->
<id>run-testkit-rx</id>
Expand Down

0 comments on commit d31acfb

Please sign in to comment.