Skip to content

Commit

Permalink
Call close with the appropriate flag to commit or rollback on Unmanag…
Browse files Browse the repository at this point in the history
…edTransaction where possible to avoid double state acquisition

Calling `close` instead of separate `isOpen` and `commitAsync` requires less lock acquisitions and is safer.
  • Loading branch information
injectives committed Nov 10, 2021
1 parent 8929bd4 commit 65b0a34
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private <T> void executeWork(CompletableFuture<T> resultFuture, UnmanagedTransac
Throwable error = Futures.completionExceptionCause( completionError );
if ( error != null )
{
rollbackTxAfterFailedTransactionWork( tx, resultFuture, error );
closeTxAfterFailedTransactionWork( tx, resultFuture, error );
}
else
{
Expand Down Expand Up @@ -174,43 +174,33 @@ private <T> CompletionStage<T> safeExecuteWork(UnmanagedTransaction tx, AsyncTra
}
}

private <T> void rollbackTxAfterFailedTransactionWork(UnmanagedTransaction tx, CompletableFuture<T> resultFuture, Throwable error )
private <T> void closeTxAfterFailedTransactionWork( UnmanagedTransaction tx, CompletableFuture<T> resultFuture, Throwable error )
{
if ( tx.isOpen() )
{
tx.rollbackAsync().whenComplete( ( ignore, rollbackError ) -> {
if ( rollbackError != null )
tx.closeAsync().whenComplete(
( ignored, rollbackError ) ->
{
error.addSuppressed( rollbackError );
}
resultFuture.completeExceptionally( error );
} );
}
else
{
resultFuture.completeExceptionally( error );
}
if ( rollbackError != null )
{
error.addSuppressed( rollbackError );
}
resultFuture.completeExceptionally( error );
} );
}

private <T> void closeTxAfterSucceededTransactionWork(UnmanagedTransaction tx, CompletableFuture<T> resultFuture, T result )
{
if ( tx.isOpen() )
{
tx.commitAsync().whenComplete( ( ignore, completionError ) -> {
Throwable commitError = Futures.completionExceptionCause( completionError );
if ( commitError != null )
tx.closeAsync( true ).whenComplete(
( ignored, completionError ) ->
{
resultFuture.completeExceptionally( commitError );
}
else
{
resultFuture.complete( result );
}
} );
}
else
{
resultFuture.complete( result );
}
Throwable commitError = Futures.completionExceptionCause( completionError );
if ( commitError != null )
{
resultFuture.completeExceptionally( commitError );
}
else
{
resultFuture.complete( result );
}
} );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,12 @@ else if ( beginError instanceof ConnectionReadTimeoutException )

public CompletionStage<Void> closeAsync()
{
return closeAsync( false, true );
return closeAsync( false );
}

public CompletionStage<Void> closeAsync( boolean commit )
{
return closeAsync( commit, true );
}

public CompletionStage<Void> commitAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public <T> Publisher<T> writeTransaction( RxTransactionWork<? extends Publisher<
private <T> Publisher<T> runTransaction( AccessMode mode, RxTransactionWork<? extends Publisher<T>> work, TransactionConfig config )
{
Flux<T> repeatableWork = Flux.usingWhen( beginTransaction( mode, config ), work::execute,
InternalRxTransaction::commitIfOpen, ( tx, error ) -> tx.close(), InternalRxTransaction::close );
tx -> tx.close( true ), ( tx, error ) -> tx.close(), InternalRxTransaction::close );
return session.retryLogic().retryRx( repeatableWork );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.neo4j.driver.reactive.RxTransaction;

import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher;
import static org.neo4j.driver.internal.util.Futures.completedWithNull;

public class InternalRxTransaction extends AbstractRxQueryRunner implements RxTransaction
{
Expand Down Expand Up @@ -77,13 +76,13 @@ public <T> Publisher<T> rollback()
return createEmptyPublisher( tx::rollbackAsync );
}

Publisher<Void> commitIfOpen()
Publisher<Void> close()
{
return createEmptyPublisher( () -> tx.isOpen() ? tx.commitAsync() : completedWithNull() );
return close( false );
}

Publisher<Void> close()
Publisher<Void> close( boolean commit )
{
return createEmptyPublisher( tx::closeAsync );
return createEmptyPublisher( () -> tx.closeAsync( commit ) );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ void shouldCommitWhenOpen()
when( tx.commitAsync() ).thenReturn( Futures.completedWithNull() );

InternalRxTransaction rxTx = new InternalRxTransaction( tx );
Publisher<Void> publisher = rxTx.commitIfOpen();
Publisher<Void> publisher = rxTx.close( true );
StepVerifier.create( publisher ).verifyComplete();

verify( tx ).commitAsync();
Expand All @@ -161,7 +161,7 @@ void shouldNotCommitWhenNotOpen()
when( tx.commitAsync() ).thenReturn( Futures.completedWithNull() );

InternalRxTransaction rxTx = new InternalRxTransaction( tx );
Publisher<Void> publisher = rxTx.commitIfOpen();
Publisher<Void> publisher = rxTx.close( true );
StepVerifier.create( publisher ).verifyComplete();

verify( tx, never() ).commitAsync();
Expand Down

0 comments on commit 65b0a34

Please sign in to comment.