Skip to content

Commit

Permalink
Introduce new Reactive Streams session
Browse files Browse the repository at this point in the history
This update introduces support for reactive session with Reactive Streams types. While it is similar to the deprecated `RxSession`, it includes the improvements introduced with the `ReactiveSession` that uses Flow API types.

Sample session creation:
```
var session = driver.reactiveSession(org.neo4j.driver.reactivestreams.ReactiveSession.class);
```.
  • Loading branch information
injectives committed Oct 25, 2022
1 parent 93c7b95 commit 0ca83f8
Show file tree
Hide file tree
Showing 65 changed files with 1,718 additions and 39 deletions.
12 changes: 12 additions & 0 deletions driver/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -397,4 +397,16 @@
<method>org.neo4j.driver.types.TypeSystem getDefault()</method>
</difference>

<difference>
<className>org/neo4j/driver/Driver</className>
<differenceType>7012</differenceType>
<method>org.neo4j.driver.BaseReactiveSession reactiveSession(java.lang.Class)</method>
</difference>

<difference>
<className>org/neo4j/driver/Driver</className>
<differenceType>7012</differenceType>
<method>org.neo4j.driver.BaseReactiveSession reactiveSession(java.lang.Class, org.neo4j.driver.SessionConfig)</method>
</difference>

</differences>
1 change: 1 addition & 0 deletions driver/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
exports org.neo4j.driver;
exports org.neo4j.driver.async;
exports org.neo4j.driver.reactive;
exports org.neo4j.driver.reactivestreams;
exports org.neo4j.driver.types;
exports org.neo4j.driver.summary;
exports org.neo4j.driver.net;
Expand Down
24 changes: 24 additions & 0 deletions driver/src/main/java/org/neo4j/driver/BaseReactiveSession.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver;

/**
* A base interface for reactive sessions, used by {@link Driver#reactiveSession(Class)} and {@link Driver#reactiveSession(Class, SessionConfig)}.
*/
public interface BaseReactiveSession {}
44 changes: 41 additions & 3 deletions driver/src/main/java/org/neo4j/driver/Driver.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ public interface Driver extends AutoCloseable {
*
* @return a new {@link Session} object.
*/
Session session();
default Session session() {
return session(SessionConfig.defaultConfig());
}

/**
* Create a new {@link Session} with a specified {@link SessionConfig session configuration}.
Expand Down Expand Up @@ -133,7 +135,41 @@ default ReactiveSession reactiveSession() {
* @param sessionConfig used to customize the session.
* @return a new {@link ReactiveSession} object.
*/
ReactiveSession reactiveSession(SessionConfig sessionConfig);
default ReactiveSession reactiveSession(SessionConfig sessionConfig) {
return reactiveSession(ReactiveSession.class, sessionConfig);
}

/**
* Create a new reactive session of supported type with default {@link SessionConfig session configuration}.
* <p>
* Supported types are:
* <ul>
* <li>{@link org.neo4j.driver.reactive.ReactiveSession} - reactive session using Flow API</li>
* <li>{@link org.neo4j.driver.reactivestreams.ReactiveSession} - reactive session using Reactive Streams API</li>
* </ul>
*
* @param sessionClass session type class
* @return session instance
* @param <T> session type
*/
default <T extends BaseReactiveSession> T reactiveSession(Class<T> sessionClass) {
return reactiveSession(sessionClass, SessionConfig.defaultConfig());
}

/**
* Create a new reactive session of supported type with a specified {@link SessionConfig session configuration}.
* <p>
* Supported types are:
* <ul>
* <li>{@link org.neo4j.driver.reactive.ReactiveSession} - reactive session using Flow API</li>
* <li>{@link org.neo4j.driver.reactivestreams.ReactiveSession} - reactive session using Reactive Streams API</li>
* </ul>
*
* @param sessionClass session type class
* @return session instance
* @param <T> session type
*/
<T extends BaseReactiveSession> T reactiveSession(Class<T> sessionClass, SessionConfig sessionConfig);

/**
* Create a new general purpose {@link AsyncSession} with default {@link SessionConfig session configuration}. The {@link AsyncSession} provides an
Expand All @@ -143,7 +179,9 @@ default ReactiveSession reactiveSession() {
*
* @return a new {@link AsyncSession} object.
*/
AsyncSession asyncSession();
default AsyncSession asyncSession() {
return asyncSession(SessionConfig.defaultConfig());
}

/**
* Create a new {@link AsyncSession} with a specified {@link SessionConfig session configuration}.
Expand Down
28 changes: 14 additions & 14 deletions driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
*/
package org.neo4j.driver.internal;

import static java.util.Objects.requireNonNull;
import static org.neo4j.driver.internal.util.Futures.completedWithNull;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import org.neo4j.driver.BaseReactiveSession;
import org.neo4j.driver.Driver;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
Expand All @@ -33,12 +35,10 @@
import org.neo4j.driver.internal.async.NetworkSession;
import org.neo4j.driver.internal.metrics.DevNullMetricsProvider;
import org.neo4j.driver.internal.metrics.MetricsProvider;
import org.neo4j.driver.internal.reactive.InternalReactiveSession;
import org.neo4j.driver.internal.reactive.InternalRxSession;
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.internal.types.InternalTypeSystem;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.reactive.ReactiveSession;
import org.neo4j.driver.reactive.RxSession;
import org.neo4j.driver.types.TypeSystem;

Expand All @@ -61,11 +61,6 @@ public class InternalDriver implements Driver {
this.log = logging.getLog(getClass());
}

@Override
public Session session() {
return new InternalSession(newSession(SessionConfig.defaultConfig()));
}

@Override
public Session session(SessionConfig sessionConfig) {
return new InternalSession(newSession(sessionConfig));
Expand All @@ -77,14 +72,19 @@ public RxSession rxSession(SessionConfig sessionConfig) {
return new InternalRxSession(newSession(sessionConfig));
}

@SuppressWarnings({"deprecation", "unchecked"})
@Override
public ReactiveSession reactiveSession(SessionConfig sessionConfig) {
return new InternalReactiveSession(newSession(sessionConfig));
}

@Override
public AsyncSession asyncSession() {
return new InternalAsyncSession(newSession(SessionConfig.defaultConfig()));
public <T extends BaseReactiveSession> T reactiveSession(Class<T> sessionClass, SessionConfig sessionConfig) {
requireNonNull(sessionClass, "sessionClass must not be null");
requireNonNull(sessionClass, "sessionConfig must not be null");
if (org.neo4j.driver.reactive.ReactiveSession.class.isAssignableFrom(sessionClass)) {
return (T) new org.neo4j.driver.internal.reactive.InternalReactiveSession(newSession(sessionConfig));
} else if (org.neo4j.driver.reactivestreams.ReactiveSession.class.isAssignableFrom(sessionClass)) {
return (T) new org.neo4j.driver.internal.reactivestreams.InternalReactiveSession(newSession(sessionConfig));
} else {
throw new IllegalArgumentException(
String.format("Unsupported session type '%s'", sessionClass.getCanonicalName()));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

abstract class AbstractReactiveSession<S> {
public abstract class AbstractReactiveSession<S> {
protected final NetworkSession session;

public AbstractReactiveSession(NetworkSession session) {
Expand All @@ -45,15 +45,15 @@ public AbstractReactiveSession(NetworkSession session) {
this.session = session;
}

abstract S createTransaction(UnmanagedTransaction unmanagedTransaction);
protected abstract S createTransaction(UnmanagedTransaction unmanagedTransaction);

abstract Publisher<Void> closeTransaction(S transaction, boolean commit);
protected abstract Publisher<Void> closeTransaction(S transaction, boolean commit);

Publisher<S> doBeginTransaction(TransactionConfig config) {
return doBeginTransaction(config, null);
}

Publisher<S> doBeginTransaction(TransactionConfig config, String txType) {
protected Publisher<S> doBeginTransaction(TransactionConfig config, String txType) {
return createSingleItemPublisher(
() -> {
CompletableFuture<S> txFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -87,7 +87,7 @@ Publisher<S> beginTransaction(AccessMode mode, TransactionConfig config) {
"Unexpected condition, begin transaction call has completed successfully with transaction being null"));
}

<T> Publisher<T> runTransaction(
protected <T> Publisher<T> runTransaction(
AccessMode mode, Function<S, ? extends Publisher<T>> work, TransactionConfig config) {
Flux<T> repeatableWork = Flux.usingWhen(
beginTransaction(mode, config),
Expand Down Expand Up @@ -119,7 +119,7 @@ public Set<Bookmark> lastBookmarks() {
return session.lastBookmarks();
}

<T> Publisher<T> doClose() {
protected <T> Publisher<T> doClose() {
return createEmptyPublisher(session::closeAsync);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,30 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

abstract class AbstractReactiveTransaction {
public abstract class AbstractReactiveTransaction {
protected final UnmanagedTransaction tx;

protected AbstractReactiveTransaction(UnmanagedTransaction tx) {
this.tx = tx;
}

<T> Publisher<T> doCommit() {
protected <T> Publisher<T> doCommit() {
return createEmptyPublisher(tx::commitAsync);
}

<T> Publisher<T> doRollback() {
protected <T> Publisher<T> doRollback() {
return createEmptyPublisher(tx::rollbackAsync);
}

Publisher<Void> doClose() {
protected Publisher<Void> doClose() {
return close(false);
}

Publisher<Boolean> doIsOpen() {
protected Publisher<Boolean> doIsOpen() {
return Mono.just(tx.isOpen());
}

Publisher<Void> close(boolean commit) {
public Publisher<Void> close(boolean commit) {
return createEmptyPublisher(() -> tx.closeAsync(commit));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ public InternalReactiveSession(NetworkSession session) {
}

@Override
ReactiveTransaction createTransaction(UnmanagedTransaction unmanagedTransaction) {
protected ReactiveTransaction createTransaction(UnmanagedTransaction unmanagedTransaction) {
return new InternalReactiveTransaction(unmanagedTransaction);
}

@Override
org.reactivestreams.Publisher<Void> closeTransaction(ReactiveTransaction transaction, boolean commit) {
protected org.reactivestreams.Publisher<Void> closeTransaction(ReactiveTransaction transaction, boolean commit) {
return ((InternalReactiveTransaction) transaction).close(commit);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ public InternalRxSession(NetworkSession session) {
}

@Override
RxTransaction createTransaction(UnmanagedTransaction unmanagedTransaction) {
protected RxTransaction createTransaction(UnmanagedTransaction unmanagedTransaction) {
return new InternalRxTransaction(unmanagedTransaction);
}

@Override
Publisher<Void> closeTransaction(RxTransaction transaction, boolean commit) {
protected Publisher<Void> closeTransaction(RxTransaction transaction, boolean commit) {
return ((InternalRxTransaction) transaction).close(commit);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.reactivestreams;

import java.util.Map;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.Value;
import org.neo4j.driver.Values;
import org.neo4j.driver.internal.util.Extract;
import org.neo4j.driver.internal.value.MapValue;
import org.neo4j.driver.reactivestreams.ReactiveQueryRunner;
import org.neo4j.driver.reactivestreams.ReactiveResult;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

interface BaseReactiveQueryRunner extends ReactiveQueryRunner {
@Override
default Publisher<ReactiveResult> run(String queryStr, Value parameters) {
try {
Query query = new Query(queryStr, parameters);
return run(query);
} catch (Throwable t) {
return Mono.error(t);
}
}

@Override
default Publisher<ReactiveResult> run(String query, Map<String, Object> parameters) {
return run(query, parameters(parameters));
}

@Override
default Publisher<ReactiveResult> run(String query, Record parameters) {
return run(query, parameters(parameters));
}

@Override
default Publisher<ReactiveResult> run(String queryStr) {
try {
Query query = new Query(queryStr);
return run(query);
} catch (Throwable t) {
return Mono.error(t);
}
}

static Value parameters(Record record) {
return record == null ? Values.EmptyMap : parameters(record.asMap());
}

static Value parameters(Map<String, Object> map) {
if (map == null || map.isEmpty()) {
return Values.EmptyMap;
}
return new MapValue(Extract.mapOfValues(map));
}
}
Loading

0 comments on commit 0ca83f8

Please sign in to comment.