-
Notifications
You must be signed in to change notification settings - Fork 155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft of async API #402
Draft of async API #402
Conversation
LG, let's merge! BTW, Should we merge this into a separate branch? or should we merge into main branch? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks really good! Only a couple of comments (even can be ignored :)).
@@ -29,7 +29,7 @@ | |||
*/ | |||
public class ConnectionSettings | |||
{ | |||
private static final String DEFAULT_USER_AGENT = format( "neo4j-java/%s", driverVersion() ); | |||
public static final String DEFAULT_USER_AGENT = format( "neo4j-java/%s", driverVersion() ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very small thing but is this being used elsewhere? Shall we set it as private again?
{ | ||
Map<String,Value> parameters = initialBookmark.asBeginTransactionParameters(); | ||
|
||
connection.run( "BEGIN", parameters, NoOpResponseHandler.INSTANCE ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do these "BEGIN", "COMMIT" and "ROLLBACK" strings deserve their constants?
Collectors are basically response handlers that are queued at the same time as outbound message in a separate queue. Later, when response arrives next collector is popped from the queue to handle it. Collector interface was quite wide and allowed processing parts of the SUCCESS message metadata. This commit turns `Collector` into `ResponseHandler` and makes it only expose three methods to handle SUCCESS, FAILURE and RECORD messages.
Added runtime dependency on netty-all. Introduced a dedicated package for the experimental classes. New `NettyConnection` is able to asynchronously send and receive all types of Bolt messages. All classes are completely disconnected from the rest of the codebase.
Added separate netty based async connection. Made netty channel connector return initialized (after INIT) connection futures.
Made it possible to pass `SecurityPlan` down to the Netty channel bootstrap and add an SSL handler, if needed. This allows running encrypted connection from driver to server. Also fixed compilation problems related to `Collector` to `ResponseHandler` refactoring.
It uses the existing simple async netty-based connection. Introduced a simple async result cursor that only allows iteration over result records. Also added a listenable future interface and implementation which is an extension of default java future but allows listener registration. Such future is needed because default java future only gives ability to block for the result.
Introduced an async connection pool that keeps pools of netty channels by server address. It uses existing netty `FixedChannelPool` internally. Also made `DirectConnectionProvider` use this pool and acquire async connections from it. New pool tries to expose API similar to exsiting `ConnectionPool` interface except acquisition of connection returns an async connection which is backed by a `Future<Channel>`. Active connection tracking does not work because of `ChannelInitializer` conflict in the channel pool.
Connection pool needs to track and expose active network connection count for least-connected load balancing. Tracking and counts were previously broken because of how `ChannelInitializer`s are handled by the netty channel pool. Pool installs it's own channel initializer which got overridden by our own initializer (which adds SSL and some channel attributes). This prevented active channel tracking from noticing a channel creation event and corresponding counter was never decremented. This commit makes our channel initializer aware of active connections tracker which it notifies after new channel is initialized.
Extracted all listeners to separate top level classes and removed unused methods from `AsyncConnection`.
To make sure database cleans up all resources and moves state machine to the READY state.
Fixed error handling and added couple more tests for `Session#runAsync()`.
For each netty-based connection success, failure and record messages are fetched from the network by an event loop thread. They are then handed over to an application/user thread. This commit adds a separate records handler and makes it synchronized to avoid any visibility issues. Also added a simple TCP level back pressure with netty's auto-read. When too many records are buffered event loop will not read more from the network.
Bolt protocol requires clients to acknowledge failures by sending an ACK_FAILURE message. Server will respond with IGNORED to every subsequent request before failure is acknowledge. This commit makes async connection eagerly send an ACK_FAILURE after receiving a FAILURE message. It also improves fatal error handling. All queued response handlers will be failed on fatal error. It's needed because fatal errors are not recoverable and result in channel being closed.
This commit makes session able to start transactions asynchronously. It is then possible to run async queries and async commit/rollback those transactions. Async transactions currently do not track their state and simply send issued commands to the server.
Fixed issues with connection release when async transactions are committer/rolled back. Fused async transaction functionality into `ExplicitTransaction`. Added `Session#closeAsync()`. Moved ACK_FAILURE handling to the place where preceding FAILURE message is received, ack was previously executed too late after user code.
It is renamed to `Task` and has a dedicated listener class that allows users to consume both result and failure.
This commit makes `Session#runAsync()` implementation use futures all the way up. Previously it tried to use `NettyConnection` with future inside. This was problematic because operations in the corresponding event loop were not properly ordered. It also simplifies connection release in result handler. Later now directly releases connections without additional abstraction `ResultResourcesHandler`.
Made it take RUN and PULL_ALL handlers as constructor arguments.
Reimplemented async transactions based on future chaining. This solves issues with message queueing. All messages are queued and executed by event loop which guarantees ordering.
They were boltkit related and fixed by this commit neo4j-drivers/boltkit@730c6b2.
From `netty` to `async` to better represent contained classes.
Commit moves all classes related to netty outbound pipeline to a separate package. It also make code properly handle messages that do not fit in one chunk. Whole message is serialized to a netty buffer primitive by primitive by `ChunkAwareByteBufOutput` which also inserts chunk sizes where necessary.
Removed unnecessary abstraction OutboundMessageWriter. OutboundMessageHandler now accepts message format which is selected after handshake based on protocol version suggested by the server.
Commit moves all classes related to netty inbound pipeline to a separate package. It also adds unit tests for inbound handlers and renames classes to better reflect their purpose.
This commit includes: * switching off byte array support based on the server version after INIT * maxConnectionPoolSize & connectionAcquisitionTimeout are now propagated from Config to the pool * ActiveChannelTracker now keeps sets of all channels so that it can close them when needed; this functionality is currently unused but should be used for routing driver when updated routing table does not contain particular server * BootstrapFactory allows creation of single-threaded bootstraps for tests * server version is now attached to the channel attributes; it is currently unused but is valuable for routing driver to decide which routing procedure to invoke * more unit tests for various components
Renamed `EventLoopAwareFuture` to `InternalFuture` and `EventLoopAwarePromise` to `InternalPromise`. Added unit tests for connection initialization handlers/listeners.
Such attempt will now return a failed future. Also fixed default values for `maxConnectionPoolSize` and `connectionAcquisitionTimeout`.
This commit makes channel bootstrap use and enforce connection timeout, which is propagated from user through the config.
Named future combinators same as those in Java 8 `CompletableFuture` and removed duplicated ones. Also these methods were added directly to `InternalFuture` to improve code readability. Added convenience method to convert `InternalFuture` to `Task` which will become part of the public API.
`ActiveChannelTracker#prune()` renamed to `#purge()`.
This commit adds support for `idleTimeBeforeConnectionTest` with netty channels. Ping/verification added to `NettyChannelHealthChecker` that is used by the channel pool.
Also this commit makes async query execution wait for RUN response before returning cursor back to the user. This means all RUN failures will surface earlier in a future returned by `#runAsync()` and not during consumption of the cursor.
This commit makes result summary exposed as future.
Also renamed related listener from `TaskListener` to `ResponseListener`. Such renaming is needed because `Task` is more of a C# concept and represents slightly different thing in Java. Removed duplicated method from `InternalPromise`.
* close async connection pool when driver creation fails * do not memorize transaction in session until it's successfully started * close async connection in `Session#close()` * relax assertion in connection timeout test because connection to non-routable IP addresses can result in immediate failure and not hanging in some environments
By making `InternalFuture` implement this public interface. Previously a special adapter object was needed. Now we can directly return internal classes.
We decided to keep async API in a separate branch for now. It will get merged into 1.5 a bit later. |
PR adds initial version of async API for direct (only
bolt://
scheme, notbolt+routing://
) driver.Main API contact points:
Response
interface that is aFuture
one can subscribe to withResponseListener
StatementResultCursor
which is an async counterpart ofStatementResult
; it is basically an async iterator with#hasNext()
beingResponse<Boolean> fetchAsync()
and#next()
beingRecord current()
Session#runAsync()
to run queries asynchronously in auto-commit modeSession#beginTransactionAsync()
to start a transaction asynchronouslyTransaction#runAsync()
to run queries asynchronously in the transactionTransaction#commitAsync()
andTransaction#rollbackAsync()
to finish transaction with commit or rollbackSession#closeAsync()
to close session asynchronously and release all resourcesImplementation notes:
compile
but will eventually be shaded into the driver artifactFixedChannelPool
which adds two new config settings:maxConnectionPoolSize
andconnectionAcquisitionTimeoutMillis
Not part of this PR:
Related to #102