Skip to content

Commit

Permalink
Backport UnmanagedTransaction stage and completion improvements (#1069)
Browse files Browse the repository at this point in the history
* Make UnmanagedTransaction return ongoing tx completion stage (#1057)

This update ensures that `UnmanagedTransaction` returns existing on-going tx completion stage when a similar request is made. For instance, if it was requested to be rolled back and then requested to be closed, both invocations should get the same on-going stage. In addition, it should not accept conflicting actions, like committing and rolling back at the same time.

In addition, it makes sure that cancellation on reactive transaction function results in rollback.

* Call close with the appropriate flag to commit or rollback on UnmanagedTransaction where possible to avoid double state acquisition (#1065)

* Call close with the appropriate flag to commit or rollback on UnmanagedTransaction where possible to avoid double state acquisition

Calling `close` instead of separate `isOpen` and `commitAsync` requires less lock acquisitions and is safer.

* Update tests
  • Loading branch information
injectives authored Nov 11, 2021
1 parent 86220ae commit 2912037
Show file tree
Hide file tree
Showing 8 changed files with 411 additions and 229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private <T> void executeWork(CompletableFuture<T> resultFuture, UnmanagedTransac
Throwable error = Futures.completionExceptionCause( completionError );
if ( error != null )
{
rollbackTxAfterFailedTransactionWork( tx, resultFuture, error );
closeTxAfterFailedTransactionWork( tx, resultFuture, error );
}
else
{
Expand Down Expand Up @@ -174,43 +174,33 @@ private <T> CompletionStage<T> safeExecuteWork(UnmanagedTransaction tx, AsyncTra
}
}

private <T> void rollbackTxAfterFailedTransactionWork(UnmanagedTransaction tx, CompletableFuture<T> resultFuture, Throwable error )
private <T> void closeTxAfterFailedTransactionWork( UnmanagedTransaction tx, CompletableFuture<T> resultFuture, Throwable error )
{
if ( tx.isOpen() )
{
tx.rollbackAsync().whenComplete( ( ignore, rollbackError ) -> {
if ( rollbackError != null )
tx.closeAsync().whenComplete(
( ignored, rollbackError ) ->
{
error.addSuppressed( rollbackError );
}
resultFuture.completeExceptionally( error );
} );
}
else
{
resultFuture.completeExceptionally( error );
}
if ( rollbackError != null )
{
error.addSuppressed( rollbackError );
}
resultFuture.completeExceptionally( error );
} );
}

private <T> void closeTxAfterSucceededTransactionWork(UnmanagedTransaction tx, CompletableFuture<T> resultFuture, T result )
{
if ( tx.isOpen() )
{
tx.commitAsync().whenComplete( ( ignore, completionError ) -> {
Throwable commitError = Futures.completionExceptionCause( completionError );
if ( commitError != null )
tx.closeAsync( true ).whenComplete(
( ignored, completionError ) ->
{
resultFuture.completeExceptionally( commitError );
}
else
{
resultFuture.complete( result );
}
} );
}
else
{
resultFuture.complete( result );
}
Throwable commitError = Futures.completionExceptionCause( completionError );
if ( commitError != null )
{
resultFuture.completeExceptionally( commitError );
}
else
{
resultFuture.complete( result );
}
} );
}
}
Loading

0 comments on commit 2912037

Please sign in to comment.