diff --git a/driver/src/main/java/org/neo4j/driver/BookmarkManager.java b/driver/src/main/java/org/neo4j/driver/BookmarkManager.java new file mode 100644 index 0000000000..b314c4eb71 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/BookmarkManager.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver; + +import java.io.Serializable; +import java.util.Set; + +/** + * Keeps track of database bookmarks and is used by the driver to ensure causal consistency between sessions and query executions. + *

+ * Please note that implementations of this interface MUST NOT block for extended periods of time. + *

+ * Implementations must avoid calling driver. + * + * @see org.neo4j.driver.Config.ConfigBuilder#withBookmarkManager(BookmarkManager) + */ +public interface BookmarkManager extends Serializable { + /** + * Updates database bookmarks by deleting the given previous bookmarks and adding the new bookmarks. + * + * @param database the database name, this might be an empty string when session has no database name configured and database discovery is unavailable + * @param previousBookmarks the previous bookmarks + * @param newBookmarks the new bookmarks + */ + void updateBookmarks(String database, Set previousBookmarks, Set newBookmarks); + + /** + * Gets an immutable set of bookmarks for a given database. + * + * @param database the database name + * @return the set of bookmarks or an empty set if the database name is unknown to the bookmark manager + */ + Set getBookmarks(String database); + + /** + * Gets an immutable set of bookmarks for all databases. + * + * @return the set of bookmarks or an empty set + */ + Set getAllBookmarks(); + + /** + * Deletes bookmarks for the given databases. + *

+ * This method should be called by driver users if data deletion is desired when bookmarks for the given databases are no longer needed. + * + * @param databases the set of database names + */ + void forget(Set databases); +} diff --git a/driver/src/main/java/org/neo4j/driver/BookmarkManagerConfig.java b/driver/src/main/java/org/neo4j/driver/BookmarkManagerConfig.java new file mode 100644 index 0000000000..50042d7bde --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/BookmarkManagerConfig.java @@ -0,0 +1,131 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiConsumer; + +/** + * Bookmark configuration used to configure bookmark manager supplied by {@link BookmarkManagers#defaultManager(BookmarkManagerConfig)}. + */ +public final class BookmarkManagerConfig { + private final Map> initialBookmarks; + private final BiConsumer> updateListener; + private final BookmarkSupplier bookmarkSupplier; + + private BookmarkManagerConfig(BookmarkManagerConfigBuilder builder) { + this.initialBookmarks = builder.initialBookmarks; + this.updateListener = builder.updateListener; + this.bookmarkSupplier = builder.bookmarkSupplier; + } + + /** + * Creates a new {@link BookmarkManagerConfigBuilder} used to construct a configuration object. + * + * @return a bookmark manager configuration builder. + */ + public static BookmarkManagerConfigBuilder builder() { + return new BookmarkManagerConfigBuilder(); + } + + /** + * Returns the map of bookmarks used to initialise the bookmark manager. + * + * @return the map of bookmarks + */ + public Map> initialBookmarks() { + return initialBookmarks; + } + + /** + * Returns a bookmark update listener that will be notified when database bookmarks are updated. + * + * @return the update listener or {@code null} + */ + public BiConsumer> updateListener() { + return updateListener; + } + + /** + * Returns bookmark supplier that will be used by the bookmark manager when getting bookmarks. + * + * @return the bookmark supplier or {@code null} + */ + public BookmarkSupplier bookmarkSupplier() { + return bookmarkSupplier; + } + + /** + * Builder used to configure {@link BookmarkManagerConfig} which will be used to create a bookmark manager. + */ + public static class BookmarkManagerConfigBuilder { + private Map> initialBookmarks = Collections.emptyMap(); + private BiConsumer> updateListener; + private BookmarkSupplier bookmarkSupplier; + + private BookmarkManagerConfigBuilder() {} + + /** + * Provide a map of initial bookmarks to initialise the bookmark manager. + * + * @param databaseToBookmarks database to bookmarks map + * @return this builder + */ + public BookmarkManagerConfigBuilder withInitialBookmarks(Map> databaseToBookmarks) { + Objects.requireNonNull(databaseToBookmarks, "databaseToBookmarks must not be null"); + this.initialBookmarks = databaseToBookmarks; + return this; + } + + /** + * Provide a bookmarks update listener. + *

+ * The listener will be called outside bookmark manager's synchronisation lock. + * + * @param updateListener update listener + * @return this builder + */ + public BookmarkManagerConfigBuilder withUpdateListener(BiConsumer> updateListener) { + this.updateListener = updateListener; + return this; + } + + /** + * Provide a bookmark supplier. + *

+ * The supplied bookmarks will be served alongside the bookmarks served by the bookmark manager. The supplied bookmarks will not be stored nor managed by the bookmark manager. + *

+ * The supplier will be called outside bookmark manager's synchronisation lock. + * + * @param bookmarkSupplier the bookmarks supplier + * @return this builder + */ + public BookmarkManagerConfigBuilder withBookmarksSupplier(BookmarkSupplier bookmarkSupplier) { + this.bookmarkSupplier = bookmarkSupplier; + return this; + } + + public BookmarkManagerConfig build() { + return new BookmarkManagerConfig(this); + } + } +} diff --git a/driver/src/main/java/org/neo4j/driver/BookmarkManagers.java b/driver/src/main/java/org/neo4j/driver/BookmarkManagers.java new file mode 100644 index 0000000000..8f81ca1a85 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/BookmarkManagers.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver; + +import org.neo4j.driver.internal.Neo4jBookmarkManager; + +/** + * Setups new instances of {@link BookmarkManager}. + */ +public interface BookmarkManagers { + /** + * Setups a new instance of bookmark manager that can be used in {@link org.neo4j.driver.Config.ConfigBuilder#withBookmarkManager(BookmarkManager)}. + * + * @param config the bookmark manager configuration + * @return the bookmark manager + */ + static BookmarkManager defaultManager(BookmarkManagerConfig config) { + return new Neo4jBookmarkManager(config.initialBookmarks(), config.updateListener(), config.bookmarkSupplier()); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/BookmarkSupplier.java b/driver/src/main/java/org/neo4j/driver/BookmarkSupplier.java new file mode 100644 index 0000000000..0d47b55244 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/BookmarkSupplier.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver; + +import java.util.Set; + +/** + * Supplies additional bookmarks to {@link BookmarkManager} implementation provided by {@link BookmarkManagers#defaultManager(BookmarkManagerConfig)}. + *

+ * Implementations must avoid calling driver. + */ +public interface BookmarkSupplier { + /** + * Supplies a set of bookmarks for a given database. + * + * @param database the database name + * @return the set of bookmarks, must not be {@code null} + */ + Set getBookmarks(String database); + + /** + * Supplies a set of bookmarks for all databases. + * + * @return the set of bookmarks, must not be {@code null} + */ + Set getAllBookmarks(); +} diff --git a/driver/src/main/java/org/neo4j/driver/Config.java b/driver/src/main/java/org/neo4j/driver/Config.java index 3ab7f9e479..3f78f2bcf5 100644 --- a/driver/src/main/java/org/neo4j/driver/Config.java +++ b/driver/src/main/java/org/neo4j/driver/Config.java @@ -98,6 +98,7 @@ public class Config implements Serializable { private final int eventLoopThreads; private final String userAgent; private final MetricsAdapter metricsAdapter; + private final BookmarkManager bookmarkManager; private Config(ConfigBuilder builder) { this.logging = builder.logging; @@ -119,6 +120,8 @@ private Config(ConfigBuilder builder) { this.eventLoopThreads = builder.eventLoopThreads; this.metricsAdapter = builder.metricsAdapter; + + this.bookmarkManager = builder.bookmarkManager; } /** @@ -252,6 +255,15 @@ public String userAgent() { return userAgent; } + /** + * A {@link BookmarkManager} implementation for the driver to use. + * + * @return bookmark implementation or {@code null}. + */ + public BookmarkManager bookmarkManager() { + return bookmarkManager; + } + /** * Used to build new config instances */ @@ -272,6 +284,7 @@ public static class ConfigBuilder { private MetricsAdapter metricsAdapter = MetricsAdapter.DEV_NULL; private long fetchSize = FetchSizeUtil.DEFAULT_FETCH_SIZE; private int eventLoopThreads = 0; + private BookmarkManager bookmarkManager; private ConfigBuilder() {} @@ -645,6 +658,19 @@ public ConfigBuilder withUserAgent(String userAgent) { return this; } + /** + * Sets a {@link BookmarkManager} implementation for the driver to use. + *

+ * By default, bookmark manager is effectively disabled. + * + * @param bookmarkManager bookmark manager implementation. Providing {@code null} effectively disables bookmark manager. + * @return this builder. + */ + public ConfigBuilder withBookmarkManager(BookmarkManager bookmarkManager) { + this.bookmarkManager = bookmarkManager; + return this; + } + /** * Extracts the driver version from the driver jar MANIFEST.MF file. */ diff --git a/driver/src/main/java/org/neo4j/driver/SessionConfig.java b/driver/src/main/java/org/neo4j/driver/SessionConfig.java index 6d6dfc86f0..0312c1b063 100644 --- a/driver/src/main/java/org/neo4j/driver/SessionConfig.java +++ b/driver/src/main/java/org/neo4j/driver/SessionConfig.java @@ -45,6 +45,7 @@ public class SessionConfig implements Serializable { private final String database; private final Long fetchSize; private final String impersonatedUser; + private final boolean ignoreBookmarkManager; private SessionConfig(Builder builder) { this.bookmarks = builder.bookmarks; @@ -52,6 +53,7 @@ private SessionConfig(Builder builder) { this.database = builder.database; this.fetchSize = builder.fetchSize; this.impersonatedUser = builder.impersonatedUser; + this.ignoreBookmarkManager = builder.ignoreBookmarkManager; } /** @@ -130,6 +132,15 @@ public Optional impersonatedUser() { return Optional.ofNullable(impersonatedUser); } + /** + * Determines if {@link BookmarkManager} configured at driver level should be ignored. + * + * @return {@code true} if bookmark manager should be ignored and not otherwise. + */ + public boolean ignoreBookmarkManager() { + return ignoreBookmarkManager; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -143,12 +154,13 @@ public boolean equals(Object o) { && defaultAccessMode == that.defaultAccessMode && Objects.equals(database, that.database) && Objects.equals(fetchSize, that.fetchSize) - && Objects.equals(impersonatedUser, that.impersonatedUser); + && Objects.equals(impersonatedUser, that.impersonatedUser) + && Objects.equals(ignoreBookmarkManager, that.ignoreBookmarkManager); } @Override public int hashCode() { - return Objects.hash(bookmarks, defaultAccessMode, database, impersonatedUser); + return Objects.hash(bookmarks, defaultAccessMode, database, impersonatedUser, ignoreBookmarkManager); } @Override @@ -167,6 +179,7 @@ public static class Builder { private AccessMode defaultAccessMode = AccessMode.WRITE; private String database = null; private String impersonatedUser = null; + private boolean ignoreBookmarkManager = false; private Builder() {} @@ -292,6 +305,17 @@ public Builder withImpersonatedUser(String impersonatedUser) { return this; } + /** + * Ignore {@link BookmarkManager} configured at driver level using {@link org.neo4j.driver.Config.ConfigBuilder#withBookmarkManager(BookmarkManager)}. + * + * @param ignore ignore if {@code true}, use otherwise. + * @return this builder. + */ + public Builder withIgnoredBookmarkManager(boolean ignore) { + this.ignoreBookmarkManager = ignore; + return this; + } + public SessionConfig build() { return new SessionConfig(this); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/BookmarksHolder.java b/driver/src/main/java/org/neo4j/driver/internal/DatabaseBookmark.java similarity index 63% rename from driver/src/main/java/org/neo4j/driver/internal/BookmarksHolder.java rename to driver/src/main/java/org/neo4j/driver/internal/DatabaseBookmark.java index efae79c8f1..a3d80e9fbb 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/BookmarksHolder.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DatabaseBookmark.java @@ -18,22 +18,6 @@ */ package org.neo4j.driver.internal; -import java.util.Collections; -import java.util.Set; import org.neo4j.driver.Bookmark; -public interface BookmarksHolder { - Set getBookmarks(); - - void setBookmark(Bookmark bookmark); - - BookmarksHolder NO_OP = new BookmarksHolder() { - @Override - public Set getBookmarks() { - return Collections.emptySet(); - } - - @Override - public void setBookmark(Bookmark bookmark) {} - }; -} +public record DatabaseBookmark(String databaseName, Bookmark bookmark) {} diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java index 2e9dc95c1a..5876a8a28d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java @@ -260,7 +260,8 @@ protected InternalDriver createRoutingDriver( */ protected InternalDriver createDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, MetricsProvider metricsProvider, Config config) { - return new InternalDriver(securityPlan, sessionFactory, metricsProvider, config.logging()); + return new InternalDriver( + securityPlan, sessionFactory, metricsProvider, config.logging(), config.bookmarkManager()); } /** diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java index 155bc55af7..27dcdb3b1e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java @@ -22,6 +22,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicBoolean; +import org.neo4j.driver.BookmarkManager; import org.neo4j.driver.Driver; import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; @@ -50,15 +51,19 @@ public class InternalDriver implements Driver { private AtomicBoolean closed = new AtomicBoolean(false); private final MetricsProvider metricsProvider; + private final BookmarkManager bookmarkManager; + InternalDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, MetricsProvider metricsProvider, - Logging logging) { + Logging logging, + BookmarkManager bookmarkManager) { this.securityPlan = securityPlan; this.sessionFactory = sessionFactory; this.metricsProvider = metricsProvider; this.log = logging.getLog(getClass()); + this.bookmarkManager = bookmarkManager != null ? bookmarkManager : new NoOpBookmarkManager(); } @Override @@ -164,7 +169,8 @@ private static RuntimeException driverCloseException() { public NetworkSession newSession(SessionConfig config) { assertOpen(); - NetworkSession session = sessionFactory.newInstance(config); + var bookmarkManager = config.ignoreBookmarkManager() ? new NoOpBookmarkManager() : this.bookmarkManager; + NetworkSession session = sessionFactory.newInstance(config, bookmarkManager); if (closed.get()) { // session does not immediately acquire connection, it is fine to just throw throw driverCloseException(); diff --git a/driver/src/main/java/org/neo4j/driver/internal/Neo4jBookmarkManager.java b/driver/src/main/java/org/neo4j/driver/internal/Neo4jBookmarkManager.java new file mode 100644 index 0000000000..3ab4882fcc --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/Neo4jBookmarkManager.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + +import static org.neo4j.driver.internal.util.LockUtil.executeWithLock; + +import java.io.Serial; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import org.neo4j.driver.Bookmark; +import org.neo4j.driver.BookmarkManager; +import org.neo4j.driver.BookmarkSupplier; + +/** + * A basic {@link BookmarkManager} implementation. + */ +public final class Neo4jBookmarkManager implements BookmarkManager { + @Serial + private static final long serialVersionUID = 6615186840717102303L; + + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + + private final Map> databaseToBookmarks = new HashMap<>(); + private final BiConsumer> updateListener; + private final BookmarkSupplier bookmarkSupplier; + + public Neo4jBookmarkManager( + Map> initialBookmarks, + BiConsumer> updateListener, + BookmarkSupplier bookmarkSupplier) { + Objects.requireNonNull(initialBookmarks, "initialBookmarks must not be null"); + this.databaseToBookmarks.putAll(initialBookmarks); + this.updateListener = updateListener; + this.bookmarkSupplier = bookmarkSupplier; + } + + @Override + public void updateBookmarks(String database, Set previousBookmarks, Set newBookmarks) { + var immutableBookmarks = executeWithLock( + rwLock.writeLock(), + () -> databaseToBookmarks.compute(database, (ignored, bookmarks) -> { + var updatedBookmarks = new HashSet(); + if (bookmarks != null) { + bookmarks.stream() + .filter(bookmark -> !previousBookmarks.contains(bookmark)) + .forEach(updatedBookmarks::add); + } + updatedBookmarks.addAll(newBookmarks); + return Collections.unmodifiableSet(updatedBookmarks); + })); + if (updateListener != null) { + updateListener.accept(database, immutableBookmarks); + } + } + + @Override + public Set getBookmarks(String database) { + var immutableBookmarks = executeWithLock( + rwLock.readLock(), () -> databaseToBookmarks.getOrDefault(database, Collections.emptySet())); + if (bookmarkSupplier != null) { + var bookmarks = new HashSet<>(immutableBookmarks); + bookmarks.addAll(bookmarkSupplier.getBookmarks(database)); + immutableBookmarks = Collections.unmodifiableSet(bookmarks); + } + return immutableBookmarks; + } + + @Override + public Set getAllBookmarks() { + var immutableBookmarks = executeWithLock(rwLock.readLock(), () -> databaseToBookmarks.values().stream() + .flatMap(Collection::stream)) + .collect(Collectors.toUnmodifiableSet()); + if (bookmarkSupplier != null) { + var bookmarks = new HashSet<>(immutableBookmarks); + bookmarks.addAll(bookmarkSupplier.getAllBookmarks()); + immutableBookmarks = Collections.unmodifiableSet(bookmarks); + } + return immutableBookmarks; + } + + @Override + public void forget(Set databases) { + executeWithLock(rwLock.writeLock(), () -> databases.forEach(databaseToBookmarks::remove)); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/DefaultBookmarksHolder.java b/driver/src/main/java/org/neo4j/driver/internal/NoOpBookmarkManager.java similarity index 54% rename from driver/src/main/java/org/neo4j/driver/internal/DefaultBookmarksHolder.java rename to driver/src/main/java/org/neo4j/driver/internal/NoOpBookmarkManager.java index a61999eeb0..6e1c8d4acf 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DefaultBookmarksHolder.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NoOpBookmarkManager.java @@ -18,35 +18,38 @@ */ package org.neo4j.driver.internal; +import java.io.Serial; import java.util.Collections; import java.util.Set; import org.neo4j.driver.Bookmark; +import org.neo4j.driver.BookmarkManager; /** - * @since 2.0 + * A no-op {@link BookmarkManager} implementation. */ -public class DefaultBookmarksHolder implements BookmarksHolder { - private volatile Set bookmarks; +public class NoOpBookmarkManager implements BookmarkManager { + @Serial + private static final long serialVersionUID = 7175136719562680362L; - // for testing only - public DefaultBookmarksHolder() { - this(Collections.emptySet()); + private static final Set EMPTY = Collections.emptySet(); + + @Override + public void updateBookmarks(String database, Set previousBookmarks, Set newBookmarks) { + // ignored } - public DefaultBookmarksHolder(Set bookmarks) { - this.bookmarks = bookmarks; + @Override + public Set getBookmarks(String database) { + return EMPTY; } @Override - public Set getBookmarks() { - return bookmarks; + public Set getAllBookmarks() { + return EMPTY; } @Override - @SuppressWarnings("deprecation") - public void setBookmark(Bookmark bookmark) { - if (bookmark != null && !bookmark.isEmpty()) { - bookmarks = Collections.singleton(bookmark); - } + public void forget(Set databases) { + // ignored } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java b/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java index f2407f0f1a..8eef20c633 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java @@ -19,11 +19,12 @@ package org.neo4j.driver.internal; import java.util.concurrent.CompletionStage; +import org.neo4j.driver.BookmarkManager; import org.neo4j.driver.SessionConfig; import org.neo4j.driver.internal.async.NetworkSession; public interface SessionFactory { - NetworkSession newInstance(SessionConfig sessionConfig); + NetworkSession newInstance(SessionConfig sessionConfig, BookmarkManager bookmarkManager); CompletionStage verifyConnectivity(); diff --git a/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java b/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java index e24c89ecfb..29156dc6c3 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java @@ -20,11 +20,13 @@ import java.util.Collections; import java.util.HashSet; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletionStage; import org.neo4j.driver.AccessMode; import org.neo4j.driver.Bookmark; +import org.neo4j.driver.BookmarkManager; import org.neo4j.driver.Config; import org.neo4j.driver.Logging; import org.neo4j.driver.SessionConfig; @@ -49,17 +51,18 @@ public class SessionFactoryImpl implements SessionFactory { } @Override - public NetworkSession newInstance(SessionConfig sessionConfig) { - BookmarksHolder bookmarksHolder = new DefaultBookmarksHolder(toDistinctSet(sessionConfig.bookmarks())); + public NetworkSession newInstance(SessionConfig sessionConfig, BookmarkManager bookmarkManager) { + Objects.requireNonNull(bookmarkManager, "bookmarkManager may not be null"); return createSession( connectionProvider, retryLogic, parseDatabaseName(sessionConfig), sessionConfig.defaultAccessMode(), - bookmarksHolder, + toDistinctSet(sessionConfig.bookmarks()), parseFetchSize(sessionConfig), sessionConfig.impersonatedUser().orElse(null), - logging); + logging, + bookmarkManager); } private Set toDistinctSet(Iterable bookmarks) { @@ -125,28 +128,33 @@ private NetworkSession createSession( RetryLogic retryLogic, DatabaseName databaseName, AccessMode mode, - BookmarksHolder bookmarksHolder, + Set bookmarks, long fetchSize, String impersonatedUser, - Logging logging) { + Logging logging, + BookmarkManager bookmarkManager) { + Objects.requireNonNull(bookmarks, "bookmarks may not be null"); + Objects.requireNonNull(bookmarkManager, "bookmarkManager may not be null"); return leakedSessionsLoggingEnabled ? new LeakLoggingNetworkSession( connectionProvider, retryLogic, databaseName, mode, - bookmarksHolder, + bookmarks, impersonatedUser, fetchSize, - logging) + logging, + bookmarkManager) : new NetworkSession( connectionProvider, retryLogic, databaseName, mode, - bookmarksHolder, + bookmarks, impersonatedUser, fetchSize, - logging); + logging, + bookmarkManager); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSession.java index c7a2fc2311..86985059ee 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSession.java @@ -20,9 +20,11 @@ import static java.lang.System.lineSeparator; +import java.util.Set; import org.neo4j.driver.AccessMode; +import org.neo4j.driver.Bookmark; +import org.neo4j.driver.BookmarkManager; import org.neo4j.driver.Logging; -import org.neo4j.driver.internal.BookmarksHolder; import org.neo4j.driver.internal.DatabaseName; import org.neo4j.driver.internal.retry.RetryLogic; import org.neo4j.driver.internal.spi.ConnectionProvider; @@ -36,19 +38,21 @@ public LeakLoggingNetworkSession( RetryLogic retryLogic, DatabaseName databaseName, AccessMode mode, - BookmarksHolder bookmarksHolder, + Set bookmarks, String impersonatedUser, long fetchSize, - Logging logging) { + Logging logging, + BookmarkManager bookmarkManager) { super( connectionProvider, retryLogic, databaseName, mode, - bookmarksHolder, + bookmarks, impersonatedUser, fetchSize, - logging); + logging, + bookmarkManager); this.stackTrace = captureStackTrace(); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java index 1e119920bf..1775799eef 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java @@ -19,8 +19,13 @@ package org.neo4j.driver.internal.async; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.neo4j.driver.internal.async.ConnectionContext.PENDING_DATABASE_NAME_EXCEPTION_SUPPLIER; import static org.neo4j.driver.internal.util.Futures.completedWithNull; +import java.util.Collections; +import java.util.HashSet; +import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -28,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.AccessMode; import org.neo4j.driver.Bookmark; +import org.neo4j.driver.BookmarkManager; import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; import org.neo4j.driver.Query; @@ -35,8 +41,9 @@ import org.neo4j.driver.async.ResultCursor; import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.exceptions.TransactionNestingException; -import org.neo4j.driver.internal.BookmarksHolder; +import org.neo4j.driver.internal.DatabaseBookmark; import org.neo4j.driver.internal.DatabaseName; +import org.neo4j.driver.internal.DatabaseNameUtil; import org.neo4j.driver.internal.FailableCursor; import org.neo4j.driver.internal.ImpersonationUtil; import org.neo4j.driver.internal.cursor.AsyncResultCursor; @@ -49,40 +56,51 @@ import org.neo4j.driver.internal.util.Futures; public class NetworkSession { + /** + * Fallback database name used by the driver when session has no database name configured and database discovery is unavailable. + */ + String FALLBACK_DATABASE_NAME = ""; + private final ConnectionProvider connectionProvider; private final NetworkSessionConnectionContext connectionContext; private final AccessMode mode; private final RetryLogic retryLogic; protected final Logger log; - private final BookmarksHolder bookmarksHolder; private final long fetchSize; private volatile CompletionStage transactionStage = completedWithNull(); private volatile CompletionStage connectionStage = completedWithNull(); private volatile CompletionStage resultCursorStage = completedWithNull(); private final AtomicBoolean open = new AtomicBoolean(true); + private final BookmarkManager bookmarkManager; + private volatile Set lastUsedBookmarks = Collections.emptySet(); + private volatile Set lastReceivedBookmarks; public NetworkSession( ConnectionProvider connectionProvider, RetryLogic retryLogic, DatabaseName databaseName, AccessMode mode, - BookmarksHolder bookmarksHolder, + Set bookmarks, String impersonatedUser, long fetchSize, - Logging logging) { + Logging logging, + BookmarkManager bookmarkManager) { + Objects.requireNonNull(bookmarks, "bookmarks may not be null"); + Objects.requireNonNull(bookmarkManager, "bookmarkManager may not be null"); this.connectionProvider = connectionProvider; this.mode = mode; this.retryLogic = retryLogic; this.log = new PrefixedLogger("[" + hashCode() + "]", logging.getLog(getClass())); - this.bookmarksHolder = bookmarksHolder; CompletableFuture databaseNameFuture = databaseName .databaseName() .map(ignored -> CompletableFuture.completedFuture(databaseName)) .orElse(new CompletableFuture<>()); - this.connectionContext = new NetworkSessionConnectionContext( - databaseNameFuture, bookmarksHolder.getBookmarks(), impersonatedUser); + this.bookmarkManager = bookmarkManager; + this.lastReceivedBookmarks = bookmarks; + this.connectionContext = + new NetworkSessionConnectionContext(databaseNameFuture, determineBookmarks(true), impersonatedUser); this.fetchSize = fetchSize; } @@ -117,8 +135,8 @@ public CompletionStage beginTransactionAsync(AccessMode mo .thenApply(connection -> ImpersonationUtil.ensureImpersonationSupport(connection, connection.impersonatedUser())) .thenCompose(connection -> { - UnmanagedTransaction tx = new UnmanagedTransaction(connection, bookmarksHolder, fetchSize); - return tx.beginAsync(bookmarksHolder.getBookmarks(), config); + UnmanagedTransaction tx = new UnmanagedTransaction(connection, this::handleNewBookmark, fetchSize); + return tx.beginAsync(determineBookmarks(false), config); }); // update the reference to the only known transaction @@ -143,7 +161,7 @@ public RetryLogic retryLogic() { } public Set lastBookmarks() { - return bookmarksHolder.getBookmarks(); + return lastReceivedBookmarks; } public CompletionStage releaseConnectionAsync() { @@ -210,7 +228,13 @@ private CompletionStage buildResultCursorFactory(Query quer try { ResultCursorFactory factory = connection .protocol() - .runInAutoCommitTransaction(connection, query, bookmarksHolder, config, fetchSize); + .runInAutoCommitTransaction( + connection, + query, + determineBookmarks(false), + this::handleNewBookmark, + config, + fetchSize); return completedFuture(factory); } catch (Throwable e) { return Futures.failedFuture(e); @@ -304,6 +328,44 @@ private void ensureSessionIsOpen() { } } + private void handleNewBookmark(DatabaseBookmark databaseBookmark) { + assertDatabaseNameFutureIsDone(); + var bookmark = databaseBookmark.bookmark(); + if (bookmark != null) { + var bookmarks = Set.of(bookmark); + String databaseName = databaseBookmark.databaseName(); + if (databaseName == null || databaseName.isEmpty()) { + databaseName = getDatabaseNameNow().orElse(FALLBACK_DATABASE_NAME); + } + lastReceivedBookmarks = bookmarks; + bookmarkManager.updateBookmarks(databaseName, lastUsedBookmarks, bookmarks); + } + } + + private Set determineBookmarks(boolean useSystemOnly) { + var bookmarks = new HashSet(); + if (useSystemOnly) { + bookmarks.addAll(bookmarkManager.getBookmarks(DatabaseNameUtil.SYSTEM_DATABASE_NAME)); + } else { + bookmarks.addAll(bookmarkManager.getAllBookmarks()); + lastUsedBookmarks = Collections.unmodifiableSet(bookmarks); + } + bookmarks.addAll(lastReceivedBookmarks); + return bookmarks; + } + + private void assertDatabaseNameFutureIsDone() { + if (!connectionContext.databaseNameFuture().isDone()) { + throw new IllegalStateException("Illegal internal state encountered, database name future is not done."); + } + } + + private Optional getDatabaseNameNow() { + return Futures.joinNowOrElseThrow( + connectionContext.databaseNameFuture(), PENDING_DATABASE_NAME_EXCEPTION_SUPPLIER) + .databaseName(); + } + /** * The {@link NetworkSessionConnectionContext#mode} can be mutable for a session connection context */ diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java index b2c38a2a7c..3edabf18b0 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java @@ -34,6 +34,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; import org.neo4j.driver.Bookmark; import org.neo4j.driver.Query; @@ -42,7 +43,7 @@ import org.neo4j.driver.exceptions.AuthorizationExpiredException; import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.exceptions.ConnectionReadTimeoutException; -import org.neo4j.driver.internal.BookmarksHolder; +import org.neo4j.driver.internal.DatabaseBookmark; import org.neo4j.driver.internal.cursor.AsyncResultCursor; import org.neo4j.driver.internal.cursor.RxResultCursor; import org.neo4j.driver.internal.messaging.BoltProtocol; @@ -83,7 +84,7 @@ private enum State { private final Connection connection; private final BoltProtocol protocol; - private final BookmarksHolder bookmarksHolder; + private final Consumer bookmarkConsumer; private final ResultCursorsHolder resultCursors; private final long fetchSize; private final Lock lock = new ReentrantLock(); @@ -93,15 +94,18 @@ private enum State { private Throwable causeOfTermination; private CompletionStage interruptStage; - public UnmanagedTransaction(Connection connection, BookmarksHolder bookmarksHolder, long fetchSize) { - this(connection, bookmarksHolder, fetchSize, new ResultCursorsHolder()); + public UnmanagedTransaction(Connection connection, Consumer bookmarkConsumer, long fetchSize) { + this(connection, bookmarkConsumer, fetchSize, new ResultCursorsHolder()); } protected UnmanagedTransaction( - Connection connection, BookmarksHolder bookmarksHolder, long fetchSize, ResultCursorsHolder resultCursors) { + Connection connection, + Consumer bookmarkConsumer, + long fetchSize, + ResultCursorsHolder resultCursors) { this.connection = connection; this.protocol = connection.protocol(); - this.bookmarksHolder = bookmarksHolder; + this.bookmarkConsumer = bookmarkConsumer; this.resultCursors = resultCursors; this.fetchSize = fetchSize; } @@ -215,7 +219,7 @@ private CompletionStage doCommitAsync(Throwable cursorFailure) { : null); return exception != null ? failedFuture(exception) - : protocol.commitTransaction(connection).thenAccept(bookmarksHolder::setBookmark); + : protocol.commitTransaction(connection).thenAccept(bookmarkConsumer); } private CompletionStage doRollbackAsync() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunner.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunner.java index e8347c070f..7776ed4c48 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunner.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunner.java @@ -27,9 +27,7 @@ import org.neo4j.driver.Bookmark; import org.neo4j.driver.Query; import org.neo4j.driver.Value; -import org.neo4j.driver.internal.BookmarksHolder; import org.neo4j.driver.internal.DatabaseName; -import org.neo4j.driver.internal.ReadOnlyBookmarksHolder; import org.neo4j.driver.internal.async.connection.DirectConnection; import org.neo4j.driver.internal.messaging.BoltProtocolVersion; import org.neo4j.driver.internal.spi.Connection; @@ -48,8 +46,8 @@ public MultiDatabasesRoutingProcedureRunner(RoutingContext context) { } @Override - BookmarksHolder bookmarksHolder(Set bookmarks) { - return new ReadOnlyBookmarksHolder(bookmarks); + Set adaptBookmarks(Set bookmarks) { + return bookmarks; } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/SingleDatabaseRoutingProcedureRunner.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/SingleDatabaseRoutingProcedureRunner.java index aff0aca818..3a8e424694 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/SingleDatabaseRoutingProcedureRunner.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/SingleDatabaseRoutingProcedureRunner.java @@ -22,6 +22,7 @@ import static org.neo4j.driver.internal.DatabaseNameUtil.defaultDatabase; import static org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil.UNLIMITED_FETCH_SIZE; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.concurrent.CompletionException; @@ -34,7 +35,6 @@ import org.neo4j.driver.async.ResultCursor; import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.exceptions.FatalDiscoveryException; -import org.neo4j.driver.internal.BookmarksHolder; import org.neo4j.driver.internal.DatabaseName; import org.neo4j.driver.internal.async.connection.DirectConnection; import org.neo4j.driver.internal.messaging.BoltProtocolVersion; @@ -60,8 +60,7 @@ public CompletionStage run( Connection connection, DatabaseName databaseName, Set bookmarks, String impersonatedUser) { DirectConnection delegate = connection(connection); Query procedure = procedureQuery(connection.protocol().version(), databaseName); - BookmarksHolder bookmarksHolder = bookmarksHolder(bookmarks); - return runProcedure(delegate, procedure, bookmarksHolder) + return runProcedure(delegate, procedure, adaptBookmarks(bookmarks)) .thenCompose(records -> releaseConnection(delegate, records)) .handle((records, error) -> processProcedureResponse(procedure, records, error)); } @@ -80,16 +79,20 @@ Query procedureQuery(BoltProtocolVersion protocolVersion, DatabaseName databaseN return new Query(GET_ROUTING_TABLE, parameters(ROUTING_CONTEXT, context.toMap())); } - BookmarksHolder bookmarksHolder(Set ignored) { - return BookmarksHolder.NO_OP; + Set adaptBookmarks(Set bookmarks) { + return Collections.emptySet(); } - CompletionStage> runProcedure( - Connection connection, Query procedure, BookmarksHolder bookmarksHolder) { + CompletionStage> runProcedure(Connection connection, Query procedure, Set bookmarks) { return connection .protocol() .runInAutoCommitTransaction( - connection, procedure, bookmarksHolder, TransactionConfig.empty(), UNLIMITED_FETCH_SIZE) + connection, + procedure, + bookmarks, + (ignored) -> {}, + TransactionConfig.empty(), + UNLIMITED_FETCH_SIZE) .asyncResult() .thenCompose(ResultCursor::listAsync); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/CommitTxResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/CommitTxResponseHandler.java index ff79dee4f3..690538c7d2 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/CommitTxResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/CommitTxResponseHandler.java @@ -23,26 +23,21 @@ import java.util.Arrays; import java.util.Map; import java.util.concurrent.CompletableFuture; -import org.neo4j.driver.Bookmark; import org.neo4j.driver.Value; -import org.neo4j.driver.internal.InternalBookmark; +import org.neo4j.driver.internal.DatabaseBookmark; import org.neo4j.driver.internal.spi.ResponseHandler; +import org.neo4j.driver.internal.util.MetadataExtractor; public class CommitTxResponseHandler implements ResponseHandler { - private final CompletableFuture commitFuture; + private final CompletableFuture commitFuture; - public CommitTxResponseHandler(CompletableFuture commitFuture) { + public CommitTxResponseHandler(CompletableFuture commitFuture) { this.commitFuture = requireNonNull(commitFuture); } @Override public void onSuccess(Map metadata) { - Value bookmarkValue = metadata.get("bookmark"); - if (bookmarkValue == null) { - commitFuture.complete(null); - } else { - commitFuture.complete(InternalBookmark.parse(bookmarkValue.asString())); - } + commitFuture.complete(MetadataExtractor.extractDatabaseBookmark(metadata)); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullHandlers.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullHandlers.java index d39a6cce7a..3cce37f653 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullHandlers.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullHandlers.java @@ -18,8 +18,9 @@ */ package org.neo4j.driver.internal.handlers; +import java.util.function.Consumer; import org.neo4j.driver.Query; -import org.neo4j.driver.internal.BookmarksHolder; +import org.neo4j.driver.internal.DatabaseBookmark; import org.neo4j.driver.internal.async.UnmanagedTransaction; import org.neo4j.driver.internal.handlers.pulln.AutoPullResponseHandler; import org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler; @@ -33,10 +34,10 @@ public static PullAllResponseHandler newBoltV3PullAllHandler( Query query, RunResponseHandler runHandler, Connection connection, - BookmarksHolder bookmarksHolder, + Consumer bookmarkConsumer, UnmanagedTransaction tx) { PullResponseCompletionListener completionListener = - createPullResponseCompletionListener(connection, bookmarksHolder, tx); + createPullResponseCompletionListener(connection, bookmarkConsumer, tx); return new LegacyPullAllResponseHandler( query, runHandler, connection, BoltProtocolV3.METADATA_EXTRACTOR, completionListener); @@ -46,11 +47,11 @@ public static PullAllResponseHandler newBoltV4AutoPullHandler( Query query, RunResponseHandler runHandler, Connection connection, - BookmarksHolder bookmarksHolder, + Consumer bookmarkConsumer, UnmanagedTransaction tx, long fetchSize) { PullResponseCompletionListener completionListener = - createPullResponseCompletionListener(connection, bookmarksHolder, tx); + createPullResponseCompletionListener(connection, bookmarkConsumer, tx); return new AutoPullResponseHandler( query, runHandler, connection, BoltProtocolV3.METADATA_EXTRACTOR, completionListener, fetchSize); @@ -60,19 +61,19 @@ public static PullResponseHandler newBoltV4BasicPullHandler( Query query, RunResponseHandler runHandler, Connection connection, - BookmarksHolder bookmarksHolder, + Consumer bookmarkConsumer, UnmanagedTransaction tx) { PullResponseCompletionListener completionListener = - createPullResponseCompletionListener(connection, bookmarksHolder, tx); + createPullResponseCompletionListener(connection, bookmarkConsumer, tx); return new BasicPullResponseHandler( query, runHandler, connection, BoltProtocolV3.METADATA_EXTRACTOR, completionListener); } private static PullResponseCompletionListener createPullResponseCompletionListener( - Connection connection, BookmarksHolder bookmarksHolder, UnmanagedTransaction tx) { + Connection connection, Consumer bookmarkConsumer, UnmanagedTransaction tx) { return tx != null ? new TransactionPullResponseCompletionListener(tx) - : new SessionPullResponseCompletionListener(connection, bookmarksHolder); + : new SessionPullResponseCompletionListener(connection, bookmarkConsumer); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListener.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListener.java index 9a39e4ab94..6e9e6bbc83 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListener.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListener.java @@ -21,26 +21,27 @@ import static java.util.Objects.requireNonNull; import java.util.Map; +import java.util.function.Consumer; import org.neo4j.driver.Value; import org.neo4j.driver.exceptions.AuthorizationExpiredException; import org.neo4j.driver.exceptions.ConnectionReadTimeoutException; -import org.neo4j.driver.internal.BookmarksHolder; +import org.neo4j.driver.internal.DatabaseBookmark; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.util.MetadataExtractor; public class SessionPullResponseCompletionListener implements PullResponseCompletionListener { - private final BookmarksHolder bookmarksHolder; + private final Consumer bookmarkConsumer; private final Connection connection; - public SessionPullResponseCompletionListener(Connection connection, BookmarksHolder bookmarksHolder) { + public SessionPullResponseCompletionListener(Connection connection, Consumer bookmarkConsumer) { + this.bookmarkConsumer = requireNonNull(bookmarkConsumer); this.connection = requireNonNull(connection); - this.bookmarksHolder = requireNonNull(bookmarksHolder); } @Override public void afterSuccess(Map metadata) { releaseConnection(); - bookmarksHolder.setBookmark(MetadataExtractor.extractBookmarks(metadata)); + bookmarkConsumer.accept(MetadataExtractor.extractDatabaseBookmark(metadata)); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java index d10c87adf5..3185746c25 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java +++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java @@ -24,6 +24,7 @@ import io.netty.channel.ChannelPromise; import java.util.Set; import java.util.concurrent.CompletionStage; +import java.util.function.Consumer; import org.neo4j.driver.AuthToken; import org.neo4j.driver.Bookmark; import org.neo4j.driver.Query; @@ -31,7 +32,7 @@ import org.neo4j.driver.Transaction; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.exceptions.ClientException; -import org.neo4j.driver.internal.BookmarksHolder; +import org.neo4j.driver.internal.DatabaseBookmark; import org.neo4j.driver.internal.async.UnmanagedTransaction; import org.neo4j.driver.internal.cluster.RoutingContext; import org.neo4j.driver.internal.cursor.ResultCursorFactory; @@ -88,7 +89,7 @@ void initializeChannel( * @param connection the connection to use. * @return a completion stage completed with a bookmark when transaction is committed or completed exceptionally when there was a failure. */ - CompletionStage commitTransaction(Connection connection); + CompletionStage commitTransaction(Connection connection); /** * Rollback the unmanaged transaction. @@ -103,7 +104,7 @@ void initializeChannel( * * @param connection the network connection to use. * @param query the cypher to execute. - * @param bookmarksHolder the bookmarksHolder that keeps track of the current bookmarks and can be updated with a new bookmark. + * @param bookmarkConsumer the database bookmark consumer. * @param config the transaction config for the implicitly started auto-commit transaction. * @param fetchSize the record fetch size for PULL message. * @return stage with cursor. @@ -111,7 +112,8 @@ void initializeChannel( ResultCursorFactory runInAutoCommitTransaction( Connection connection, Query query, - BookmarksHolder bookmarksHolder, + Set bookmarks, + Consumer bookmarkConsumer, TransactionConfig config, long fetchSize); diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java index cee952d8db..c95081166c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java +++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java @@ -31,11 +31,12 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.Consumer; import org.neo4j.driver.AuthToken; import org.neo4j.driver.Bookmark; import org.neo4j.driver.Query; import org.neo4j.driver.TransactionConfig; -import org.neo4j.driver.internal.BookmarksHolder; +import org.neo4j.driver.internal.DatabaseBookmark; import org.neo4j.driver.internal.DatabaseName; import org.neo4j.driver.internal.async.UnmanagedTransaction; import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; @@ -127,8 +128,8 @@ public CompletionStage beginTransaction( } @Override - public CompletionStage commitTransaction(Connection connection) { - CompletableFuture commitFuture = new CompletableFuture<>(); + public CompletionStage commitTransaction(Connection connection) { + CompletableFuture commitFuture = new CompletableFuture<>(); connection.writeAndFlush(COMMIT, new CommitTxResponseHandler(commitFuture)); return commitFuture; } @@ -144,38 +145,34 @@ public CompletionStage rollbackTransaction(Connection connection) { public ResultCursorFactory runInAutoCommitTransaction( Connection connection, Query query, - BookmarksHolder bookmarksHolder, + Set bookmarks, + Consumer bookmarkConsumer, TransactionConfig config, long fetchSize) { verifyDatabaseNameBeforeTransaction(connection.databaseName()); RunWithMetadataMessage runMessage = autoCommitTxRunMessage( - query, - config, - connection.databaseName(), - connection.mode(), - bookmarksHolder.getBookmarks(), - connection.impersonatedUser()); - return buildResultCursorFactory(connection, query, bookmarksHolder, null, runMessage, fetchSize); + query, config, connection.databaseName(), connection.mode(), bookmarks, connection.impersonatedUser()); + return buildResultCursorFactory(connection, query, bookmarkConsumer, null, runMessage, fetchSize); } @Override public ResultCursorFactory runInUnmanagedTransaction( Connection connection, Query query, UnmanagedTransaction tx, long fetchSize) { RunWithMetadataMessage runMessage = unmanagedTxRunMessage(query); - return buildResultCursorFactory(connection, query, BookmarksHolder.NO_OP, tx, runMessage, fetchSize); + return buildResultCursorFactory(connection, query, (ignored) -> {}, tx, runMessage, fetchSize); } protected ResultCursorFactory buildResultCursorFactory( Connection connection, Query query, - BookmarksHolder bookmarksHolder, + Consumer bookmarkConsumer, UnmanagedTransaction tx, RunWithMetadataMessage runMessage, long ignored) { CompletableFuture runFuture = new CompletableFuture<>(); RunResponseHandler runHandler = new RunResponseHandler(runFuture, METADATA_EXTRACTOR, connection, tx); PullAllResponseHandler pullHandler = - newBoltV3PullAllHandler(query, runHandler, connection, bookmarksHolder, tx); + newBoltV3PullAllHandler(query, runHandler, connection, bookmarkConsumer, tx); return new AsyncResultCursorOnlyFactory(connection, runMessage, runHandler, runFuture, pullHandler); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/v4/BoltProtocolV4.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/v4/BoltProtocolV4.java index dca0be8a6e..00ba111904 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/messaging/v4/BoltProtocolV4.java +++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/v4/BoltProtocolV4.java @@ -22,8 +22,9 @@ import static org.neo4j.driver.internal.handlers.PullHandlers.newBoltV4BasicPullHandler; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import org.neo4j.driver.Query; -import org.neo4j.driver.internal.BookmarksHolder; +import org.neo4j.driver.internal.DatabaseBookmark; import org.neo4j.driver.internal.DatabaseName; import org.neo4j.driver.internal.async.UnmanagedTransaction; import org.neo4j.driver.internal.cursor.ResultCursorFactory; @@ -51,7 +52,7 @@ public MessageFormat createMessageFormat() { protected ResultCursorFactory buildResultCursorFactory( Connection connection, Query query, - BookmarksHolder bookmarksHolder, + Consumer bookmarkConsumer, UnmanagedTransaction tx, RunWithMetadataMessage runMessage, long fetchSize) { @@ -59,8 +60,9 @@ protected ResultCursorFactory buildResultCursorFactory( RunResponseHandler runHandler = new RunResponseHandler(runFuture, METADATA_EXTRACTOR, connection, tx); PullAllResponseHandler pullAllHandler = - newBoltV4AutoPullHandler(query, runHandler, connection, bookmarksHolder, tx, fetchSize); - PullResponseHandler pullHandler = newBoltV4BasicPullHandler(query, runHandler, connection, bookmarksHolder, tx); + newBoltV4AutoPullHandler(query, runHandler, connection, bookmarkConsumer, tx, fetchSize); + PullResponseHandler pullHandler = + newBoltV4BasicPullHandler(query, runHandler, connection, bookmarkConsumer, tx); return new ResultCursorFactoryImpl(connection, runMessage, runHandler, runFuture, pullHandler, pullAllHandler); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/v41/BoltProtocolV41.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/v41/BoltProtocolV41.java index f2ba21a59f..ff927408f8 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/messaging/v41/BoltProtocolV41.java +++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/v41/BoltProtocolV41.java @@ -22,8 +22,9 @@ import static org.neo4j.driver.internal.handlers.PullHandlers.newBoltV4BasicPullHandler; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import org.neo4j.driver.Query; -import org.neo4j.driver.internal.BookmarksHolder; +import org.neo4j.driver.internal.DatabaseBookmark; import org.neo4j.driver.internal.DatabaseName; import org.neo4j.driver.internal.async.UnmanagedTransaction; import org.neo4j.driver.internal.cursor.ResultCursorFactory; @@ -52,7 +53,7 @@ public MessageFormat createMessageFormat() { protected ResultCursorFactory buildResultCursorFactory( Connection connection, Query query, - BookmarksHolder bookmarksHolder, + Consumer bookmarkConsumer, UnmanagedTransaction tx, RunWithMetadataMessage runMessage, long fetchSize) { @@ -60,8 +61,9 @@ protected ResultCursorFactory buildResultCursorFactory( RunResponseHandler runHandler = new RunResponseHandler(runFuture, METADATA_EXTRACTOR, connection, tx); PullAllResponseHandler pullAllHandler = - newBoltV4AutoPullHandler(query, runHandler, connection, bookmarksHolder, tx, fetchSize); - PullResponseHandler pullHandler = newBoltV4BasicPullHandler(query, runHandler, connection, bookmarksHolder, tx); + newBoltV4AutoPullHandler(query, runHandler, connection, bookmarkConsumer, tx, fetchSize); + PullResponseHandler pullHandler = + newBoltV4BasicPullHandler(query, runHandler, connection, bookmarkConsumer, tx); return new ResultCursorFactoryImpl(connection, runMessage, runHandler, runFuture, pullHandler, pullAllHandler); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/MetadataExtractor.java b/driver/src/main/java/org/neo4j/driver/internal/util/MetadataExtractor.java index 9567d462a3..dfbf84436e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/util/MetadataExtractor.java +++ b/driver/src/main/java/org/neo4j/driver/internal/util/MetadataExtractor.java @@ -32,6 +32,7 @@ import org.neo4j.driver.Value; import org.neo4j.driver.exceptions.ProtocolException; import org.neo4j.driver.exceptions.UntrustedServerException; +import org.neo4j.driver.internal.DatabaseBookmark; import org.neo4j.driver.internal.InternalBookmark; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.summary.InternalDatabaseInfo; @@ -113,20 +114,10 @@ public ResultSummary extractSummary( extractResultConsumedAfter(metadata, resultConsumedAfterMetadataKey)); } - public static DatabaseInfo extractDatabaseInfo(Map metadata) { - Value dbValue = metadata.get("db"); - if (dbValue == null || dbValue.isNull()) { - return DEFAULT_DATABASE_INFO; - } else { - return new InternalDatabaseInfo(dbValue.asString()); - } - } - - public static Bookmark extractBookmarks(Map metadata) { - Value bookmarkValue = metadata.get("bookmark"); - return bookmarkValue != null && !bookmarkValue.isNull() && bookmarkValue.hasType(TYPE_SYSTEM.STRING()) - ? InternalBookmark.parse(bookmarkValue.asString()) - : InternalBookmark.empty(); + public static DatabaseBookmark extractDatabaseBookmark(Map metadata) { + var databaseName = extractDatabaseInfo(metadata).name(); + var bookmark = extractBookmark(metadata); + return new DatabaseBookmark(databaseName, bookmark); } public static Value extractServer(Map metadata) { @@ -142,6 +133,24 @@ public static Value extractServer(Map metadata) { return versionValue; } + static DatabaseInfo extractDatabaseInfo(Map metadata) { + Value dbValue = metadata.get("db"); + if (dbValue == null || dbValue.isNull()) { + return DEFAULT_DATABASE_INFO; + } else { + return new InternalDatabaseInfo(dbValue.asString()); + } + } + + static Bookmark extractBookmark(Map metadata) { + Value bookmarkValue = metadata.get("bookmark"); + Bookmark bookmark = null; + if (bookmarkValue != null && !bookmarkValue.isNull() && bookmarkValue.hasType(TYPE_SYSTEM.STRING())) { + bookmark = InternalBookmark.parse(bookmarkValue.asString()); + } + return bookmark; + } + private static QueryType extractQueryType(Map metadata) { Value typeValue = metadata.get("type"); if (typeValue != null) { diff --git a/driver/src/test/java/org/neo4j/driver/BookmarkManagerConfigTest.java b/driver/src/test/java/org/neo4j/driver/BookmarkManagerConfigTest.java new file mode 100644 index 0000000000..2a42cd8da8 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/BookmarkManagerConfigTest.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +import java.util.Map; +import java.util.Set; +import java.util.function.BiConsumer; +import org.junit.jupiter.api.Test; + +class BookmarkManagerConfigTest { + BookmarkManagerConfig config; + + @Test + void shouldReturnDefaultValues() { + // GIVEN & WHEN + config = BookmarkManagerConfig.builder().build(); + + // THEN + assertNotNull(config.initialBookmarks()); + assertTrue(config.initialBookmarks().isEmpty()); + assertNull(config.updateListener()); + assertNull(config.bookmarkSupplier()); + } + + @Test + void shouldReturnInitialBookmarks() { + // GIVEN + var bookmarks = Map.of("neo4j", Set.of(Bookmark.from("TS:000001"))); + + // WHEN + config = BookmarkManagerConfig.builder().withInitialBookmarks(bookmarks).build(); + + // WHEN & THEN + assertEquals(bookmarks, config.initialBookmarks()); + assertNull(config.updateListener()); + assertNull(config.bookmarkSupplier()); + } + + @Test + void shouldReturnUpdateListener() { + // GIVEN + BiConsumer> updateListener = (d, b) -> {}; + + // WHEN + config = BookmarkManagerConfig.builder() + .withUpdateListener(updateListener) + .build(); + + // WHEN & THEN + assertNotNull(config.initialBookmarks()); + assertTrue(config.initialBookmarks().isEmpty()); + assertEquals(updateListener, config.updateListener()); + assertNull(config.bookmarkSupplier()); + } + + @Test + void shouldReturnBookmarkSupplier() { + // GIVEN + var bookmarkSupplier = mock(BookmarkSupplier.class); + + // WHEN + config = BookmarkManagerConfig.builder() + .withBookmarksSupplier(bookmarkSupplier) + .build(); + + // WHEN & THEN + assertNotNull(config.initialBookmarks()); + assertTrue(config.initialBookmarks().isEmpty()); + assertNull(config.updateListener()); + assertEquals(bookmarkSupplier, config.bookmarkSupplier()); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/ReadOnlyBookmarksHolder.java b/driver/src/test/java/org/neo4j/driver/BookmarkManagersTest.java similarity index 56% rename from driver/src/main/java/org/neo4j/driver/internal/ReadOnlyBookmarksHolder.java rename to driver/src/test/java/org/neo4j/driver/BookmarkManagersTest.java index 4447583e30..06c0a09794 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ReadOnlyBookmarksHolder.java +++ b/driver/src/test/java/org/neo4j/driver/BookmarkManagersTest.java @@ -16,28 +16,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.neo4j.driver.internal; +package org.neo4j.driver; -import java.util.Set; -import org.neo4j.driver.Bookmark; +import static org.junit.jupiter.api.Assertions.assertTrue; -/** - * @since 2.0 - */ -public class ReadOnlyBookmarksHolder implements BookmarksHolder { - private final Set bookmarks; +import org.junit.jupiter.api.Test; +import org.neo4j.driver.internal.Neo4jBookmarkManager; - public ReadOnlyBookmarksHolder(Set bookmarks) { - this.bookmarks = bookmarks; - } +class BookmarkManagersTest { + @Test + void shouldCreateDefaultBookmarkManager() { + // GIVEN + var config = BookmarkManagerConfig.builder().build(); - @Override - public Set getBookmarks() { - return bookmarks; - } + // WHEN + var bookmarkManager = BookmarkManagers.defaultManager(config); - @Override - public void setBookmark(Bookmark bookmark) { - // NO_OP + // THEN + assertTrue(bookmarkManager instanceof Neo4jBookmarkManager); } } diff --git a/driver/src/test/java/org/neo4j/driver/ParametersTest.java b/driver/src/test/java/org/neo4j/driver/ParametersTest.java index 6a3bbeeb71..9f8344670c 100644 --- a/driver/src/test/java/org/neo4j/driver/ParametersTest.java +++ b/driver/src/test/java/org/neo4j/driver/ParametersTest.java @@ -34,12 +34,12 @@ import static org.neo4j.driver.internal.util.ValueFactory.emptyRelationshipValue; import static org.neo4j.driver.internal.util.ValueFactory.filledPathValue; +import java.util.Collections; import java.util.stream.Stream; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.neo4j.driver.exceptions.ClientException; -import org.neo4j.driver.internal.DefaultBookmarksHolder; import org.neo4j.driver.internal.InternalRecord; import org.neo4j.driver.internal.InternalSession; import org.neo4j.driver.internal.async.NetworkSession; @@ -107,10 +107,11 @@ private Session mockedSession() { retryLogic, defaultDatabase(), AccessMode.WRITE, - new DefaultBookmarksHolder(), + Collections.emptySet(), null, UNLIMITED_FETCH_SIZE, - DEV_NULL_LOGGING); + DEV_NULL_LOGGING, + mock(BookmarkManager.class)); return new InternalSession(session); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/DefaultBookmarkHolderTest.java b/driver/src/test/java/org/neo4j/driver/internal/DefaultBookmarkHolderTest.java deleted file mode 100644 index 3d883a4e43..0000000000 --- a/driver/src/test/java/org/neo4j/driver/internal/DefaultBookmarkHolderTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.util.Collections; -import java.util.Set; -import org.junit.jupiter.api.Test; -import org.neo4j.driver.Bookmark; - -class DefaultBookmarkHolderTest { - @Test - void shouldAllowToGetAndSetBookmarks() { - BookmarksHolder bookmarkHolder = new DefaultBookmarksHolder(); - assertTrue(bookmarkHolder.getBookmarks().isEmpty()); - - bookmarkHolder.setBookmark(null); - assertTrue(bookmarkHolder.getBookmarks().isEmpty()); - - Bookmark bookmark1 = InternalBookmark.parse("neo4j:bookmark:v1:tx1"); - bookmarkHolder.setBookmark(bookmark1); - assertEquals(Collections.singleton(bookmark1), bookmarkHolder.getBookmarks()); - - bookmarkHolder.setBookmark(null); - assertEquals(Collections.singleton(bookmark1), bookmarkHolder.getBookmarks()); - - bookmarkHolder.setBookmark(InternalBookmark.empty()); - assertEquals(Collections.singleton(bookmark1), bookmarkHolder.getBookmarks()); - - Bookmark bookmark2 = InternalBookmark.parse("neo4j:bookmark:v1:tx2"); - bookmarkHolder.setBookmark(bookmark2); - assertEquals(Collections.singleton(bookmark2), bookmarkHolder.getBookmarks()); - - Bookmark bookmark3 = InternalBookmark.parse("neo4j:bookmark:v1:tx42"); - bookmarkHolder.setBookmark(bookmark3); - assertEquals(Collections.singleton(bookmark3), bookmarkHolder.getBookmarks()); - } - - @Test - void bookmarkCanBeSet() { - BookmarksHolder bookmarkHolder = new DefaultBookmarksHolder(); - Bookmark bookmark = InternalBookmark.parse("neo4j:bookmark:v1:tx100"); - - bookmarkHolder.setBookmark(bookmark); - - assertEquals(Collections.singleton(bookmark), bookmarkHolder.getBookmarks()); - } - - @Test - void shouldNotOverwriteBookmarkWithNull() { - Bookmark initBookmark = InternalBookmark.parse("Cat"); - BookmarksHolder bookmarkHolder = new DefaultBookmarksHolder(Collections.singleton(initBookmark)); - assertEquals(Collections.singleton(initBookmark), bookmarkHolder.getBookmarks()); - bookmarkHolder.setBookmark(null); - assertEquals(Collections.singleton(initBookmark), bookmarkHolder.getBookmarks()); - } - - @Test - void shouldNotOverwriteBookmarkWithEmptyBookmark() { - Set initBookmark = Collections.singleton(InternalBookmark.parse("Cat")); - BookmarksHolder bookmarkHolder = new DefaultBookmarksHolder(initBookmark); - assertEquals(initBookmark, bookmarkHolder.getBookmarks()); - bookmarkHolder.setBookmark(InternalBookmark.empty()); - assertEquals(initBookmark, bookmarkHolder.getBookmarks()); - } -} diff --git a/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java b/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java index 145a2b36d6..c2c7553018 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java @@ -110,7 +110,9 @@ void usesStandardSessionFactoryWhenNothingConfigured(String uri) { createDriver(uri, factory, config); SessionFactory capturedFactory = factory.capturedSessionFactory; - assertThat(capturedFactory.newInstance(SessionConfig.defaultConfig()), instanceOf(NetworkSession.class)); + assertThat( + capturedFactory.newInstance(SessionConfig.defaultConfig(), new NoOpBookmarkManager()), + instanceOf(NetworkSession.class)); } @ParameterizedTest @@ -123,7 +125,7 @@ void usesLeakLoggingSessionFactoryWhenConfigured(String uri) { SessionFactory capturedFactory = factory.capturedSessionFactory; assertThat( - capturedFactory.newInstance(SessionConfig.defaultConfig()), + capturedFactory.newInstance(SessionConfig.defaultConfig(), new NoOpBookmarkManager()), instanceOf(LeakLoggingNetworkSession.class)); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/InternalDriverTest.java b/driver/src/test/java/org/neo4j/driver/internal/InternalDriverTest.java index 44fefb1077..1cbf0bb2bc 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/InternalDriverTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/InternalDriverTest.java @@ -113,7 +113,7 @@ void shouldReturnMetricsIfMetricsEnabled() { private static InternalDriver newDriver(SessionFactory sessionFactory) { return new InternalDriver( - SecurityPlanImpl.insecure(), sessionFactory, DevNullMetricsProvider.INSTANCE, DEV_NULL_LOGGING); + SecurityPlanImpl.insecure(), sessionFactory, DevNullMetricsProvider.INSTANCE, DEV_NULL_LOGGING, null); } private static SessionFactory sessionFactoryMock() { @@ -130,6 +130,6 @@ private static InternalDriver newDriver(boolean isMetricsEnabled) { } MetricsProvider metricsProvider = DriverFactory.getOrCreateMetricsProvider(config, Clock.SYSTEM); - return new InternalDriver(SecurityPlanImpl.insecure(), sessionFactory, metricsProvider, DEV_NULL_LOGGING); + return new InternalDriver(SecurityPlanImpl.insecure(), sessionFactory, metricsProvider, DEV_NULL_LOGGING, null); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/InternalTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/InternalTransactionTest.java index b546b58442..3601c13f5f 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/InternalTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/InternalTransactionTest.java @@ -57,6 +57,7 @@ import org.neo4j.driver.summary.ResultSummary; class InternalTransactionTest { + private static final String DATABASE = "neo4j"; private Connection connection; private Transaction tx; @@ -64,8 +65,11 @@ class InternalTransactionTest { void setUp() { connection = connectionMock(BoltProtocolV4.INSTANCE); ConnectionProvider connectionProvider = mock(ConnectionProvider.class); - when(connectionProvider.acquireConnection(any(ConnectionContext.class))) - .thenReturn(completedFuture(connection)); + when(connectionProvider.acquireConnection(any(ConnectionContext.class))).thenAnswer(invocation -> { + var context = (ConnectionContext) invocation.getArgument(0); + context.databaseNameFuture().complete(DatabaseNameUtil.database(DATABASE)); + return completedFuture(connection); + }); InternalSession session = new InternalSession(newSession(connectionProvider)); tx = session.beginTransaction(); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/Neo4jBookmarkManagerTest.java b/driver/src/test/java/org/neo4j/driver/internal/Neo4jBookmarkManagerTest.java new file mode 100644 index 0000000000..343425b1e1 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/Neo4jBookmarkManagerTest.java @@ -0,0 +1,136 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.mock; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import org.junit.jupiter.api.Test; +import org.neo4j.driver.Bookmark; +import org.neo4j.driver.BookmarkSupplier; + +class Neo4jBookmarkManagerTest { + Neo4jBookmarkManager manager; + + @Test + void shouldRejectNullInitialBookmarks() { + assertThrows(NullPointerException.class, () -> new Neo4jBookmarkManager(null, null, null)); + } + + @Test + void shouldAddInitialBookmarks() { + // GIVEN + var systemBookmarks = Set.of(Bookmark.from("SY:000001")); + var neo4jDatabase = "neo4j"; + var neo4jBookmarks = Set.of(Bookmark.from("NE:000001")); + var initialBookmarks = + Map.of(DatabaseNameUtil.SYSTEM_DATABASE_NAME, systemBookmarks, neo4jDatabase, neo4jBookmarks); + manager = new Neo4jBookmarkManager(initialBookmarks, null, null); + + // WHEN & THEN + assertEquals( + initialBookmarks.values().stream().flatMap(Set::stream).collect(Collectors.toSet()), + manager.getAllBookmarks()); + assertEquals(systemBookmarks, manager.getBookmarks(DatabaseNameUtil.SYSTEM_DATABASE_NAME)); + assertEquals(neo4jBookmarks, manager.getBookmarks(neo4jDatabase)); + } + + @Test + void shouldNotifyUpdateListener() { + // GIVEN + @SuppressWarnings("unchecked") + BiConsumer> updateListener = mock(BiConsumer.class); + manager = new Neo4jBookmarkManager(Collections.emptyMap(), updateListener, null); + var bookmark = Bookmark.from("SY:000001"); + + // WHEN + manager.updateBookmarks(DatabaseNameUtil.SYSTEM_DATABASE_NAME, Collections.emptySet(), Set.of(bookmark)); + + // THEN + then(updateListener).should().accept(DatabaseNameUtil.SYSTEM_DATABASE_NAME, Set.of(bookmark)); + } + + @Test + void shouldUpdateBookmarks() { + // GIVEN + var initialBookmark0 = Bookmark.from("SY:000001"); + var initialBookmark1 = Bookmark.from("SY:000002"); + var initialBookmark2 = Bookmark.from("SY:000003"); + var initialBookmark3 = Bookmark.from("SY:000004"); + var initialBookmark4 = Bookmark.from("SY:000005"); + var initialBookmarks = Map.of( + DatabaseNameUtil.SYSTEM_DATABASE_NAME, + Set.of(initialBookmark0, initialBookmark1, initialBookmark2, initialBookmark3, initialBookmark4)); + manager = new Neo4jBookmarkManager(initialBookmarks, null, null); + var newBookmark = Bookmark.from("SY:000007"); + + // WHEN + manager.updateBookmarks( + DatabaseNameUtil.SYSTEM_DATABASE_NAME, Set.of(initialBookmark2, initialBookmark3), Set.of(newBookmark)); + var bookmarks = manager.getBookmarks(DatabaseNameUtil.SYSTEM_DATABASE_NAME); + + // THEN + assertEquals(Set.of(initialBookmark0, initialBookmark1, initialBookmark4, newBookmark), bookmarks); + } + + @Test + void shouldGetBookmarksFromBookmarkSupplier() { + // GIVEN + var initialBookmark = Bookmark.from("SY:000001"); + var initialBookmarks = Map.of(DatabaseNameUtil.SYSTEM_DATABASE_NAME, Set.of(initialBookmark)); + var bookmarkSupplier = mock(BookmarkSupplier.class); + var supplierBookmark = Bookmark.from("SY:000002"); + given(bookmarkSupplier.getBookmarks(DatabaseNameUtil.SYSTEM_DATABASE_NAME)) + .willReturn(Set.of(supplierBookmark)); + manager = new Neo4jBookmarkManager(initialBookmarks, null, bookmarkSupplier); + + // WHEN + var bookmarks = manager.getBookmarks(DatabaseNameUtil.SYSTEM_DATABASE_NAME); + + // THEN + then(bookmarkSupplier).should().getBookmarks(DatabaseNameUtil.SYSTEM_DATABASE_NAME); + assertEquals(Set.of(initialBookmark, supplierBookmark), bookmarks); + } + + @Test + void shouldGetAllBookmarksFromBookmarkSupplier() { + // GIVEN + var initialBookmark = Bookmark.from("SY:000001"); + var initialBookmarks = Map.of(DatabaseNameUtil.SYSTEM_DATABASE_NAME, Set.of(initialBookmark)); + var bookmarkSupplier = mock(BookmarkSupplier.class); + var supplierBookmark = Bookmark.from("SY:000002"); + given(bookmarkSupplier.getAllBookmarks()).willReturn(Set.of(supplierBookmark)); + manager = new Neo4jBookmarkManager(initialBookmarks, null, bookmarkSupplier); + + // WHEN + var bookmarks = manager.getAllBookmarks(); + + // THEN + then(bookmarkSupplier).should().getAllBookmarks(); + assertEquals(Set.of(initialBookmark, supplierBookmark), bookmarks); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/SessionFactoryImplTest.java b/driver/src/test/java/org/neo4j/driver/internal/SessionFactoryImplTest.java index 8fe6c36812..c776e5ef1e 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/SessionFactoryImplTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/SessionFactoryImplTest.java @@ -26,6 +26,7 @@ import org.junit.jupiter.api.Test; import org.neo4j.driver.AccessMode; +import org.neo4j.driver.BookmarkManager; import org.neo4j.driver.Config; import org.neo4j.driver.internal.async.LeakLoggingNetworkSession; import org.neo4j.driver.internal.async.NetworkSession; @@ -39,11 +40,11 @@ void createsNetworkSessions() { SessionFactory factory = newSessionFactory(config); NetworkSession readSession = factory.newInstance( - builder().withDefaultAccessMode(AccessMode.READ).build()); + builder().withDefaultAccessMode(AccessMode.READ).build(), mock(BookmarkManager.class)); assertThat(readSession, instanceOf(NetworkSession.class)); NetworkSession writeSession = factory.newInstance( - builder().withDefaultAccessMode(AccessMode.WRITE).build()); + builder().withDefaultAccessMode(AccessMode.WRITE).build(), mock(BookmarkManager.class)); assertThat(writeSession, instanceOf(NetworkSession.class)); } @@ -56,11 +57,11 @@ void createsLeakLoggingNetworkSessions() { SessionFactory factory = newSessionFactory(config); NetworkSession readSession = factory.newInstance( - builder().withDefaultAccessMode(AccessMode.READ).build()); + builder().withDefaultAccessMode(AccessMode.READ).build(), mock(BookmarkManager.class)); assertThat(readSession, instanceOf(LeakLoggingNetworkSession.class)); NetworkSession writeSession = factory.newInstance( - builder().withDefaultAccessMode(AccessMode.WRITE).build()); + builder().withDefaultAccessMode(AccessMode.WRITE).build(), mock(BookmarkManager.class)); assertThat(writeSession, instanceOf(LeakLoggingNetworkSession.class)); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncSessionTest.java index 87647ac313..ab8c803fcd 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncSessionTest.java @@ -76,6 +76,7 @@ import org.neo4j.driver.async.ResultCursor; import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.exceptions.SessionExpiredException; +import org.neo4j.driver.internal.DatabaseNameUtil; import org.neo4j.driver.internal.InternalBookmark; import org.neo4j.driver.internal.InternalRecord; import org.neo4j.driver.internal.messaging.v4.BoltProtocolV4; @@ -86,6 +87,7 @@ import org.neo4j.driver.internal.value.IntegerValue; class InternalAsyncSessionTest { + private static final String DATABASE = "neo4j"; private Connection connection; private ConnectionProvider connectionProvider; private AsyncSession asyncSession; @@ -95,8 +97,11 @@ class InternalAsyncSessionTest { void setUp() { connection = connectionMock(BoltProtocolV4.INSTANCE); connectionProvider = mock(ConnectionProvider.class); - when(connectionProvider.acquireConnection(any(ConnectionContext.class))) - .thenReturn(completedFuture(connection)); + when(connectionProvider.acquireConnection(any(ConnectionContext.class))).thenAnswer(invocation -> { + var context = (ConnectionContext) invocation.getArgument(0); + context.databaseNameFuture().complete(DatabaseNameUtil.database(DATABASE)); + return completedFuture(connection); + }); session = newSession(connectionProvider); asyncSession = new InternalAsyncSession(session); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncTransactionTest.java index 7baa011108..a950ee1e03 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncTransactionTest.java @@ -53,6 +53,7 @@ import org.neo4j.driver.Value; import org.neo4j.driver.async.AsyncTransaction; import org.neo4j.driver.async.ResultCursor; +import org.neo4j.driver.internal.DatabaseNameUtil; import org.neo4j.driver.internal.InternalRecord; import org.neo4j.driver.internal.messaging.v4.BoltProtocolV4; import org.neo4j.driver.internal.spi.Connection; @@ -61,6 +62,7 @@ import org.neo4j.driver.summary.ResultSummary; class InternalAsyncTransactionTest { + private static final String DATABASE = "neo4j"; private Connection connection; private NetworkSession networkSession; private InternalAsyncTransaction tx; @@ -69,8 +71,11 @@ class InternalAsyncTransactionTest { void setUp() { connection = connectionMock(BoltProtocolV4.INSTANCE); ConnectionProvider connectionProvider = mock(ConnectionProvider.class); - when(connectionProvider.acquireConnection(any(ConnectionContext.class))) - .thenReturn(completedFuture(connection)); + when(connectionProvider.acquireConnection(any(ConnectionContext.class))).thenAnswer(invocation -> { + var context = (ConnectionContext) invocation.getArgument(0); + context.databaseNameFuture().complete(DatabaseNameUtil.database(DATABASE)); + return completedFuture(connection); + }); networkSession = newSession(connectionProvider); InternalAsyncSession session = new InternalAsyncSession(networkSession); tx = (InternalAsyncTransaction) await(session.beginTransactionAsync()); diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSessionTest.java index f8eb4ca851..a6b1379fe9 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/LeakLoggingNetworkSessionTest.java @@ -33,13 +33,14 @@ import static org.neo4j.driver.testutil.TestUtil.DEFAULT_TEST_PROTOCOL; import java.lang.reflect.Method; +import java.util.Collections; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.mockito.ArgumentCaptor; +import org.neo4j.driver.BookmarkManager; import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; import org.neo4j.driver.TransactionConfig; -import org.neo4j.driver.internal.DefaultBookmarksHolder; import org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionProvider; @@ -96,10 +97,11 @@ private static LeakLoggingNetworkSession newSession(Logging logging, boolean ope new FixedRetryLogic(0), defaultDatabase(), READ, - new DefaultBookmarksHolder(), + Collections.emptySet(), null, FetchSizeUtil.UNLIMITED_FETCH_SIZE, - logging); + logging, + mock(BookmarkManager.class)); } private static ConnectionProvider connectionProviderMock(boolean openConnection) { diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/NetworkSessionTest.java index cd2ba1f7cd..5aba0c63c7 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/NetworkSessionTest.java @@ -63,6 +63,8 @@ import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.async.ResultCursor; import org.neo4j.driver.exceptions.ClientException; +import org.neo4j.driver.internal.DatabaseBookmark; +import org.neo4j.driver.internal.DatabaseNameUtil; import org.neo4j.driver.internal.InternalBookmark; import org.neo4j.driver.internal.messaging.BoltProtocol; import org.neo4j.driver.internal.messaging.request.PullMessage; @@ -72,6 +74,7 @@ import org.neo4j.driver.internal.spi.ConnectionProvider; class NetworkSessionTest { + private static final String DATABASE = "neo4j"; private Connection connection; private ConnectionProvider connectionProvider; private NetworkSession session; @@ -80,8 +83,11 @@ class NetworkSessionTest { void setUp() { connection = connectionMock(BoltProtocolV4.INSTANCE); connectionProvider = mock(ConnectionProvider.class); - when(connectionProvider.acquireConnection(any(ConnectionContext.class))) - .thenReturn(completedFuture(connection)); + when(connectionProvider.acquireConnection(any(ConnectionContext.class))).thenAnswer(invocation -> { + var context = (ConnectionContext) invocation.getArgument(0); + context.databaseNameFuture().complete(DatabaseNameUtil.database(DATABASE)); + return completedFuture(connection); + }); session = newSession(connectionProvider); } @@ -213,7 +219,9 @@ void updatesBookmarkWhenTxIsClosed() { Bookmark bookmarkAfterCommit = InternalBookmark.parse("TheBookmark"); BoltProtocol protocol = spy(BoltProtocolV4.INSTANCE); - doReturn(completedFuture(bookmarkAfterCommit)).when(protocol).commitTransaction(any(Connection.class)); + doReturn(completedFuture(new DatabaseBookmark(DATABASE, bookmarkAfterCommit))) + .when(protocol) + .commitTransaction(any(Connection.class)); when(connection.protocol()).thenReturn(protocol); @@ -259,7 +267,9 @@ void bookmarkIsPropagatedBetweenTransactions() { NetworkSession session = newSession(connectionProvider); BoltProtocol protocol = spy(BoltProtocolV4.INSTANCE); - doReturn(completedFuture(bookmark1), completedFuture(bookmark2)) + doReturn( + completedFuture(new DatabaseBookmark(DATABASE, bookmark1)), + completedFuture(new DatabaseBookmark(DATABASE, bookmark2))) .when(protocol) .commitTransaction(any(Connection.class)); @@ -323,7 +333,11 @@ void shouldRunAfterRunFailure() { RuntimeException error = new RuntimeException("Hi"); when(connectionProvider.acquireConnection(any(ConnectionContext.class))) .thenReturn(failedFuture(error)) - .thenReturn(completedFuture(connection)); + .thenAnswer(invocation -> { + var context = (ConnectionContext) invocation.getArgument(0); + context.databaseNameFuture().complete(DatabaseNameUtil.database(DATABASE)); + return completedFuture(connection); + }); Exception e = assertThrows(Exception.class, () -> run(session, "RETURN 1")); @@ -346,8 +360,16 @@ void shouldRunAfterBeginTxFailureOnBookmark() { Connection connection2 = connectionMock(BoltProtocolV4.INSTANCE); when(connectionProvider.acquireConnection(any(ConnectionContext.class))) - .thenReturn(completedFuture(connection1)) - .thenReturn(completedFuture(connection2)); + .thenAnswer(invocation -> { + var context = (ConnectionContext) invocation.getArgument(0); + context.databaseNameFuture().complete(DatabaseNameUtil.database(DATABASE)); + return completedFuture(connection1); + }) + .thenAnswer(invocation -> { + var context = (ConnectionContext) invocation.getArgument(0); + context.databaseNameFuture().complete(DatabaseNameUtil.database(DATABASE)); + return completedFuture(connection2); + }); Set bookmarks = Collections.singleton(InternalBookmark.parse("neo4j:bookmark:v1:tx42")); NetworkSession session = newSession(connectionProvider, bookmarks); @@ -372,8 +394,16 @@ void shouldBeginTxAfterBeginTxFailureOnBookmark() { Connection connection2 = connectionMock(BoltProtocolV4.INSTANCE); when(connectionProvider.acquireConnection(any(ConnectionContext.class))) - .thenReturn(completedFuture(connection1)) - .thenReturn(completedFuture(connection2)); + .thenAnswer(invocation -> { + var context = (ConnectionContext) invocation.getArgument(0); + context.databaseNameFuture().complete(DatabaseNameUtil.database(DATABASE)); + return completedFuture(connection1); + }) + .thenAnswer(invocation -> { + var context = (ConnectionContext) invocation.getArgument(0); + context.databaseNameFuture().complete(DatabaseNameUtil.database(DATABASE)); + return completedFuture(connection2); + }); Set bookmarks = Collections.singleton(InternalBookmark.parse("neo4j:bookmark:v1:tx42")); NetworkSession session = newSession(connectionProvider, bookmarks); @@ -393,7 +423,11 @@ void shouldBeginTxAfterRunFailureToAcquireConnection() { RuntimeException error = new RuntimeException("Hi"); when(connectionProvider.acquireConnection(any(ConnectionContext.class))) .thenReturn(failedFuture(error)) - .thenReturn(completedFuture(connection)); + .thenAnswer(invocation -> { + var context = (ConnectionContext) invocation.getArgument(0); + context.databaseNameFuture().complete(DatabaseNameUtil.database(DATABASE)); + return completedFuture(connection); + }); Exception e = assertThrows(Exception.class, () -> run(session, "RETURN 1")); assertEquals(error, e); diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java index b586d7efe3..4351556540 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java @@ -69,7 +69,6 @@ import org.neo4j.driver.exceptions.AuthorizationExpiredException; import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.exceptions.ConnectionReadTimeoutException; -import org.neo4j.driver.internal.DefaultBookmarksHolder; import org.neo4j.driver.internal.FailableCursor; import org.neo4j.driver.internal.InternalBookmark; import org.neo4j.driver.internal.messaging.BoltProtocol; @@ -173,8 +172,7 @@ void shouldBeClosedWhenMarkedTerminatedAndClosed() { void shouldReleaseConnectionWhenBeginFails() { RuntimeException error = new RuntimeException("Wrong bookmark!"); Connection connection = connectionWithBegin(handler -> handler.onFailure(error)); - UnmanagedTransaction tx = - new UnmanagedTransaction(connection, new DefaultBookmarksHolder(), UNLIMITED_FETCH_SIZE); + UnmanagedTransaction tx = new UnmanagedTransaction(connection, (ignored) -> {}, UNLIMITED_FETCH_SIZE); Set bookmarks = Collections.singleton(InternalBookmark.parse("SomeBookmark")); TransactionConfig txConfig = TransactionConfig.empty(); @@ -188,8 +186,7 @@ void shouldReleaseConnectionWhenBeginFails() { @Test void shouldNotReleaseConnectionWhenBeginSucceeds() { Connection connection = connectionWithBegin(handler -> handler.onSuccess(emptyMap())); - UnmanagedTransaction tx = - new UnmanagedTransaction(connection, new DefaultBookmarksHolder(), UNLIMITED_FETCH_SIZE); + UnmanagedTransaction tx = new UnmanagedTransaction(connection, (ignored) -> {}, UNLIMITED_FETCH_SIZE); Set bookmarks = Collections.singleton(InternalBookmark.parse("SomeBookmark")); TransactionConfig txConfig = TransactionConfig.empty(); @@ -202,8 +199,7 @@ void shouldNotReleaseConnectionWhenBeginSucceeds() { @Test void shouldReleaseConnectionWhenTerminatedAndCommitted() { Connection connection = connectionMock(); - UnmanagedTransaction tx = - new UnmanagedTransaction(connection, new DefaultBookmarksHolder(), UNLIMITED_FETCH_SIZE); + UnmanagedTransaction tx = new UnmanagedTransaction(connection, (ignored) -> {}, UNLIMITED_FETCH_SIZE); tx.markTerminated(null); @@ -218,8 +214,8 @@ void shouldNotCreateCircularExceptionWhenTerminationCauseEqualsToCursorFailure() Connection connection = connectionMock(); ClientException terminationCause = new ClientException("Custom exception"); ResultCursorsHolder resultCursorsHolder = mockResultCursorWith(terminationCause); - UnmanagedTransaction tx = new UnmanagedTransaction( - connection, new DefaultBookmarksHolder(), UNLIMITED_FETCH_SIZE, resultCursorsHolder); + UnmanagedTransaction tx = + new UnmanagedTransaction(connection, (ignored) -> {}, UNLIMITED_FETCH_SIZE, resultCursorsHolder); tx.markTerminated(terminationCause); @@ -233,8 +229,8 @@ void shouldNotCreateCircularExceptionWhenTerminationCauseDifferentFromCursorFail Connection connection = connectionMock(); ClientException terminationCause = new ClientException("Custom exception"); ResultCursorsHolder resultCursorsHolder = mockResultCursorWith(new ClientException("Cursor error")); - UnmanagedTransaction tx = new UnmanagedTransaction( - connection, new DefaultBookmarksHolder(), UNLIMITED_FETCH_SIZE, resultCursorsHolder); + UnmanagedTransaction tx = + new UnmanagedTransaction(connection, (ignored) -> {}, UNLIMITED_FETCH_SIZE, resultCursorsHolder); tx.markTerminated(terminationCause); @@ -250,8 +246,7 @@ void shouldNotCreateCircularExceptionWhenTerminationCauseDifferentFromCursorFail void shouldNotCreateCircularExceptionWhenTerminatedWithoutFailure() { Connection connection = connectionMock(); ClientException terminationCause = new ClientException("Custom exception"); - UnmanagedTransaction tx = - new UnmanagedTransaction(connection, new DefaultBookmarksHolder(), UNLIMITED_FETCH_SIZE); + UnmanagedTransaction tx = new UnmanagedTransaction(connection, (ignored) -> {}, UNLIMITED_FETCH_SIZE); tx.markTerminated(terminationCause); @@ -264,8 +259,7 @@ void shouldNotCreateCircularExceptionWhenTerminatedWithoutFailure() { @Test void shouldReleaseConnectionWhenTerminatedAndRolledBack() { Connection connection = connectionMock(); - UnmanagedTransaction tx = - new UnmanagedTransaction(connection, new DefaultBookmarksHolder(), UNLIMITED_FETCH_SIZE); + UnmanagedTransaction tx = new UnmanagedTransaction(connection, (ignored) -> {}, UNLIMITED_FETCH_SIZE); tx.markTerminated(null); await(tx.rollbackAsync()); @@ -276,8 +270,7 @@ void shouldReleaseConnectionWhenTerminatedAndRolledBack() { @Test void shouldReleaseConnectionWhenClose() throws Throwable { Connection connection = connectionMock(); - UnmanagedTransaction tx = - new UnmanagedTransaction(connection, new DefaultBookmarksHolder(), UNLIMITED_FETCH_SIZE); + UnmanagedTransaction tx = new UnmanagedTransaction(connection, (ignored) -> {}, UNLIMITED_FETCH_SIZE); await(tx.closeAsync()); @@ -288,8 +281,7 @@ void shouldReleaseConnectionWhenClose() throws Throwable { void shouldReleaseConnectionOnConnectionAuthorizationExpiredExceptionFailure() { AuthorizationExpiredException exception = new AuthorizationExpiredException("code", "message"); Connection connection = connectionWithBegin(handler -> handler.onFailure(exception)); - UnmanagedTransaction tx = - new UnmanagedTransaction(connection, new DefaultBookmarksHolder(), UNLIMITED_FETCH_SIZE); + UnmanagedTransaction tx = new UnmanagedTransaction(connection, (ignored) -> {}, UNLIMITED_FETCH_SIZE); Set bookmarks = Collections.singleton(InternalBookmark.parse("SomeBookmark")); TransactionConfig txConfig = TransactionConfig.empty(); @@ -305,8 +297,7 @@ void shouldReleaseConnectionOnConnectionAuthorizationExpiredExceptionFailure() { void shouldReleaseConnectionOnConnectionReadTimeoutExceptionFailure() { Connection connection = connectionWithBegin(handler -> handler.onFailure(ConnectionReadTimeoutException.INSTANCE)); - UnmanagedTransaction tx = - new UnmanagedTransaction(connection, new DefaultBookmarksHolder(), UNLIMITED_FETCH_SIZE); + UnmanagedTransaction tx = new UnmanagedTransaction(connection, (ignored) -> {}, UNLIMITED_FETCH_SIZE); Set bookmarks = Collections.singleton(InternalBookmark.parse("SomeBookmark")); TransactionConfig txConfig = TransactionConfig.empty(); @@ -336,8 +327,7 @@ void shouldReturnExistingStageOnSimilarCompletingAction( given(connection.protocol()).willReturn(protocol); given(protocolCommit ? protocol.commitTransaction(connection) : protocol.rollbackTransaction(connection)) .willReturn(new CompletableFuture<>()); - UnmanagedTransaction tx = - new UnmanagedTransaction(connection, new DefaultBookmarksHolder(), UNLIMITED_FETCH_SIZE); + UnmanagedTransaction tx = new UnmanagedTransaction(connection, (ignored) -> {}, UNLIMITED_FETCH_SIZE); CompletionStage initialStage = mapTransactionAction(initialAction, tx).get(); @@ -379,8 +369,7 @@ void shouldReturnFailingStageOnConflictingCompletingAction( given(connection.protocol()).willReturn(protocol); given(protocolCommit ? protocol.commitTransaction(connection) : protocol.rollbackTransaction(connection)) .willReturn(protocolActionCompleted ? completedFuture(null) : new CompletableFuture<>()); - UnmanagedTransaction tx = - new UnmanagedTransaction(connection, new DefaultBookmarksHolder(), UNLIMITED_FETCH_SIZE); + UnmanagedTransaction tx = new UnmanagedTransaction(connection, (ignored) -> {}, UNLIMITED_FETCH_SIZE); CompletionStage originalActionStage = mapTransactionAction(initialAction, tx).get(); @@ -423,8 +412,7 @@ void shouldReturnCompletedWithNullStageOnClosingInactiveTransactionExceptCommitt given(connection.protocol()).willReturn(protocol); given(protocolCommit ? protocol.commitTransaction(connection) : protocol.rollbackTransaction(connection)) .willReturn(completedFuture(null)); - UnmanagedTransaction tx = - new UnmanagedTransaction(connection, new DefaultBookmarksHolder(), UNLIMITED_FETCH_SIZE); + UnmanagedTransaction tx = new UnmanagedTransaction(connection, (ignored) -> {}, UNLIMITED_FETCH_SIZE); CompletionStage originalActionStage = mapTransactionAction(originalAction, tx).get(); @@ -472,8 +460,7 @@ private static UnmanagedTransaction beginTx(Connection connection) { } private static UnmanagedTransaction beginTx(Connection connection, Set initialBookmarks) { - UnmanagedTransaction tx = - new UnmanagedTransaction(connection, new DefaultBookmarksHolder(), UNLIMITED_FETCH_SIZE); + UnmanagedTransaction tx = new UnmanagedTransaction(connection, (ignored) -> {}, UNLIMITED_FETCH_SIZE); return await(tx.beginAsync(initialBookmarks, TransactionConfig.empty())); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunnerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunnerTest.java index 34ad146c0d..1b45d26ea5 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunnerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunnerTest.java @@ -39,15 +39,15 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletionStage; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.neo4j.driver.AccessMode; +import org.neo4j.driver.Bookmark; import org.neo4j.driver.Query; import org.neo4j.driver.Record; import org.neo4j.driver.Value; -import org.neo4j.driver.internal.BookmarksHolder; -import org.neo4j.driver.internal.ReadOnlyBookmarksHolder; import org.neo4j.driver.internal.spi.Connection; class MultiDatabasesRoutingProcedureRunnerTest extends AbstractRoutingProcedureRunnerTest { @@ -60,7 +60,7 @@ void shouldCallGetRoutingTableWithEmptyMapOnSystemDatabaseForDatabase(String db) assertTrue(response.isSuccess()); assertEquals(1, response.records().size()); - assertThat(runner.bookmarksHolder, instanceOf(ReadOnlyBookmarksHolder.class)); + assertThat(runner.bookmarks, instanceOf(Set.class)); assertThat(runner.connection.databaseName(), equalTo(systemDatabase())); assertThat(runner.connection.mode(), equalTo(AccessMode.READ)); @@ -80,7 +80,7 @@ void shouldCallGetRoutingTableWithParamOnSystemDatabaseForDatabase(String db) { assertTrue(response.isSuccess()); assertEquals(1, response.records().size()); - assertThat(runner.bookmarksHolder, instanceOf(ReadOnlyBookmarksHolder.class)); + assertThat(runner.bookmarks, instanceOf(Set.class)); assertThat(runner.connection.databaseName(), equalTo(systemDatabase())); assertThat(runner.connection.mode(), equalTo(AccessMode.READ)); @@ -109,7 +109,7 @@ private static class TestRoutingProcedureRunner extends MultiDatabasesRoutingPro final CompletionStage> runProcedureResult; private Connection connection; private Query procedure; - private BookmarksHolder bookmarksHolder; + private Set bookmarks; TestRoutingProcedureRunner(RoutingContext context) { this(context, completedFuture(singletonList(mock(Record.class)))); @@ -121,11 +121,10 @@ private static class TestRoutingProcedureRunner extends MultiDatabasesRoutingPro } @Override - CompletionStage> runProcedure( - Connection connection, Query procedure, BookmarksHolder bookmarksHolder) { + CompletionStage> runProcedure(Connection connection, Query procedure, Set bookmarks) { this.connection = connection; this.procedure = procedure; - this.bookmarksHolder = bookmarksHolder; + this.bookmarks = bookmarks; return runProcedureResult; } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/SingleDatabaseRoutingProcedureRunnerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/SingleDatabaseRoutingProcedureRunnerTest.java index 762891c50b..667eee9f1a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/SingleDatabaseRoutingProcedureRunnerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/SingleDatabaseRoutingProcedureRunnerTest.java @@ -38,17 +38,18 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletionStage; import java.util.stream.Stream; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import org.neo4j.driver.AccessMode; +import org.neo4j.driver.Bookmark; import org.neo4j.driver.Query; import org.neo4j.driver.Record; import org.neo4j.driver.Value; import org.neo4j.driver.exceptions.FatalDiscoveryException; -import org.neo4j.driver.internal.BookmarksHolder; import org.neo4j.driver.internal.spi.Connection; class SingleDatabaseRoutingProcedureRunnerTest extends AbstractRoutingProcedureRunnerTest { @@ -61,7 +62,7 @@ void shouldCallGetRoutingTableWithEmptyMap() { assertTrue(response.isSuccess()); assertEquals(1, response.records().size()); - assertThat(runner.bookmarksHolder, equalTo(BookmarksHolder.NO_OP)); + assertThat(runner.bookmarks, equalTo(Collections.emptySet())); assertThat(runner.connection.databaseName(), equalTo(defaultDatabase())); assertThat(runner.connection.mode(), equalTo(AccessMode.WRITE)); @@ -81,7 +82,7 @@ void shouldCallGetRoutingTableWithParam() { assertTrue(response.isSuccess()); assertEquals(1, response.records().size()); - assertThat(runner.bookmarksHolder, equalTo(BookmarksHolder.NO_OP)); + assertThat(runner.bookmarks, equalTo(Collections.emptySet())); assertThat(runner.connection.databaseName(), equalTo(defaultDatabase())); assertThat(runner.connection.mode(), equalTo(AccessMode.WRITE)); @@ -121,7 +122,7 @@ private static class TestRoutingProcedureRunner extends SingleDatabaseRoutingPro final CompletionStage> runProcedureResult; private Connection connection; private Query procedure; - private BookmarksHolder bookmarksHolder; + private Set bookmarks; TestRoutingProcedureRunner(RoutingContext context) { this(context, completedFuture(singletonList(mock(Record.class)))); @@ -133,11 +134,10 @@ private static class TestRoutingProcedureRunner extends SingleDatabaseRoutingPro } @Override - CompletionStage> runProcedure( - Connection connection, Query procedure, BookmarksHolder bookmarksHolder) { + CompletionStage> runProcedure(Connection connection, Query procedure, Set bookmarks) { this.connection = connection; this.procedure = procedure; - this.bookmarksHolder = bookmarksHolder; + this.bookmarks = bookmarks; return runProcedureResult; } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/CommitTxResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/CommitTxResponseHandlerTest.java index 34e4fc9198..5862c0e4c6 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/CommitTxResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/CommitTxResponseHandlerTest.java @@ -21,26 +21,25 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.neo4j.driver.Values.value; import static org.neo4j.driver.testutil.TestUtil.await; import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.Test; -import org.neo4j.driver.Bookmark; import org.neo4j.driver.Value; +import org.neo4j.driver.internal.DatabaseBookmark; import org.neo4j.driver.internal.InternalBookmark; class CommitTxResponseHandlerTest { - private final CompletableFuture future = new CompletableFuture<>(); + private final CompletableFuture future = new CompletableFuture<>(); private final CommitTxResponseHandler handler = new CommitTxResponseHandler(future); @Test void shouldHandleSuccessWithoutBookmark() { handler.onSuccess(emptyMap()); - assertNull(await(future)); + assertEquals(new DatabaseBookmark(null, null), await(future)); } @Test @@ -49,7 +48,7 @@ void shouldHandleSuccessWithBookmark() { handler.onSuccess(singletonMap("bookmark", value(bookmarkString))); - assertEquals(InternalBookmark.parse(bookmarkString), await(future)); + assertEquals(InternalBookmark.parse(bookmarkString), await(future).bookmark()); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListenerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListenerTest.java index cde0103def..c8eca8d81d 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListenerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/SessionPullResponseCompletionListenerTest.java @@ -27,12 +27,13 @@ import static org.neo4j.driver.Values.value; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import org.junit.jupiter.api.Test; import org.neo4j.driver.Query; import org.neo4j.driver.exceptions.AuthorizationExpiredException; import org.neo4j.driver.exceptions.ConnectionReadTimeoutException; import org.neo4j.driver.internal.BoltServerAddress; -import org.neo4j.driver.internal.BookmarksHolder; +import org.neo4j.driver.internal.DatabaseBookmark; import org.neo4j.driver.internal.InternalBookmark; import org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler; import org.neo4j.driver.internal.messaging.v3.BoltProtocolV3; @@ -45,7 +46,7 @@ class SessionPullResponseCompletionListenerTest { void shouldReleaseConnectionOnSuccess() { Connection connection = newConnectionMock(); PullResponseCompletionListener listener = - new SessionPullResponseCompletionListener(connection, BookmarksHolder.NO_OP); + new SessionPullResponseCompletionListener(connection, (ignored) -> {}); ResponseHandler handler = newHandler(connection, listener); handler.onSuccess(emptyMap()); @@ -57,7 +58,7 @@ void shouldReleaseConnectionOnSuccess() { void shouldReleaseConnectionOnFailure() { Connection connection = newConnectionMock(); PullResponseCompletionListener listener = - new SessionPullResponseCompletionListener(connection, BookmarksHolder.NO_OP); + new SessionPullResponseCompletionListener(connection, (ignored) -> {}); ResponseHandler handler = newHandler(connection, listener); handler.onFailure(new RuntimeException()); @@ -69,21 +70,22 @@ void shouldReleaseConnectionOnFailure() { void shouldUpdateBookmarksOnSuccess() { Connection connection = newConnectionMock(); String bookmarkValue = "neo4j:bookmark:v1:tx42"; - BookmarksHolder bookmarksHolder = mock(BookmarksHolder.class); + @SuppressWarnings("unchecked") + Consumer bookmarkConsumer = mock(Consumer.class); PullResponseCompletionListener listener = - new SessionPullResponseCompletionListener(connection, bookmarksHolder); + new SessionPullResponseCompletionListener(connection, bookmarkConsumer); ResponseHandler handler = newHandler(connection, listener); handler.onSuccess(singletonMap("bookmark", value(bookmarkValue))); - verify(bookmarksHolder).setBookmark(InternalBookmark.parse(bookmarkValue)); + verify(bookmarkConsumer).accept(new DatabaseBookmark(null, InternalBookmark.parse(bookmarkValue))); } @Test void shouldReleaseConnectionImmediatelyOnAuthorizationExpiredExceptionFailure() { Connection connection = newConnectionMock(); PullResponseCompletionListener listener = - new SessionPullResponseCompletionListener(connection, BookmarksHolder.NO_OP); + new SessionPullResponseCompletionListener(connection, (ignored) -> {}); ResponseHandler handler = newHandler(connection, listener); AuthorizationExpiredException exception = new AuthorizationExpiredException("code", "message"); @@ -97,7 +99,7 @@ void shouldReleaseConnectionImmediatelyOnAuthorizationExpiredExceptionFailure() void shouldReleaseConnectionImmediatelyOnConnectionReadTimeoutExceptionFailure() { Connection connection = newConnectionMock(); PullResponseCompletionListener listener = - new SessionPullResponseCompletionListener(connection, BookmarksHolder.NO_OP); + new SessionPullResponseCompletionListener(connection, (ignored) -> {}); ResponseHandler handler = newHandler(connection, listener); handler.onFailure(ConnectionReadTimeoutException.INSTANCE); diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/SessionPullResponseCompletionListenerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/SessionPullResponseCompletionListenerTest.java index 252cb4369d..37e881d736 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/SessionPullResponseCompletionListenerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/SessionPullResponseCompletionListenerTest.java @@ -27,9 +27,10 @@ import java.util.Collections; import java.util.function.BiConsumer; +import java.util.function.Consumer; import org.neo4j.driver.Query; import org.neo4j.driver.Record; -import org.neo4j.driver.internal.BookmarksHolder; +import org.neo4j.driver.internal.DatabaseBookmark; import org.neo4j.driver.internal.handlers.RunResponseHandler; import org.neo4j.driver.internal.handlers.SessionPullResponseCompletionListener; import org.neo4j.driver.internal.messaging.v4.BoltProtocolV4; @@ -44,16 +45,17 @@ protected void shouldHandleSuccessWithSummary(BasicPullResponseHandler.State sta BiConsumer recordConsumer = mock(BiConsumer.class); @SuppressWarnings("unchecked") BiConsumer summaryConsumer = mock(BiConsumer.class); - BookmarksHolder bookmarksHolder = mock(BookmarksHolder.class); + @SuppressWarnings("unchecked") + Consumer bookmarksConsumer = mock(Consumer.class); PullResponseHandler handler = - newSessionResponseHandler(conn, recordConsumer, summaryConsumer, bookmarksHolder, state); + newSessionResponseHandler(conn, recordConsumer, summaryConsumer, bookmarksConsumer, state); // When handler.onSuccess(Collections.emptyMap()); // Then verify(conn).release(); - verify(bookmarksHolder).setBookmark(any()); + verify(bookmarksConsumer).accept(any()); verify(recordConsumer).accept(null, null); verify(summaryConsumer).accept(any(ResultSummary.class), eq(null)); } @@ -85,19 +87,18 @@ protected BasicPullResponseHandler newResponseHandlerWithStatus( BiConsumer recordConsumer, BiConsumer summaryConsumer, BasicPullResponseHandler.State state) { - BookmarksHolder bookmarksHolder = BookmarksHolder.NO_OP; - return newSessionResponseHandler(conn, recordConsumer, summaryConsumer, bookmarksHolder, state); + return newSessionResponseHandler(conn, recordConsumer, summaryConsumer, (ignored) -> {}, state); } private static BasicPullResponseHandler newSessionResponseHandler( Connection conn, BiConsumer recordConsumer, BiConsumer summaryConsumer, - BookmarksHolder bookmarksHolder, + Consumer bookmarkConsumer, BasicPullResponseHandler.State state) { RunResponseHandler runHandler = mock(RunResponseHandler.class); SessionPullResponseCompletionListener listener = - new SessionPullResponseCompletionListener(conn, bookmarksHolder); + new SessionPullResponseCompletionListener(conn, bookmarkConsumer); BasicPullResponseHandler handler = new BasicPullResponseHandler( mock(Query.class), runHandler, conn, BoltProtocolV4.METADATA_EXTRACTOR, listener); diff --git a/driver/src/test/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3Test.java b/driver/src/test/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3Test.java index c695185e58..22a1c883c0 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3Test.java +++ b/driver/src/test/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3Test.java @@ -34,8 +34,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.driver.AccessMode.WRITE; @@ -53,6 +55,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.Consumer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -67,8 +70,7 @@ import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.Value; import org.neo4j.driver.exceptions.ClientException; -import org.neo4j.driver.internal.BookmarksHolder; -import org.neo4j.driver.internal.DefaultBookmarksHolder; +import org.neo4j.driver.internal.DatabaseBookmark; import org.neo4j.driver.internal.InternalBookmark; import org.neo4j.driver.internal.async.UnmanagedTransaction; import org.neo4j.driver.internal.async.connection.ChannelAttributes; @@ -247,10 +249,10 @@ void shouldCommitTransaction() { .when(connection) .writeAndFlush(eq(CommitMessage.COMMIT), any()); - CompletionStage stage = protocol.commitTransaction(connection); + CompletionStage stage = protocol.commitTransaction(connection); verify(connection).writeAndFlush(eq(CommitMessage.COMMIT), any(CommitTxResponseHandler.class)); - assertEquals(InternalBookmark.parse(bookmarkString), await(stage)); + assertEquals(InternalBookmark.parse(bookmarkString), await(stage).bookmark()); } @Test @@ -347,7 +349,8 @@ void shouldNotSupportDatabaseNameForAutoCommitTransactions() { () -> protocol.runInAutoCommitTransaction( connectionMock("foo", protocol), new Query("RETURN 1"), - BookmarksHolder.NO_OP, + Collections.emptySet(), + (ignored) -> {}, TransactionConfig.empty(), UNLIMITED_FETCH_SIZE)); assertThat(e.getMessage(), startsWith("Database name parameter for selecting database is not supported")); @@ -361,7 +364,8 @@ protected void testDatabaseNameSupport(boolean autoCommitTx) { () -> protocol.runInAutoCommitTransaction( connectionMock("foo", protocol), new Query("RETURN 1"), - BookmarksHolder.NO_OP, + Collections.emptySet(), + (ignored) -> {}, TransactionConfig.empty(), UNLIMITED_FETCH_SIZE)); } else { @@ -413,9 +417,8 @@ protected void testRunAndWaitForRunResponse(boolean autoCommitTx, TransactionCon CompletionStage cursorStage; if (autoCommitTx) { - BookmarksHolder bookmarksHolder = new DefaultBookmarksHolder(initialBookmarks); cursorStage = protocol.runInAutoCommitTransaction( - connection, QUERY, bookmarksHolder, config, UNLIMITED_FETCH_SIZE) + connection, QUERY, initialBookmarks, (ignored) -> {}, config, UNLIMITED_FETCH_SIZE) .asyncResult(); } else { cursorStage = protocol.runInUnmanagedTransaction( @@ -439,10 +442,11 @@ connection, QUERY, mock(UnmanagedTransaction.class), UNLIMITED_FETCH_SIZE) protected void testSuccessfulRunInAutoCommitTxWithWaitingForResponse( Set bookmarks, TransactionConfig config, AccessMode mode) throws Exception { Connection connection = connectionMock(mode, protocol); - BookmarksHolder bookmarksHolder = new DefaultBookmarksHolder(bookmarks); + @SuppressWarnings("unchecked") + Consumer bookmarkConsumer = mock(Consumer.class); CompletableFuture cursorFuture = protocol.runInAutoCommitTransaction( - connection, QUERY, bookmarksHolder, config, UNLIMITED_FETCH_SIZE) + connection, QUERY, bookmarks, bookmarkConsumer, config, UNLIMITED_FETCH_SIZE) .asyncResult() .toCompletableFuture(); assertFalse(cursorFuture.isDone()); @@ -452,7 +456,7 @@ protected void testSuccessfulRunInAutoCommitTxWithWaitingForResponse( String newBookmarkValue = "neo4j:bookmark:v1:tx98765"; handlers.runHandler.onSuccess(emptyMap()); handlers.pullAllHandler.onSuccess(singletonMap("bookmark", value(newBookmarkValue))); - assertEquals(Collections.singleton(InternalBookmark.parse(newBookmarkValue)), bookmarksHolder.getBookmarks()); + then(bookmarkConsumer).should().accept(new DatabaseBookmark(null, InternalBookmark.parse(newBookmarkValue))); assertTrue(cursorFuture.isDone()); assertNotNull(cursorFuture.get()); @@ -461,10 +465,11 @@ protected void testSuccessfulRunInAutoCommitTxWithWaitingForResponse( protected void testFailedRunInAutoCommitTxWithWaitingForResponse( Set bookmarks, TransactionConfig config, AccessMode mode) throws Exception { Connection connection = connectionMock(mode, protocol); - BookmarksHolder bookmarksHolder = new DefaultBookmarksHolder(bookmarks); + @SuppressWarnings("unchecked") + Consumer bookmarkConsumer = mock(Consumer.class); CompletableFuture cursorFuture = protocol.runInAutoCommitTransaction( - connection, QUERY, bookmarksHolder, config, UNLIMITED_FETCH_SIZE) + connection, QUERY, bookmarks, bookmarkConsumer, config, UNLIMITED_FETCH_SIZE) .asyncResult() .toCompletableFuture(); assertFalse(cursorFuture.isDone()); @@ -472,7 +477,7 @@ protected void testFailedRunInAutoCommitTxWithWaitingForResponse( ResponseHandler runResponseHandler = verifyRunInvoked(connection, true, bookmarks, config, mode).runHandler; Throwable error = new RuntimeException(); runResponseHandler.onFailure(error); - assertEquals(bookmarks, bookmarksHolder.getBookmarks()); + then(bookmarkConsumer).should(times(0)).accept(any()); assertTrue(cursorFuture.isDone()); Throwable actual = diff --git a/driver/src/test/java/org/neo4j/driver/internal/messaging/v4/BoltProtocolV4Test.java b/driver/src/test/java/org/neo4j/driver/internal/messaging/v4/BoltProtocolV4Test.java index b40ed9484e..58bc14d982 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/messaging/v4/BoltProtocolV4Test.java +++ b/driver/src/test/java/org/neo4j/driver/internal/messaging/v4/BoltProtocolV4Test.java @@ -34,8 +34,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.driver.AccessMode.WRITE; @@ -54,6 +56,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.Consumer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -67,9 +70,8 @@ import org.neo4j.driver.Query; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.Value; -import org.neo4j.driver.internal.BookmarksHolder; +import org.neo4j.driver.internal.DatabaseBookmark; import org.neo4j.driver.internal.DatabaseName; -import org.neo4j.driver.internal.DefaultBookmarksHolder; import org.neo4j.driver.internal.InternalBookmark; import org.neo4j.driver.internal.async.UnmanagedTransaction; import org.neo4j.driver.internal.async.connection.ChannelAttributes; @@ -242,10 +244,10 @@ void shouldCommitTransaction() { .when(connection) .writeAndFlush(eq(CommitMessage.COMMIT), any()); - CompletionStage stage = protocol.commitTransaction(connection); + CompletionStage stage = protocol.commitTransaction(connection); verify(connection).writeAndFlush(eq(CommitMessage.COMMIT), any(CommitTxResponseHandler.class)); - assertEquals(InternalBookmark.parse(bookmarkString), await(stage)); + assertEquals(InternalBookmark.parse(bookmarkString), await(stage).bookmark()); } @Test @@ -339,7 +341,8 @@ void shouldNotSupportDatabaseNameForAutoCommitTransactions() { assertDoesNotThrow(() -> protocol.runInAutoCommitTransaction( connectionMock("foo", protocol), new Query("RETURN 1"), - BookmarksHolder.NO_OP, + Collections.emptySet(), + (ignored) -> {}, TransactionConfig.empty(), UNLIMITED_FETCH_SIZE)); } @@ -360,10 +363,11 @@ protected void testFailedRunInAutoCommitTxWithWaitingForResponse( Set bookmarks, TransactionConfig config, AccessMode mode) throws Exception { // Given Connection connection = connectionMock(mode, protocol); - BookmarksHolder bookmarksHolder = new DefaultBookmarksHolder(bookmarks); + @SuppressWarnings("unchecked") + Consumer bookmarkConsumer = mock(Consumer.class); CompletableFuture cursorFuture = protocol.runInAutoCommitTransaction( - connection, QUERY, bookmarksHolder, config, UNLIMITED_FETCH_SIZE) + connection, QUERY, bookmarks, bookmarkConsumer, config, UNLIMITED_FETCH_SIZE) .asyncResult() .toCompletableFuture(); @@ -375,7 +379,7 @@ protected void testFailedRunInAutoCommitTxWithWaitingForResponse( runHandler.onFailure(error); // Then - assertEquals(bookmarks, bookmarksHolder.getBookmarks()); + then(bookmarkConsumer).should(times(0)).accept(any()); assertTrue(cursorFuture.isDone()); Throwable actual = assertThrows(error.getClass(), () -> await(cursorFuture.get().mapSuccessfulRunCompletionAsync())); @@ -386,10 +390,11 @@ protected void testSuccessfulRunInAutoCommitTxWithWaitingForResponse( Set bookmarks, TransactionConfig config, AccessMode mode) throws Exception { // Given Connection connection = connectionMock(mode, protocol); - BookmarksHolder bookmarksHolder = new DefaultBookmarksHolder(bookmarks); + @SuppressWarnings("unchecked") + Consumer bookmarkConsumer = mock(Consumer.class); CompletableFuture cursorFuture = protocol.runInAutoCommitTransaction( - connection, QUERY, bookmarksHolder, config, UNLIMITED_FETCH_SIZE) + connection, QUERY, bookmarks, bookmarkConsumer, config, UNLIMITED_FETCH_SIZE) .asyncResult() .toCompletableFuture(); @@ -400,7 +405,7 @@ protected void testSuccessfulRunInAutoCommitTxWithWaitingForResponse( runHandler.onSuccess(emptyMap()); // Then - assertEquals(bookmarks, bookmarksHolder.getBookmarks()); + then(bookmarkConsumer).should(times(0)).accept(any()); assertTrue(cursorFuture.isDone()); assertNotNull(cursorFuture.get()); } @@ -445,9 +450,8 @@ protected void testRunAndWaitForRunResponse(boolean autoCommitTx, TransactionCon CompletionStage cursorStage; if (autoCommitTx) { - BookmarksHolder bookmarksHolder = new DefaultBookmarksHolder(initialBookmarks); cursorStage = protocol.runInAutoCommitTransaction( - connection, QUERY, bookmarksHolder, config, UNLIMITED_FETCH_SIZE) + connection, QUERY, initialBookmarks, (ignored) -> {}, config, UNLIMITED_FETCH_SIZE) .asyncResult(); } else { cursorStage = protocol.runInUnmanagedTransaction( @@ -472,7 +476,12 @@ protected void testDatabaseNameSupport(boolean autoCommitTx) { Connection connection = connectionMock("foo", protocol); if (autoCommitTx) { ResultCursorFactory factory = protocol.runInAutoCommitTransaction( - connection, QUERY, BookmarksHolder.NO_OP, TransactionConfig.empty(), UNLIMITED_FETCH_SIZE); + connection, + QUERY, + Collections.emptySet(), + (ignored) -> {}, + TransactionConfig.empty(), + UNLIMITED_FETCH_SIZE); CompletionStage resultStage = factory.asyncResult(); ResponseHandler runHandler = verifySessionRunInvoked( connection, Collections.emptySet(), TransactionConfig.empty(), AccessMode.WRITE, database("foo")); diff --git a/driver/src/test/java/org/neo4j/driver/internal/messaging/v41/BoltProtocolV41Test.java b/driver/src/test/java/org/neo4j/driver/internal/messaging/v41/BoltProtocolV41Test.java index 2375335a11..af8ff9dd9a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/messaging/v41/BoltProtocolV41Test.java +++ b/driver/src/test/java/org/neo4j/driver/internal/messaging/v41/BoltProtocolV41Test.java @@ -34,8 +34,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.driver.AccessMode.WRITE; @@ -54,6 +56,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.Consumer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -67,9 +70,8 @@ import org.neo4j.driver.Query; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.Value; -import org.neo4j.driver.internal.BookmarksHolder; +import org.neo4j.driver.internal.DatabaseBookmark; import org.neo4j.driver.internal.DatabaseName; -import org.neo4j.driver.internal.DefaultBookmarksHolder; import org.neo4j.driver.internal.InternalBookmark; import org.neo4j.driver.internal.async.UnmanagedTransaction; import org.neo4j.driver.internal.async.connection.ChannelAttributes; @@ -246,10 +248,10 @@ void shouldCommitTransaction() { .when(connection) .writeAndFlush(eq(CommitMessage.COMMIT), any()); - CompletionStage stage = protocol.commitTransaction(connection); + CompletionStage stage = protocol.commitTransaction(connection); verify(connection).writeAndFlush(eq(CommitMessage.COMMIT), any(CommitTxResponseHandler.class)); - assertEquals(InternalBookmark.parse(bookmarkString), await(stage)); + assertEquals(InternalBookmark.parse(bookmarkString), await(stage).bookmark()); } @Test @@ -343,7 +345,8 @@ void shouldNotSupportDatabaseNameForAutoCommitTransactions() { assertDoesNotThrow(() -> protocol.runInAutoCommitTransaction( connectionMock("foo", protocol), new Query("RETURN 1"), - BookmarksHolder.NO_OP, + Collections.emptySet(), + (ignored) -> {}, TransactionConfig.empty(), UNLIMITED_FETCH_SIZE)); } @@ -356,10 +359,11 @@ private void testFailedRunInAutoCommitTxWithWaitingForResponse( Set bookmarks, TransactionConfig config, AccessMode mode) throws Exception { // Given Connection connection = connectionMock(mode, protocol); - BookmarksHolder bookmarksHolder = new DefaultBookmarksHolder(bookmarks); + @SuppressWarnings("unchecked") + Consumer bookmarkConsumer = mock(Consumer.class); CompletableFuture cursorFuture = protocol.runInAutoCommitTransaction( - connection, QUERY, bookmarksHolder, config, UNLIMITED_FETCH_SIZE) + connection, QUERY, bookmarks, bookmarkConsumer, config, UNLIMITED_FETCH_SIZE) .asyncResult() .toCompletableFuture(); @@ -371,7 +375,7 @@ private void testFailedRunInAutoCommitTxWithWaitingForResponse( runHandler.onFailure(error); // Then - assertEquals(bookmarks, bookmarksHolder.getBookmarks()); + then(bookmarkConsumer).should(times(0)).accept(any()); assertTrue(cursorFuture.isDone()); Throwable actual = assertThrows(error.getClass(), () -> await(cursorFuture.get().mapSuccessfulRunCompletionAsync())); @@ -382,10 +386,11 @@ private void testSuccessfulRunInAutoCommitTxWithWaitingForResponse( Set bookmarks, TransactionConfig config, AccessMode mode) throws Exception { // Given Connection connection = connectionMock(mode, protocol); - BookmarksHolder bookmarksHolder = new DefaultBookmarksHolder(bookmarks); + @SuppressWarnings("unchecked") + Consumer bookmarkConsumer = mock(Consumer.class); CompletableFuture cursorFuture = protocol.runInAutoCommitTransaction( - connection, QUERY, bookmarksHolder, config, UNLIMITED_FETCH_SIZE) + connection, QUERY, bookmarks, bookmarkConsumer, config, UNLIMITED_FETCH_SIZE) .asyncResult() .toCompletableFuture(); @@ -396,7 +401,7 @@ private void testSuccessfulRunInAutoCommitTxWithWaitingForResponse( runHandler.onSuccess(emptyMap()); // Then - assertEquals(bookmarks, bookmarksHolder.getBookmarks()); + then(bookmarkConsumer).should(times(0)).accept(any()); assertTrue(cursorFuture.isDone()); assertNotNull(cursorFuture.get()); } @@ -440,9 +445,8 @@ private void testRunAndWaitForRunResponse(boolean autoCommitTx, TransactionConfi CompletionStage cursorStage; if (autoCommitTx) { - BookmarksHolder bookmarksHolder = new DefaultBookmarksHolder(initialBookmarks); cursorStage = protocol.runInAutoCommitTransaction( - connection, QUERY, bookmarksHolder, config, UNLIMITED_FETCH_SIZE) + connection, QUERY, initialBookmarks, (ignored) -> {}, config, UNLIMITED_FETCH_SIZE) .asyncResult(); } else { cursorStage = protocol.runInUnmanagedTransaction( @@ -465,7 +469,12 @@ private void testDatabaseNameSupport(boolean autoCommitTx) { Connection connection = connectionMock("foo", protocol); if (autoCommitTx) { ResultCursorFactory factory = protocol.runInAutoCommitTransaction( - connection, QUERY, BookmarksHolder.NO_OP, TransactionConfig.empty(), UNLIMITED_FETCH_SIZE); + connection, + QUERY, + Collections.emptySet(), + (ignored) -> {}, + TransactionConfig.empty(), + UNLIMITED_FETCH_SIZE); CompletionStage resultStage = factory.asyncResult(); ResponseHandler runHandler = verifySessionRunInvoked( connection, Collections.emptySet(), TransactionConfig.empty(), AccessMode.WRITE, database("foo")); diff --git a/driver/src/test/java/org/neo4j/driver/internal/messaging/v42/BoltProtocolV42Test.java b/driver/src/test/java/org/neo4j/driver/internal/messaging/v42/BoltProtocolV42Test.java index 34cfd2cee0..a10b7c237b 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/messaging/v42/BoltProtocolV42Test.java +++ b/driver/src/test/java/org/neo4j/driver/internal/messaging/v42/BoltProtocolV42Test.java @@ -34,8 +34,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.driver.AccessMode.WRITE; @@ -54,6 +56,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.Consumer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -67,9 +70,8 @@ import org.neo4j.driver.Query; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.Value; -import org.neo4j.driver.internal.BookmarksHolder; +import org.neo4j.driver.internal.DatabaseBookmark; import org.neo4j.driver.internal.DatabaseName; -import org.neo4j.driver.internal.DefaultBookmarksHolder; import org.neo4j.driver.internal.InternalBookmark; import org.neo4j.driver.internal.async.UnmanagedTransaction; import org.neo4j.driver.internal.async.connection.ChannelAttributes; @@ -246,10 +248,10 @@ void shouldCommitTransaction() { .when(connection) .writeAndFlush(eq(CommitMessage.COMMIT), any()); - CompletionStage stage = protocol.commitTransaction(connection); + CompletionStage stage = protocol.commitTransaction(connection); verify(connection).writeAndFlush(eq(CommitMessage.COMMIT), any(CommitTxResponseHandler.class)); - assertEquals(InternalBookmark.parse(bookmarkString), await(stage)); + assertEquals(InternalBookmark.parse(bookmarkString), await(stage).bookmark()); } @Test @@ -343,7 +345,8 @@ void shouldNotSupportDatabaseNameForAutoCommitTransactions() { assertDoesNotThrow(() -> protocol.runInAutoCommitTransaction( connectionMock("foo", protocol), new Query("RETURN 1"), - BookmarksHolder.NO_OP, + Collections.emptySet(), + (ignored) -> {}, TransactionConfig.empty(), UNLIMITED_FETCH_SIZE)); } @@ -356,10 +359,11 @@ private void testFailedRunInAutoCommitTxWithWaitingForResponse( Set bookmarks, TransactionConfig config, AccessMode mode) throws Exception { // Given Connection connection = connectionMock(mode, protocol); - BookmarksHolder bookmarksHolder = new DefaultBookmarksHolder(bookmarks); + @SuppressWarnings("unchecked") + Consumer bookmarkConsumer = mock(Consumer.class); CompletableFuture cursorFuture = protocol.runInAutoCommitTransaction( - connection, QUERY, bookmarksHolder, config, UNLIMITED_FETCH_SIZE) + connection, QUERY, bookmarks, bookmarkConsumer, config, UNLIMITED_FETCH_SIZE) .asyncResult() .toCompletableFuture(); @@ -371,7 +375,7 @@ private void testFailedRunInAutoCommitTxWithWaitingForResponse( runHandler.onFailure(error); // Then - assertEquals(bookmarks, bookmarksHolder.getBookmarks()); + then(bookmarkConsumer).should(times(0)).accept(any()); Throwable actual = assertThrows(error.getClass(), () -> await(cursorFuture.get().mapSuccessfulRunCompletionAsync())); assertSame(error, actual); @@ -381,10 +385,11 @@ private void testSuccessfulRunInAutoCommitTxWithWaitingForResponse( Set bookmarks, TransactionConfig config, AccessMode mode) throws Exception { // Given Connection connection = connectionMock(mode, protocol); - BookmarksHolder bookmarksHolder = new DefaultBookmarksHolder(bookmarks); + @SuppressWarnings("unchecked") + Consumer bookmarkConsumer = mock(Consumer.class); CompletableFuture cursorFuture = protocol.runInAutoCommitTransaction( - connection, QUERY, bookmarksHolder, config, UNLIMITED_FETCH_SIZE) + connection, QUERY, bookmarks, bookmarkConsumer, config, UNLIMITED_FETCH_SIZE) .asyncResult() .toCompletableFuture(); @@ -395,7 +400,7 @@ private void testSuccessfulRunInAutoCommitTxWithWaitingForResponse( runHandler.onSuccess(emptyMap()); // Then - assertEquals(bookmarks, bookmarksHolder.getBookmarks()); + then(bookmarkConsumer).should(times(0)).accept(any()); assertTrue(cursorFuture.isDone()); assertNotNull(cursorFuture.get()); } @@ -439,9 +444,8 @@ private void testRunAndWaitForRunResponse(boolean autoCommitTx, TransactionConfi CompletionStage cursorStage; if (autoCommitTx) { - BookmarksHolder bookmarksHolder = new DefaultBookmarksHolder(initialBookmarks); cursorStage = protocol.runInAutoCommitTransaction( - connection, QUERY, bookmarksHolder, config, UNLIMITED_FETCH_SIZE) + connection, QUERY, initialBookmarks, (ignored) -> {}, config, UNLIMITED_FETCH_SIZE) .asyncResult(); } else { cursorStage = protocol.runInUnmanagedTransaction( @@ -466,7 +470,12 @@ private void testDatabaseNameSupport(boolean autoCommitTx) { Connection connection = connectionMock("foo", protocol); if (autoCommitTx) { ResultCursorFactory factory = protocol.runInAutoCommitTransaction( - connection, QUERY, BookmarksHolder.NO_OP, TransactionConfig.empty(), UNLIMITED_FETCH_SIZE); + connection, + QUERY, + Collections.emptySet(), + (ignored) -> {}, + TransactionConfig.empty(), + UNLIMITED_FETCH_SIZE); CompletionStage resultStage = factory.asyncResult(); ResponseHandler runHandler = verifySessionRunInvoked( connection, Collections.emptySet(), TransactionConfig.empty(), AccessMode.WRITE, database("foo")); diff --git a/driver/src/test/java/org/neo4j/driver/internal/messaging/v43/BoltProtocolV43Test.java b/driver/src/test/java/org/neo4j/driver/internal/messaging/v43/BoltProtocolV43Test.java index 6cd1449924..8c3792da58 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/messaging/v43/BoltProtocolV43Test.java +++ b/driver/src/test/java/org/neo4j/driver/internal/messaging/v43/BoltProtocolV43Test.java @@ -34,8 +34,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.driver.AccessMode.WRITE; @@ -54,6 +56,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.Consumer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -67,9 +70,8 @@ import org.neo4j.driver.Query; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.Value; -import org.neo4j.driver.internal.BookmarksHolder; +import org.neo4j.driver.internal.DatabaseBookmark; import org.neo4j.driver.internal.DatabaseName; -import org.neo4j.driver.internal.DefaultBookmarksHolder; import org.neo4j.driver.internal.InternalBookmark; import org.neo4j.driver.internal.async.UnmanagedTransaction; import org.neo4j.driver.internal.async.connection.ChannelAttributes; @@ -245,10 +247,10 @@ void shouldCommitTransaction() { .when(connection) .writeAndFlush(eq(CommitMessage.COMMIT), any()); - CompletionStage stage = protocol.commitTransaction(connection); + CompletionStage stage = protocol.commitTransaction(connection); verify(connection).writeAndFlush(eq(CommitMessage.COMMIT), any(CommitTxResponseHandler.class)); - assertEquals(InternalBookmark.parse(bookmarkString), await(stage)); + assertEquals(InternalBookmark.parse(bookmarkString), await(stage).bookmark()); } @Test @@ -342,7 +344,8 @@ void shouldNotSupportDatabaseNameForAutoCommitTransactions() { assertDoesNotThrow(() -> protocol.runInAutoCommitTransaction( connectionMock("foo", protocol), new Query("RETURN 1"), - BookmarksHolder.NO_OP, + Collections.emptySet(), + (ignored) -> {}, TransactionConfig.empty(), UNLIMITED_FETCH_SIZE)); } @@ -355,10 +358,11 @@ private void testFailedRunInAutoCommitTxWithWaitingForResponse( Set bookmarks, TransactionConfig config, AccessMode mode) throws Exception { // Given Connection connection = connectionMock(mode, protocol); - BookmarksHolder bookmarksHolder = new DefaultBookmarksHolder(bookmarks); + @SuppressWarnings("unchecked") + Consumer bookmarkConsumer = mock(Consumer.class); CompletableFuture cursorFuture = protocol.runInAutoCommitTransaction( - connection, QUERY, bookmarksHolder, config, UNLIMITED_FETCH_SIZE) + connection, QUERY, bookmarks, bookmarkConsumer, config, UNLIMITED_FETCH_SIZE) .asyncResult() .toCompletableFuture(); @@ -370,7 +374,7 @@ private void testFailedRunInAutoCommitTxWithWaitingForResponse( runHandler.onFailure(error); // Then - assertEquals(bookmarks, bookmarksHolder.getBookmarks()); + then(bookmarkConsumer).should(times(0)).accept(any()); assertTrue(cursorFuture.isDone()); Throwable actual = assertThrows(error.getClass(), () -> await(cursorFuture.get().mapSuccessfulRunCompletionAsync())); @@ -381,10 +385,11 @@ private void testSuccessfulRunInAutoCommitTxWithWaitingForResponse( Set bookmarks, TransactionConfig config, AccessMode mode) throws Exception { // Given Connection connection = connectionMock(mode, protocol); - BookmarksHolder bookmarksHolder = new DefaultBookmarksHolder(bookmarks); + @SuppressWarnings("unchecked") + Consumer bookmarkConsumer = mock(Consumer.class); CompletableFuture cursorFuture = protocol.runInAutoCommitTransaction( - connection, QUERY, bookmarksHolder, config, UNLIMITED_FETCH_SIZE) + connection, QUERY, bookmarks, bookmarkConsumer, config, UNLIMITED_FETCH_SIZE) .asyncResult() .toCompletableFuture(); @@ -395,7 +400,7 @@ private void testSuccessfulRunInAutoCommitTxWithWaitingForResponse( runHandler.onSuccess(emptyMap()); // Then - assertEquals(bookmarks, bookmarksHolder.getBookmarks()); + then(bookmarkConsumer).should(times(0)).accept(any()); assertTrue(cursorFuture.isDone()); assertNotNull(cursorFuture.get()); } @@ -439,9 +444,10 @@ private void testRunAndWaitForRunResponse(boolean autoCommitTx, TransactionConfi CompletionStage cursorStage; if (autoCommitTx) { - BookmarksHolder bookmarksHolder = new DefaultBookmarksHolder(initialBookmarks); + @SuppressWarnings("unchecked") + Consumer bookmarkConsumer = mock(Consumer.class); cursorStage = protocol.runInAutoCommitTransaction( - connection, QUERY, bookmarksHolder, config, UNLIMITED_FETCH_SIZE) + connection, QUERY, initialBookmarks, bookmarkConsumer, config, UNLIMITED_FETCH_SIZE) .asyncResult(); } else { cursorStage = protocol.runInUnmanagedTransaction( @@ -466,7 +472,12 @@ private void testDatabaseNameSupport(boolean autoCommitTx) { Connection connection = connectionMock("foo", protocol); if (autoCommitTx) { ResultCursorFactory factory = protocol.runInAutoCommitTransaction( - connection, QUERY, BookmarksHolder.NO_OP, TransactionConfig.empty(), UNLIMITED_FETCH_SIZE); + connection, + QUERY, + Collections.emptySet(), + (ignored) -> {}, + TransactionConfig.empty(), + UNLIMITED_FETCH_SIZE); CompletionStage resultStage = factory.asyncResult(); ResponseHandler runHandler = verifySessionRunInvoked( connection, Collections.emptySet(), TransactionConfig.empty(), AccessMode.WRITE, database("foo")); diff --git a/driver/src/test/java/org/neo4j/driver/internal/messaging/v44/BoltProtocolV44Test.java b/driver/src/test/java/org/neo4j/driver/internal/messaging/v44/BoltProtocolV44Test.java index ef19163ac9..42c9f89acc 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/messaging/v44/BoltProtocolV44Test.java +++ b/driver/src/test/java/org/neo4j/driver/internal/messaging/v44/BoltProtocolV44Test.java @@ -34,8 +34,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.driver.AccessMode.WRITE; @@ -54,6 +56,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.Consumer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -67,9 +70,8 @@ import org.neo4j.driver.Query; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.Value; -import org.neo4j.driver.internal.BookmarksHolder; +import org.neo4j.driver.internal.DatabaseBookmark; import org.neo4j.driver.internal.DatabaseName; -import org.neo4j.driver.internal.DefaultBookmarksHolder; import org.neo4j.driver.internal.InternalBookmark; import org.neo4j.driver.internal.async.UnmanagedTransaction; import org.neo4j.driver.internal.async.connection.ChannelAttributes; @@ -245,10 +247,10 @@ void shouldCommitTransaction() { .when(connection) .writeAndFlush(eq(CommitMessage.COMMIT), any()); - CompletionStage stage = protocol.commitTransaction(connection); + CompletionStage stage = protocol.commitTransaction(connection); verify(connection).writeAndFlush(eq(CommitMessage.COMMIT), any(CommitTxResponseHandler.class)); - assertEquals(InternalBookmark.parse(bookmarkString), await(stage)); + assertEquals(InternalBookmark.parse(bookmarkString), await(stage).bookmark()); } @Test @@ -342,7 +344,8 @@ void shouldNotSupportDatabaseNameForAutoCommitTransactions() { assertDoesNotThrow(() -> protocol.runInAutoCommitTransaction( connectionMock("foo", protocol), new Query("RETURN 1"), - BookmarksHolder.NO_OP, + Collections.emptySet(), + (ignored) -> {}, TransactionConfig.empty(), UNLIMITED_FETCH_SIZE)); } @@ -355,10 +358,11 @@ private void testFailedRunInAutoCommitTxWithWaitingForResponse( Set bookmarks, TransactionConfig config, AccessMode mode) { // Given Connection connection = connectionMock(mode, protocol); - BookmarksHolder bookmarksHolder = new DefaultBookmarksHolder(bookmarks); + @SuppressWarnings("unchecked") + Consumer bookmarkConsumer = mock(Consumer.class); CompletableFuture cursorFuture = protocol.runInAutoCommitTransaction( - connection, QUERY, bookmarksHolder, config, UNLIMITED_FETCH_SIZE) + connection, QUERY, bookmarks, bookmarkConsumer, config, UNLIMITED_FETCH_SIZE) .asyncResult() .toCompletableFuture(); @@ -370,7 +374,7 @@ private void testFailedRunInAutoCommitTxWithWaitingForResponse( runHandler.onFailure(error); // Then - assertEquals(bookmarks, bookmarksHolder.getBookmarks()); + then(bookmarkConsumer).should(times(0)).accept(any()); assertTrue(cursorFuture.isDone()); Throwable actual = assertThrows(error.getClass(), () -> await(cursorFuture.get().mapSuccessfulRunCompletionAsync())); @@ -381,10 +385,11 @@ private void testSuccessfulRunInAutoCommitTxWithWaitingForResponse( Set bookmarks, TransactionConfig config, AccessMode mode) throws Exception { // Given Connection connection = connectionMock(mode, protocol); - BookmarksHolder bookmarksHolder = new DefaultBookmarksHolder(bookmarks); + @SuppressWarnings("unchecked") + Consumer bookmarkConsumer = mock(Consumer.class); CompletableFuture cursorFuture = protocol.runInAutoCommitTransaction( - connection, QUERY, bookmarksHolder, config, UNLIMITED_FETCH_SIZE) + connection, QUERY, bookmarks, bookmarkConsumer, config, UNLIMITED_FETCH_SIZE) .asyncResult() .toCompletableFuture(); @@ -395,7 +400,7 @@ private void testSuccessfulRunInAutoCommitTxWithWaitingForResponse( runHandler.onSuccess(emptyMap()); // Then - assertEquals(bookmarks, bookmarksHolder.getBookmarks()); + then(bookmarkConsumer).should(times(0)).accept(any()); assertTrue(cursorFuture.isDone()); assertNotNull(cursorFuture.get()); } @@ -439,9 +444,8 @@ private void testRunAndWaitForRunResponse(boolean autoCommitTx, TransactionConfi CompletionStage cursorStage; if (autoCommitTx) { - BookmarksHolder bookmarksHolder = new DefaultBookmarksHolder(initialBookmarks); cursorStage = protocol.runInAutoCommitTransaction( - connection, QUERY, bookmarksHolder, config, UNLIMITED_FETCH_SIZE) + connection, QUERY, initialBookmarks, (ignored) -> {}, config, UNLIMITED_FETCH_SIZE) .asyncResult(); } else { cursorStage = protocol.runInUnmanagedTransaction( @@ -466,7 +470,12 @@ private void testDatabaseNameSupport(boolean autoCommitTx) { Connection connection = connectionMock("foo", protocol); if (autoCommitTx) { ResultCursorFactory factory = protocol.runInAutoCommitTransaction( - connection, QUERY, BookmarksHolder.NO_OP, TransactionConfig.empty(), UNLIMITED_FETCH_SIZE); + connection, + QUERY, + Collections.emptySet(), + (ignored) -> {}, + TransactionConfig.empty(), + UNLIMITED_FETCH_SIZE); CompletionStage resultStage = factory.asyncResult(); ResponseHandler runHandler = verifySessionRunInvoked( connection, Collections.emptySet(), TransactionConfig.empty(), AccessMode.WRITE, database("foo")); diff --git a/driver/src/test/java/org/neo4j/driver/internal/messaging/v5/BoltProtocolV5Test.java b/driver/src/test/java/org/neo4j/driver/internal/messaging/v5/BoltProtocolV5Test.java index bdd8143b81..1c9a4967d1 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/messaging/v5/BoltProtocolV5Test.java +++ b/driver/src/test/java/org/neo4j/driver/internal/messaging/v5/BoltProtocolV5Test.java @@ -34,8 +34,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.driver.AccessMode.WRITE; @@ -54,6 +56,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.Consumer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -67,9 +70,8 @@ import org.neo4j.driver.Query; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.Value; -import org.neo4j.driver.internal.BookmarksHolder; +import org.neo4j.driver.internal.DatabaseBookmark; import org.neo4j.driver.internal.DatabaseName; -import org.neo4j.driver.internal.DefaultBookmarksHolder; import org.neo4j.driver.internal.InternalBookmark; import org.neo4j.driver.internal.async.UnmanagedTransaction; import org.neo4j.driver.internal.async.connection.ChannelAttributes; @@ -245,10 +247,10 @@ void shouldCommitTransaction() { .when(connection) .writeAndFlush(eq(CommitMessage.COMMIT), any()); - CompletionStage stage = protocol.commitTransaction(connection); + CompletionStage stage = protocol.commitTransaction(connection); verify(connection).writeAndFlush(eq(CommitMessage.COMMIT), any(CommitTxResponseHandler.class)); - assertEquals(InternalBookmark.parse(bookmarkString), await(stage)); + assertEquals(InternalBookmark.parse(bookmarkString), await(stage).bookmark()); } @Test @@ -342,7 +344,8 @@ void shouldNotSupportDatabaseNameForAutoCommitTransactions() { assertDoesNotThrow(() -> protocol.runInAutoCommitTransaction( connectionMock("foo", protocol), new Query("RETURN 1"), - BookmarksHolder.NO_OP, + Collections.emptySet(), + (ignored) -> {}, TransactionConfig.empty(), UNLIMITED_FETCH_SIZE)); } @@ -355,10 +358,11 @@ private void testFailedRunInAutoCommitTxWithWaitingForResponse( Set bookmarks, TransactionConfig config, AccessMode mode) throws Exception { // Given Connection connection = connectionMock(mode, protocol); - BookmarksHolder bookmarksHolder = new DefaultBookmarksHolder(bookmarks); + @SuppressWarnings("unchecked") + Consumer bookmarkConsumer = mock(Consumer.class); CompletableFuture cursorFuture = protocol.runInAutoCommitTransaction( - connection, QUERY, bookmarksHolder, config, UNLIMITED_FETCH_SIZE) + connection, QUERY, bookmarks, bookmarkConsumer, config, UNLIMITED_FETCH_SIZE) .asyncResult() .toCompletableFuture(); @@ -370,7 +374,7 @@ private void testFailedRunInAutoCommitTxWithWaitingForResponse( runHandler.onFailure(error); // Then - assertEquals(bookmarks, bookmarksHolder.getBookmarks()); + then(bookmarkConsumer).should(times(0)).accept(any()); assertTrue(cursorFuture.isDone()); Throwable actual = assertThrows(error.getClass(), () -> await(cursorFuture.get().mapSuccessfulRunCompletionAsync())); @@ -381,10 +385,11 @@ private void testSuccessfulRunInAutoCommitTxWithWaitingForResponse( Set bookmarks, TransactionConfig config, AccessMode mode) throws Exception { // Given Connection connection = connectionMock(mode, protocol); - BookmarksHolder bookmarksHolder = new DefaultBookmarksHolder(bookmarks); + @SuppressWarnings("unchecked") + Consumer bookmarkConsumer = mock(Consumer.class); CompletableFuture cursorFuture = protocol.runInAutoCommitTransaction( - connection, QUERY, bookmarksHolder, config, UNLIMITED_FETCH_SIZE) + connection, QUERY, bookmarks, bookmarkConsumer, config, UNLIMITED_FETCH_SIZE) .asyncResult() .toCompletableFuture(); @@ -395,7 +400,7 @@ private void testSuccessfulRunInAutoCommitTxWithWaitingForResponse( runHandler.onSuccess(emptyMap()); // Then - assertEquals(bookmarks, bookmarksHolder.getBookmarks()); + then(bookmarkConsumer).should(times(0)).accept(any()); assertTrue(cursorFuture.isDone()); assertNotNull(cursorFuture.get()); } @@ -439,9 +444,8 @@ private void testRunAndWaitForRunResponse(boolean autoCommitTx, TransactionConfi CompletionStage cursorStage; if (autoCommitTx) { - BookmarksHolder bookmarksHolder = new DefaultBookmarksHolder(initialBookmarks); cursorStage = protocol.runInAutoCommitTransaction( - connection, QUERY, bookmarksHolder, config, UNLIMITED_FETCH_SIZE) + connection, QUERY, initialBookmarks, (ignored) -> {}, config, UNLIMITED_FETCH_SIZE) .asyncResult(); } else { cursorStage = protocol.runInUnmanagedTransaction( @@ -466,7 +470,12 @@ private void testDatabaseNameSupport(boolean autoCommitTx) { Connection connection = connectionMock("foo", protocol); if (autoCommitTx) { ResultCursorFactory factory = protocol.runInAutoCommitTransaction( - connection, QUERY, BookmarksHolder.NO_OP, TransactionConfig.empty(), UNLIMITED_FETCH_SIZE); + connection, + QUERY, + Collections.emptySet(), + (ignored) -> {}, + TransactionConfig.empty(), + UNLIMITED_FETCH_SIZE); CompletionStage resultStage = factory.asyncResult(); ResponseHandler runHandler = verifySessionRunInvoked( connection, Collections.emptySet(), TransactionConfig.empty(), AccessMode.WRITE, database("foo")); diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/MetadataExtractorTest.java b/driver/src/test/java/org/neo4j/driver/internal/util/MetadataExtractorTest.java index 290c6aadbe..8aace55e49 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/MetadataExtractorTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/MetadataExtractorTest.java @@ -342,30 +342,30 @@ void shouldBuildResultSummaryWithoutResultConsumedAfter() { void shouldExtractBookmark() { String bookmarkValue = "neo4j:bookmark:v1:tx123456"; - Bookmark bookmark = MetadataExtractor.extractBookmarks(singletonMap("bookmark", value(bookmarkValue))); + Bookmark bookmark = MetadataExtractor.extractBookmark(singletonMap("bookmark", value(bookmarkValue))); assertEquals(InternalBookmark.parse(bookmarkValue), bookmark); } @Test void shouldExtractNoBookmarkWhenMetadataContainsNull() { - Bookmark bookmark = MetadataExtractor.extractBookmarks(singletonMap("bookmark", null)); + Bookmark bookmark = MetadataExtractor.extractBookmark(singletonMap("bookmark", null)); - assertEquals(InternalBookmark.empty(), bookmark); + assertNull(bookmark); } @Test void shouldExtractNoBookmarkWhenMetadataContainsNullValue() { - Bookmark bookmark = MetadataExtractor.extractBookmarks(singletonMap("bookmark", Values.NULL)); + Bookmark bookmark = MetadataExtractor.extractBookmark(singletonMap("bookmark", Values.NULL)); - assertEquals(InternalBookmark.empty(), bookmark); + assertNull(bookmark); } @Test void shouldExtractNoBookmarkWhenMetadataContainsValueOfIncorrectType() { - Bookmark bookmark = MetadataExtractor.extractBookmarks(singletonMap("bookmark", value(42))); + Bookmark bookmark = MetadataExtractor.extractBookmark(singletonMap("bookmark", value(42))); - assertEquals(InternalBookmark.empty(), bookmark); + assertNull(bookmark); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/testutil/TestUtil.java b/driver/src/test/java/org/neo4j/driver/testutil/TestUtil.java index ce52b4b1cd..a102263adc 100644 --- a/driver/src/test/java/org/neo4j/driver/testutil/TestUtil.java +++ b/driver/src/test/java/org/neo4j/driver/testutil/TestUtil.java @@ -75,7 +75,7 @@ import org.neo4j.driver.SessionConfig; import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.internal.BoltServerAddress; -import org.neo4j.driver.internal.DefaultBookmarksHolder; +import org.neo4j.driver.internal.NoOpBookmarkManager; import org.neo4j.driver.internal.async.NetworkSession; import org.neo4j.driver.internal.async.connection.EventLoopGroupFactory; import org.neo4j.driver.internal.handlers.BeginTxResponseHandler; @@ -263,10 +263,11 @@ public static NetworkSession newSession( retryLogic, defaultDatabase(), mode, - new DefaultBookmarksHolder(bookmarks), + bookmarks, null, UNLIMITED_FETCH_SIZE, - DEV_NULL_LOGGING); + DEV_NULL_LOGGING, + new NoOpBookmarkManager()); } public static void verifyRunRx(Connection connection, String query) { diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java index 6334b6e0c0..854dcdc154 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java @@ -57,7 +57,8 @@ public class GetFeatures implements TestkitRequest { "Detail:DefaultSecurityConfigValueEquality", "Optimization:ImplicitDefaultArguments", "Feature:Bolt:Patch:UTC", - "Feature:API:Type.Temporal")); + "Feature:API:Type.Temporal", + "Feature:API:BookmarkManager")); private static final Set SYNC_FEATURES = new HashSet<>(Arrays.asList( "Feature:Bolt:3.0", diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java index 7b4bb52afa..da4aa835bb 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java @@ -24,9 +24,12 @@ import java.net.UnknownHostException; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; @@ -44,6 +47,8 @@ import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import org.neo4j.driver.AuthToken; import org.neo4j.driver.AuthTokens; +import org.neo4j.driver.Bookmark; +import org.neo4j.driver.BookmarkManagers; import org.neo4j.driver.Config; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.DefaultDomainNameResolver; @@ -112,6 +117,20 @@ public TestkitResponse process(TestkitState testkitState) { Optional.ofNullable(data.maxConnectionPoolSize).ifPresent(configBuilder::withMaxConnectionPoolSize); Optional.ofNullable(data.connectionAcquisitionTimeoutMs) .ifPresent(timeout -> configBuilder.withConnectionAcquisitionTimeout(timeout, TimeUnit.MILLISECONDS)); + Optional.ofNullable(data.bookmarkManager).ifPresent(managerConfig -> { + var initialBookmarks = Optional.ofNullable(managerConfig.getInitialBookmarks()) + .orElseGet(Collections::emptyMap) + .entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> e.getValue().stream().map(Bookmark::from).collect(Collectors.toSet()))); + var config = org.neo4j.driver.BookmarkManagerConfig.builder() + .withInitialBookmarks(initialBookmarks) + .build(); + var manager = BookmarkManagers.defaultManager(config); + configBuilder.withBookmarkManager(manager); + }); configBuilder.withDriverMetrics(); org.neo4j.driver.Driver driver; Config config = configBuilder.build(); @@ -286,6 +305,15 @@ public static class NewDriverBody { private Long connectionAcquisitionTimeoutMs; private boolean encrypted; private List trustedCertificates; + private BookmarkManagerConfig bookmarkManager; + } + + @Setter + @Getter + public static class BookmarkManagerConfig { + private Map> initialBookmarks; + private boolean bookmarkSupplier; + private boolean notifyBookmarks; } @RequiredArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java index 1c25738381..82b9edb2b3 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java @@ -83,6 +83,7 @@ protected TestkitResponse createSessionStateAndResponse( Optional.ofNullable(data.database).ifPresent(builder::withDatabase); Optional.ofNullable(data.impersonatedUser).ifPresent(builder::withImpersonatedUser); + Optional.ofNullable(data.ignoreBookmarkManager).ifPresent(builder::withIgnoredBookmarkManager); if (data.getFetchSize() != 0) { builder.withFetchSize(data.getFetchSize()); @@ -124,5 +125,6 @@ public static class NewSessionBody { private String database; private String impersonatedUser; private int fetchSize; + private Boolean ignoreBookmarkManager; } }