diff --git a/driver/clirr-ignored-differences.xml b/driver/clirr-ignored-differences.xml index d48c56e400..9b77f32b46 100644 --- a/driver/clirr-ignored-differences.xml +++ b/driver/clirr-ignored-differences.xml @@ -31,4 +31,16 @@ org.neo4j.driver.Logger getLog(java.lang.Class) + + org/neo4j/driver/async/AsyncTransaction + 7012 + java.util.concurrent.CompletionStage closeAsync() + + + + org/neo4j/driver/reactive/RxTransaction + 7012 + org.reactivestreams.Publisher close() + + diff --git a/driver/src/main/java/org/neo4j/driver/async/AsyncTransaction.java b/driver/src/main/java/org/neo4j/driver/async/AsyncTransaction.java index aeb9ea283a..7a0f333403 100644 --- a/driver/src/main/java/org/neo4j/driver/async/AsyncTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/async/AsyncTransaction.java @@ -23,9 +23,9 @@ import java.util.concurrent.Executor; import java.util.function.Function; -import org.neo4j.driver.Session; import org.neo4j.driver.Query; import org.neo4j.driver.QueryRunner; +import org.neo4j.driver.Session; /** * Logical container for an atomic unit of work. @@ -90,4 +90,12 @@ public interface AsyncTransaction extends AsyncQueryRunner * be completed exceptionally when rollback fails. */ CompletionStage rollbackAsync(); + + /** + * Close the transaction. If the transaction has been {@link #commitAsync() committed} or {@link #rollbackAsync() rolled back}, the close is optional and no + * operation is performed. Otherwise, the transaction will be rolled back by default by this method. + * + * @return new {@link CompletionStage} that gets completed with {@code null} when close is successful, otherwise it gets completed exceptionally. + */ + CompletionStage closeAsync(); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncTransaction.java index 91ec69eb90..5e7123b4db 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncTransaction.java @@ -45,7 +45,13 @@ public CompletionStage rollbackAsync() } @Override - public CompletionStage runAsync(Query query) + public CompletionStage closeAsync() + { + return tx.closeAsync(); + } + + @Override + public CompletionStage runAsync( Query query ) { return tx.runAsync( query ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java index b4212ae963..70aee517bf 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java @@ -76,7 +76,7 @@ public Publisher rollback() return createEmptyPublisher( tx::rollbackAsync ); } - Publisher close() + public Publisher close() { return close( false ); } diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxTransaction.java b/driver/src/main/java/org/neo4j/driver/reactive/RxTransaction.java index eefcf1ff91..1c50e17dd7 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/RxTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxTransaction.java @@ -41,11 +41,19 @@ public interface RxTransaction extends RxQueryRunner Publisher commit(); /** - * Rolls back the transaction. - * It completes without publishing anything if transaction is rolled back successfully. - * Otherwise, errors when there is any error to roll back. + * Rolls back the transaction. It completes without publishing anything if transaction is rolled back successfully. Otherwise, errors when there is any + * error to roll back. + * * @param makes it easier to be chained after other publishers. * @return an empty publisher. */ Publisher rollback(); + + /** + * Close the transaction. If the transaction has been {@link #commit() committed} or {@link #rollback() rolled back}, the close is optional and no operation + * is performed. Otherwise, the transaction will be rolled back by default by this method. + * + * @return new {@link Publisher} that gets completed when close is successful, otherwise an error is signalled. + */ + Publisher close(); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java index dbb8ae7544..bd242e27f7 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java @@ -59,13 +59,13 @@ public class GetFeatures implements TestkitRequest "Temporary:GetConnectionPoolMetrics", "Temporary:CypherPathAndRelationship", "Temporary:FullSummary", - "Temporary:ResultKeys" + "Temporary:ResultKeys", + "Temporary:TransactionClose" ) ); private static final Set SYNC_FEATURES = new HashSet<>( Arrays.asList( "Feature:Bolt:3.0", "Optimization:PullPipelining", - "Temporary:TransactionClose", "Feature:API:Result.List", "Optimization:ResultListFetchAll" ) ); diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java index b06602eb47..90d12e3213 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java @@ -21,12 +21,15 @@ import lombok.Getter; import lombok.Setter; import neo4j.org.testkit.backend.TestkitState; +import neo4j.org.testkit.backend.holder.AbstractTransactionHolder; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import neo4j.org.testkit.backend.messages.responses.Transaction; import reactor.core.publisher.Mono; import java.util.concurrent.CompletionStage; +import org.neo4j.driver.async.AsyncTransaction; + @Setter @Getter public class TransactionClose implements TestkitRequest @@ -43,13 +46,19 @@ public TestkitResponse process( TestkitState testkitState ) @Override public CompletionStage processAsync( TestkitState testkitState ) { - throw new UnsupportedOperationException(); + return testkitState.getAsyncTransactionHolder( data.getTxId() ) + .thenApply( AbstractTransactionHolder::getTransaction ) + .thenCompose( AsyncTransaction::closeAsync ) + .thenApply( ignored -> createResponse( data.getTxId() ) ); } @Override public Mono processRx( TestkitState testkitState ) { - throw new UnsupportedOperationException( "Operation not supported" ); + return testkitState.getRxTransactionHolder( data.getTxId() ) + .map( AbstractTransactionHolder::getTransaction ) + .flatMap( tx -> Mono.fromDirect( tx.close() ) ) + .then( Mono.just( createResponse( data.getTxId() ) ) ); } private Transaction createResponse( String txId )