diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java index 83f0f7a862..efc291933b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java @@ -146,7 +146,7 @@ private void executeWork(CompletableFuture resultFuture, UnmanagedTransac Throwable error = Futures.completionExceptionCause( completionError ); if ( error != null ) { - rollbackTxAfterFailedTransactionWork( tx, resultFuture, error ); + closeTxAfterFailedTransactionWork( tx, resultFuture, error ); } else { @@ -174,43 +174,33 @@ private CompletionStage safeExecuteWork(UnmanagedTransaction tx, AsyncTra } } - private void rollbackTxAfterFailedTransactionWork(UnmanagedTransaction tx, CompletableFuture resultFuture, Throwable error ) + private void closeTxAfterFailedTransactionWork( UnmanagedTransaction tx, CompletableFuture resultFuture, Throwable error ) { - if ( tx.isOpen() ) - { - tx.rollbackAsync().whenComplete( ( ignore, rollbackError ) -> { - if ( rollbackError != null ) + tx.closeAsync().whenComplete( + ( ignored, rollbackError ) -> { - error.addSuppressed( rollbackError ); - } - resultFuture.completeExceptionally( error ); - } ); - } - else - { - resultFuture.completeExceptionally( error ); - } + if ( rollbackError != null ) + { + error.addSuppressed( rollbackError ); + } + resultFuture.completeExceptionally( error ); + } ); } private void closeTxAfterSucceededTransactionWork(UnmanagedTransaction tx, CompletableFuture resultFuture, T result ) { - if ( tx.isOpen() ) - { - tx.commitAsync().whenComplete( ( ignore, completionError ) -> { - Throwable commitError = Futures.completionExceptionCause( completionError ); - if ( commitError != null ) + tx.closeAsync( true ).whenComplete( + ( ignored, completionError ) -> { - resultFuture.completeExceptionally( commitError ); - } - else - { - resultFuture.complete( result ); - } - } ); - } - else - { - resultFuture.complete( result ); - } + Throwable commitError = Futures.completionExceptionCause( completionError ); + if ( commitError != null ) + { + resultFuture.completeExceptionally( commitError ); + } + else + { + resultFuture.complete( result ); + } + } ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java index 03fdb0e7ca..fbd7a985c7 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java @@ -134,7 +134,12 @@ else if ( beginError instanceof ConnectionReadTimeoutException ) public CompletionStage closeAsync() { - return closeAsync( false, true ); + return closeAsync( false ); + } + + public CompletionStage closeAsync( boolean commit ) + { + return closeAsync( commit, true ); } public CompletionStage commitAsync() diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java index 41f70d0a1d..222b64562d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java @@ -130,7 +130,7 @@ public Publisher writeTransaction( RxTransactionWork Publisher runTransaction( AccessMode mode, RxTransactionWork> work, TransactionConfig config ) { Flux repeatableWork = Flux.usingWhen( beginTransaction( mode, config ), work::execute, - InternalRxTransaction::commitIfOpen, ( tx, error ) -> tx.close(), InternalRxTransaction::close ); + tx -> tx.close( true ), ( tx, error ) -> tx.close(), InternalRxTransaction::close ); return session.retryLogic().retryRx( repeatableWork ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java index c1a9267336..b4212ae963 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java @@ -30,7 +30,6 @@ import org.neo4j.driver.reactive.RxTransaction; import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher; -import static org.neo4j.driver.internal.util.Futures.completedWithNull; public class InternalRxTransaction extends AbstractRxQueryRunner implements RxTransaction { @@ -77,13 +76,13 @@ public Publisher rollback() return createEmptyPublisher( tx::rollbackAsync ); } - Publisher commitIfOpen() + Publisher close() { - return createEmptyPublisher( () -> tx.isOpen() ? tx.commitAsync() : completedWithNull() ); + return close( false ); } - Publisher close() + Publisher close( boolean commit ) { - return createEmptyPublisher( tx::closeAsync ); + return createEmptyPublisher( () -> tx.closeAsync( commit ) ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java index 1accde96db..b2ae93245a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java @@ -147,7 +147,7 @@ void shouldCommitWhenOpen() when( tx.commitAsync() ).thenReturn( Futures.completedWithNull() ); InternalRxTransaction rxTx = new InternalRxTransaction( tx ); - Publisher publisher = rxTx.commitIfOpen(); + Publisher publisher = rxTx.close( true ); StepVerifier.create( publisher ).verifyComplete(); verify( tx ).commitAsync(); @@ -161,7 +161,7 @@ void shouldNotCommitWhenNotOpen() when( tx.commitAsync() ).thenReturn( Futures.completedWithNull() ); InternalRxTransaction rxTx = new InternalRxTransaction( tx ); - Publisher publisher = rxTx.commitIfOpen(); + Publisher publisher = rxTx.close( true ); StepVerifier.create( publisher ).verifyComplete(); verify( tx, never() ).commitAsync();