Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reactive transaction function resource cleanup logic #1013

Merged
merged 1 commit into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ public Publisher<RxTransaction> beginTransaction( TransactionConfig config )
}, () -> new IllegalStateException( "Unexpected condition, begin transaction call has completed successfully with transaction being null" ) );
}

private Publisher<RxTransaction> beginTransaction( AccessMode mode, TransactionConfig config )
private Publisher<InternalRxTransaction> beginTransaction( AccessMode mode, TransactionConfig config )
{
return createSingleItemPublisher(
() ->
{
CompletableFuture<RxTransaction> txFuture = new CompletableFuture<>();
CompletableFuture<InternalRxTransaction> txFuture = new CompletableFuture<>();
session.beginTransactionAsync( mode, config ).whenComplete(
( tx, completionError ) ->
{
Expand Down Expand Up @@ -130,7 +130,7 @@ public <T> Publisher<T> writeTransaction( RxTransactionWork<? extends Publisher<
private <T> Publisher<T> runTransaction( AccessMode mode, RxTransactionWork<? extends Publisher<T>> work, TransactionConfig config )
{
Flux<T> repeatableWork = Flux.usingWhen( beginTransaction( mode, config ), work::execute,
RxTransaction::commit, ( tx, error ) -> tx.rollback(), null );
InternalRxTransaction::commitIfOpen, ( tx, error ) -> tx.close(), null );
return session.retryLogic().retryRx( repeatableWork );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@
*/
package org.neo4j.driver.internal.reactive;

import org.neo4j.driver.Query;
import org.reactivestreams.Publisher;

import java.util.concurrent.CompletableFuture;

import org.neo4j.driver.Query;
import org.neo4j.driver.internal.async.UnmanagedTransaction;
import org.neo4j.driver.internal.cursor.RxResultCursor;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.reactive.RxResult;
import org.neo4j.driver.reactive.RxTransaction;

import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher;
import static org.neo4j.driver.internal.util.Futures.completedWithNull;

public class InternalRxTransaction extends AbstractRxQueryRunner implements RxTransaction
{
Expand Down Expand Up @@ -67,26 +68,22 @@ public RxResult run(Query query)
@Override
public <T> Publisher<T> commit()
{
return close( true );
return createEmptyPublisher( tx::commitAsync );
}

@Override
public <T> Publisher<T> rollback()
{
return close( false );
return createEmptyPublisher( tx::rollbackAsync );
}

private <T> Publisher<T> close( boolean commit )
Publisher<Void> commitIfOpen()
{
return createEmptyPublisher( () -> {
if ( commit )
{
return tx.commitAsync();
}
else
{
return tx.rollbackAsync();
}
} );
return createEmptyPublisher( () -> tx.isOpen() ? tx.commitAsync() : completedWithNull() );
}

Publisher<Void> close()
{
return createEmptyPublisher( tx::closeAsync );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.neo4j.driver.Query;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -35,11 +34,12 @@
import java.util.stream.Stream;

import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Query;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.internal.InternalRecord;
import org.neo4j.driver.internal.async.UnmanagedTransaction;
import org.neo4j.driver.internal.async.NetworkSession;
import org.neo4j.driver.internal.async.UnmanagedTransaction;
import org.neo4j.driver.internal.cursor.RxResultCursor;
import org.neo4j.driver.internal.cursor.RxResultCursorImpl;
import org.neo4j.driver.internal.util.FixedRetryLogic;
Expand Down Expand Up @@ -199,6 +199,7 @@ void shouldDelegateRunTx( Function<RxSession,Publisher<String>> runTx ) throws T
// Given
NetworkSession session = mock( NetworkSession.class );
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
when( tx.isOpen() ).thenReturn( true );
when( tx.commitAsync() ).thenReturn( completedWithNull() );
when( tx.rollbackAsync() ).thenReturn( completedWithNull() );

Expand All @@ -222,6 +223,7 @@ void shouldRetryOnError() throws Throwable
int retryCount = 2;
NetworkSession session = mock( NetworkSession.class );
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
when( tx.isOpen() ).thenReturn( true );
when( tx.commitAsync() ).thenReturn( completedWithNull() );
when( tx.rollbackAsync() ).thenReturn( completedWithNull() );

Expand All @@ -239,7 +241,7 @@ void shouldRetryOnError() throws Throwable

// Then
verify( session, times( retryCount + 1 ) ).beginTransactionAsync( any( AccessMode.class ), any( TransactionConfig.class ) );
verify( tx, times( retryCount + 1 ) ).rollbackAsync();
verify( tx, times( retryCount + 1 ) ).closeAsync();
}

@Test
Expand All @@ -249,6 +251,7 @@ void shouldObtainResultIfRetrySucceed() throws Throwable
int retryCount = 2;
NetworkSession session = mock( NetworkSession.class );
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
when( tx.isOpen() ).thenReturn( true );
when( tx.commitAsync() ).thenReturn( completedWithNull() );
when( tx.rollbackAsync() ).thenReturn( completedWithNull() );

Expand All @@ -273,7 +276,7 @@ void shouldObtainResultIfRetrySucceed() throws Throwable

// Then
verify( session, times( retryCount + 1 ) ).beginTransactionAsync( any( AccessMode.class ), any( TransactionConfig.class ) );
verify( tx, times( retryCount ) ).rollbackAsync();
verify( tx, times( retryCount ) ).closeAsync();
verify( tx ).commitAsync();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
*/
package org.neo4j.driver.internal.reactive;

import org.junit.Test;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.neo4j.driver.Query;
import org.reactivestreams.Publisher;
import reactor.test.StepVerifier;

Expand All @@ -30,6 +29,7 @@
import java.util.function.Function;
import java.util.stream.Stream;

import org.neo4j.driver.Query;
import org.neo4j.driver.Value;
import org.neo4j.driver.internal.InternalRecord;
import org.neo4j.driver.internal.async.UnmanagedTransaction;
Expand All @@ -48,6 +48,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.neo4j.driver.Values.parameters;
Expand Down Expand Up @@ -137,4 +138,45 @@ void shouldMarkTxIfFailedToRun( Function<RxTransaction, RxResult> runReturnOne )
assertThat( t.getCause(), equalTo( error ) );
verify( tx ).markTerminated( error );
}

@Test
void shouldCommitWhenOpen()
{
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
when( tx.isOpen() ).thenReturn( true );
when( tx.commitAsync() ).thenReturn( Futures.completedWithNull() );

InternalRxTransaction rxTx = new InternalRxTransaction( tx );
Publisher<Void> publisher = rxTx.commitIfOpen();
StepVerifier.create( publisher ).verifyComplete();

verify( tx ).commitAsync();
}

@Test
void shouldNotCommitWhenNotOpen()
{
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
when( tx.isOpen() ).thenReturn( false );
when( tx.commitAsync() ).thenReturn( Futures.completedWithNull() );

InternalRxTransaction rxTx = new InternalRxTransaction( tx );
Publisher<Void> publisher = rxTx.commitIfOpen();
StepVerifier.create( publisher ).verifyComplete();

verify( tx, never() ).commitAsync();
}

@Test
void shouldDelegateClose()
{
UnmanagedTransaction tx = mock( UnmanagedTransaction.class );
when( tx.closeAsync() ).thenReturn( Futures.completedWithNull() );

InternalRxTransaction rxTx = new InternalRxTransaction( tx );
Publisher<Void> publisher = rxTx.close();
StepVerifier.create( publisher ).verifyComplete();

verify( tx ).closeAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,6 @@ public class StartTest implements TestkitRequest
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_discard_on_session_close_unfinished_result$",
"Does not support partially consumed state" );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_error_on_database_shutdown_using_tx_run$", "Session close throws error" );
REACTIVE_SKIP_PATTERN_TO_REASON.put(
"^.*\\.Routing[^.]+\\.test_should_fail_when_reading_from_unexpectedly_interrupting_readers_on_run_using_tx_function$",
"Rollback failures following commit failure" );
REACTIVE_SKIP_PATTERN_TO_REASON.put(
"^.*\\.Routing[^.]+\\.test_should_fail_when_writing_to_unexpectedly_interrupting_writers_on_run_using_tx_function$",
"Rollback failures following commit failure" );
skipMessage = "Requires investigation";
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_server_agent", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_server_version", skipMessage );
Expand All @@ -82,7 +76,6 @@ public class StartTest implements TestkitRequest
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestOptimizations\\..*$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDirectConnectionRecvTimeout\\..*$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\..*$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.Routing[^.]+\\.test_should_successfully_acquire_rt_when_router_ip_changes$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\.test_timeout$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\.test_timeout_unmanaged_tx$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_session_on_tx_commit$", skipMessage );
Expand Down