Skip to content

Commit

Permalink
Fix reactive transaction function resource cleanup logic (neo4j#1009)
Browse files Browse the repository at this point in the history
This update fixes an issue when transaction function could fail if transaction has been explicitly committed or an explicit commit has failed.
  • Loading branch information
injectives committed Sep 16, 2021
1 parent 44fc1fd commit 5df9d9b
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ public Publisher<RxTransaction> beginTransaction( TransactionConfig config )
}, () -> new IllegalStateException( "Unexpected condition, begin transaction call has completed successfully with transaction being null" ) );
}

private Publisher<RxTransaction> beginTransaction( AccessMode mode, TransactionConfig config )
private Publisher<InternalRxTransaction> beginTransaction( AccessMode mode, TransactionConfig config )
{
return createSingleItemPublisher(
() ->
{
CompletableFuture<RxTransaction> txFuture = new CompletableFuture<>();
CompletableFuture<InternalRxTransaction> txFuture = new CompletableFuture<>();
session.beginTransactionAsync( mode, config ).whenComplete(
( tx, completionError ) ->
{
Expand Down Expand Up @@ -130,7 +130,7 @@ public <T> Publisher<T> writeTransaction( RxTransactionWork<? extends Publisher<
private <T> Publisher<T> runTransaction( AccessMode mode, RxTransactionWork<? extends Publisher<T>> work, TransactionConfig config )
{
Flux<T> repeatableWork = Flux.usingWhen( beginTransaction( mode, config ), work::execute,
RxTransaction::commit, ( tx, error ) -> tx.rollback(), null );
InternalRxTransaction::commitIfOpen, ( tx, error ) -> tx.close(), null );
return session.retryLogic().retryRx( repeatableWork );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@
*/
package org.neo4j.driver.internal.reactive;

import org.neo4j.driver.Query;
import org.reactivestreams.Publisher;

import java.util.concurrent.CompletableFuture;

import org.neo4j.driver.Query;
import org.neo4j.driver.internal.async.UnmanagedTransaction;
import org.neo4j.driver.internal.cursor.RxResultCursor;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.reactive.RxResult;
import org.neo4j.driver.reactive.RxTransaction;

import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher;
import static org.neo4j.driver.internal.util.Futures.completedWithNull;

public class InternalRxTransaction extends AbstractRxQueryRunner implements RxTransaction
{
Expand Down Expand Up @@ -67,26 +68,22 @@ public RxResult run(Query query)
@Override
public <T> Publisher<T> commit()
{
return close( true );
return createEmptyPublisher( tx::commitAsync );
}

@Override
public <T> Publisher<T> rollback()
{
return close( false );
return createEmptyPublisher( tx::rollbackAsync );
}

private <T> Publisher<T> close( boolean commit )
Publisher<Void> commitIfOpen()
{
return createEmptyPublisher( () -> {
if ( commit )
{
return tx.commitAsync();
}
else
{
return tx.rollbackAsync();
}
} );
return createEmptyPublisher( () -> tx.isOpen() ? tx.commitAsync() : completedWithNull() );
}

Publisher<Void> close()
{
return createEmptyPublisher( tx::closeAsync );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.neo4j.driver.Query;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -35,11 +34,12 @@
import java.util.stream.Stream;

import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Query;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.internal.InternalRecord;
import org.neo4j.driver.internal.async.UnmanagedTransaction;
import org.neo4j.driver.internal.async.NetworkSession;
import org.neo4j.driver.internal.async.UnmanagedTransaction;
import org.neo4j.driver.internal.cursor.RxResultCursor;
import org.neo4j.driver.internal.cursor.RxResultCursorImpl;
import org.neo4j.driver.internal.util.FixedRetryLogic;
Expand Down Expand Up @@ -199,6 +199,7 @@ void shouldDelegateRunTx( Function<RxSession,Publisher<String>> runTx ) throws T
// Given
NetworkSession session = mock( NetworkSession.class );
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
when( tx.isOpen() ).thenReturn( true );
when( tx.commitAsync() ).thenReturn( completedWithNull() );
when( tx.rollbackAsync() ).thenReturn( completedWithNull() );

Expand All @@ -222,6 +223,7 @@ void shouldRetryOnError() throws Throwable
int retryCount = 2;
NetworkSession session = mock( NetworkSession.class );
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
when( tx.isOpen() ).thenReturn( true );
when( tx.commitAsync() ).thenReturn( completedWithNull() );
when( tx.rollbackAsync() ).thenReturn( completedWithNull() );

Expand All @@ -239,7 +241,7 @@ void shouldRetryOnError() throws Throwable

// Then
verify( session, times( retryCount + 1 ) ).beginTransactionAsync( any( AccessMode.class ), any( TransactionConfig.class ) );
verify( tx, times( retryCount + 1 ) ).rollbackAsync();
verify( tx, times( retryCount + 1 ) ).closeAsync();
}

@Test
Expand All @@ -249,6 +251,7 @@ void shouldObtainResultIfRetrySucceed() throws Throwable
int retryCount = 2;
NetworkSession session = mock( NetworkSession.class );
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
when( tx.isOpen() ).thenReturn( true );
when( tx.commitAsync() ).thenReturn( completedWithNull() );
when( tx.rollbackAsync() ).thenReturn( completedWithNull() );

Expand All @@ -273,7 +276,7 @@ void shouldObtainResultIfRetrySucceed() throws Throwable

// Then
verify( session, times( retryCount + 1 ) ).beginTransactionAsync( any( AccessMode.class ), any( TransactionConfig.class ) );
verify( tx, times( retryCount ) ).rollbackAsync();
verify( tx, times( retryCount ) ).closeAsync();
verify( tx ).commitAsync();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
*/
package org.neo4j.driver.internal.reactive;

import org.junit.Test;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.neo4j.driver.Query;
import org.reactivestreams.Publisher;
import reactor.test.StepVerifier;

Expand All @@ -30,6 +29,7 @@
import java.util.function.Function;
import java.util.stream.Stream;

import org.neo4j.driver.Query;
import org.neo4j.driver.Value;
import org.neo4j.driver.internal.InternalRecord;
import org.neo4j.driver.internal.async.UnmanagedTransaction;
Expand All @@ -48,6 +48,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.neo4j.driver.Values.parameters;
Expand Down Expand Up @@ -137,4 +138,45 @@ void shouldMarkTxIfFailedToRun( Function<RxTransaction, RxResult> runReturnOne )
assertThat( t.getCause(), equalTo( error ) );
verify( tx ).markTerminated( error );
}

@Test
void shouldCommitWhenOpen()
{
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
when( tx.isOpen() ).thenReturn( true );
when( tx.commitAsync() ).thenReturn( Futures.completedWithNull() );

InternalRxTransaction rxTx = new InternalRxTransaction( tx );
Publisher<Void> publisher = rxTx.commitIfOpen();
StepVerifier.create( publisher ).verifyComplete();

verify( tx ).commitAsync();
}

@Test
void shouldNotCommitWhenNotOpen()
{
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
when( tx.isOpen() ).thenReturn( false );
when( tx.commitAsync() ).thenReturn( Futures.completedWithNull() );

InternalRxTransaction rxTx = new InternalRxTransaction( tx );
Publisher<Void> publisher = rxTx.commitIfOpen();
StepVerifier.create( publisher ).verifyComplete();

verify( tx, never() ).commitAsync();
}

@Test
void shouldDelegateClose()
{
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
when( tx.closeAsync() ).thenReturn( Futures.completedWithNull() );

InternalRxTransaction rxTx = new InternalRxTransaction( tx );
Publisher<Void> publisher = rxTx.close();
StepVerifier.create( publisher ).verifyComplete();

verify( tx ).closeAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,6 @@ public class StartTest implements TestkitRequest
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_discard_on_session_close_unfinished_result$",
"Does not support partially consumed state" );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_error_on_database_shutdown_using_tx_run$", "Session close throws error" );
REACTIVE_SKIP_PATTERN_TO_REASON.put(
"^.*\\.Routing[^.]+\\.test_should_fail_when_reading_from_unexpectedly_interrupting_readers_on_run_using_tx_function$",
"Rollback failures following commit failure" );
REACTIVE_SKIP_PATTERN_TO_REASON.put(
"^.*\\.Routing[^.]+\\.test_should_fail_when_writing_to_unexpectedly_interrupting_writers_on_run_using_tx_function$",
"Rollback failures following commit failure" );
skipMessage = "Requires investigation";
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_server_agent", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_server_version", skipMessage );
Expand All @@ -82,7 +76,6 @@ public class StartTest implements TestkitRequest
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestOptimizations\\..*$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDirectConnectionRecvTimeout\\..*$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\..*$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_successfully_acquire_rt_when_router_ip_changes$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\.test_timeout$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\.test_timeout_unmanaged_tx$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_session_on_tx_commit$", skipMessage );
Expand Down

0 comments on commit 5df9d9b

Please sign in to comment.