Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
Use new typed session API.
Browse files Browse the repository at this point in the history
  • Loading branch information
michael-simons committed Oct 26, 2022
1 parent 637870c commit 6d47ae3
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 17 deletions.
31 changes: 15 additions & 16 deletions neo4j-http/src/main/java/org/neo4j/http/db/DefaultNeo4jAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
import org.neo4j.driver.Record;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.reactive.RxQueryRunner;
import org.neo4j.driver.reactive.RxSession;
import org.neo4j.driver.reactivestreams.ReactiveQueryRunner;
import org.neo4j.driver.reactivestreams.ReactiveResult;
import org.neo4j.driver.reactivestreams.ReactiveSession;
import org.neo4j.driver.summary.ResultSummary;
import org.neo4j.http.config.ApplicationProperties;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -60,15 +61,13 @@ class DefaultNeo4jAdapter implements Neo4jAdapter {
}

@Override
@SuppressWarnings({"deprecation", "RedundantSuppression"})
public Flux<Record> stream(Neo4jPrincipal principal, String database, Query query) {

return queryEvaluator.getExecutionRequirements(principal, query.text())
.flatMapMany(requirements -> this.execute0(principal, database, requirements, q -> Flux.from(q.run(query).records())));
.flatMapMany(requirements -> this.execute0(principal, database, requirements, q -> Mono.fromDirect(q.run(query)).flatMapMany(ReactiveResult::records)));
}

@Override
@SuppressWarnings({"deprecation", "RedundantSuppression"})
public Mono<ResultContainer> run(Neo4jPrincipal principal, String database, AnnotatedQuery query, AnnotatedQuery... additionalQueries) {

Flux<AnnotatedQuery> queries = Flux.just(query);
Expand All @@ -83,9 +82,10 @@ record ResultAndSummary(EagerResult result, ResultSummary summary) {
.flatMap(q -> Mono.fromDirect(this.execute0(principal, database, q.getT2(), runner -> {
var annotatedQuery = q.getT1();
var rxResult = runner.run(annotatedQuery.value());
return Mono.fromDirect(rxResult.keys())
.zipWith(Flux.from(rxResult.records()).collectList())
.flatMap(v -> Mono.just(v).zipWith(Mono.fromDirect(rxResult.consume()), (t, s) -> Tuples.of(t.getT1(), t.getT2(), s)))
return Mono.fromDirect(rxResult)
.flatMap(reactiveResult -> Mono.just(reactiveResult.keys())
.zipWith(Flux.from(reactiveResult.records()).collectList())
.flatMap(v -> Mono.just(v).zipWith(Mono.fromDirect(reactiveResult.consume()), (t, s) -> Tuples.of(t.getT1(), t.getT2(), s))))
.map(content -> new ResultAndSummary(EagerResult.success(content, annotatedQuery.includeStats(), annotatedQuery.resultDataContents(), driver.defaultTypeSystem()), content.getT3()));
}))).onErrorResume(Neo4jException.class, e -> Mono.just(new ResultAndSummary(EagerResult.error(e), null)))
)
Expand All @@ -99,8 +99,7 @@ record ResultAndSummary(EagerResult result, ResultSummary summary) {
});
}

@SuppressWarnings("deprecation")
<T> Publisher<T> execute0(Neo4jPrincipal principal, String database, QueryEvaluator.ExecutionRequirements requirements, Function<RxQueryRunner, Publisher<T>> query) {
<T> Publisher<T> execute0(Neo4jPrincipal principal, String database, QueryEvaluator.ExecutionRequirements requirements, Function<ReactiveQueryRunner, Publisher<T>> query) {

var sessionSupplier = queryEvaluator.isEnterpriseEdition().
flatMap(v -> {
Expand All @@ -110,23 +109,23 @@ <T> Publisher<T> execute0(Neo4jPrincipal principal, String database, QueryEvalua
.withDatabase(database)
.withDefaultAccessMode(requirements.target() == QueryEvaluator.Target.WRITERS ? AccessMode.WRITE : AccessMode.READ)
.build();
return Mono.fromCallable(() -> driver.rxSession(sessionConfig));
return Mono.fromCallable(() -> driver.reactiveSession(ReactiveSession.class, sessionConfig));
});

Flux<T> flow;
if (requirements.transactionMode() == QueryEvaluator.TransactionMode.IMPLICIT) {
flow = Flux.usingWhen(sessionSupplier, query, RxSession::close);
flow = Flux.usingWhen(sessionSupplier, query, ReactiveSession::close);
} else {
flow = switch (requirements.target()) {
case WRITERS -> Flux.usingWhen(
sessionSupplier,
session -> session.writeTransaction(query::apply),
RxSession::close
session -> session.executeWrite(query::apply),
ReactiveSession::close
);
case READERS -> Flux.usingWhen(
sessionSupplier,
session -> session.readTransaction(query::apply),
RxSession::close
session -> session.executeRead(query::apply),
ReactiveSession::close
);
};
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
<maven.compiler.release>${java.version}</maven.compiler.release>
<maven.version>3.8.6</maven.version>
<!-- Different name than the one managed by boot to proper manage it centrally and be able to override it -->
<neo4j-driver.version>5.1.0</neo4j-driver.version>
<neo4j-driver.version>5.0-SNAPSHOT</neo4j-driver.version>
<neo4j.version>4.4.12</neo4j.version>
<project.build.docs>${project.build.directory}/docs</project.build.docs>
<project.build.docs.branch>main</project.build.docs.branch>
Expand Down

0 comments on commit 6d47ae3

Please sign in to comment.