From 8dcc02a40124131ab1d71d4411c4413ed427f94e Mon Sep 17 00:00:00 2001 From: Dmitriy Tverdiakov Date: Tue, 14 Sep 2021 17:47:47 +0100 Subject: [PATCH] Fix reactive transaction function resource cleanup logic This update fixes an issue when transaction function could fail if transaction has been explicitly committed or an explicit commit has failed. --- .../internal/reactive/InternalRxSession.java | 6 +-- .../reactive/InternalRxTransaction.java | 25 +++++----- .../reactive/InternalRxSessionTest.java | 11 +++-- .../reactive/InternalRxTransactionTest.java | 46 ++++++++++++++++++- .../backend/messages/requests/StartTest.java | 7 --- 5 files changed, 65 insertions(+), 30 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java index 69073d6051..110892c026 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java @@ -81,12 +81,12 @@ public Publisher beginTransaction( TransactionConfig config ) }, () -> new IllegalStateException( "Unexpected condition, begin transaction call has completed successfully with transaction being null" ) ); } - private Publisher beginTransaction( AccessMode mode, TransactionConfig config ) + private Publisher beginTransaction( AccessMode mode, TransactionConfig config ) { return createSingleItemPublisher( () -> { - CompletableFuture txFuture = new CompletableFuture<>(); + CompletableFuture txFuture = new CompletableFuture<>(); session.beginTransactionAsync( mode, config ).whenComplete( ( tx, completionError ) -> { @@ -130,7 +130,7 @@ public Publisher writeTransaction( RxTransactionWork Publisher runTransaction( AccessMode mode, RxTransactionWork> work, TransactionConfig config ) { Flux repeatableWork = Flux.usingWhen( beginTransaction( mode, config ), work::execute, - RxTransaction::commit, ( tx, error ) -> tx.rollback(), null ); + InternalRxTransaction::commitIfOpen, ( tx, error ) -> tx.close(), null ); return session.retryLogic().retryRx( repeatableWork ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java index 763c306288..c1a9267336 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java @@ -18,11 +18,11 @@ */ package org.neo4j.driver.internal.reactive; -import org.neo4j.driver.Query; import org.reactivestreams.Publisher; import java.util.concurrent.CompletableFuture; +import org.neo4j.driver.Query; import org.neo4j.driver.internal.async.UnmanagedTransaction; import org.neo4j.driver.internal.cursor.RxResultCursor; import org.neo4j.driver.internal.util.Futures; @@ -30,6 +30,7 @@ import org.neo4j.driver.reactive.RxTransaction; import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher; +import static org.neo4j.driver.internal.util.Futures.completedWithNull; public class InternalRxTransaction extends AbstractRxQueryRunner implements RxTransaction { @@ -67,26 +68,22 @@ public RxResult run(Query query) @Override public Publisher commit() { - return close( true ); + return createEmptyPublisher( tx::commitAsync ); } @Override public Publisher rollback() { - return close( false ); + return createEmptyPublisher( tx::rollbackAsync ); } - private Publisher close( boolean commit ) + Publisher commitIfOpen() { - return createEmptyPublisher( () -> { - if ( commit ) - { - return tx.commitAsync(); - } - else - { - return tx.rollbackAsync(); - } - } ); + return createEmptyPublisher( () -> tx.isOpen() ? tx.commitAsync() : completedWithNull() ); + } + + Publisher close() + { + return createEmptyPublisher( tx::closeAsync ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java index 1531863818..3f320231ab 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java @@ -21,7 +21,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -import org.neo4j.driver.Query; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -35,11 +34,12 @@ import java.util.stream.Stream; import org.neo4j.driver.AccessMode; +import org.neo4j.driver.Query; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.Value; import org.neo4j.driver.internal.InternalRecord; -import org.neo4j.driver.internal.async.UnmanagedTransaction; import org.neo4j.driver.internal.async.NetworkSession; +import org.neo4j.driver.internal.async.UnmanagedTransaction; import org.neo4j.driver.internal.cursor.RxResultCursor; import org.neo4j.driver.internal.cursor.RxResultCursorImpl; import org.neo4j.driver.internal.util.FixedRetryLogic; @@ -199,6 +199,7 @@ void shouldDelegateRunTx( Function> runTx ) throws T // Given NetworkSession session = mock( NetworkSession.class ); UnmanagedTransaction tx = mock( UnmanagedTransaction.class ); + when( tx.isOpen() ).thenReturn( true ); when( tx.commitAsync() ).thenReturn( completedWithNull() ); when( tx.rollbackAsync() ).thenReturn( completedWithNull() ); @@ -222,6 +223,7 @@ void shouldRetryOnError() throws Throwable int retryCount = 2; NetworkSession session = mock( NetworkSession.class ); UnmanagedTransaction tx = mock( UnmanagedTransaction.class ); + when( tx.isOpen() ).thenReturn( true ); when( tx.commitAsync() ).thenReturn( completedWithNull() ); when( tx.rollbackAsync() ).thenReturn( completedWithNull() ); @@ -239,7 +241,7 @@ void shouldRetryOnError() throws Throwable // Then verify( session, times( retryCount + 1 ) ).beginTransactionAsync( any( AccessMode.class ), any( TransactionConfig.class ) ); - verify( tx, times( retryCount + 1 ) ).rollbackAsync(); + verify( tx, times( retryCount + 1 ) ).closeAsync(); } @Test @@ -249,6 +251,7 @@ void shouldObtainResultIfRetrySucceed() throws Throwable int retryCount = 2; NetworkSession session = mock( NetworkSession.class ); UnmanagedTransaction tx = mock( UnmanagedTransaction.class ); + when( tx.isOpen() ).thenReturn( true ); when( tx.commitAsync() ).thenReturn( completedWithNull() ); when( tx.rollbackAsync() ).thenReturn( completedWithNull() ); @@ -273,7 +276,7 @@ void shouldObtainResultIfRetrySucceed() throws Throwable // Then verify( session, times( retryCount + 1 ) ).beginTransactionAsync( any( AccessMode.class ), any( TransactionConfig.class ) ); - verify( tx, times( retryCount ) ).rollbackAsync(); + verify( tx, times( retryCount ) ).closeAsync(); verify( tx ).commitAsync(); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java index 8a3959bb25..1accde96db 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java @@ -18,10 +18,9 @@ */ package org.neo4j.driver.internal.reactive; -import org.junit.Test; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -import org.neo4j.driver.Query; import org.reactivestreams.Publisher; import reactor.test.StepVerifier; @@ -30,6 +29,7 @@ import java.util.function.Function; import java.util.stream.Stream; +import org.neo4j.driver.Query; import org.neo4j.driver.Value; import org.neo4j.driver.internal.InternalRecord; import org.neo4j.driver.internal.async.UnmanagedTransaction; @@ -48,6 +48,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.driver.Values.parameters; @@ -137,4 +138,45 @@ void shouldMarkTxIfFailedToRun( Function runReturnOne ) assertThat( t.getCause(), equalTo( error ) ); verify( tx ).markTerminated( error ); } + + @Test + void shouldCommitWhenOpen() + { + UnmanagedTransaction tx = mock( UnmanagedTransaction.class ); + when( tx.isOpen() ).thenReturn( true ); + when( tx.commitAsync() ).thenReturn( Futures.completedWithNull() ); + + InternalRxTransaction rxTx = new InternalRxTransaction( tx ); + Publisher publisher = rxTx.commitIfOpen(); + StepVerifier.create( publisher ).verifyComplete(); + + verify( tx ).commitAsync(); + } + + @Test + void shouldNotCommitWhenNotOpen() + { + UnmanagedTransaction tx = mock( UnmanagedTransaction.class ); + when( tx.isOpen() ).thenReturn( false ); + when( tx.commitAsync() ).thenReturn( Futures.completedWithNull() ); + + InternalRxTransaction rxTx = new InternalRxTransaction( tx ); + Publisher publisher = rxTx.commitIfOpen(); + StepVerifier.create( publisher ).verifyComplete(); + + verify( tx, never() ).commitAsync(); + } + + @Test + void shouldDelegateClose() + { + UnmanagedTransaction tx = mock( UnmanagedTransaction.class ); + when( tx.closeAsync() ).thenReturn( Futures.completedWithNull() ); + + InternalRxTransaction rxTx = new InternalRxTransaction( tx ); + Publisher publisher = rxTx.close(); + StepVerifier.create( publisher ).verifyComplete(); + + verify( tx ).closeAsync(); + } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java index 0468bde969..9a84d8c871 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java @@ -66,12 +66,6 @@ public class StartTest implements TestkitRequest REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_discard_on_session_close_unfinished_result$", "Does not support partially consumed state" ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_error_on_database_shutdown_using_tx_run$", "Session close throws error" ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( - "^.*\\.Routing[^.]+\\.test_should_fail_when_reading_from_unexpectedly_interrupting_readers_on_run_using_tx_function$", - "Rollback failures following commit failure" ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( - "^.*\\.Routing[^.]+\\.test_should_fail_when_writing_to_unexpectedly_interrupting_writers_on_run_using_tx_function$", - "Rollback failures following commit failure" ); skipMessage = "Requires investigation"; REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_server_agent", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_server_version", skipMessage ); @@ -81,7 +75,6 @@ public class StartTest implements TestkitRequest REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestOptimizations\\..*$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDirectConnectionRecvTimeout\\..*$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\..*$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_successfully_acquire_rt_when_router_ip_changes$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\.test_timeout$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\.test_timeout_unmanaged_tx$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_session_on_tx_commit$", skipMessage );