Skip to content

Commit

Permalink
Add transaction close support to async and reactive APIs (#1119) (#1124)
Browse files Browse the repository at this point in the history
  • Loading branch information
injectives authored Jan 19, 2022
1 parent 2aac13d commit 47667e9
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 10 deletions.
12 changes: 12 additions & 0 deletions driver/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,16 @@
<method>org.neo4j.driver.Logger getLog(java.lang.Class)</method>
</difference>

<difference>
<className>org/neo4j/driver/async/AsyncTransaction</className>
<differenceType>7012</differenceType>
<method>java.util.concurrent.CompletionStage closeAsync()</method>
</difference>

<difference>
<className>org/neo4j/driver/reactive/RxTransaction</className>
<differenceType>7012</differenceType>
<method>org.reactivestreams.Publisher close()</method>
</difference>

</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import java.util.concurrent.Executor;
import java.util.function.Function;

import org.neo4j.driver.Session;
import org.neo4j.driver.Query;
import org.neo4j.driver.QueryRunner;
import org.neo4j.driver.Session;

/**
* Logical container for an atomic unit of work.
Expand Down Expand Up @@ -90,4 +90,12 @@ public interface AsyncTransaction extends AsyncQueryRunner
* be completed exceptionally when rollback fails.
*/
CompletionStage<Void> rollbackAsync();

/**
* Close the transaction. If the transaction has been {@link #commitAsync() committed} or {@link #rollbackAsync() rolled back}, the close is optional and no
* operation is performed. Otherwise, the transaction will be rolled back by default by this method.
*
* @return new {@link CompletionStage} that gets completed with {@code null} when close is successful, otherwise it gets completed exceptionally.
*/
CompletionStage<Void> closeAsync();
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,13 @@ public CompletionStage<Void> rollbackAsync()
}

@Override
public CompletionStage<ResultCursor> runAsync(Query query)
public CompletionStage<Void> closeAsync()
{
return tx.closeAsync();
}

@Override
public CompletionStage<ResultCursor> runAsync( Query query )
{
return tx.runAsync( query );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public <T> Publisher<T> rollback()
return createEmptyPublisher( tx::rollbackAsync );
}

Publisher<Void> close()
public Publisher<Void> close()
{
return close( false );
}
Expand Down
14 changes: 11 additions & 3 deletions driver/src/main/java/org/neo4j/driver/reactive/RxTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,19 @@ public interface RxTransaction extends RxQueryRunner
<T> Publisher<T> commit();

/**
* Rolls back the transaction.
* It completes without publishing anything if transaction is rolled back successfully.
* Otherwise, errors when there is any error to roll back.
* Rolls back the transaction. It completes without publishing anything if transaction is rolled back successfully. Otherwise, errors when there is any
* error to roll back.
*
* @param <T> makes it easier to be chained after other publishers.
* @return an empty publisher.
*/
<T> Publisher<T> rollback();

/**
* Close the transaction. If the transaction has been {@link #commit() committed} or {@link #rollback() rolled back}, the close is optional and no operation
* is performed. Otherwise, the transaction will be rolled back by default by this method.
*
* @return new {@link Publisher} that gets completed when close is successful, otherwise an error is signalled.
*/
Publisher<Void> close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ public class GetFeatures implements TestkitRequest
"Temporary:GetConnectionPoolMetrics",
"Temporary:CypherPathAndRelationship",
"Temporary:FullSummary",
"Temporary:ResultKeys"
"Temporary:ResultKeys",
"Temporary:TransactionClose"
) );

private static final Set<String> SYNC_FEATURES = new HashSet<>( Arrays.asList(
"Feature:Bolt:3.0",
"Optimization:PullPipelining",
"Temporary:TransactionClose",
"Feature:API:Result.List",
"Optimization:ResultListFetchAll"
) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import lombok.Getter;
import lombok.Setter;
import neo4j.org.testkit.backend.TestkitState;
import neo4j.org.testkit.backend.holder.AbstractTransactionHolder;
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
import neo4j.org.testkit.backend.messages.responses.Transaction;
import reactor.core.publisher.Mono;

import java.util.concurrent.CompletionStage;

import org.neo4j.driver.async.AsyncTransaction;

@Setter
@Getter
public class TransactionClose implements TestkitRequest
Expand All @@ -43,13 +46,19 @@ public TestkitResponse process( TestkitState testkitState )
@Override
public CompletionStage<TestkitResponse> processAsync( TestkitState testkitState )
{
throw new UnsupportedOperationException();
return testkitState.getAsyncTransactionHolder( data.getTxId() )
.thenApply( AbstractTransactionHolder::getTransaction )
.thenCompose( AsyncTransaction::closeAsync )
.thenApply( ignored -> createResponse( data.getTxId() ) );
}

@Override
public Mono<TestkitResponse> processRx( TestkitState testkitState )
{
throw new UnsupportedOperationException( "Operation not supported" );
return testkitState.getRxTransactionHolder( data.getTxId() )
.map( AbstractTransactionHolder::getTransaction )
.flatMap( tx -> Mono.fromDirect( tx.close() ) )
.then( Mono.just( createResponse( data.getTxId() ) ) );
}

private Transaction createResponse( String txId )
Expand Down

0 comments on commit 47667e9

Please sign in to comment.