From 6d47ae3c5cba1a1db840afa5ad365e49df256062 Mon Sep 17 00:00:00 2001 From: Michael Simons Date: Wed, 26 Oct 2022 17:07:29 +0200 Subject: [PATCH] Use new typed session API. --- .../neo4j/http/db/DefaultNeo4jAdapter.java | 31 +++++++++---------- pom.xml | 2 +- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/neo4j-http/src/main/java/org/neo4j/http/db/DefaultNeo4jAdapter.java b/neo4j-http/src/main/java/org/neo4j/http/db/DefaultNeo4jAdapter.java index 13784d7..fb0f7e3 100644 --- a/neo4j-http/src/main/java/org/neo4j/http/db/DefaultNeo4jAdapter.java +++ b/neo4j-http/src/main/java/org/neo4j/http/db/DefaultNeo4jAdapter.java @@ -25,8 +25,9 @@ import org.neo4j.driver.Record; import org.neo4j.driver.SessionConfig; import org.neo4j.driver.exceptions.Neo4jException; -import org.neo4j.driver.reactive.RxQueryRunner; -import org.neo4j.driver.reactive.RxSession; +import org.neo4j.driver.reactivestreams.ReactiveQueryRunner; +import org.neo4j.driver.reactivestreams.ReactiveResult; +import org.neo4j.driver.reactivestreams.ReactiveSession; import org.neo4j.driver.summary.ResultSummary; import org.neo4j.http.config.ApplicationProperties; import org.reactivestreams.Publisher; @@ -60,15 +61,13 @@ class DefaultNeo4jAdapter implements Neo4jAdapter { } @Override - @SuppressWarnings({"deprecation", "RedundantSuppression"}) public Flux stream(Neo4jPrincipal principal, String database, Query query) { return queryEvaluator.getExecutionRequirements(principal, query.text()) - .flatMapMany(requirements -> this.execute0(principal, database, requirements, q -> Flux.from(q.run(query).records()))); + .flatMapMany(requirements -> this.execute0(principal, database, requirements, q -> Mono.fromDirect(q.run(query)).flatMapMany(ReactiveResult::records))); } @Override - @SuppressWarnings({"deprecation", "RedundantSuppression"}) public Mono run(Neo4jPrincipal principal, String database, AnnotatedQuery query, AnnotatedQuery... additionalQueries) { Flux queries = Flux.just(query); @@ -83,9 +82,10 @@ record ResultAndSummary(EagerResult result, ResultSummary summary) { .flatMap(q -> Mono.fromDirect(this.execute0(principal, database, q.getT2(), runner -> { var annotatedQuery = q.getT1(); var rxResult = runner.run(annotatedQuery.value()); - return Mono.fromDirect(rxResult.keys()) - .zipWith(Flux.from(rxResult.records()).collectList()) - .flatMap(v -> Mono.just(v).zipWith(Mono.fromDirect(rxResult.consume()), (t, s) -> Tuples.of(t.getT1(), t.getT2(), s))) + return Mono.fromDirect(rxResult) + .flatMap(reactiveResult -> Mono.just(reactiveResult.keys()) + .zipWith(Flux.from(reactiveResult.records()).collectList()) + .flatMap(v -> Mono.just(v).zipWith(Mono.fromDirect(reactiveResult.consume()), (t, s) -> Tuples.of(t.getT1(), t.getT2(), s)))) .map(content -> new ResultAndSummary(EagerResult.success(content, annotatedQuery.includeStats(), annotatedQuery.resultDataContents(), driver.defaultTypeSystem()), content.getT3())); }))).onErrorResume(Neo4jException.class, e -> Mono.just(new ResultAndSummary(EagerResult.error(e), null))) ) @@ -99,8 +99,7 @@ record ResultAndSummary(EagerResult result, ResultSummary summary) { }); } - @SuppressWarnings("deprecation") - Publisher execute0(Neo4jPrincipal principal, String database, QueryEvaluator.ExecutionRequirements requirements, Function> query) { + Publisher execute0(Neo4jPrincipal principal, String database, QueryEvaluator.ExecutionRequirements requirements, Function> query) { var sessionSupplier = queryEvaluator.isEnterpriseEdition(). flatMap(v -> { @@ -110,23 +109,23 @@ Publisher execute0(Neo4jPrincipal principal, String database, QueryEvalua .withDatabase(database) .withDefaultAccessMode(requirements.target() == QueryEvaluator.Target.WRITERS ? AccessMode.WRITE : AccessMode.READ) .build(); - return Mono.fromCallable(() -> driver.rxSession(sessionConfig)); + return Mono.fromCallable(() -> driver.reactiveSession(ReactiveSession.class, sessionConfig)); }); Flux flow; if (requirements.transactionMode() == QueryEvaluator.TransactionMode.IMPLICIT) { - flow = Flux.usingWhen(sessionSupplier, query, RxSession::close); + flow = Flux.usingWhen(sessionSupplier, query, ReactiveSession::close); } else { flow = switch (requirements.target()) { case WRITERS -> Flux.usingWhen( sessionSupplier, - session -> session.writeTransaction(query::apply), - RxSession::close + session -> session.executeWrite(query::apply), + ReactiveSession::close ); case READERS -> Flux.usingWhen( sessionSupplier, - session -> session.readTransaction(query::apply), - RxSession::close + session -> session.executeRead(query::apply), + ReactiveSession::close ); }; } diff --git a/pom.xml b/pom.xml index 481695b..8e59876 100644 --- a/pom.xml +++ b/pom.xml @@ -108,7 +108,7 @@ ${java.version} 3.8.6 - 5.1.0 + 5.0-SNAPSHOT 4.4.12 ${project.build.directory}/docs main