From 7a365b61f9b4b61ec9678ba81b35e6505503b8ad Mon Sep 17 00:00:00 2001 From: Dmitriy Tverdiakov Date: Thu, 24 Mar 2022 12:05:15 +0000 Subject: [PATCH] Introduce new managed transaction methods and deprecate existing ones This update introduces new methods for managed transactions, also known as transaction functions. Furthermore, it deprecates the existing methods. The new methods are: - `executeRead` (synchronous and reactive), `executeReadAsync` (asynchronous) - `executeWrite` (synchronous and reactive), `executeWriteAsync` (asynchronous) - `executeWriteWithoutResult` (synchronous) There are overloaded options that take transaction configuration. The new methods do not permit explicit management (commit, rollback, close) of managed transaction. The driver will attempt committing the transaction as long as the provided unit of work completes successfully. --- driver/clirr-ignored-differences.xml | 146 +++++++++++++++++ .../java/org/neo4j/driver/QueryRunner.java | 140 +--------------- .../main/java/org/neo4j/driver/Session.java | 153 +++++++++++++++--- .../org/neo4j/driver/SimpleQueryRunner.java | 148 +++++++++++++++++ .../org/neo4j/driver/TransactionCallback.java | 35 ++++ .../org/neo4j/driver/TransactionContext.java | 26 +++ .../org/neo4j/driver/TransactionWork.java | 7 +- .../org/neo4j/driver/async/AsyncSession.java | 87 +++++++++- .../async/AsyncTransactionCallback.java | 35 ++++ .../driver/async/AsyncTransactionContext.java | 26 +++ .../driver/async/AsyncTransactionWork.java | 7 +- .../DelegatingTransactionContext.java | 68 ++++++++ .../driver/internal/InternalSession.java | 13 ++ .../DelegatingAsyncTransactionContext.java | 69 ++++++++ .../internal/async/InternalAsyncSession.java | 13 ++ .../DelegatingRxTransactionContext.java | 68 ++++++++ .../internal/reactive/InternalRxSession.java | 15 +- .../org/neo4j/driver/reactive/RxSession.java | 102 ++++++++++-- .../reactive/RxTransactionCallback.java | 35 ++++ .../driver/reactive/RxTransactionContext.java | 26 +++ .../driver/reactive/RxTransactionWork.java | 7 +- .../DelegatingTransactionContextTest.java | 132 +++++++++++++++ .../driver/internal/InternalSessionTest.java | 128 +++++++++++++++ ...DelegatingAsyncTransactionContextTest.java | 134 +++++++++++++++ .../async/InternalAsyncSessionTest.java | 72 ++++++++- .../DelegatingRxTransactionContextTest.java | 132 +++++++++++++++ .../reactive/InternalRxSessionTest.java | 57 +++++++ .../neo4j/driver/util/SessionExtension.java | 17 +- 28 files changed, 1704 insertions(+), 194 deletions(-) create mode 100644 driver/src/main/java/org/neo4j/driver/SimpleQueryRunner.java create mode 100644 driver/src/main/java/org/neo4j/driver/TransactionCallback.java create mode 100644 driver/src/main/java/org/neo4j/driver/TransactionContext.java create mode 100644 driver/src/main/java/org/neo4j/driver/async/AsyncTransactionCallback.java create mode 100644 driver/src/main/java/org/neo4j/driver/async/AsyncTransactionContext.java create mode 100644 driver/src/main/java/org/neo4j/driver/internal/DelegatingTransactionContext.java create mode 100644 driver/src/main/java/org/neo4j/driver/internal/async/DelegatingAsyncTransactionContext.java create mode 100644 driver/src/main/java/org/neo4j/driver/internal/reactive/DelegatingRxTransactionContext.java create mode 100644 driver/src/main/java/org/neo4j/driver/reactive/RxTransactionCallback.java create mode 100644 driver/src/main/java/org/neo4j/driver/reactive/RxTransactionContext.java create mode 100644 driver/src/test/java/org/neo4j/driver/internal/DelegatingTransactionContextTest.java create mode 100644 driver/src/test/java/org/neo4j/driver/internal/InternalSessionTest.java create mode 100644 driver/src/test/java/org/neo4j/driver/internal/async/DelegatingAsyncTransactionContextTest.java create mode 100644 driver/src/test/java/org/neo4j/driver/internal/reactive/DelegatingRxTransactionContextTest.java diff --git a/driver/clirr-ignored-differences.xml b/driver/clirr-ignored-differences.xml index ff46e0640a..66893cb60c 100644 --- a/driver/clirr-ignored-differences.xml +++ b/driver/clirr-ignored-differences.xml @@ -122,4 +122,150 @@ org.reactivestreams.Publisher isOpen() + + org/neo4j/driver/Session + 7012 + java.lang.Object executeRead(org.neo4j.driver.TransactionCallback) + + + + org/neo4j/driver/Session + 7012 + java.lang.Object executeRead(org.neo4j.driver.TransactionCallback, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/Session + 7012 + void executeReadWithoutResult(java.util.function.Consumer) + + + + org/neo4j/driver/Session + 7012 + void executeReadWithoutResult(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/Session + 7012 + java.lang.Object executeWrite(org.neo4j.driver.TransactionCallback) + + + + org/neo4j/driver/Session + 7012 + java.lang.Object executeWrite(org.neo4j.driver.TransactionCallback, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/Session + 7012 + void executeWriteWithoutResult(java.util.function.Consumer) + + + + org/neo4j/driver/Session + 7012 + void executeWriteWithoutResult(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeReadAsync(org.neo4j.driver.async.AsyncTransactionCallback) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeReadAsync(org.neo4j.driver.async.AsyncTransactionCallback, org.neo4j.driver.TransactionConfig) + + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeReadWithoutResultAsync(java.util.function.Consumer) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeReadWithoutResultAsync(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeWriteAsync(org.neo4j.driver.async.AsyncTransactionCallback) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeWriteAsync(org.neo4j.driver.async.AsyncTransactionCallback, org.neo4j.driver.TransactionConfig) + + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeWriteWithoutResultAsync(java.util.function.Consumer) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeWriteWithoutResultAsync(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeRead(org.neo4j.driver.reactive.RxTransactionCallback) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeRead(org.neo4j.driver.reactive.RxTransactionCallback, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeReadWithoutResult(java.util.function.Consumer) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeReadWithoutResult(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeWrite(org.neo4j.driver.reactive.RxTransactionCallback) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeWrite(org.neo4j.driver.reactive.RxTransactionCallback, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeWriteWithoutResult(java.util.function.Consumer) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeWriteWithoutResult(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + diff --git a/driver/src/main/java/org/neo4j/driver/QueryRunner.java b/driver/src/main/java/org/neo4j/driver/QueryRunner.java index a27f767107..7492cc1af9 100644 --- a/driver/src/main/java/org/neo4j/driver/QueryRunner.java +++ b/driver/src/main/java/org/neo4j/driver/QueryRunner.java @@ -18,147 +18,11 @@ */ package org.neo4j.driver; -import java.util.Map; - /** - * Common interface for components that can execute Neo4j queries. - * - *

Important notes on semantics

- *

- * queries run in the same {@link QueryRunner} are guaranteed - * to execute in order, meaning changes made by one query will be seen - * by all subsequent queries in the same {@link QueryRunner}. - *

- * However, to allow handling very large results, and to improve performance, - * result streams are retrieved lazily from the network. - * This means that when any of {@link #run(Query)} - * methods return a result, the query has only started executing - it may not - * have completed yet. Most of the time, you will not notice this, because the - * driver automatically waits for queries to complete at specific points to - * fulfill its contracts. - *

- * Specifically, the driver will ensure all outstanding queries are completed - * whenever you: + * An {@link AutoCloseable} extension of the {@link SimpleQueryRunner}. * - *

- *

- * As noted, most of the time, you will not need to consider this - your writes will - * always be durably stored as long as you either use the results, explicitly commit - * {@link Transaction transactions} or close the session you used using {@link Session#close()}. - *

- * While these semantics introduce some complexity, it gives the driver the ability - * to handle infinite result streams (like subscribing to events), significantly lowers - * the memory overhead for your application and improves performance. - * - * @see Session - * @see Transaction * @since 1.0 */ -public interface QueryRunner extends AutoCloseable +public interface QueryRunner extends SimpleQueryRunner, AutoCloseable { - /** - * Run a query and return a result stream. - *

- * This method takes a set of parameters that will be injected into the - * query by Neo4j. Using parameters is highly encouraged, it helps avoid - * dangerous cypher injection attacks and improves database performance as - * Neo4j can re-use query plans more often. - *

- * This particular method takes a {@link Value} as its input. This is useful - * if you want to take a map-like value that you've gotten from a prior result - * and send it back as parameters. - *

- * If you are creating parameters programmatically, {@link #run(String, Map)} - * might be more helpful, it converts your map to a {@link Value} for you. - * - *

Example

- *
-     * {@code
-     *
-     * Result result = session.run( "MATCH (n) WHERE n.name = $myNameParam RETURN (n)",
-     *                                       Values.parameters( "myNameParam", "Bob" ) );
-     * }
-     * 
- * - * @param query text of a Neo4j query - * @param parameters input parameters, should be a map Value, see {@link Values#parameters(Object...)}. - * @return a stream of result values and associated metadata - */ - Result run(String query, Value parameters ); - - /** - * Run a query and return a result stream. - *

- * This method takes a set of parameters that will be injected into the - * query by Neo4j. Using parameters is highly encouraged, it helps avoid - * dangerous cypher injection attacks and improves database performance as - * Neo4j can re-use query plans more often. - *

- * This version of run takes a {@link Map} of parameters. The values in the map - * must be values that can be converted to Neo4j types. See {@link Values#parameters(Object...)} for - * a list of allowed types. - * - *

Example

- *
-     * {@code
-     *
-     * Map parameters = new HashMap();
-     * parameters.put("myNameParam", "Bob");
-     *
-     * Result result = session.run( "MATCH (n) WHERE n.name = $myNameParam RETURN (n)",
-     *                                       parameters );
-     * }
-     * 
- * - * @param query text of a Neo4j query - * @param parameters input data for the query - * @return a stream of result values and associated metadata - */ - Result run(String query, Map parameters ); - - /** - * Run a query and return a result stream. - *

- * This method takes a set of parameters that will be injected into the - * query by Neo4j. Using parameters is highly encouraged, it helps avoid - * dangerous cypher injection attacks and improves database performance as - * Neo4j can re-use query plans more often. - *

- * This version of run takes a {@link Record} of parameters, which can be useful - * if you want to use the output of one query as input for another. - * - * @param query text of a Neo4j query - * @param parameters input data for the query - * @return a stream of result values and associated metadata - */ - Result run(String query, Record parameters ); - - /** - * Run a query and return a result stream. - * - * @param query text of a Neo4j query - * @return a stream of result values and associated metadata - */ - Result run(String query ); - - /** - * Run a query and return a result stream. - *

Example

- *
-     * {@code
-     *
-     * Query query = new Query( "MATCH (n) WHERE n.name = $myNameParam RETURN n.age" );
-     * Result result = session.run( query.withParameters( Values.parameters( "myNameParam", "Bob" )  ) );
-     * }
-     * 
- * - * @param query a Neo4j query - * @return a stream of result values and associated metadata - */ - Result run(Query query); } diff --git a/driver/src/main/java/org/neo4j/driver/Session.java b/driver/src/main/java/org/neo4j/driver/Session.java index d4823b4969..da44ccb556 100644 --- a/driver/src/main/java/org/neo4j/driver/Session.java +++ b/driver/src/main/java/org/neo4j/driver/Session.java @@ -19,6 +19,7 @@ package org.neo4j.driver; import java.util.Map; +import java.util.function.Consumer; import org.neo4j.driver.async.AsyncSession; import org.neo4j.driver.util.Resource; @@ -87,67 +88,169 @@ public interface Session extends Resource, QueryRunner * @param work the {@link TransactionWork} to be applied to a new read transaction. * @param the return type of the given unit of work. * @return a result as returned by the given unit of work. + * @deprecated superseded by {@link #executeRead(TransactionCallback)}. */ + @Deprecated T readTransaction( TransactionWork work ); /** - * Execute a unit of work in a managed {@link AccessMode#READ read} transaction - * with the specified {@link TransactionConfig configuration}. + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. *

- * This transaction will automatically be committed unless an exception is - * thrown during query execution or by the user code. + * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. *

- * Managed transactions should not generally be explicitly committed (via - * {@link Transaction#commit()}). + * The provided unit of work should not return {@link Result} object. * - * @param work the {@link TransactionWork} to be applied to a new read transaction. + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a result as returned by the given unit of work. + */ + default T executeRead( TransactionCallback callback ) + { + return executeRead( callback, TransactionConfig.empty() ); + } + + /** + * Execute a unit of work in a managed {@link AccessMode#READ read} transaction with the specified {@link TransactionConfig configuration}. + *

+ * This transaction will automatically be committed unless an exception is thrown during query execution or by the user code. + *

+ * Managed transactions should not generally be explicitly committed (via {@link Transaction#commit()}). + * + * @param work the {@link TransactionWork} to be applied to a new read transaction. * @param config configuration for all transactions started to execute the unit of work. - * @param the return type of the given unit of work. + * @param the return type of the given unit of work. * @return a result as returned by the given unit of work. + * @deprecated superseded by {@link #executeRead(TransactionCallback, TransactionConfig)}. */ + @Deprecated T readTransaction( TransactionWork work, TransactionConfig config ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + * + * @param callback the callback representing the unit of work. + * @param config the transaction configuration for the managed transaction. + * @param the return type of the given unit of work. + * @return a result as returned by the given unit of work. + */ + T executeRead( TransactionCallback callback, TransactionConfig config ); + /** * Execute a unit of work in a managed {@link AccessMode#WRITE write} transaction. *

- * This transaction will automatically be committed unless an exception is - * thrown during query execution or by the user code. + * This transaction will automatically be committed unless an exception is thrown during query execution or by the user code. *

- * Managed transactions should not generally be explicitly committed (via - * {@link Transaction#commit()}). + * Managed transactions should not generally be explicitly committed (via {@link Transaction#commit()}). * * @param work the {@link TransactionWork} to be applied to a new write transaction. - * @param the return type of the given unit of work. + * @param the return type of the given unit of work. * @return a result as returned by the given unit of work. + * @deprecated superseded by {@link #executeWrite(TransactionCallback)}. */ + @Deprecated T writeTransaction( TransactionWork work ); /** - * Execute a unit of work in a managed {@link AccessMode#WRITE write} transaction - * with the specified {@link TransactionConfig configuration}. + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. *

- * This transaction will automatically be committed unless an exception is - * thrown during query execution or by the user code. + * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. *

- * Managed transactions should not generally be explicitly committed (via - * {@link Transaction#commit()}). + * The provided unit of work should not return {@link Result} object. * - * @param work the {@link TransactionWork} to be applied to a new write transaction. + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a result as returned by the given unit of work. + */ + default T executeWrite( TransactionCallback callback ) + { + return executeWrite( callback, TransactionConfig.empty() ); + } + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + * + * @param contextConsumer the consumer representing the unit of work. + */ + default void executeWriteWithoutResult( Consumer contextConsumer ) + { + executeWrite( tc -> + { + contextConsumer.accept( tc ); + return null; + } ); + } + + /** + * Execute a unit of work in a managed {@link AccessMode#WRITE write} transaction with the specified {@link TransactionConfig configuration}. + *

+ * This transaction will automatically be committed unless an exception is thrown during query execution or by the user code. + *

+ * Managed transactions should not generally be explicitly committed (via {@link Transaction#commit()}). + * + * @param work the {@link TransactionWork} to be applied to a new write transaction. * @param config configuration for all transactions started to execute the unit of work. - * @param the return type of the given unit of work. + * @param the return type of the given unit of work. * @return a result as returned by the given unit of work. + * @deprecated superseded by {@link #executeWrite(TransactionCallback, TransactionConfig)}. */ + @Deprecated T writeTransaction( TransactionWork work, TransactionConfig config ); /** - * Run a query in a managed auto-commit transaction with the specified - * {@link TransactionConfig configuration}, and return a result stream. + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. * - * @param query text of a Neo4j query. + * @param callback the callback representing the unit of work. + * @param config the transaction configuration for the managed transaction. + * @param the return type of the given unit of work. + * @return a result as returned by the given unit of work. + */ + T executeWrite( TransactionCallback callback, TransactionConfig config ); + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + * + * @param contextConsumer the consumer representing the unit of work. + * @param config the transaction configuration for the managed transaction. + */ + default void executeWriteWithoutResult( Consumer contextConsumer, TransactionConfig config ) + { + executeWrite( tc -> + { + contextConsumer.accept( tc ); + return null; + }, config ); + } + + /** + * Run a query in a managed auto-commit transaction with the specified {@link TransactionConfig configuration}, and return a result stream. + * + * @param query text of a Neo4j query. * @param config configuration for the new transaction. * @return a stream of result values and associated metadata. */ - Result run(String query, TransactionConfig config ); + Result run( String query, TransactionConfig config ); /** * Run a query with parameters in a managed auto-commit transaction with the diff --git a/driver/src/main/java/org/neo4j/driver/SimpleQueryRunner.java b/driver/src/main/java/org/neo4j/driver/SimpleQueryRunner.java new file mode 100644 index 0000000000..8e3101e766 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/SimpleQueryRunner.java @@ -0,0 +1,148 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver; + +import java.util.Map; + +/** + * Common interface for components that can execute Neo4j queries. + * + *

Important notes on semantics

+ *

+ * queries run in the same {@link QueryRunner} are guaranteed to execute in order, meaning changes made by one query will be seen by all subsequent queries in + * the same {@link QueryRunner}. + *

+ * However, to allow handling very large results, and to improve performance, result streams are retrieved lazily from the network. This means that when any of + * {@link #run(Query)} methods return a result, the query has only started executing - it may not have completed yet. Most of the time, you will not notice + * this, because the driver automatically waits for queries to complete at specific points to fulfill its contracts. + *

+ * Specifically, the driver will ensure all outstanding queries are completed whenever you: + * + *

    + *
  • Read from or discard a result, for instance via + * {@link Result#next()} or {@link Result#consume()}
  • + *
  • Explicitly commit/rollback a transaction using blocking {@link Transaction#close()}
  • + *
  • Close a session using blocking {@link Session#close()}
  • + *
+ *

+ * As noted, most of the time, you will not need to consider this - your writes will + * always be durably stored as long as you either use the results, explicitly commit + * {@link Transaction transactions} or close the session you utilised using {@link Session#close()}. + *

+ * While these semantics introduce some complexity, it gives the driver the ability + * to handle infinite result streams (like subscribing to events), significantly lowers + * the memory overhead for your application and improves performance. + * + * @see Session + * @see Transaction + * @since 5.0 + */ +public interface SimpleQueryRunner +{ + /** + * Run a query and return a result stream. + *

+ * This method takes a set of parameters that will be injected into the query by Neo4j. Using parameters is highly encouraged, it helps avoid dangerous + * cypher injection attacks and improves database performance as Neo4j can re-use query plans more often. + *

+ * This particular method takes a {@link Value} as its input. This is useful if you want to take a map-like value that you've gotten from a prior result and + * send it back as parameters. + *

+ * If you are creating parameters programmatically, {@link #run(String, Map)} might be more helpful, it converts your map to a {@link Value} for you. + * + *

Example

+ *
+     * {@code
+     *
+     * Result result = session.run( "MATCH (n) WHERE n.name = $myNameParam RETURN (n)",
+     *                                       Values.parameters( "myNameParam", "Bob" ) );
+     * }
+     * 
+ * + * @param query text of a Neo4j query + * @param parameters input parameters, should be a map Value, see {@link Values#parameters(Object...)}. + * @return a stream of result values and associated metadata + */ + Result run( String query, Value parameters ); + + /** + * Run a query and return a result stream. + *

+ * This method takes a set of parameters that will be injected into the query by Neo4j. Using parameters is highly encouraged, it helps avoid dangerous + * cypher injection attacks and improves database performance as Neo4j can re-use query plans more often. + *

+ * This version of run takes a {@link Map} of parameters. The values in the map must be values that can be converted to Neo4j types. See {@link + * Values#parameters(Object...)} for a list of allowed types. + * + *

Example

+ *
+     * {@code
+     *
+     * Map parameters = new HashMap();
+     * parameters.put("myNameParam", "Bob");
+     *
+     * Result result = session.run( "MATCH (n) WHERE n.name = $myNameParam RETURN (n)",
+     *                                       parameters );
+     * }
+     * 
+ * + * @param query text of a Neo4j query + * @param parameters input data for the query + * @return a stream of result values and associated metadata + */ + Result run( String query, Map parameters ); + + /** + * Run a query and return a result stream. + *

+ * This method takes a set of parameters that will be injected into the query by Neo4j. Using parameters is highly encouraged, it helps avoid dangerous + * cypher injection attacks and improves database performance as Neo4j can re-use query plans more often. + *

+ * This version of run takes a {@link Record} of parameters, which can be useful if you want to use the output of one query as input for another. + * + * @param query text of a Neo4j query + * @param parameters input data for the query + * @return a stream of result values and associated metadata + */ + Result run( String query, Record parameters ); + + /** + * Run a query and return a result stream. + * + * @param query text of a Neo4j query + * @return a stream of result values and associated metadata + */ + Result run( String query ); + + /** + * Run a query and return a result stream. + *

Example

+ *
+     * {@code
+     *
+     * Query query = new Query( "MATCH (n) WHERE n.name = $myNameParam RETURN n.age" );
+     * Result result = session.run( query.withParameters( Values.parameters( "myNameParam", "Bob" )  ) );
+     * }
+     * 
+ * + * @param query a Neo4j query + * @return a stream of result values and associated metadata + */ + Result run( Query query ); +} diff --git a/driver/src/main/java/org/neo4j/driver/TransactionCallback.java b/driver/src/main/java/org/neo4j/driver/TransactionCallback.java new file mode 100644 index 0000000000..b34b7bc0fe --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/TransactionCallback.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver; + +/** + * Callback that executes operations against a given {@link TransactionContext}. + * + * @param the return type of this work. + */ +public interface TransactionCallback +{ + /** + * Executes all given operations against the same transaction context. + * + * @param context the transaction context to use. + * @return result object or {@code null} if none. + */ + T execute( TransactionContext context ); +} diff --git a/driver/src/main/java/org/neo4j/driver/TransactionContext.java b/driver/src/main/java/org/neo4j/driver/TransactionContext.java new file mode 100644 index 0000000000..ad94347dff --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/TransactionContext.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver; + +/** + * A context for running queries within transaction. + */ +public interface TransactionContext extends SimpleQueryRunner +{ +} diff --git a/driver/src/main/java/org/neo4j/driver/TransactionWork.java b/driver/src/main/java/org/neo4j/driver/TransactionWork.java index f78b0a7cc7..f868f6ad1b 100644 --- a/driver/src/main/java/org/neo4j/driver/TransactionWork.java +++ b/driver/src/main/java/org/neo4j/driver/TransactionWork.java @@ -19,12 +19,13 @@ package org.neo4j.driver; /** - * Callback that executes operations against a given {@link Transaction}. - * To be used with {@link Session#readTransaction(TransactionWork)} and - * {@link Session#writeTransaction(TransactionWork)} methods. + * Callback that executes operations against a given {@link Transaction}. To be used with {@link Session#readTransaction(TransactionWork)} and {@link + * Session#writeTransaction(TransactionWork)} methods. * * @param the return type of this work. + * @deprecated superseded by {@link TransactionCallback}. */ +@Deprecated public interface TransactionWork { /** diff --git a/driver/src/main/java/org/neo4j/driver/async/AsyncSession.java b/driver/src/main/java/org/neo4j/driver/async/AsyncSession.java index a925612066..8a1f3b467f 100644 --- a/driver/src/main/java/org/neo4j/driver/async/AsyncSession.java +++ b/driver/src/main/java/org/neo4j/driver/async/AsyncSession.java @@ -25,11 +25,12 @@ import java.util.function.Function; import org.neo4j.driver.AccessMode; +import org.neo4j.driver.Bookmark; import org.neo4j.driver.Query; +import org.neo4j.driver.Result; import org.neo4j.driver.Transaction; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.Values; -import org.neo4j.driver.Bookmark; /** * Provides a context of work for database interactions. @@ -133,9 +134,31 @@ public interface AsyncSession extends AsyncQueryRunner * @param the return type of the given unit of work. * @return a {@link CompletionStage completion stage} completed with the same result as returned by the given * unit of work. Stage can be completed exceptionally if given work or commit fails. + * @deprecated superseded by {@link #executeReadAsync(AsyncTransactionCallback)}. */ + @Deprecated CompletionStage readTransactionAsync( AsyncTransactionWork> work ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a completion stage that completes successfully with the result of the unit of work on success or completes exceptionally otherwise. + */ + default CompletionStage executeReadAsync( AsyncTransactionCallback> callback ) + { + return executeReadAsync( callback, TransactionConfig.empty() ); + } + /** * Execute given unit of asynchronous work in a {@link AccessMode#READ read} asynchronous transaction with * the specified {@link TransactionConfig configuration}. @@ -158,9 +181,29 @@ public interface AsyncSession extends AsyncQueryRunner * @param the return type of the given unit of work. * @return a {@link CompletionStage completion stage} completed with the same result as returned by the given * unit of work. Stage can be completed exceptionally if given work or commit fails. + * @deprecated superseded by {@link #executeReadAsync(AsyncTransactionCallback, TransactionConfig)}. */ + @Deprecated CompletionStage readTransactionAsync( AsyncTransactionWork> work, TransactionConfig config ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @param the return type of the given unit of work. + * @return a completion stage that completes successfully with the result of the unit of work on success or completes exceptionally otherwise. + */ + CompletionStage executeReadAsync( AsyncTransactionCallback> callback, TransactionConfig config ); + /** * Execute given unit of asynchronous work in a {@link AccessMode#WRITE write} asynchronous transaction. *

@@ -181,9 +224,31 @@ public interface AsyncSession extends AsyncQueryRunner * @param the return type of the given unit of work. * @return a {@link CompletionStage completion stage} completed with the same result as returned by the given * unit of work. Stage can be completed exceptionally if given work or commit fails. + * @deprecated superseded by {@link #executeWriteAsync(AsyncTransactionCallback)}. */ + @Deprecated CompletionStage writeTransactionAsync( AsyncTransactionWork> work ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a completion stage that completes successfully with the result of the unit of work on success or completes exceptionally otherwise. + */ + default CompletionStage executeWriteAsync( AsyncTransactionCallback> callback ) + { + return executeWriteAsync( callback, TransactionConfig.empty() ); + } + /** * Execute given unit of asynchronous work in a {@link AccessMode#WRITE write} asynchronous transaction with * the specified {@link TransactionConfig configuration}. @@ -206,9 +271,29 @@ public interface AsyncSession extends AsyncQueryRunner * @param the return type of the given unit of work. * @return a {@link CompletionStage completion stage} completed with the same result as returned by the given * unit of work. Stage can be completed exceptionally if given work or commit fails. + * @deprecated superseded by {@link #executeWriteAsync(AsyncTransactionCallback, TransactionConfig)}. */ + @Deprecated CompletionStage writeTransactionAsync( AsyncTransactionWork> work, TransactionConfig config ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @param the return type of the given unit of work. + * @return a completion stage that completes successfully with the result of the unit of work on success or completes exceptionally otherwise. + */ + CompletionStage executeWriteAsync( AsyncTransactionCallback> callback, TransactionConfig config ); + /** * Run a query asynchronously in an auto-commit transaction with the specified {@link TransactionConfig configuration} and return a * {@link CompletionStage} with a result cursor. diff --git a/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionCallback.java b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionCallback.java new file mode 100644 index 0000000000..17300558fe --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionCallback.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.async; + +/** + * Callback that executes operations against a given {@link AsyncTransactionContext}. + * + * @param the return type of this work. + */ +public interface AsyncTransactionCallback +{ + /** + * Executes all given operations against the same transaction context. + * + * @param context the transaction context to use. + * @return result object or {@code null} if none. + */ + T execute( AsyncTransactionContext context ); +} diff --git a/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionContext.java b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionContext.java new file mode 100644 index 0000000000..1667cffdcd --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionContext.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.async; + +/** + * A context for running queries within transaction. + */ +public interface AsyncTransactionContext extends AsyncQueryRunner +{ +} diff --git a/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionWork.java b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionWork.java index fc9a29cb91..64c14d832c 100644 --- a/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionWork.java +++ b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionWork.java @@ -19,13 +19,14 @@ package org.neo4j.driver.async; /** - * Callback that executes operations against a given {@link AsyncTransaction}. - * To be used with {@link AsyncSession#readTransactionAsync(AsyncTransactionWork)} and - * {@link AsyncSession#writeTransactionAsync(AsyncTransactionWork)} (AsyncTransactionWork)} methods. + * Callback that executes operations against a given {@link AsyncTransaction}. To be used with {@link AsyncSession#readTransactionAsync(AsyncTransactionWork)} + * and {@link AsyncSession#writeTransactionAsync(AsyncTransactionWork)} (AsyncTransactionWork)} methods. * * @param the return type of this work. * @since 4.0 + * @deprecated superseded by {@link AsyncTransactionCallback}. */ +@Deprecated public interface AsyncTransactionWork { /** diff --git a/driver/src/main/java/org/neo4j/driver/internal/DelegatingTransactionContext.java b/driver/src/main/java/org/neo4j/driver/internal/DelegatingTransactionContext.java new file mode 100644 index 0000000000..874d6e36f1 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/DelegatingTransactionContext.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + +import java.util.Map; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Result; +import org.neo4j.driver.Transaction; +import org.neo4j.driver.TransactionContext; +import org.neo4j.driver.Value; + +final class DelegatingTransactionContext implements TransactionContext +{ + private final Transaction delegate; + + public DelegatingTransactionContext( Transaction delegate ) + { + this.delegate = delegate; + } + + @Override + public Result run( String query, Value parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public Result run( String query, Map parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public Result run( String query, Record parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public Result run( String query ) + { + return delegate.run( query ); + } + + @Override + public Result run( Query query ) + { + return delegate.run( query ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java index e11133ddee..33008f71c4 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java @@ -26,6 +26,7 @@ import org.neo4j.driver.Result; import org.neo4j.driver.Session; import org.neo4j.driver.Transaction; +import org.neo4j.driver.TransactionCallback; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.TransactionWork; import org.neo4j.driver.async.ResultCursor; @@ -112,6 +113,12 @@ public T readTransaction( TransactionWork work, TransactionConfig config return transaction( AccessMode.READ, work, config ); } + @Override + public T executeRead( TransactionCallback callback, TransactionConfig config ) + { + return readTransaction( tx -> callback.execute( new DelegatingTransactionContext( tx ) ), config ); + } + @Override public T writeTransaction( TransactionWork work ) { @@ -124,6 +131,12 @@ public T writeTransaction( TransactionWork work, TransactionConfig config return transaction( AccessMode.WRITE, work, config ); } + @Override + public T executeWrite( TransactionCallback callback, TransactionConfig config ) + { + return writeTransaction( tx -> callback.execute( new DelegatingTransactionContext( tx ) ), config ); + } + @Override public Bookmark lastBookmark() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/DelegatingAsyncTransactionContext.java b/driver/src/main/java/org/neo4j/driver/internal/async/DelegatingAsyncTransactionContext.java new file mode 100644 index 0000000000..f422df85f6 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/async/DelegatingAsyncTransactionContext.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.async; + +import java.util.Map; +import java.util.concurrent.CompletionStage; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Value; +import org.neo4j.driver.async.AsyncTransaction; +import org.neo4j.driver.async.AsyncTransactionContext; +import org.neo4j.driver.async.ResultCursor; + +final class DelegatingAsyncTransactionContext implements AsyncTransactionContext +{ + private final AsyncTransaction delegate; + + public DelegatingAsyncTransactionContext( AsyncTransaction delegate ) + { + this.delegate = delegate; + } + + @Override + public CompletionStage runAsync( String query, Value parameters ) + { + return delegate.runAsync( query, parameters ); + } + + @Override + public CompletionStage runAsync( String query, Map parameters ) + { + return delegate.runAsync( query, parameters ); + } + + @Override + public CompletionStage runAsync( String query, Record parameters ) + { + return delegate.runAsync( query, parameters ); + } + + @Override + public CompletionStage runAsync( String query ) + { + return delegate.runAsync( query ); + } + + @Override + public CompletionStage runAsync( Query query ) + { + return delegate.runAsync( query ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java index efc291933b..23c12fded6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java @@ -28,6 +28,7 @@ import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.async.AsyncSession; import org.neo4j.driver.async.AsyncTransaction; +import org.neo4j.driver.async.AsyncTransactionCallback; import org.neo4j.driver.async.AsyncTransactionWork; import org.neo4j.driver.async.ResultCursor; import org.neo4j.driver.internal.util.Futures; @@ -99,6 +100,12 @@ public CompletionStage readTransactionAsync( AsyncTransactionWork CompletionStage executeReadAsync( AsyncTransactionCallback> callback, TransactionConfig config ) + { + return readTransactionAsync( tx -> callback.execute( new DelegatingAsyncTransactionContext( tx ) ), config ); + } + @Override public CompletionStage writeTransactionAsync( AsyncTransactionWork> work ) { @@ -111,6 +118,12 @@ public CompletionStage writeTransactionAsync( AsyncTransactionWork CompletionStage executeWriteAsync( AsyncTransactionCallback> callback, TransactionConfig config ) + { + return writeTransactionAsync( tx -> callback.execute( new DelegatingAsyncTransactionContext( tx ) ), config ); + } + @Override public Bookmark lastBookmark() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/DelegatingRxTransactionContext.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/DelegatingRxTransactionContext.java new file mode 100644 index 0000000000..d78116230b --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/DelegatingRxTransactionContext.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.reactive; + +import java.util.Map; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Value; +import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxTransaction; +import org.neo4j.driver.reactive.RxTransactionContext; + +final class DelegatingRxTransactionContext implements RxTransactionContext +{ + private final RxTransaction delegate; + + public DelegatingRxTransactionContext( RxTransaction delegate ) + { + this.delegate = delegate; + } + + @Override + public RxResult run( String query, Value parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public RxResult run( String query, Map parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public RxResult run( String query, Record parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public RxResult run( String query ) + { + return delegate.run( query ); + } + + @Override + public RxResult run( Query query ) + { + return delegate.run( query ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java index 1907ace63f..0403a736b9 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java @@ -35,6 +35,7 @@ import org.neo4j.driver.reactive.RxResult; import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.reactive.RxTransaction; +import org.neo4j.driver.reactive.RxTransactionCallback; import org.neo4j.driver.reactive.RxTransactionWork; import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher; @@ -115,6 +116,12 @@ public Publisher readTransaction( RxTransactionWork Publisher executeRead( RxTransactionCallback> callback, TransactionConfig config ) + { + return readTransaction( tx -> callback.execute( new DelegatingRxTransactionContext( tx ) ), config ); + } + @Override public Publisher writeTransaction( RxTransactionWork> work ) { @@ -127,6 +134,12 @@ public Publisher writeTransaction( RxTransactionWork Publisher executeWrite( RxTransactionCallback> callback, TransactionConfig config ) + { + return writeTransaction( tx -> callback.execute( new DelegatingRxTransactionContext( tx ) ), config ); + } + private Publisher runTransaction( AccessMode mode, RxTransactionWork> work, TransactionConfig config ) { Flux repeatableWork = Flux.usingWhen( beginTransaction( mode, config ), work::execute, @@ -135,7 +148,7 @@ private Publisher runTransaction( AccessMode mode, RxTransactionWork the return type of the given unit of work. * @return a {@link Publisher publisher} completed with the same result as returned by the given unit of work. * publisher can be completed exceptionally if given work or commit fails. + * @deprecated superseded by {@link #executeRead(RxTransactionCallback)}. * */ + @Deprecated Publisher readTransaction( RxTransactionWork> work ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a publisher that emits the result of the unit of work and success signals on success or error otherwise. + */ + default Publisher executeRead( RxTransactionCallback> callback ) + { + return executeRead( callback, TransactionConfig.empty() ); + } + /** * Execute given unit of reactive work in a {@link AccessMode#READ read} reactive transaction with * the specified {@link TransactionConfig configuration}. @@ -104,10 +128,29 @@ public interface RxSession extends RxQueryRunner * @param the return type of the given unit of work. * @return a {@link Publisher publisher} completed with the same result as returned by the given unit of work. * publisher can be completed exceptionally if given work or commit fails. - * + * @deprecated superseded by {@link #executeRead(RxTransactionCallback, TransactionConfig)}. */ + @Deprecated Publisher readTransaction( RxTransactionWork> work, TransactionConfig config ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @param the return type of the given unit of work. + * @return a publisher that emits the result of the unit of work and success signals on success or error otherwise. + */ + Publisher executeRead( RxTransactionCallback> callback, TransactionConfig config ); + /** * Execute given unit of reactive work in a {@link AccessMode#WRITE write} reactive transaction.

@@ -125,10 +168,31 @@ public interface RxSession extends RxQueryRunner * @param the return type of the given unit of work. * @return a {@link Publisher publisher} completed with the same result as returned by the given unit of work. * publisher can be completed exceptionally if given work or commit fails. - * + * @deprecated superseded by {@link #executeWrite(RxTransactionCallback)}. */ + @Deprecated Publisher writeTransaction( RxTransactionWork> work ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a publisher that emits the result of the unit of work and success signals on success or error otherwise. + */ + default Publisher executeWrite( RxTransactionCallback> callback ) + { + return executeWrite( callback, TransactionConfig.empty() ); + } + /** * Execute given unit of reactive work in a {@link AccessMode#WRITE write} reactive transaction with * the specified {@link TransactionConfig configuration}. @@ -148,20 +212,38 @@ public interface RxSession extends RxQueryRunner * @param the return type of the given unit of work. * @return a {@link Publisher publisher} completed with the same result as returned by the given unit of work. * publisher can be completed exceptionally if given work or commit fails. - * + * @deprecated superseded by {@link #executeWrite(RxTransactionCallback, TransactionConfig)}. */ + @Deprecated Publisher writeTransaction( RxTransactionWork> work, TransactionConfig config ); /** - * Run a query with parameters in an auto-commit transaction with specified {@link TransactionConfig} and return a reactive result stream. - * The query is not executed when the reactive result is returned. - * Instead, the publishers in the result will actually start the execution of the query. + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. * - * @param query text of a Neo4j query. + * @param callback the callback representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @param the return type of the given unit of work. + * @return a publisher that emits the result of the unit of work and success signals on success or error otherwise. + */ + Publisher executeWrite( RxTransactionCallback> callback, TransactionConfig config ); + + /** + * Run a query with parameters in an auto-commit transaction with specified {@link TransactionConfig} and return a reactive result stream. The query is not + * executed when the reactive result is returned. Instead, the publishers in the result will actually start the execution of the query. + * + * @param query text of a Neo4j query. * @param config configuration for the new transaction. * @return a reactive result. */ - RxResult run(String query, TransactionConfig config ); + RxResult run( String query, TransactionConfig config ); /** * Run a query with parameters in an auto-commit transaction with specified {@link TransactionConfig} and return a reactive result stream. diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionCallback.java b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionCallback.java new file mode 100644 index 0000000000..d763bb68b9 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionCallback.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.reactive; + +/** + * Callback that executes operations against a given {@link RxTransactionContext}. + * + * @param the return type of this work. + */ +public interface RxTransactionCallback +{ + /** + * Executes all given operations against the same transaction context. + * + * @param context the transaction context to use. + * @return result object or {@code null} if none. + */ + T execute( RxTransactionContext context ); +} diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionContext.java b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionContext.java new file mode 100644 index 0000000000..6d5e531467 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionContext.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.reactive; + +/** + * A context for running queries within transaction. + */ +public interface RxTransactionContext extends RxQueryRunner +{ +} diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java index ef68a8118f..2fe89b7e1a 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java @@ -19,13 +19,14 @@ package org.neo4j.driver.reactive; /** - * Callback that executes operations against a given {@link RxTransaction}. - * To be used with {@link RxSession#readTransaction(RxTransactionWork)} and - * {@link RxSession#writeTransaction(RxTransactionWork)} methods. + * Callback that executes operations against a given {@link RxTransaction}. To be used with {@link RxSession#readTransaction(RxTransactionWork)} and {@link + * RxSession#writeTransaction(RxTransactionWork)} methods. * * @param the return type of this work. * @since 4.0 + * @deprecated superseded by {@link RxTransactionCallback}. */ +@Deprecated public interface RxTransactionWork { /** diff --git a/driver/src/test/java/org/neo4j/driver/internal/DelegatingTransactionContextTest.java b/driver/src/test/java/org/neo4j/driver/internal/DelegatingTransactionContextTest.java new file mode 100644 index 0000000000..146a2b238d --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/DelegatingTransactionContextTest.java @@ -0,0 +1,132 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Map; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Result; +import org.neo4j.driver.Transaction; +import org.neo4j.driver.Value; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.mock; + +public class DelegatingTransactionContextTest +{ + Transaction transaction; + DelegatingTransactionContext context; + + @BeforeEach + void beforeEach() + { + transaction = mock( Transaction.class ); + context = new DelegatingTransactionContext( transaction ); + } + + @Test + void shouldDelegateRunWithValueParams() + { + // GIVEN + String query = "something"; + Value params = mock( Value.class ); + Result expected = mock( Result.class ); + given( transaction.run( query, params ) ).willReturn( expected ); + + // WHEN + Result actual = context.run( query, params ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().run( query, params ); + } + + @Test + void shouldDelegateRunWithMapParams() + { + // GIVEN + String query = "something"; + Map params = Collections.emptyMap(); + Result expected = mock( Result.class ); + given( transaction.run( query, params ) ).willReturn( expected ); + + // WHEN + Result actual = context.run( query, params ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().run( query, params ); + } + + @Test + void shouldDelegateRunWithRecordParams() + { + // GIVEN + String query = "something"; + Record params = mock( Record.class ); + Result expected = mock( Result.class ); + given( transaction.run( query, params ) ).willReturn( expected ); + + // WHEN + Result actual = context.run( query, params ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().run( query, params ); + } + + @Test + void shouldDelegateRun() + { + // GIVEN + String query = "something"; + Result expected = mock( Result.class ); + given( transaction.run( query ) ).willReturn( expected ); + + // WHEN + Result actual = context.run( query ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().run( query ); + } + + @Test + void shouldDelegateRunWithQueryType() + { + // GIVEN + Query query = mock( Query.class ); + Result expected = mock( Result.class ); + given( transaction.run( query ) ).willReturn( expected ); + + // WHEN + Result actual = context.run( query ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().run( query ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/InternalSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/InternalSessionTest.java new file mode 100644 index 0000000000..d3f1d4ecf1 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/InternalSessionTest.java @@ -0,0 +1,128 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.List; +import java.util.function.Consumer; + +import org.neo4j.driver.Session; +import org.neo4j.driver.TransactionCallback; +import org.neo4j.driver.TransactionConfig; +import org.neo4j.driver.TransactionContext; +import org.neo4j.driver.internal.async.NetworkSession; +import org.neo4j.driver.internal.retry.RetryLogic; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.mock; + +public class InternalSessionTest +{ + NetworkSession networkSession; + Session session; + + @BeforeEach + void beforeEach() + { + networkSession = mock( NetworkSession.class ); + session = new InternalSession( networkSession ); + } + + @ParameterizedTest + @MethodSource( "executeVariations" ) + void shouldDelegateExecuteReadToRetryLogic( ExecuteVariation executeVariation ) + { + // GIVEN + RetryLogic logic = mock( RetryLogic.class ); + String expected = ""; + given( logic.retry( any() ) ).willReturn( expected ); + given( networkSession.retryLogic() ).willReturn( logic ); + TransactionCallback tc = ( ignored ) -> expected; + Consumer consumer = ( ignored ) -> + { + }; + TransactionConfig config = TransactionConfig.builder().build(); + + // WHEN + String actual = null; + if ( executeVariation.readOnly ) + { + actual = executeVariation.explicitTxConfig ? session.executeRead( tc, config ) : session.executeRead( tc ); + } + else + { + if ( executeVariation.hasResult ) + { + actual = executeVariation.explicitTxConfig ? session.executeWrite( tc, config ) : session.executeWrite( tc ); + } + else + { + if ( executeVariation.explicitTxConfig ) + { + session.executeWriteWithoutResult( consumer, config ); + } + else + { + session.executeWriteWithoutResult( consumer ); + } + } + } + + // THEN + if ( executeVariation.hasResult ) + { + assertEquals( expected, actual ); + } + then( networkSession ).should().retryLogic(); + then( logic ).should().retry( any() ); + } + + static List executeVariations() + { + return Arrays.asList( + new ExecuteVariation( false, false, false ), + new ExecuteVariation( false, false, true ), + new ExecuteVariation( false, true, false ), + new ExecuteVariation( false, true, true ), + new ExecuteVariation( true, false, true ), + new ExecuteVariation( true, true, true ) + ); + } + + private static class ExecuteVariation + { + private final boolean readOnly; + private final boolean explicitTxConfig; + private final boolean hasResult; + + private ExecuteVariation( boolean readOnly, boolean explicitTxConfig, boolean hasResult ) + { + this.readOnly = readOnly; + this.explicitTxConfig = explicitTxConfig; + this.hasResult = hasResult; + } + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/DelegatingAsyncTransactionContextTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/DelegatingAsyncTransactionContextTest.java new file mode 100644 index 0000000000..4c1609526e --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/async/DelegatingAsyncTransactionContextTest.java @@ -0,0 +1,134 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.async; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Value; +import org.neo4j.driver.async.AsyncTransaction; +import org.neo4j.driver.async.ResultCursor; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.mock; + +public class DelegatingAsyncTransactionContextTest +{ + AsyncTransaction transaction; + DelegatingAsyncTransactionContext context; + + @BeforeEach + void beforeEach() + { + transaction = mock( AsyncTransaction.class ); + context = new DelegatingAsyncTransactionContext( transaction ); + } + + @Test + void shouldDelegateRunWithValueParams() + { + // GIVEN + String query = "something"; + Value params = mock( Value.class ); + CompletionStage expected = CompletableFuture.completedFuture( null ); + given( transaction.runAsync( query, params ) ).willReturn( expected ); + + // WHEN + CompletionStage actual = context.runAsync( query, params ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().runAsync( query, params ); + } + + @Test + void shouldDelegateRunWithMapParams() + { + // GIVEN + String query = "something"; + Map params = Collections.emptyMap(); + CompletionStage expected = CompletableFuture.completedFuture( null ); + given( transaction.runAsync( query, params ) ).willReturn( expected ); + + // WHEN + CompletionStage actual = context.runAsync( query, params ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().runAsync( query, params ); + } + + @Test + void shouldDelegateRunWithRecordParams() + { + // GIVEN + String query = "something"; + Record params = mock( Record.class ); + CompletionStage expected = CompletableFuture.completedFuture( null ); + given( transaction.runAsync( query, params ) ).willReturn( expected ); + + // WHEN + CompletionStage actual = context.runAsync( query, params ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().runAsync( query, params ); + } + + @Test + void shouldDelegateRun() + { + // GIVEN + String query = "something"; + CompletionStage expected = CompletableFuture.completedFuture( null ); + given( transaction.runAsync( query ) ).willReturn( expected ); + + // WHEN + CompletionStage actual = context.runAsync( query ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().runAsync( query ); + } + + @Test + void shouldDelegateRunWithQueryType() + { + // GIVEN + Query query = mock( Query.class ); + CompletionStage expected = CompletableFuture.completedFuture( null ); + given( transaction.runAsync( query ) ).willReturn( expected ); + + // WHEN + CompletionStage actual = context.runAsync( query ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().runAsync( query ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncSessionTest.java index 627ea6750a..36e22112af 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncSessionTest.java @@ -24,7 +24,11 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Stream; @@ -35,6 +39,7 @@ import org.neo4j.driver.Value; import org.neo4j.driver.async.AsyncSession; import org.neo4j.driver.async.AsyncTransaction; +import org.neo4j.driver.async.AsyncTransactionCallback; import org.neo4j.driver.async.AsyncTransactionWork; import org.neo4j.driver.async.ResultCursor; import org.neo4j.driver.exceptions.ServiceUnavailableException; @@ -59,6 +64,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -132,7 +139,7 @@ private static Stream>> allRunTxM @ParameterizedTest @MethodSource( "allSessionRunMethods" ) - void shouldFlushOnRun( Function> runReturnOne ) throws Throwable + void shouldFlushOnRun( Function> runReturnOne ) { setupSuccessfulRunAndPull( connection ); @@ -143,7 +150,7 @@ void shouldFlushOnRun( Function> runR @ParameterizedTest @MethodSource( "allBeginTxMethods" ) - void shouldDelegateBeginTx( Function> beginTx ) throws Throwable + void shouldDelegateBeginTx( Function> beginTx ) { AsyncTransaction tx = await( beginTx.apply( asyncSession ) ); @@ -153,7 +160,7 @@ void shouldDelegateBeginTx( Function> runTx ) throws Throwable + void txRunShouldBeginAndCommitTx( Function> runTx ) { String string = await( runTx.apply( asyncSession ) ); @@ -224,21 +231,48 @@ void writeTxRetriedUntilFailureWhenTxCloseThrows() testTxIsRetriedUntilFailureWhenCommitFails( WRITE ); } - @Test - void shouldCloseSession() throws Throwable + void shouldCloseSession() { - await ( asyncSession.closeAsync() ); + await( asyncSession.closeAsync() ); assertFalse( this.session.isOpen() ); } @Test - void shouldReturnBookmark() throws Throwable + void shouldReturnBookmark() { session = newSession( connectionProvider, InternalBookmark.parse( "Bookmark1" ) ); asyncSession = new InternalAsyncSession( session ); - assertThat( asyncSession.lastBookmark(), equalTo( session.lastBookmark() )); + assertThat( asyncSession.lastBookmark(), equalTo( session.lastBookmark() ) ); + } + + @ParameterizedTest + @MethodSource( "executeVariations" ) + void shouldDelegateExecuteReadToRetryLogic( ExecuteVariation executeVariation ) throws ExecutionException, InterruptedException + { + // GIVEN + NetworkSession networkSession = mock( NetworkSession.class ); + AsyncSession session = new InternalAsyncSession( networkSession ); + RetryLogic logic = mock( RetryLogic.class ); + String expected = ""; + given( networkSession.retryLogic() ).willReturn( logic ); + AsyncTransactionCallback> tc = ( ignored ) -> CompletableFuture.completedFuture( expected ); + given( logic.retryAsync( any() ) ).willReturn( tc.execute( null ) ); + TransactionConfig config = TransactionConfig.builder().build(); + + // WHEN + CompletionStage actual = executeVariation.readOnly ? + ( + executeVariation.explicitTxConfig ? session.executeReadAsync( tc, config ) : session.executeReadAsync( tc ) + ) : ( + executeVariation.explicitTxConfig ? session.executeWriteAsync( tc, config ) : session.executeWriteAsync( tc ) + ); + + // THEN + assertEquals( expected, actual.toCompletableFuture().get() ); + then( networkSession ).should().retryLogic(); + then( logic ).should().retryAsync( any() ); } private void testTxRollbackWhenThrows( AccessMode transactionMode ) @@ -391,4 +425,26 @@ public CompletionStage execute( AsyncTransaction tx ) return completedFuture( result ); } } + + static List executeVariations() + { + return Arrays.asList( + new ExecuteVariation( false, false ), + new ExecuteVariation( false, true ), + new ExecuteVariation( true, false ), + new ExecuteVariation( true, true ) + ); + } + + private static class ExecuteVariation + { + private final boolean readOnly; + private final boolean explicitTxConfig; + + private ExecuteVariation( boolean readOnly, boolean explicitTxConfig ) + { + this.readOnly = readOnly; + this.explicitTxConfig = explicitTxConfig; + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/DelegatingRxTransactionContextTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/DelegatingRxTransactionContextTest.java new file mode 100644 index 0000000000..267d2388e6 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/DelegatingRxTransactionContextTest.java @@ -0,0 +1,132 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.reactive; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Map; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Value; +import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxTransaction; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.mock; + +public class DelegatingRxTransactionContextTest +{ + RxTransaction transaction; + DelegatingRxTransactionContext context; + + @BeforeEach + void beforeEach() + { + transaction = mock( RxTransaction.class ); + context = new DelegatingRxTransactionContext( transaction ); + } + + @Test + void shouldDelegateRunWithValueParams() + { + // GIVEN + String query = "something"; + Value params = mock( Value.class ); + RxResult expected = mock( RxResult.class ); + given( transaction.run( query, params ) ).willReturn( expected ); + + // WHEN + RxResult actual = context.run( query, params ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().run( query, params ); + } + + @Test + void shouldDelegateRunWithMapParams() + { + // GIVEN + String query = "something"; + Map params = Collections.emptyMap(); + RxResult expected = mock( RxResult.class ); + given( transaction.run( query, params ) ).willReturn( expected ); + + // WHEN + RxResult actual = context.run( query, params ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().run( query, params ); + } + + @Test + void shouldDelegateRunWithRecordParams() + { + // GIVEN + String query = "something"; + Record params = mock( Record.class ); + RxResult expected = mock( RxResult.class ); + given( transaction.run( query, params ) ).willReturn( expected ); + + // WHEN + RxResult actual = context.run( query, params ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().run( query, params ); + } + + @Test + void shouldDelegateRun() + { + // GIVEN + String query = "something"; + RxResult expected = mock( RxResult.class ); + given( transaction.run( query ) ).willReturn( expected ); + + // WHEN + RxResult actual = context.run( query ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().run( query ); + } + + @Test + void shouldDelegateRunWithQueryType() + { + // GIVEN + Query query = mock( Query.class ); + RxResult expected = mock( RxResult.class ); + given( transaction.run( query ) ).willReturn( expected ); + + // WHEN + RxResult actual = context.run( query ); + + // THEN + assertEquals( expected, actual ); + then( transaction ).should().run( query ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java index c0ef7e7483..682ec8c6de 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java @@ -26,6 +26,8 @@ import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; @@ -42,20 +44,25 @@ import org.neo4j.driver.internal.async.UnmanagedTransaction; import org.neo4j.driver.internal.cursor.RxResultCursor; import org.neo4j.driver.internal.cursor.RxResultCursorImpl; +import org.neo4j.driver.internal.retry.RetryLogic; import org.neo4j.driver.internal.util.FixedRetryLogic; import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.internal.value.IntegerValue; import org.neo4j.driver.reactive.RxResult; import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.reactive.RxTransaction; +import org.neo4j.driver.reactive.RxTransactionCallback; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -309,4 +316,54 @@ void shouldDelegateClose() verify( session ).closeAsync(); verifyNoMoreInteractions( session ); } + + @ParameterizedTest + @MethodSource( "executeVariations" ) + void shouldDelegateExecuteReadToRetryLogic( ExecuteVariation executeVariation ) + { + // GIVEN + NetworkSession networkSession = mock( NetworkSession.class ); + RxSession session = new InternalRxSession( networkSession ); + RetryLogic logic = mock( RetryLogic.class ); + String expected = ""; + given( networkSession.retryLogic() ).willReturn( logic ); + RxTransactionCallback> tc = ( ignored ) -> Mono.justOrEmpty( expected ); + given( logic.retryRx( any() ) ).willReturn( tc.execute( null ) ); + TransactionConfig config = TransactionConfig.builder().build(); + + // WHEN + Publisher actual = executeVariation.readOnly ? + ( + executeVariation.explicitTxConfig ? session.executeRead( tc, config ) : session.executeRead( tc ) + ) : ( + executeVariation.explicitTxConfig ? session.executeWrite( tc, config ) : session.executeWrite( tc ) + ); + + // THEN + assertEquals( expected, Mono.from( actual ).block() ); + then( networkSession ).should().retryLogic(); + then( logic ).should().retryRx( any() ); + } + + static List executeVariations() + { + return Arrays.asList( + new ExecuteVariation( false, false ), + new ExecuteVariation( false, true ), + new ExecuteVariation( true, false ), + new ExecuteVariation( true, true ) + ); + } + + private static class ExecuteVariation + { + private final boolean readOnly; + private final boolean explicitTxConfig; + + private ExecuteVariation( boolean readOnly, boolean explicitTxConfig ) + { + this.readOnly = readOnly; + this.explicitTxConfig = explicitTxConfig; + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/util/SessionExtension.java b/driver/src/test/java/org/neo4j/driver/util/SessionExtension.java index e1c85057ea..f0d94b968d 100644 --- a/driver/src/test/java/org/neo4j/driver/util/SessionExtension.java +++ b/driver/src/test/java/org/neo4j/driver/util/SessionExtension.java @@ -30,6 +30,7 @@ import org.neo4j.driver.Result; import org.neo4j.driver.Session; import org.neo4j.driver.Transaction; +import org.neo4j.driver.TransactionCallback; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.TransactionWork; import org.neo4j.driver.Value; @@ -94,6 +95,12 @@ public T readTransaction( TransactionWork work, TransactionConfig config return realSession.readTransaction( work, config ); } + @Override + public T executeRead( TransactionCallback callback, TransactionConfig config ) + { + return realSession.executeRead( callback, config ); + } + @Override public T writeTransaction( TransactionWork work ) { @@ -106,6 +113,12 @@ public T writeTransaction( TransactionWork work, TransactionConfig config return realSession.writeTransaction( work, config ); } + @Override + public T executeWrite( TransactionCallback callback, TransactionConfig config ) + { + return realSession.executeWrite( callback, config ); + } + @Override public Bookmark lastBookmark() { @@ -113,9 +126,9 @@ public Bookmark lastBookmark() } @Override - public Result run(String query, Map parameters) + public Result run( String query, Map parameters ) { - return realSession.run(query, parameters); + return realSession.run( query, parameters ); } @Override