From b1bd259503d2ef203f58717d56737b95d05e73df Mon Sep 17 00:00:00 2001 From: Dmitriy Tverdiakov Date: Thu, 24 Mar 2022 12:05:15 +0000 Subject: [PATCH] Introduce new managed transaction methods and deprecate existing methods This update introduces new methods for managed transactions, also known as transaction functions. Furthermore, it deprecates the existing methods. The new methods are: - `executeRead` - `executeReadWithoutResult` - `executeWrite` - `executeWriteWithoutResult` There are overloaded options that take transaction configuration. The new methods are available in all supported API styles (synchronous, asynchronous and reactive). The new methods do not permit explicit management (commit, rollback, close) of managed transaction. The driver will attempt committing the transaction as long as the provided unit of work completes successfully. --- driver/clirr-ignored-differences.xml | 146 +++++++++++++ .../java/org/neo4j/driver/QueryRunner.java | 140 +------------ .../main/java/org/neo4j/driver/Session.java | 192 ++++++++++++++--- .../org/neo4j/driver/SimpleQueryRunner.java | 148 +++++++++++++ .../org/neo4j/driver/TransactionCallback.java | 24 +++ .../org/neo4j/driver/TransactionContext.java | 23 ++ .../org/neo4j/driver/TransactionWork.java | 1 + .../org/neo4j/driver/async/AsyncSession.java | 182 +++++++++++++++- .../async/AsyncTransactionCallback.java | 24 +++ .../driver/async/AsyncTransactionContext.java | 23 ++ .../driver/async/AsyncTransactionWork.java | 1 + .../DelegatingTransactionContext.java | 68 ++++++ .../driver/internal/InternalSession.java | 13 ++ .../DelegatingAsyncTransactionContext.java | 69 ++++++ .../internal/async/InternalAsyncSession.java | 13 ++ .../DelegatingRxTransactionContext.java | 68 ++++++ .../internal/reactive/InternalRxSession.java | 15 +- .../org/neo4j/driver/reactive/RxSession.java | 197 +++++++++++++++++- .../reactive/RxTransactionCallback.java | 24 +++ .../driver/reactive/RxTransactionContext.java | 23 ++ .../driver/reactive/RxTransactionWork.java | 1 + .../neo4j/driver/util/SessionExtension.java | 17 +- 22 files changed, 1235 insertions(+), 177 deletions(-) create mode 100644 driver/src/main/java/org/neo4j/driver/SimpleQueryRunner.java create mode 100644 driver/src/main/java/org/neo4j/driver/TransactionCallback.java create mode 100644 driver/src/main/java/org/neo4j/driver/TransactionContext.java create mode 100644 driver/src/main/java/org/neo4j/driver/async/AsyncTransactionCallback.java create mode 100644 driver/src/main/java/org/neo4j/driver/async/AsyncTransactionContext.java create mode 100644 driver/src/main/java/org/neo4j/driver/internal/DelegatingTransactionContext.java create mode 100644 driver/src/main/java/org/neo4j/driver/internal/async/DelegatingAsyncTransactionContext.java create mode 100644 driver/src/main/java/org/neo4j/driver/internal/reactive/DelegatingRxTransactionContext.java create mode 100644 driver/src/main/java/org/neo4j/driver/reactive/RxTransactionCallback.java create mode 100644 driver/src/main/java/org/neo4j/driver/reactive/RxTransactionContext.java diff --git a/driver/clirr-ignored-differences.xml b/driver/clirr-ignored-differences.xml index ff46e0640a..66893cb60c 100644 --- a/driver/clirr-ignored-differences.xml +++ b/driver/clirr-ignored-differences.xml @@ -122,4 +122,150 @@ org.reactivestreams.Publisher isOpen() + + org/neo4j/driver/Session + 7012 + java.lang.Object executeRead(org.neo4j.driver.TransactionCallback) + + + + org/neo4j/driver/Session + 7012 + java.lang.Object executeRead(org.neo4j.driver.TransactionCallback, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/Session + 7012 + void executeReadWithoutResult(java.util.function.Consumer) + + + + org/neo4j/driver/Session + 7012 + void executeReadWithoutResult(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/Session + 7012 + java.lang.Object executeWrite(org.neo4j.driver.TransactionCallback) + + + + org/neo4j/driver/Session + 7012 + java.lang.Object executeWrite(org.neo4j.driver.TransactionCallback, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/Session + 7012 + void executeWriteWithoutResult(java.util.function.Consumer) + + + + org/neo4j/driver/Session + 7012 + void executeWriteWithoutResult(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeReadAsync(org.neo4j.driver.async.AsyncTransactionCallback) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeReadAsync(org.neo4j.driver.async.AsyncTransactionCallback, org.neo4j.driver.TransactionConfig) + + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeReadWithoutResultAsync(java.util.function.Consumer) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeReadWithoutResultAsync(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeWriteAsync(org.neo4j.driver.async.AsyncTransactionCallback) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeWriteAsync(org.neo4j.driver.async.AsyncTransactionCallback, org.neo4j.driver.TransactionConfig) + + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeWriteWithoutResultAsync(java.util.function.Consumer) + + + + org/neo4j/driver/async/AsyncSession + 7012 + java.util.concurrent.CompletionStage executeWriteWithoutResultAsync(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeRead(org.neo4j.driver.reactive.RxTransactionCallback) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeRead(org.neo4j.driver.reactive.RxTransactionCallback, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeReadWithoutResult(java.util.function.Consumer) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeReadWithoutResult(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeWrite(org.neo4j.driver.reactive.RxTransactionCallback) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeWrite(org.neo4j.driver.reactive.RxTransactionCallback, org.neo4j.driver.TransactionConfig) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeWriteWithoutResult(java.util.function.Consumer) + + + + org/neo4j/driver/reactive/RxSession + 7012 + org.reactivestreams.Publisher executeWriteWithoutResult(java.util.function.Consumer, org.neo4j.driver.TransactionConfig) + + diff --git a/driver/src/main/java/org/neo4j/driver/QueryRunner.java b/driver/src/main/java/org/neo4j/driver/QueryRunner.java index a27f767107..7492cc1af9 100644 --- a/driver/src/main/java/org/neo4j/driver/QueryRunner.java +++ b/driver/src/main/java/org/neo4j/driver/QueryRunner.java @@ -18,147 +18,11 @@ */ package org.neo4j.driver; -import java.util.Map; - /** - * Common interface for components that can execute Neo4j queries. - * - *

Important notes on semantics

- *

- * queries run in the same {@link QueryRunner} are guaranteed - * to execute in order, meaning changes made by one query will be seen - * by all subsequent queries in the same {@link QueryRunner}. - *

- * However, to allow handling very large results, and to improve performance, - * result streams are retrieved lazily from the network. - * This means that when any of {@link #run(Query)} - * methods return a result, the query has only started executing - it may not - * have completed yet. Most of the time, you will not notice this, because the - * driver automatically waits for queries to complete at specific points to - * fulfill its contracts. - *

- * Specifically, the driver will ensure all outstanding queries are completed - * whenever you: + * An {@link AutoCloseable} extension of the {@link SimpleQueryRunner}. * - *

- *

- * As noted, most of the time, you will not need to consider this - your writes will - * always be durably stored as long as you either use the results, explicitly commit - * {@link Transaction transactions} or close the session you used using {@link Session#close()}. - *

- * While these semantics introduce some complexity, it gives the driver the ability - * to handle infinite result streams (like subscribing to events), significantly lowers - * the memory overhead for your application and improves performance. - * - * @see Session - * @see Transaction * @since 1.0 */ -public interface QueryRunner extends AutoCloseable +public interface QueryRunner extends SimpleQueryRunner, AutoCloseable { - /** - * Run a query and return a result stream. - *

- * This method takes a set of parameters that will be injected into the - * query by Neo4j. Using parameters is highly encouraged, it helps avoid - * dangerous cypher injection attacks and improves database performance as - * Neo4j can re-use query plans more often. - *

- * This particular method takes a {@link Value} as its input. This is useful - * if you want to take a map-like value that you've gotten from a prior result - * and send it back as parameters. - *

- * If you are creating parameters programmatically, {@link #run(String, Map)} - * might be more helpful, it converts your map to a {@link Value} for you. - * - *

Example

- *
-     * {@code
-     *
-     * Result result = session.run( "MATCH (n) WHERE n.name = $myNameParam RETURN (n)",
-     *                                       Values.parameters( "myNameParam", "Bob" ) );
-     * }
-     * 
- * - * @param query text of a Neo4j query - * @param parameters input parameters, should be a map Value, see {@link Values#parameters(Object...)}. - * @return a stream of result values and associated metadata - */ - Result run(String query, Value parameters ); - - /** - * Run a query and return a result stream. - *

- * This method takes a set of parameters that will be injected into the - * query by Neo4j. Using parameters is highly encouraged, it helps avoid - * dangerous cypher injection attacks and improves database performance as - * Neo4j can re-use query plans more often. - *

- * This version of run takes a {@link Map} of parameters. The values in the map - * must be values that can be converted to Neo4j types. See {@link Values#parameters(Object...)} for - * a list of allowed types. - * - *

Example

- *
-     * {@code
-     *
-     * Map parameters = new HashMap();
-     * parameters.put("myNameParam", "Bob");
-     *
-     * Result result = session.run( "MATCH (n) WHERE n.name = $myNameParam RETURN (n)",
-     *                                       parameters );
-     * }
-     * 
- * - * @param query text of a Neo4j query - * @param parameters input data for the query - * @return a stream of result values and associated metadata - */ - Result run(String query, Map parameters ); - - /** - * Run a query and return a result stream. - *

- * This method takes a set of parameters that will be injected into the - * query by Neo4j. Using parameters is highly encouraged, it helps avoid - * dangerous cypher injection attacks and improves database performance as - * Neo4j can re-use query plans more often. - *

- * This version of run takes a {@link Record} of parameters, which can be useful - * if you want to use the output of one query as input for another. - * - * @param query text of a Neo4j query - * @param parameters input data for the query - * @return a stream of result values and associated metadata - */ - Result run(String query, Record parameters ); - - /** - * Run a query and return a result stream. - * - * @param query text of a Neo4j query - * @return a stream of result values and associated metadata - */ - Result run(String query ); - - /** - * Run a query and return a result stream. - *

Example

- *
-     * {@code
-     *
-     * Query query = new Query( "MATCH (n) WHERE n.name = $myNameParam RETURN n.age" );
-     * Result result = session.run( query.withParameters( Values.parameters( "myNameParam", "Bob" )  ) );
-     * }
-     * 
- * - * @param query a Neo4j query - * @return a stream of result values and associated metadata - */ - Result run(Query query); } diff --git a/driver/src/main/java/org/neo4j/driver/Session.java b/driver/src/main/java/org/neo4j/driver/Session.java index d4823b4969..225835b69e 100644 --- a/driver/src/main/java/org/neo4j/driver/Session.java +++ b/driver/src/main/java/org/neo4j/driver/Session.java @@ -19,6 +19,7 @@ package org.neo4j.driver; import java.util.Map; +import java.util.function.Consumer; import org.neo4j.driver.async.AsyncSession; import org.neo4j.driver.util.Resource; @@ -87,67 +88,208 @@ public interface Session extends Resource, QueryRunner * @param work the {@link TransactionWork} to be applied to a new read transaction. * @param the return type of the given unit of work. * @return a result as returned by the given unit of work. + * @deprecated superseded by {@link #executeRead(TransactionCallback)}. */ + @Deprecated T readTransaction( TransactionWork work ); /** - * Execute a unit of work in a managed {@link AccessMode#READ read} transaction - * with the specified {@link TransactionConfig configuration}. + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. *

- * This transaction will automatically be committed unless an exception is - * thrown during query execution or by the user code. + * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. *

- * Managed transactions should not generally be explicitly committed (via - * {@link Transaction#commit()}). + * The provided unit of work should not return {@link Result} object. * - * @param work the {@link TransactionWork} to be applied to a new read transaction. + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a result as returned by the given unit of work. + */ + default T executeRead( TransactionCallback callback ) + { + return executeRead( callback, TransactionConfig.empty() ); + } + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + * + * @param contextConsumer the consumer representing the unit of work. + */ + default void executeReadWithoutResult( Consumer contextConsumer ) + { + executeRead( tc -> + { + contextConsumer.accept( tc ); + return null; + } ); + } + + /** + * Execute a unit of work in a managed {@link AccessMode#READ read} transaction with the specified {@link TransactionConfig configuration}. + *

+ * This transaction will automatically be committed unless an exception is thrown during query execution or by the user code. + *

+ * Managed transactions should not generally be explicitly committed (via {@link Transaction#commit()}). + * + * @param work the {@link TransactionWork} to be applied to a new read transaction. * @param config configuration for all transactions started to execute the unit of work. - * @param the return type of the given unit of work. + * @param the return type of the given unit of work. * @return a result as returned by the given unit of work. + * @deprecated superseded by {@link #executeRead(TransactionCallback, TransactionConfig)}. */ + @Deprecated T readTransaction( TransactionWork work, TransactionConfig config ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + * + * @param callback the callback representing the unit of work. + * @param config the transaction configuration for the managed transaction. + * @param the return type of the given unit of work. + * @return a result as returned by the given unit of work. + */ + T executeRead( TransactionCallback callback, TransactionConfig config ); + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + * + * @param contextConsumer the consumer representing the unit of work. + * @param config the transaction configuration for the managed transaction. + */ + default void executeReadWithoutResult( Consumer contextConsumer, TransactionConfig config ) + { + executeRead( tc -> + { + contextConsumer.accept( tc ); + return null; + }, config ); + } + /** * Execute a unit of work in a managed {@link AccessMode#WRITE write} transaction. *

- * This transaction will automatically be committed unless an exception is - * thrown during query execution or by the user code. + * This transaction will automatically be committed unless an exception is thrown during query execution or by the user code. *

- * Managed transactions should not generally be explicitly committed (via - * {@link Transaction#commit()}). + * Managed transactions should not generally be explicitly committed (via {@link Transaction#commit()}). * * @param work the {@link TransactionWork} to be applied to a new write transaction. - * @param the return type of the given unit of work. + * @param the return type of the given unit of work. * @return a result as returned by the given unit of work. + * @deprecated superseded by {@link #executeWrite(TransactionCallback)}. */ + @Deprecated T writeTransaction( TransactionWork work ); /** - * Execute a unit of work in a managed {@link AccessMode#WRITE write} transaction - * with the specified {@link TransactionConfig configuration}. + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. *

- * This transaction will automatically be committed unless an exception is - * thrown during query execution or by the user code. + * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. *

- * Managed transactions should not generally be explicitly committed (via - * {@link Transaction#commit()}). + * The provided unit of work should not return {@link Result} object. * - * @param work the {@link TransactionWork} to be applied to a new write transaction. + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a result as returned by the given unit of work. + */ + default T executeWrite( TransactionCallback callback ) + { + return executeWrite( callback, TransactionConfig.empty() ); + } + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + * + * @param contextConsumer the consumer representing the unit of work. + */ + default void executeWriteWithoutResult( Consumer contextConsumer ) + { + executeWrite( tc -> + { + contextConsumer.accept( tc ); + return null; + } ); + } + + /** + * Execute a unit of work in a managed {@link AccessMode#WRITE write} transaction with the specified {@link TransactionConfig configuration}. + *

+ * This transaction will automatically be committed unless an exception is thrown during query execution or by the user code. + *

+ * Managed transactions should not generally be explicitly committed (via {@link Transaction#commit()}). + * + * @param work the {@link TransactionWork} to be applied to a new write transaction. * @param config configuration for all transactions started to execute the unit of work. - * @param the return type of the given unit of work. + * @param the return type of the given unit of work. * @return a result as returned by the given unit of work. + * @deprecated superseded by {@link #executeWrite(TransactionCallback, TransactionConfig)}. */ + @Deprecated T writeTransaction( TransactionWork work, TransactionConfig config ); /** - * Run a query in a managed auto-commit transaction with the specified - * {@link TransactionConfig configuration}, and return a result stream. + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. * - * @param query text of a Neo4j query. + * @param callback the callback representing the unit of work. + * @param config the transaction configuration for the managed transaction. + * @param the return type of the given unit of work. + * @return a result as returned by the given unit of work. + */ + T executeWrite( TransactionCallback callback, TransactionConfig config ); + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + * + * @param contextConsumer the consumer representing the unit of work. + * @param config the transaction configuration for the managed transaction. + */ + default void executeWriteWithoutResult( Consumer contextConsumer, TransactionConfig config ) + { + executeWrite( tc -> + { + contextConsumer.accept( tc ); + return null; + }, config ); + } + + /** + * Run a query in a managed auto-commit transaction with the specified {@link TransactionConfig configuration}, and return a result stream. + * + * @param query text of a Neo4j query. * @param config configuration for the new transaction. * @return a stream of result values and associated metadata. */ - Result run(String query, TransactionConfig config ); + Result run( String query, TransactionConfig config ); /** * Run a query with parameters in a managed auto-commit transaction with the diff --git a/driver/src/main/java/org/neo4j/driver/SimpleQueryRunner.java b/driver/src/main/java/org/neo4j/driver/SimpleQueryRunner.java new file mode 100644 index 0000000000..8e3101e766 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/SimpleQueryRunner.java @@ -0,0 +1,148 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver; + +import java.util.Map; + +/** + * Common interface for components that can execute Neo4j queries. + * + *

Important notes on semantics

+ *

+ * queries run in the same {@link QueryRunner} are guaranteed to execute in order, meaning changes made by one query will be seen by all subsequent queries in + * the same {@link QueryRunner}. + *

+ * However, to allow handling very large results, and to improve performance, result streams are retrieved lazily from the network. This means that when any of + * {@link #run(Query)} methods return a result, the query has only started executing - it may not have completed yet. Most of the time, you will not notice + * this, because the driver automatically waits for queries to complete at specific points to fulfill its contracts. + *

+ * Specifically, the driver will ensure all outstanding queries are completed whenever you: + * + *

    + *
  • Read from or discard a result, for instance via + * {@link Result#next()} or {@link Result#consume()}
  • + *
  • Explicitly commit/rollback a transaction using blocking {@link Transaction#close()}
  • + *
  • Close a session using blocking {@link Session#close()}
  • + *
+ *

+ * As noted, most of the time, you will not need to consider this - your writes will + * always be durably stored as long as you either use the results, explicitly commit + * {@link Transaction transactions} or close the session you utilised using {@link Session#close()}. + *

+ * While these semantics introduce some complexity, it gives the driver the ability + * to handle infinite result streams (like subscribing to events), significantly lowers + * the memory overhead for your application and improves performance. + * + * @see Session + * @see Transaction + * @since 5.0 + */ +public interface SimpleQueryRunner +{ + /** + * Run a query and return a result stream. + *

+ * This method takes a set of parameters that will be injected into the query by Neo4j. Using parameters is highly encouraged, it helps avoid dangerous + * cypher injection attacks and improves database performance as Neo4j can re-use query plans more often. + *

+ * This particular method takes a {@link Value} as its input. This is useful if you want to take a map-like value that you've gotten from a prior result and + * send it back as parameters. + *

+ * If you are creating parameters programmatically, {@link #run(String, Map)} might be more helpful, it converts your map to a {@link Value} for you. + * + *

Example

+ *
+     * {@code
+     *
+     * Result result = session.run( "MATCH (n) WHERE n.name = $myNameParam RETURN (n)",
+     *                                       Values.parameters( "myNameParam", "Bob" ) );
+     * }
+     * 
+ * + * @param query text of a Neo4j query + * @param parameters input parameters, should be a map Value, see {@link Values#parameters(Object...)}. + * @return a stream of result values and associated metadata + */ + Result run( String query, Value parameters ); + + /** + * Run a query and return a result stream. + *

+ * This method takes a set of parameters that will be injected into the query by Neo4j. Using parameters is highly encouraged, it helps avoid dangerous + * cypher injection attacks and improves database performance as Neo4j can re-use query plans more often. + *

+ * This version of run takes a {@link Map} of parameters. The values in the map must be values that can be converted to Neo4j types. See {@link + * Values#parameters(Object...)} for a list of allowed types. + * + *

Example

+ *
+     * {@code
+     *
+     * Map parameters = new HashMap();
+     * parameters.put("myNameParam", "Bob");
+     *
+     * Result result = session.run( "MATCH (n) WHERE n.name = $myNameParam RETURN (n)",
+     *                                       parameters );
+     * }
+     * 
+ * + * @param query text of a Neo4j query + * @param parameters input data for the query + * @return a stream of result values and associated metadata + */ + Result run( String query, Map parameters ); + + /** + * Run a query and return a result stream. + *

+ * This method takes a set of parameters that will be injected into the query by Neo4j. Using parameters is highly encouraged, it helps avoid dangerous + * cypher injection attacks and improves database performance as Neo4j can re-use query plans more often. + *

+ * This version of run takes a {@link Record} of parameters, which can be useful if you want to use the output of one query as input for another. + * + * @param query text of a Neo4j query + * @param parameters input data for the query + * @return a stream of result values and associated metadata + */ + Result run( String query, Record parameters ); + + /** + * Run a query and return a result stream. + * + * @param query text of a Neo4j query + * @return a stream of result values and associated metadata + */ + Result run( String query ); + + /** + * Run a query and return a result stream. + *

Example

+ *
+     * {@code
+     *
+     * Query query = new Query( "MATCH (n) WHERE n.name = $myNameParam RETURN n.age" );
+     * Result result = session.run( query.withParameters( Values.parameters( "myNameParam", "Bob" )  ) );
+     * }
+     * 
+ * + * @param query a Neo4j query + * @return a stream of result values and associated metadata + */ + Result run( Query query ); +} diff --git a/driver/src/main/java/org/neo4j/driver/TransactionCallback.java b/driver/src/main/java/org/neo4j/driver/TransactionCallback.java new file mode 100644 index 0000000000..1378957831 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/TransactionCallback.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver; + +public interface TransactionCallback +{ + T execute( TransactionContext context ); +} diff --git a/driver/src/main/java/org/neo4j/driver/TransactionContext.java b/driver/src/main/java/org/neo4j/driver/TransactionContext.java new file mode 100644 index 0000000000..42d7e06746 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/TransactionContext.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver; + +public interface TransactionContext extends SimpleQueryRunner +{ +} diff --git a/driver/src/main/java/org/neo4j/driver/TransactionWork.java b/driver/src/main/java/org/neo4j/driver/TransactionWork.java index f78b0a7cc7..eba2913e24 100644 --- a/driver/src/main/java/org/neo4j/driver/TransactionWork.java +++ b/driver/src/main/java/org/neo4j/driver/TransactionWork.java @@ -25,6 +25,7 @@ * * @param the return type of this work. */ +@Deprecated public interface TransactionWork { /** diff --git a/driver/src/main/java/org/neo4j/driver/async/AsyncSession.java b/driver/src/main/java/org/neo4j/driver/async/AsyncSession.java index a925612066..e391f54e2c 100644 --- a/driver/src/main/java/org/neo4j/driver/async/AsyncSession.java +++ b/driver/src/main/java/org/neo4j/driver/async/AsyncSession.java @@ -22,14 +22,16 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; +import java.util.function.Consumer; import java.util.function.Function; import org.neo4j.driver.AccessMode; +import org.neo4j.driver.Bookmark; import org.neo4j.driver.Query; +import org.neo4j.driver.Result; import org.neo4j.driver.Transaction; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.Values; -import org.neo4j.driver.Bookmark; /** * Provides a context of work for database interactions. @@ -133,9 +135,54 @@ public interface AsyncSession extends AsyncQueryRunner * @param the return type of the given unit of work. * @return a {@link CompletionStage completion stage} completed with the same result as returned by the given * unit of work. Stage can be completed exceptionally if given work or commit fails. + * @deprecated superseded by {@link #executeReadAsync(AsyncTransactionCallback)}. */ + @Deprecated CompletionStage readTransactionAsync( AsyncTransactionWork> work ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a completion stage that completes successfully with the result of the unit of work on success or completes exceptionally otherwise. + */ + default CompletionStage executeReadAsync( AsyncTransactionCallback> callback ) + { + return executeReadAsync( callback, TransactionConfig.empty() ); + } + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param contextConsumer the consumer representing the unit of work. + * @return a completion stage that completes successfully on success or exceptionally otherwise. + */ + default CompletionStage executeReadWithoutResultAsync( Consumer contextConsumer ) + { + return executeReadAsync( tc -> + { + contextConsumer.accept( tc ); + return null; + } ); + } + /** * Execute given unit of asynchronous work in a {@link AccessMode#READ read} asynchronous transaction with * the specified {@link TransactionConfig configuration}. @@ -158,9 +205,53 @@ public interface AsyncSession extends AsyncQueryRunner * @param the return type of the given unit of work. * @return a {@link CompletionStage completion stage} completed with the same result as returned by the given * unit of work. Stage can be completed exceptionally if given work or commit fails. + * @deprecated superseded by {@link #executeReadAsync(AsyncTransactionCallback, TransactionConfig)}. */ + @Deprecated CompletionStage readTransactionAsync( AsyncTransactionWork> work, TransactionConfig config ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @param the return type of the given unit of work. + * @return a completion stage that completes successfully with the result of the unit of work on success or completes exceptionally otherwise. + */ + CompletionStage executeReadAsync( AsyncTransactionCallback> callback, TransactionConfig config ); + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param contextConsumer the consumer representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @return a completion stage that completes successfully on success or exceptionally otherwise. + */ + default CompletionStage executeReadWithoutResultAsync( Consumer contextConsumer, TransactionConfig config ) + { + return executeReadAsync( tc -> + { + contextConsumer.accept( tc ); + return null; + }, config ); + } + /** * Execute given unit of asynchronous work in a {@link AccessMode#WRITE write} asynchronous transaction. *

@@ -181,9 +272,54 @@ public interface AsyncSession extends AsyncQueryRunner * @param the return type of the given unit of work. * @return a {@link CompletionStage completion stage} completed with the same result as returned by the given * unit of work. Stage can be completed exceptionally if given work or commit fails. + * @deprecated superseded by {@link #executeWriteAsync(AsyncTransactionCallback)}. */ + @Deprecated CompletionStage writeTransactionAsync( AsyncTransactionWork> work ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a completion stage that completes successfully with the result of the unit of work on success or completes exceptionally otherwise. + */ + default CompletionStage executeWriteAsync( AsyncTransactionCallback> callback ) + { + return executeWriteAsync( callback, TransactionConfig.empty() ); + } + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param contextConsumer the consumer representing the unit of work. + * @return a completion stage that completes successfully on success or exceptionally otherwise. + */ + default CompletionStage executeWriteWithoutResultAsync( Consumer contextConsumer ) + { + return executeWriteAsync( tc -> + { + contextConsumer.accept( tc ); + return null; + } ); + } + /** * Execute given unit of asynchronous work in a {@link AccessMode#WRITE write} asynchronous transaction with * the specified {@link TransactionConfig configuration}. @@ -206,9 +342,53 @@ public interface AsyncSession extends AsyncQueryRunner * @param the return type of the given unit of work. * @return a {@link CompletionStage completion stage} completed with the same result as returned by the given * unit of work. Stage can be completed exceptionally if given work or commit fails. + * @deprecated superseded by {@link #executeWriteAsync(AsyncTransactionCallback, TransactionConfig)}. */ + @Deprecated CompletionStage writeTransactionAsync( AsyncTransactionWork> work, TransactionConfig config ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @param the return type of the given unit of work. + * @return a completion stage that completes successfully with the result of the unit of work on success or completes exceptionally otherwise. + */ + CompletionStage executeWriteAsync( AsyncTransactionCallback> callback, TransactionConfig config ); + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param contextConsumer the consumer representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @return a completion stage that completes successfully on success or exceptionally otherwise. + */ + default CompletionStage executeWriteWithoutResultAsync( Consumer contextConsumer, TransactionConfig config ) + { + return executeWriteAsync( tc -> + { + contextConsumer.accept( tc ); + return null; + }, config ); + } + /** * Run a query asynchronously in an auto-commit transaction with the specified {@link TransactionConfig configuration} and return a * {@link CompletionStage} with a result cursor. diff --git a/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionCallback.java b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionCallback.java new file mode 100644 index 0000000000..b93f214ff2 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionCallback.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.async; + +public interface AsyncTransactionCallback +{ + T execute( AsyncTransactionContext tx ); +} diff --git a/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionContext.java b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionContext.java new file mode 100644 index 0000000000..f9acb7d4c6 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionContext.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.async; + +public interface AsyncTransactionContext extends AsyncQueryRunner +{ +} diff --git a/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionWork.java b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionWork.java index fc9a29cb91..4197922b91 100644 --- a/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionWork.java +++ b/driver/src/main/java/org/neo4j/driver/async/AsyncTransactionWork.java @@ -26,6 +26,7 @@ * @param the return type of this work. * @since 4.0 */ +@Deprecated public interface AsyncTransactionWork { /** diff --git a/driver/src/main/java/org/neo4j/driver/internal/DelegatingTransactionContext.java b/driver/src/main/java/org/neo4j/driver/internal/DelegatingTransactionContext.java new file mode 100644 index 0000000000..874d6e36f1 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/DelegatingTransactionContext.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + +import java.util.Map; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Result; +import org.neo4j.driver.Transaction; +import org.neo4j.driver.TransactionContext; +import org.neo4j.driver.Value; + +final class DelegatingTransactionContext implements TransactionContext +{ + private final Transaction delegate; + + public DelegatingTransactionContext( Transaction delegate ) + { + this.delegate = delegate; + } + + @Override + public Result run( String query, Value parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public Result run( String query, Map parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public Result run( String query, Record parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public Result run( String query ) + { + return delegate.run( query ); + } + + @Override + public Result run( Query query ) + { + return delegate.run( query ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java index e11133ddee..33008f71c4 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java @@ -26,6 +26,7 @@ import org.neo4j.driver.Result; import org.neo4j.driver.Session; import org.neo4j.driver.Transaction; +import org.neo4j.driver.TransactionCallback; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.TransactionWork; import org.neo4j.driver.async.ResultCursor; @@ -112,6 +113,12 @@ public T readTransaction( TransactionWork work, TransactionConfig config return transaction( AccessMode.READ, work, config ); } + @Override + public T executeRead( TransactionCallback callback, TransactionConfig config ) + { + return readTransaction( tx -> callback.execute( new DelegatingTransactionContext( tx ) ), config ); + } + @Override public T writeTransaction( TransactionWork work ) { @@ -124,6 +131,12 @@ public T writeTransaction( TransactionWork work, TransactionConfig config return transaction( AccessMode.WRITE, work, config ); } + @Override + public T executeWrite( TransactionCallback callback, TransactionConfig config ) + { + return writeTransaction( tx -> callback.execute( new DelegatingTransactionContext( tx ) ), config ); + } + @Override public Bookmark lastBookmark() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/DelegatingAsyncTransactionContext.java b/driver/src/main/java/org/neo4j/driver/internal/async/DelegatingAsyncTransactionContext.java new file mode 100644 index 0000000000..f422df85f6 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/async/DelegatingAsyncTransactionContext.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.async; + +import java.util.Map; +import java.util.concurrent.CompletionStage; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Value; +import org.neo4j.driver.async.AsyncTransaction; +import org.neo4j.driver.async.AsyncTransactionContext; +import org.neo4j.driver.async.ResultCursor; + +final class DelegatingAsyncTransactionContext implements AsyncTransactionContext +{ + private final AsyncTransaction delegate; + + public DelegatingAsyncTransactionContext( AsyncTransaction delegate ) + { + this.delegate = delegate; + } + + @Override + public CompletionStage runAsync( String query, Value parameters ) + { + return delegate.runAsync( query, parameters ); + } + + @Override + public CompletionStage runAsync( String query, Map parameters ) + { + return delegate.runAsync( query, parameters ); + } + + @Override + public CompletionStage runAsync( String query, Record parameters ) + { + return delegate.runAsync( query, parameters ); + } + + @Override + public CompletionStage runAsync( String query ) + { + return delegate.runAsync( query ); + } + + @Override + public CompletionStage runAsync( Query query ) + { + return delegate.runAsync( query ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java index efc291933b..23c12fded6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java @@ -28,6 +28,7 @@ import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.async.AsyncSession; import org.neo4j.driver.async.AsyncTransaction; +import org.neo4j.driver.async.AsyncTransactionCallback; import org.neo4j.driver.async.AsyncTransactionWork; import org.neo4j.driver.async.ResultCursor; import org.neo4j.driver.internal.util.Futures; @@ -99,6 +100,12 @@ public CompletionStage readTransactionAsync( AsyncTransactionWork CompletionStage executeReadAsync( AsyncTransactionCallback> callback, TransactionConfig config ) + { + return readTransactionAsync( tx -> callback.execute( new DelegatingAsyncTransactionContext( tx ) ), config ); + } + @Override public CompletionStage writeTransactionAsync( AsyncTransactionWork> work ) { @@ -111,6 +118,12 @@ public CompletionStage writeTransactionAsync( AsyncTransactionWork CompletionStage executeWriteAsync( AsyncTransactionCallback> callback, TransactionConfig config ) + { + return writeTransactionAsync( tx -> callback.execute( new DelegatingAsyncTransactionContext( tx ) ), config ); + } + @Override public Bookmark lastBookmark() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/DelegatingRxTransactionContext.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/DelegatingRxTransactionContext.java new file mode 100644 index 0000000000..d78116230b --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/DelegatingRxTransactionContext.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.reactive; + +import java.util.Map; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Value; +import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxTransaction; +import org.neo4j.driver.reactive.RxTransactionContext; + +final class DelegatingRxTransactionContext implements RxTransactionContext +{ + private final RxTransaction delegate; + + public DelegatingRxTransactionContext( RxTransaction delegate ) + { + this.delegate = delegate; + } + + @Override + public RxResult run( String query, Value parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public RxResult run( String query, Map parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public RxResult run( String query, Record parameters ) + { + return delegate.run( query, parameters ); + } + + @Override + public RxResult run( String query ) + { + return delegate.run( query ); + } + + @Override + public RxResult run( Query query ) + { + return delegate.run( query ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java index 1907ace63f..0403a736b9 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java @@ -35,6 +35,7 @@ import org.neo4j.driver.reactive.RxResult; import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.reactive.RxTransaction; +import org.neo4j.driver.reactive.RxTransactionCallback; import org.neo4j.driver.reactive.RxTransactionWork; import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher; @@ -115,6 +116,12 @@ public Publisher readTransaction( RxTransactionWork Publisher executeRead( RxTransactionCallback> callback, TransactionConfig config ) + { + return readTransaction( tx -> callback.execute( new DelegatingRxTransactionContext( tx ) ), config ); + } + @Override public Publisher writeTransaction( RxTransactionWork> work ) { @@ -127,6 +134,12 @@ public Publisher writeTransaction( RxTransactionWork Publisher executeWrite( RxTransactionCallback> callback, TransactionConfig config ) + { + return writeTransaction( tx -> callback.execute( new DelegatingRxTransactionContext( tx ) ), config ); + } + private Publisher runTransaction( AccessMode mode, RxTransactionWork> work, TransactionConfig config ) { Flux repeatableWork = Flux.usingWhen( beginTransaction( mode, config ), work::execute, @@ -135,7 +148,7 @@ private Publisher runTransaction( AccessMode mode, RxTransactionWork the return type of the given unit of work. * @return a {@link Publisher publisher} completed with the same result as returned by the given unit of work. * publisher can be completed exceptionally if given work or commit fails. + * @deprecated superseded by {@link #executeRead(RxTransactionCallback)}. * */ + @Deprecated Publisher readTransaction( RxTransactionWork> work ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a publisher that emits the result of the unit of work and success signals on success or error otherwise. + */ + default Publisher executeRead( RxTransactionCallback> callback ) + { + return executeRead( callback, TransactionConfig.empty() ); + } + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param contextConsumer the callback representing the unit of work. + * @return a publisher that emits success signal on success or error otherwise. + */ + default Publisher executeReadWithoutResult( Consumer contextConsumer ) + { + return executeRead( tc -> + { + contextConsumer.accept( tc ); + return null; + } ); + } + /** * Execute given unit of reactive work in a {@link AccessMode#READ read} reactive transaction with * the specified {@link TransactionConfig configuration}. @@ -104,10 +152,53 @@ public interface RxSession extends RxQueryRunner * @param the return type of the given unit of work. * @return a {@link Publisher publisher} completed with the same result as returned by the given unit of work. * publisher can be completed exceptionally if given work or commit fails. - * + * @deprecated superseded by {@link #executeRead(RxTransactionCallback, TransactionConfig)}. */ + @Deprecated Publisher readTransaction( RxTransactionWork> work, TransactionConfig config ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @param the return type of the given unit of work. + * @return a publisher that emits the result of the unit of work and success signals on success or error otherwise. + */ + Publisher executeRead( RxTransactionCallback> callback, TransactionConfig config ); + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#READ read} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param contextConsumer the callback representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @return a publisher that emits success signal on success or error otherwise. + */ + default Publisher executeReadWithoutResult( Consumer contextConsumer, TransactionConfig config ) + { + return executeRead( tc -> + { + contextConsumer.accept( tc ); + return null; + }, config ); + } + /** * Execute given unit of reactive work in a {@link AccessMode#WRITE write} reactive transaction.

@@ -125,10 +216,54 @@ public interface RxSession extends RxQueryRunner * @param the return type of the given unit of work. * @return a {@link Publisher publisher} completed with the same result as returned by the given unit of work. * publisher can be completed exceptionally if given work or commit fails. - * + * @deprecated superseded by {@link #executeWrite(RxTransactionCallback)}. */ + @Deprecated Publisher writeTransaction( RxTransactionWork> work ); + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param callback the callback representing the unit of work. + * @param the return type of the given unit of work. + * @return a publisher that emits the result of the unit of work and success signals on success or error otherwise. + */ + default Publisher executeWrite( RxTransactionCallback> callback ) + { + return executeWrite( callback, TransactionConfig.empty() ); + } + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param contextConsumer the callback representing the unit of work. + * @return a publisher that emits success signal on success or error otherwise. + */ + default Publisher executeWriteWithoutResult( Consumer contextConsumer ) + { + return executeWrite( tc -> + { + contextConsumer.accept( tc ); + return null; + } ); + } + /** * Execute given unit of reactive work in a {@link AccessMode#WRITE write} reactive transaction with * the specified {@link TransactionConfig configuration}. @@ -148,20 +283,62 @@ public interface RxSession extends RxQueryRunner * @param the return type of the given unit of work. * @return a {@link Publisher publisher} completed with the same result as returned by the given unit of work. * publisher can be completed exceptionally if given work or commit fails. - * + * @deprecated superseded by {@link #executeWrite(RxTransactionCallback, TransactionConfig)}. */ + @Deprecated Publisher writeTransaction( RxTransactionWork> work, TransactionConfig config ); /** - * Run a query with parameters in an auto-commit transaction with specified {@link TransactionConfig} and return a reactive result stream. - * The query is not executed when the reactive result is returned. - * Instead, the publishers in the result will actually start the execution of the query. + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. * - * @param query text of a Neo4j query. + * @param callback the callback representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @param the return type of the given unit of work. + * @return a publisher that emits the result of the unit of work and success signals on success or error otherwise. + */ + Publisher executeWrite( RxTransactionCallback> callback, TransactionConfig config ); + + /** + * Execute a unit of work as a single managed transaction with {@link AccessMode#WRITE write} access mode and retry behaviour. + *

+ * The driver will attempt committing the transaction when the provided unit of work completes successfully. A user initiated failure of the unit of work + * will result in rollback attempt. + *

+ * The provided unit of work should not return {@link Result} object. + *

+ * It is prohibited to block the thread completing the returned {@link CompletionStage}. Please avoid blocking operations or hand processing over to a + * different thread. + * + * @param contextConsumer the callback representing the unit of work. + * @param config configuration for all transactions started to execute the unit of work. + * @return a publisher that emits success signal on success or error otherwise. + */ + default Publisher executeWriteWithoutResult( Consumer contextConsumer, TransactionConfig config ) + { + return executeWrite( tc -> + { + contextConsumer.accept( tc ); + return null; + }, config ); + } + + /** + * Run a query with parameters in an auto-commit transaction with specified {@link TransactionConfig} and return a reactive result stream. The query is not + * executed when the reactive result is returned. Instead, the publishers in the result will actually start the execution of the query. + * + * @param query text of a Neo4j query. * @param config configuration for the new transaction. * @return a reactive result. */ - RxResult run(String query, TransactionConfig config ); + RxResult run( String query, TransactionConfig config ); /** * Run a query with parameters in an auto-commit transaction with specified {@link TransactionConfig} and return a reactive result stream. diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionCallback.java b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionCallback.java new file mode 100644 index 0000000000..e26ad842c7 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionCallback.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.reactive; + +public interface RxTransactionCallback +{ + T execute( RxTransactionContext tx ); +} diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionContext.java b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionContext.java new file mode 100644 index 0000000000..6e5d762407 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionContext.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.reactive; + +public interface RxTransactionContext extends RxQueryRunner +{ +} diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java index ef68a8118f..e7b4980853 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java @@ -26,6 +26,7 @@ * @param the return type of this work. * @since 4.0 */ +@Deprecated public interface RxTransactionWork { /** diff --git a/driver/src/test/java/org/neo4j/driver/util/SessionExtension.java b/driver/src/test/java/org/neo4j/driver/util/SessionExtension.java index e1c85057ea..f0d94b968d 100644 --- a/driver/src/test/java/org/neo4j/driver/util/SessionExtension.java +++ b/driver/src/test/java/org/neo4j/driver/util/SessionExtension.java @@ -30,6 +30,7 @@ import org.neo4j.driver.Result; import org.neo4j.driver.Session; import org.neo4j.driver.Transaction; +import org.neo4j.driver.TransactionCallback; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.TransactionWork; import org.neo4j.driver.Value; @@ -94,6 +95,12 @@ public T readTransaction( TransactionWork work, TransactionConfig config return realSession.readTransaction( work, config ); } + @Override + public T executeRead( TransactionCallback callback, TransactionConfig config ) + { + return realSession.executeRead( callback, config ); + } + @Override public T writeTransaction( TransactionWork work ) { @@ -106,6 +113,12 @@ public T writeTransaction( TransactionWork work, TransactionConfig config return realSession.writeTransaction( work, config ); } + @Override + public T executeWrite( TransactionCallback callback, TransactionConfig config ) + { + return realSession.executeWrite( callback, config ); + } + @Override public Bookmark lastBookmark() { @@ -113,9 +126,9 @@ public Bookmark lastBookmark() } @Override - public Result run(String query, Map parameters) + public Result run( String query, Map parameters ) { - return realSession.run(query, parameters); + return realSession.run( query, parameters ); } @Override