diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java index 62217f4f1c..30125127ca 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java @@ -30,6 +30,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; import java.util.stream.Collectors; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -45,6 +46,7 @@ import org.neo4j.driver.AuthToken; import org.neo4j.driver.AuthTokens; import org.neo4j.driver.Config; +import org.neo4j.driver.Logging; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.DefaultDomainNameResolver; import org.neo4j.driver.internal.DomainNameResolver; diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java index 7b8c1dcf34..24cde32b59 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java @@ -20,13 +20,11 @@ import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; -import java.time.Duration; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletionStage; import lombok.Getter; import lombok.Setter; -import neo4j.org.testkit.backend.CustomDriverError; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.holder.AsyncTransactionHolder; import neo4j.org.testkit.backend.holder.ReactiveTransactionHolder; @@ -34,6 +32,7 @@ import neo4j.org.testkit.backend.holder.RxTransactionHolder; import neo4j.org.testkit.backend.holder.SessionHolder; import neo4j.org.testkit.backend.holder.TransactionHolder; +import neo4j.org.testkit.backend.messages.requests.deserializer.TestkitCypherParamDeserializer; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import neo4j.org.testkit.backend.messages.responses.Transaction; import org.neo4j.driver.Session; @@ -45,33 +44,15 @@ @Setter @Getter -public class SessionBeginTransaction implements TestkitRequest { +public class SessionBeginTransaction extends WithTxConfig { private SessionBeginTransactionBody data; - private void configureTimeout(TransactionConfig.Builder builder) { - if (data.getTimeoutPresent()) { - try { - if (data.getTimeout() != null) { - builder.withTimeout(Duration.ofMillis(data.getTimeout())); - } else { - builder.withDefaultTimeout(); - } - } catch (IllegalArgumentException e) { - throw new CustomDriverError(e); - } - } - } - @Override public TestkitResponse process(TestkitState testkitState) { SessionHolder sessionHolder = testkitState.getSessionHolder(data.getSessionId()); Session session = sessionHolder.getSession(); - TransactionConfig.Builder builder = TransactionConfig.builder(); - Optional.ofNullable(data.txMeta).ifPresent(builder::withMetadata); - configureTimeout(builder); - - org.neo4j.driver.Transaction transaction = session.beginTransaction(builder.build()); + org.neo4j.driver.Transaction transaction = session.beginTransaction(getTxConfig()); return transaction(testkitState.addTransactionHolder(new TransactionHolder(sessionHolder, transaction))); } @@ -80,11 +61,8 @@ public CompletionStage processAsync(TestkitState testkitState) return testkitState.getAsyncSessionHolder(data.getSessionId()).thenCompose(sessionHolder -> { AsyncSession session = sessionHolder.getSession(); TransactionConfig.Builder builder = TransactionConfig.builder(); - Optional.ofNullable(data.txMeta).ifPresent(builder::withMetadata); - - configureTimeout(builder); - return session.beginTransactionAsync(builder.build()) + return session.beginTransactionAsync(getTxConfig()) .thenApply(tx -> transaction( testkitState.addAsyncTransactionHolder(new AsyncTransactionHolder(sessionHolder, tx)))); }); @@ -96,11 +74,8 @@ public Mono processRx(TestkitState testkitState) { return testkitState.getRxSessionHolder(data.getSessionId()).flatMap(sessionHolder -> { RxSession session = sessionHolder.getSession(); TransactionConfig.Builder builder = TransactionConfig.builder(); - Optional.ofNullable(data.txMeta).ifPresent(builder::withMetadata); - configureTimeout(builder); - - return Mono.fromDirect(session.beginTransaction(builder.build())) + return Mono.fromDirect(session.beginTransaction(getTxConfig())) .map(tx -> transaction( testkitState.addRxTransactionHolder(new RxTransactionHolder(sessionHolder, tx)))); }); @@ -111,11 +86,8 @@ public Mono processReactive(TestkitState testkitState) { return testkitState.getReactiveSessionHolder(data.getSessionId()).flatMap(sessionHolder -> { ReactiveSession session = sessionHolder.getSession(); TransactionConfig.Builder builder = TransactionConfig.builder(); - Optional.ofNullable(data.txMeta).ifPresent(builder::withMetadata); - - configureTimeout(builder); - return Mono.fromDirect(flowPublisherToFlux(session.beginTransaction(builder.build()))) + return Mono.fromDirect(flowPublisherToFlux(session.beginTransaction(getTxConfig()))) .map(tx -> transaction(testkitState.addReactiveTransactionHolder( new ReactiveTransactionHolder(sessionHolder, tx)))); }); @@ -126,11 +98,8 @@ public Mono processReactiveStreams(TestkitState testkitState) { return testkitState.getReactiveSessionStreamsHolder(data.getSessionId()).flatMap(sessionHolder -> { var session = sessionHolder.getSession(); TransactionConfig.Builder builder = TransactionConfig.builder(); - Optional.ofNullable(data.txMeta).ifPresent(builder::withMetadata); - - configureTimeout(builder); - return Mono.fromDirect(session.beginTransaction(builder.build())) + return Mono.fromDirect(session.beginTransaction(getTxConfig())) .map(tx -> transaction(testkitState.addReactiveTransactionStreamsHolder( new ReactiveTransactionStreamsHolder(sessionHolder, tx)))); }); @@ -144,9 +113,12 @@ private Transaction transaction(String txId) { @Getter @Setter - public static class SessionBeginTransactionBody { + public static class SessionBeginTransactionBody implements WithTxConfig.ITxConfigBody { private String sessionId; + + @JsonDeserialize(using = TestkitCypherParamDeserializer.class) private Map txMeta; + private Integer timeout; private Boolean timeoutPresent = false; diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java index 2d13c08e81..ff106c5662 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java @@ -21,6 +21,8 @@ import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; @@ -35,6 +37,7 @@ import neo4j.org.testkit.backend.holder.RxTransactionHolder; import neo4j.org.testkit.backend.holder.SessionHolder; import neo4j.org.testkit.backend.holder.TransactionHolder; +import neo4j.org.testkit.backend.messages.requests.deserializer.TestkitCypherParamDeserializer; import neo4j.org.testkit.backend.messages.responses.RetryableDone; import neo4j.org.testkit.backend.messages.responses.RetryableTry; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; @@ -49,7 +52,7 @@ @Setter @Getter -public class SessionReadTransaction implements TestkitRequest { +public class SessionReadTransaction extends WithTxConfig { private SessionReadTransactionBody data; @Override @@ -57,7 +60,7 @@ public class SessionReadTransaction implements TestkitRequest { public TestkitResponse process(TestkitState testkitState) { SessionHolder sessionHolder = testkitState.getSessionHolder(data.getSessionId()); Session session = sessionHolder.getSession(); - session.readTransaction(handle(testkitState, sessionHolder)); + session.readTransaction(handle(testkitState, sessionHolder), getTxConfig()); return retryableDone(); } @@ -78,7 +81,7 @@ public CompletionStage processAsync(TestkitState testkitState) return txWorkFuture; }; - return session.readTransactionAsync(workWrapper); + return session.readTransactionAsync(workWrapper, getTxConfig()); }) .thenApply(nothing -> retryableDone()); } @@ -97,7 +100,7 @@ public Mono processRx(TestkitState testkitState) { return Mono.fromCompletionStage(tryResult); }; - return Mono.fromDirect(sessionHolder.getSession().readTransaction(workWrapper)); + return Mono.fromDirect(sessionHolder.getSession().readTransaction(workWrapper, getTxConfig())); }) .then(Mono.just(retryableDone())); } @@ -117,7 +120,7 @@ public Mono processReactive(TestkitState testkitState) { }; return Mono.fromDirect( - flowPublisherToFlux(sessionHolder.getSession().executeRead(workWrapper))); + flowPublisherToFlux(sessionHolder.getSession().executeRead(workWrapper, getTxConfig()))); }) .then(Mono.just(retryableDone())); } @@ -137,7 +140,7 @@ public Mono processReactiveStreams(TestkitState testkitState) { return Mono.fromCompletionStage(tryResult); }; - return Mono.fromDirect(sessionHolder.getSession().executeRead(workWrapper)); + return Mono.fromDirect(sessionHolder.getSession().executeRead(workWrapper, getTxConfig())); }) .then(Mono.just(retryableDone())); } @@ -177,7 +180,18 @@ private RetryableDone retryableDone() { @Setter @Getter - public static class SessionReadTransactionBody { + public static class SessionReadTransactionBody implements WithTxConfig.ITxConfigBody { private String sessionId; + + @JsonDeserialize(using = TestkitCypherParamDeserializer.class) + private Map txMeta; + + private Integer timeout; + private Boolean timeoutPresent = false; + + public void setTimeout(Integer timeout) { + this.timeout = timeout; + timeoutPresent = true; + } } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java index 2e0eb237c4..3d4eb8385b 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java @@ -21,14 +21,12 @@ import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletionStage; import lombok.Getter; import lombok.Setter; -import neo4j.org.testkit.backend.CustomDriverError; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.holder.ReactiveResultHolder; import neo4j.org.testkit.backend.holder.ReactiveResultStreamsHolder; @@ -41,7 +39,6 @@ import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import org.neo4j.driver.Query; import org.neo4j.driver.Session; -import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.async.AsyncSession; import org.neo4j.driver.reactive.ReactiveSession; import org.neo4j.driver.reactive.RxResult; @@ -50,23 +47,9 @@ @Setter @Getter -public class SessionRun implements TestkitRequest { +public class SessionRun extends WithTxConfig { private SessionRunBody data; - private void configureTimeout(TransactionConfig.Builder builder) { - if (data.getTimeoutPresent()) { - try { - if (data.getTimeout() != null) { - builder.withTimeout(Duration.ofMillis(data.getTimeout())); - } else { - builder.withDefaultTimeout(); - } - } catch (IllegalArgumentException e) { - throw new CustomDriverError(e); - } - } - } - @Override public TestkitResponse process(TestkitState testkitState) { SessionHolder sessionHolder = testkitState.getSessionHolder(data.getSessionId()); @@ -74,10 +57,7 @@ public TestkitResponse process(TestkitState testkitState) { Query query = Optional.ofNullable(data.params) .map(params -> new Query(data.cypher, data.params)) .orElseGet(() -> new Query(data.cypher)); - TransactionConfig.Builder transactionConfig = TransactionConfig.builder(); - Optional.ofNullable(data.getTxMeta()).ifPresent(transactionConfig::withMetadata); - configureTimeout(transactionConfig); - org.neo4j.driver.Result result = session.run(query, transactionConfig.build()); + org.neo4j.driver.Result result = session.run(query, getTxConfig()); String id = testkitState.addResultHolder(new ResultHolder(sessionHolder, result)); return createResponse(id, result.keys()); @@ -90,11 +70,8 @@ public CompletionStage processAsync(TestkitState testkitState) Query query = Optional.ofNullable(data.params) .map(params -> new Query(data.cypher, data.params)) .orElseGet(() -> new Query(data.cypher)); - TransactionConfig.Builder transactionConfig = TransactionConfig.builder(); - Optional.ofNullable(data.getTxMeta()).ifPresent(transactionConfig::withMetadata); - configureTimeout(transactionConfig); - return session.runAsync(query, transactionConfig.build()).thenApply(resultCursor -> { + return session.runAsync(query, getTxConfig()).thenApply(resultCursor -> { String id = testkitState.addAsyncResultHolder(new ResultCursorHolder(sessionHolder, resultCursor)); return createResponse(id, resultCursor.keys()); }); @@ -109,11 +86,8 @@ public Mono processRx(TestkitState testkitState) { Query query = Optional.ofNullable(data.params) .map(params -> new Query(data.cypher, data.params)) .orElseGet(() -> new Query(data.cypher)); - TransactionConfig.Builder transactionConfig = TransactionConfig.builder(); - Optional.ofNullable(data.getTxMeta()).ifPresent(transactionConfig::withMetadata); - configureTimeout(transactionConfig); - RxResult result = session.run(query, transactionConfig.build()); + RxResult result = session.run(query, getTxConfig()); String id = testkitState.addRxResultHolder(new RxResultHolder(sessionHolder, result)); // The keys() method causes RUN message exchange. @@ -129,11 +103,8 @@ public Mono processReactive(TestkitState testkitState) { Query query = Optional.ofNullable(data.params) .map(params -> new Query(data.cypher, data.params)) .orElseGet(() -> new Query(data.cypher)); - TransactionConfig.Builder transactionConfig = TransactionConfig.builder(); - Optional.ofNullable(data.getTxMeta()).ifPresent(transactionConfig::withMetadata); - configureTimeout(transactionConfig); - return Mono.fromDirect(flowPublisherToFlux(session.run(query, transactionConfig.build()))) + return Mono.fromDirect(flowPublisherToFlux(session.run(query, getTxConfig()))) .map(result -> { String id = testkitState.addReactiveResultHolder(new ReactiveResultHolder(sessionHolder, result)); @@ -149,16 +120,12 @@ public Mono processReactiveStreams(TestkitState testkitState) { Query query = Optional.ofNullable(data.params) .map(params -> new Query(data.cypher, data.params)) .orElseGet(() -> new Query(data.cypher)); - TransactionConfig.Builder transactionConfig = TransactionConfig.builder(); - Optional.ofNullable(data.getTxMeta()).ifPresent(transactionConfig::withMetadata); - configureTimeout(transactionConfig); - return Mono.fromDirect(session.run(query, transactionConfig.build())) - .map(result -> { - String id = testkitState.addReactiveResultStreamsHolder( - new ReactiveResultStreamsHolder(sessionHolder, result)); - return createResponse(id, result.keys()); - }); + return Mono.fromDirect(session.run(query, getTxConfig())).map(result -> { + String id = testkitState.addReactiveResultStreamsHolder( + new ReactiveResultStreamsHolder(sessionHolder, result)); + return createResponse(id, result.keys()); + }); }); } @@ -170,13 +137,16 @@ private Result createResponse(String resultId, List keys) { @Setter @Getter - public static class SessionRunBody { + public static class SessionRunBody implements ITxConfigBody { @JsonDeserialize(using = TestkitCypherParamDeserializer.class) private Map params; private String sessionId; private String cypher; + + @JsonDeserialize(using = TestkitCypherParamDeserializer.class) private Map txMeta; + private Integer timeout; private Boolean timeoutPresent = false; diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java index 88abf48685..85bfc30c7c 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java @@ -21,6 +21,7 @@ import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -36,6 +37,7 @@ import neo4j.org.testkit.backend.holder.RxTransactionHolder; import neo4j.org.testkit.backend.holder.SessionHolder; import neo4j.org.testkit.backend.holder.TransactionHolder; +import neo4j.org.testkit.backend.messages.requests.deserializer.TestkitCypherParamDeserializer; import neo4j.org.testkit.backend.messages.responses.RetryableDone; import neo4j.org.testkit.backend.messages.responses.RetryableTry; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; @@ -50,7 +52,7 @@ @Setter @Getter -public class SessionWriteTransaction implements TestkitRequest { +public class SessionWriteTransaction extends WithTxConfig { private SessionWriteTransactionBody data; @Override @@ -58,7 +60,7 @@ public class SessionWriteTransaction implements TestkitRequest { public TestkitResponse process(TestkitState testkitState) { SessionHolder sessionHolder = testkitState.getSessionHolder(data.getSessionId()); Session session = sessionHolder.getSession(); - session.writeTransaction(handle(testkitState, sessionHolder)); + session.writeTransaction(handle(testkitState, sessionHolder), getTxConfig()); return retryableDone(); } @@ -79,7 +81,7 @@ public CompletionStage processAsync(TestkitState testkitState) return tryResult; }; - return session.writeTransactionAsync(workWrapper); + return session.writeTransactionAsync(workWrapper, getTxConfig()); }) .thenApply(nothing -> retryableDone()); } @@ -98,7 +100,7 @@ public Mono processRx(TestkitState testkitState) { return Mono.fromCompletionStage(tryResult); }; - return Mono.fromDirect(sessionHolder.getSession().writeTransaction(workWrapper)); + return Mono.fromDirect(sessionHolder.getSession().writeTransaction(workWrapper, getTxConfig())); }) .then(Mono.just(retryableDone())); } @@ -118,7 +120,7 @@ public Mono processReactive(TestkitState testkitState) { }; return Mono.fromDirect( - flowPublisherToFlux(sessionHolder.getSession().executeWrite(workWrapper))); + flowPublisherToFlux(sessionHolder.getSession().executeWrite(workWrapper, getTxConfig()))); }) .then(Mono.just(retryableDone())); } @@ -138,7 +140,7 @@ public Mono processReactiveStreams(TestkitState testkitState) { return Mono.fromCompletionStage(tryResult); }; - return Mono.fromDirect(sessionHolder.getSession().executeWrite(workWrapper)); + return Mono.fromDirect(sessionHolder.getSession().executeWrite(workWrapper, getTxConfig())); }) .then(Mono.just(retryableDone())); } @@ -178,9 +180,18 @@ private RetryableDone retryableDone() { @Setter @Getter - public static class SessionWriteTransactionBody { + public static class SessionWriteTransactionBody implements ITxConfigBody { private String sessionId; - private Map txMeta; - private String timeout; + + @JsonDeserialize(using = TestkitCypherParamDeserializer.class) + private Map txMeta; + + private Integer timeout; + private Boolean timeoutPresent = false; + + public void setTimeout(Integer timeout) { + this.timeout = timeout; + timeoutPresent = true; + } } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/WithTxConfig.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/WithTxConfig.java new file mode 100644 index 0000000000..8a3ee66645 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/WithTxConfig.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.messages.requests; + +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import neo4j.org.testkit.backend.CustomDriverError; +import org.neo4j.driver.TransactionConfig; + +public abstract class WithTxConfig implements TestkitRequest { + public abstract ITxConfigBody getData(); + + protected TransactionConfig.Builder configureTxTimeout(TransactionConfig.Builder builder) { + if (!getData().getTimeoutPresent()) return builder; + try { + Optional.ofNullable(getData().getTimeout()) + .ifPresentOrElse( + (timeout) -> builder.withTimeout(Duration.ofMillis(timeout)), builder::withDefaultTimeout); + } catch (IllegalArgumentException e) { + throw new CustomDriverError(e); + } + return builder; + } + + protected TransactionConfig.Builder configureTxMetadata(TransactionConfig.Builder builder) { + Optional.ofNullable(getData().getTxMeta()).ifPresent(builder::withMetadata); + return builder; + } + + protected TransactionConfig.Builder configureTx(TransactionConfig.Builder builder) { + return configureTxMetadata(configureTxTimeout(builder)); + } + + protected TransactionConfig getTxConfig() { + return configureTx(TransactionConfig.builder()).build(); + } + + public interface ITxConfigBody { + Boolean getTimeoutPresent(); + + Integer getTimeout(); + + Map getTxMeta(); + } +}