Skip to content

Commit

Permalink
TestKit backend: except txMeta as Cypher types
Browse files Browse the repository at this point in the history
  • Loading branch information
robsdedude committed Dec 7, 2022
1 parent 7519388 commit 697da00
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand All @@ -45,6 +46,7 @@
import org.neo4j.driver.AuthToken;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Config;
import org.neo4j.driver.Logging;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.DefaultDomainNameResolver;
import org.neo4j.driver.internal.DomainNameResolver;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,19 @@

import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux;

import java.time.Duration;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import lombok.Getter;
import lombok.Setter;
import neo4j.org.testkit.backend.CustomDriverError;
import neo4j.org.testkit.backend.TestkitState;
import neo4j.org.testkit.backend.holder.AsyncTransactionHolder;
import neo4j.org.testkit.backend.holder.ReactiveTransactionHolder;
import neo4j.org.testkit.backend.holder.ReactiveTransactionStreamsHolder;
import neo4j.org.testkit.backend.holder.RxTransactionHolder;
import neo4j.org.testkit.backend.holder.SessionHolder;
import neo4j.org.testkit.backend.holder.TransactionHolder;
import neo4j.org.testkit.backend.messages.requests.deserializer.TestkitCypherParamDeserializer;
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
import neo4j.org.testkit.backend.messages.responses.Transaction;
import org.neo4j.driver.Session;
Expand All @@ -45,33 +44,15 @@

@Setter
@Getter
public class SessionBeginTransaction implements TestkitRequest {
public class SessionBeginTransaction extends WithTxConfig {
private SessionBeginTransactionBody data;

private void configureTimeout(TransactionConfig.Builder builder) {
if (data.getTimeoutPresent()) {
try {
if (data.getTimeout() != null) {
builder.withTimeout(Duration.ofMillis(data.getTimeout()));
} else {
builder.withDefaultTimeout();
}
} catch (IllegalArgumentException e) {
throw new CustomDriverError(e);
}
}
}

@Override
public TestkitResponse process(TestkitState testkitState) {
SessionHolder sessionHolder = testkitState.getSessionHolder(data.getSessionId());
Session session = sessionHolder.getSession();
TransactionConfig.Builder builder = TransactionConfig.builder();
Optional.ofNullable(data.txMeta).ifPresent(builder::withMetadata);

configureTimeout(builder);

org.neo4j.driver.Transaction transaction = session.beginTransaction(builder.build());
org.neo4j.driver.Transaction transaction = session.beginTransaction(getTxConfig());
return transaction(testkitState.addTransactionHolder(new TransactionHolder(sessionHolder, transaction)));
}

Expand All @@ -80,11 +61,8 @@ public CompletionStage<TestkitResponse> processAsync(TestkitState testkitState)
return testkitState.getAsyncSessionHolder(data.getSessionId()).thenCompose(sessionHolder -> {
AsyncSession session = sessionHolder.getSession();
TransactionConfig.Builder builder = TransactionConfig.builder();
Optional.ofNullable(data.txMeta).ifPresent(builder::withMetadata);

configureTimeout(builder);

return session.beginTransactionAsync(builder.build())
return session.beginTransactionAsync(getTxConfig())
.thenApply(tx -> transaction(
testkitState.addAsyncTransactionHolder(new AsyncTransactionHolder(sessionHolder, tx))));
});
Expand All @@ -96,11 +74,8 @@ public Mono<TestkitResponse> processRx(TestkitState testkitState) {
return testkitState.getRxSessionHolder(data.getSessionId()).flatMap(sessionHolder -> {
RxSession session = sessionHolder.getSession();
TransactionConfig.Builder builder = TransactionConfig.builder();
Optional.ofNullable(data.txMeta).ifPresent(builder::withMetadata);

configureTimeout(builder);

return Mono.fromDirect(session.beginTransaction(builder.build()))
return Mono.fromDirect(session.beginTransaction(getTxConfig()))
.map(tx -> transaction(
testkitState.addRxTransactionHolder(new RxTransactionHolder(sessionHolder, tx))));
});
Expand All @@ -111,11 +86,8 @@ public Mono<TestkitResponse> processReactive(TestkitState testkitState) {
return testkitState.getReactiveSessionHolder(data.getSessionId()).flatMap(sessionHolder -> {
ReactiveSession session = sessionHolder.getSession();
TransactionConfig.Builder builder = TransactionConfig.builder();
Optional.ofNullable(data.txMeta).ifPresent(builder::withMetadata);

configureTimeout(builder);

return Mono.fromDirect(flowPublisherToFlux(session.beginTransaction(builder.build())))
return Mono.fromDirect(flowPublisherToFlux(session.beginTransaction(getTxConfig())))
.map(tx -> transaction(testkitState.addReactiveTransactionHolder(
new ReactiveTransactionHolder(sessionHolder, tx))));
});
Expand All @@ -126,11 +98,8 @@ public Mono<TestkitResponse> processReactiveStreams(TestkitState testkitState) {
return testkitState.getReactiveSessionStreamsHolder(data.getSessionId()).flatMap(sessionHolder -> {
var session = sessionHolder.getSession();
TransactionConfig.Builder builder = TransactionConfig.builder();
Optional.ofNullable(data.txMeta).ifPresent(builder::withMetadata);

configureTimeout(builder);

return Mono.fromDirect(session.beginTransaction(builder.build()))
return Mono.fromDirect(session.beginTransaction(getTxConfig()))
.map(tx -> transaction(testkitState.addReactiveTransactionStreamsHolder(
new ReactiveTransactionStreamsHolder(sessionHolder, tx))));
});
Expand All @@ -144,9 +113,12 @@ private Transaction transaction(String txId) {

@Getter
@Setter
public static class SessionBeginTransactionBody {
public static class SessionBeginTransactionBody implements WithTxConfig.ITxConfigBody {
private String sessionId;

@JsonDeserialize(using = TestkitCypherParamDeserializer.class)
private Map<String, Object> txMeta;

private Integer timeout;
private Boolean timeoutPresent = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux;
import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
Expand All @@ -35,6 +37,7 @@
import neo4j.org.testkit.backend.holder.RxTransactionHolder;
import neo4j.org.testkit.backend.holder.SessionHolder;
import neo4j.org.testkit.backend.holder.TransactionHolder;
import neo4j.org.testkit.backend.messages.requests.deserializer.TestkitCypherParamDeserializer;
import neo4j.org.testkit.backend.messages.responses.RetryableDone;
import neo4j.org.testkit.backend.messages.responses.RetryableTry;
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
Expand All @@ -49,15 +52,15 @@

@Setter
@Getter
public class SessionReadTransaction implements TestkitRequest {
public class SessionReadTransaction extends WithTxConfig {
private SessionReadTransactionBody data;

@Override
@SuppressWarnings("deprecation")
public TestkitResponse process(TestkitState testkitState) {
SessionHolder sessionHolder = testkitState.getSessionHolder(data.getSessionId());
Session session = sessionHolder.getSession();
session.readTransaction(handle(testkitState, sessionHolder));
session.readTransaction(handle(testkitState, sessionHolder), getTxConfig());
return retryableDone();
}

Expand All @@ -78,7 +81,7 @@ public CompletionStage<TestkitResponse> processAsync(TestkitState testkitState)
return txWorkFuture;
};

return session.readTransactionAsync(workWrapper);
return session.readTransactionAsync(workWrapper, getTxConfig());
})
.thenApply(nothing -> retryableDone());
}
Expand All @@ -97,7 +100,7 @@ public Mono<TestkitResponse> processRx(TestkitState testkitState) {
return Mono.fromCompletionStage(tryResult);
};

return Mono.fromDirect(sessionHolder.getSession().readTransaction(workWrapper));
return Mono.fromDirect(sessionHolder.getSession().readTransaction(workWrapper, getTxConfig()));
})
.then(Mono.just(retryableDone()));
}
Expand All @@ -117,7 +120,7 @@ public Mono<TestkitResponse> processReactive(TestkitState testkitState) {
};

return Mono.fromDirect(
flowPublisherToFlux(sessionHolder.getSession().executeRead(workWrapper)));
flowPublisherToFlux(sessionHolder.getSession().executeRead(workWrapper, getTxConfig())));
})
.then(Mono.just(retryableDone()));
}
Expand All @@ -137,7 +140,7 @@ public Mono<TestkitResponse> processReactiveStreams(TestkitState testkitState) {
return Mono.fromCompletionStage(tryResult);
};

return Mono.fromDirect(sessionHolder.getSession().executeRead(workWrapper));
return Mono.fromDirect(sessionHolder.getSession().executeRead(workWrapper, getTxConfig()));
})
.then(Mono.just(retryableDone()));
}
Expand Down Expand Up @@ -177,7 +180,18 @@ private RetryableDone retryableDone() {

@Setter
@Getter
public static class SessionReadTransactionBody {
public static class SessionReadTransactionBody implements WithTxConfig.ITxConfigBody {
private String sessionId;

@JsonDeserialize(using = TestkitCypherParamDeserializer.class)
private Map<String, Object> txMeta;

private Integer timeout;
private Boolean timeoutPresent = false;

public void setTimeout(Integer timeout) {
this.timeout = timeout;
timeoutPresent = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@
import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import lombok.Getter;
import lombok.Setter;
import neo4j.org.testkit.backend.CustomDriverError;
import neo4j.org.testkit.backend.TestkitState;
import neo4j.org.testkit.backend.holder.ReactiveResultHolder;
import neo4j.org.testkit.backend.holder.ReactiveResultStreamsHolder;
Expand All @@ -41,7 +39,6 @@
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
import org.neo4j.driver.Query;
import org.neo4j.driver.Session;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.async.AsyncSession;
import org.neo4j.driver.reactive.ReactiveSession;
import org.neo4j.driver.reactive.RxResult;
Expand All @@ -50,34 +47,17 @@

@Setter
@Getter
public class SessionRun implements TestkitRequest {
public class SessionRun extends WithTxConfig {
private SessionRunBody data;

private void configureTimeout(TransactionConfig.Builder builder) {
if (data.getTimeoutPresent()) {
try {
if (data.getTimeout() != null) {
builder.withTimeout(Duration.ofMillis(data.getTimeout()));
} else {
builder.withDefaultTimeout();
}
} catch (IllegalArgumentException e) {
throw new CustomDriverError(e);
}
}
}

@Override
public TestkitResponse process(TestkitState testkitState) {
SessionHolder sessionHolder = testkitState.getSessionHolder(data.getSessionId());
Session session = sessionHolder.getSession();
Query query = Optional.ofNullable(data.params)
.map(params -> new Query(data.cypher, data.params))
.orElseGet(() -> new Query(data.cypher));
TransactionConfig.Builder transactionConfig = TransactionConfig.builder();
Optional.ofNullable(data.getTxMeta()).ifPresent(transactionConfig::withMetadata);
configureTimeout(transactionConfig);
org.neo4j.driver.Result result = session.run(query, transactionConfig.build());
org.neo4j.driver.Result result = session.run(query, getTxConfig());
String id = testkitState.addResultHolder(new ResultHolder(sessionHolder, result));

return createResponse(id, result.keys());
Expand All @@ -90,11 +70,8 @@ public CompletionStage<TestkitResponse> processAsync(TestkitState testkitState)
Query query = Optional.ofNullable(data.params)
.map(params -> new Query(data.cypher, data.params))
.orElseGet(() -> new Query(data.cypher));
TransactionConfig.Builder transactionConfig = TransactionConfig.builder();
Optional.ofNullable(data.getTxMeta()).ifPresent(transactionConfig::withMetadata);
configureTimeout(transactionConfig);

return session.runAsync(query, transactionConfig.build()).thenApply(resultCursor -> {
return session.runAsync(query, getTxConfig()).thenApply(resultCursor -> {
String id = testkitState.addAsyncResultHolder(new ResultCursorHolder(sessionHolder, resultCursor));
return createResponse(id, resultCursor.keys());
});
Expand All @@ -109,11 +86,8 @@ public Mono<TestkitResponse> processRx(TestkitState testkitState) {
Query query = Optional.ofNullable(data.params)
.map(params -> new Query(data.cypher, data.params))
.orElseGet(() -> new Query(data.cypher));
TransactionConfig.Builder transactionConfig = TransactionConfig.builder();
Optional.ofNullable(data.getTxMeta()).ifPresent(transactionConfig::withMetadata);
configureTimeout(transactionConfig);

RxResult result = session.run(query, transactionConfig.build());
RxResult result = session.run(query, getTxConfig());
String id = testkitState.addRxResultHolder(new RxResultHolder(sessionHolder, result));

// The keys() method causes RUN message exchange.
Expand All @@ -129,11 +103,8 @@ public Mono<TestkitResponse> processReactive(TestkitState testkitState) {
Query query = Optional.ofNullable(data.params)
.map(params -> new Query(data.cypher, data.params))
.orElseGet(() -> new Query(data.cypher));
TransactionConfig.Builder transactionConfig = TransactionConfig.builder();
Optional.ofNullable(data.getTxMeta()).ifPresent(transactionConfig::withMetadata);
configureTimeout(transactionConfig);

return Mono.fromDirect(flowPublisherToFlux(session.run(query, transactionConfig.build())))
return Mono.fromDirect(flowPublisherToFlux(session.run(query, getTxConfig())))
.map(result -> {
String id =
testkitState.addReactiveResultHolder(new ReactiveResultHolder(sessionHolder, result));
Expand All @@ -149,16 +120,12 @@ public Mono<TestkitResponse> processReactiveStreams(TestkitState testkitState) {
Query query = Optional.ofNullable(data.params)
.map(params -> new Query(data.cypher, data.params))
.orElseGet(() -> new Query(data.cypher));
TransactionConfig.Builder transactionConfig = TransactionConfig.builder();
Optional.ofNullable(data.getTxMeta()).ifPresent(transactionConfig::withMetadata);
configureTimeout(transactionConfig);

return Mono.fromDirect(session.run(query, transactionConfig.build()))
.map(result -> {
String id = testkitState.addReactiveResultStreamsHolder(
new ReactiveResultStreamsHolder(sessionHolder, result));
return createResponse(id, result.keys());
});
return Mono.fromDirect(session.run(query, getTxConfig())).map(result -> {
String id = testkitState.addReactiveResultStreamsHolder(
new ReactiveResultStreamsHolder(sessionHolder, result));
return createResponse(id, result.keys());
});
});
}

Expand All @@ -170,13 +137,16 @@ private Result createResponse(String resultId, List<String> keys) {

@Setter
@Getter
public static class SessionRunBody {
public static class SessionRunBody implements ITxConfigBody {
@JsonDeserialize(using = TestkitCypherParamDeserializer.class)
private Map<String, Object> params;

private String sessionId;
private String cypher;

@JsonDeserialize(using = TestkitCypherParamDeserializer.class)
private Map<String, Object> txMeta;

private Integer timeout;
private Boolean timeoutPresent = false;

Expand Down
Loading

0 comments on commit 697da00

Please sign in to comment.