diff --git a/build.gradle b/build.gradle index 9feeed809ae89..949ada13fd9dc 100644 --- a/build.gradle +++ b/build.gradle @@ -177,6 +177,7 @@ subprojects { "org.elasticsearch:rest-api-spec:${version}": ':rest-api-spec', "org.elasticsearch:elasticsearch:${version}": ':core', "org.elasticsearch:elasticsearch-cli:${version}": ':core:cli', + "org.elasticsearch:elasticsearch-nio:${version}": ':libs:elasticsearch-nio', "org.elasticsearch.client:elasticsearch-rest-client:${version}": ':client:rest', "org.elasticsearch.client:elasticsearch-rest-client-sniffer:${version}": ':client:sniffer', "org.elasticsearch.client:elasticsearch-rest-high-level-client:${version}": ':client:rest-high-level', diff --git a/core/src/main/java/org/elasticsearch/action/support/AdapterActionFuture.java b/core/src/main/java/org/elasticsearch/action/support/AdapterActionFuture.java index 4c7698e82e04d..e70db0b1f7a73 100644 --- a/core/src/main/java/org/elasticsearch/action/support/AdapterActionFuture.java +++ b/core/src/main/java/org/elasticsearch/action/support/AdapterActionFuture.java @@ -19,13 +19,12 @@ package org.elasticsearch.action.support; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.BaseFuture; -import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException; +import org.elasticsearch.common.util.concurrent.FutureUtils; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -35,14 +34,7 @@ public abstract class AdapterActionFuture extends BaseFuture implements @Override public T actionGet() { - try { - return get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("Future got interrupted", e); - } catch (ExecutionException e) { - throw rethrowExecutionException(e); - } + return FutureUtils.get(this); } @Override @@ -62,33 +54,7 @@ public T actionGet(TimeValue timeout) { @Override public T actionGet(long timeout, TimeUnit unit) { - try { - return get(timeout, unit); - } catch (TimeoutException e) { - throw new ElasticsearchTimeoutException(e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("Future got interrupted", e); - } catch (ExecutionException e) { - throw rethrowExecutionException(e); - } - } - - static RuntimeException rethrowExecutionException(ExecutionException e) { - if (e.getCause() instanceof ElasticsearchException) { - ElasticsearchException esEx = (ElasticsearchException) e.getCause(); - Throwable root = esEx.unwrapCause(); - if (root instanceof ElasticsearchException) { - return (ElasticsearchException) root; - } else if (root instanceof RuntimeException) { - return (RuntimeException) root; - } - return new UncategorizedExecutionException("Failed execution", root); - } else if (e.getCause() instanceof RuntimeException) { - return (RuntimeException) e.getCause(); - } else { - return new UncategorizedExecutionException("Failed execution", e); - } + return FutureUtils.get(this, timeout, unit); } @Override diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/FutureUtils.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/FutureUtils.java index de841634ede67..002ed3751549a 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/FutureUtils.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/FutureUtils.java @@ -19,9 +19,15 @@ package org.elasticsearch.common.util.concurrent; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.support.AdapterActionFuture; import org.elasticsearch.common.SuppressForbidden; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public class FutureUtils { @@ -33,4 +39,60 @@ public static boolean cancel(Future toCancel) { return false; } + /** + * Calls {@link Future#get()} without the checked exceptions. + * + * @param future to dereference + * @param the type returned + * @return the value of the future + */ + public static T get(Future future) { + try { + return future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Future got interrupted", e); + } catch (ExecutionException e) { + throw rethrowExecutionException(e); + } + } + + /** + * Calls {@link Future#get(long, TimeUnit)} without the checked exceptions. + * + * @param future to dereference + * @param timeout to wait + * @param unit for timeout + * @param the type returned + * @return the value of the future + */ + public static T get(Future future, long timeout, TimeUnit unit) { + try { + return future.get(timeout, unit); + } catch (TimeoutException e) { + throw new ElasticsearchTimeoutException(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Future got interrupted", e); + } catch (ExecutionException e) { + throw FutureUtils.rethrowExecutionException(e); + } + } + + public static RuntimeException rethrowExecutionException(ExecutionException e) { + if (e.getCause() instanceof ElasticsearchException) { + ElasticsearchException esEx = (ElasticsearchException) e.getCause(); + Throwable root = esEx.unwrapCause(); + if (root instanceof ElasticsearchException) { + return (ElasticsearchException) root; + } else if (root instanceof RuntimeException) { + return (RuntimeException) root; + } + return new UncategorizedExecutionException("Failed execution", root); + } else if (e.getCause() instanceof RuntimeException) { + return (RuntimeException) e.getCause(); + } else { + return new UncategorizedExecutionException("Failed execution", e); + } + } } diff --git a/core/src/main/resources/org/elasticsearch/bootstrap/test-framework.policy b/core/src/main/resources/org/elasticsearch/bootstrap/test-framework.policy index 539587c409d42..f9a9f06ae55ea 100644 --- a/core/src/main/resources/org/elasticsearch/bootstrap/test-framework.policy +++ b/core/src/main/resources/org/elasticsearch/bootstrap/test-framework.policy @@ -63,6 +63,11 @@ grant codeBase "${codebase.mocksocket}" { permission java.net.SocketPermission "*", "accept,connect"; }; +grant codeBase "${codebase.elasticsearch-nio}" { + // elasticsearch-nio makes and accepts socket connections + permission java.net.SocketPermission "*", "accept,connect"; +}; + grant codeBase "${codebase.elasticsearch-rest-client}" { // rest makes socket connections for rest tests permission java.net.SocketPermission "*", "connect"; diff --git a/libs/elasticsearch-nio/build.gradle b/libs/elasticsearch-nio/build.gradle new file mode 100644 index 0000000000000..1ce16e7ae2a7c --- /dev/null +++ b/libs/elasticsearch-nio/build.gradle @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.elasticsearch.gradle.precommit.PrecommitTasks + +apply plugin: 'elasticsearch.build' +apply plugin: 'nebula.maven-base-publish' +apply plugin: 'nebula.maven-scm' + +archivesBaseName = 'elasticsearch-nio' + +publishing { + publications { + nebula { + artifactId = archivesBaseName + } + } +} + +dependencies { + compile "org.apache.logging.log4j:log4j-api:${versions.log4j}" + + testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}" + testCompile "junit:junit:${versions.junit}" + testCompile "org.hamcrest:hamcrest-all:${versions.hamcrest}" + testCompile("org.elasticsearch.test:framework:${version}") { + exclude group: 'org.elasticsearch', module: 'elasticsearch-nio' + } +} + +forbiddenApisMain { + // elasticsearch-nio does not depend on core, so only jdk signatures should be checked + // es-all is not checked as we connect and accept sockets + signaturesURLs = [PrecommitTasks.getResource('/forbidden/jdk-signatures.txt')] +} + +//JarHell is part of es core, which we don't want to pull in +jarHell.enabled=false + +thirdPartyAudit.excludes = [ + 'org/osgi/framework/AdaptPermission', + 'org/osgi/framework/AdminPermission', + 'org/osgi/framework/Bundle', + 'org/osgi/framework/BundleActivator', + 'org/osgi/framework/BundleContext', + 'org/osgi/framework/BundleEvent', + 'org/osgi/framework/SynchronousBundleListener', + 'org/osgi/framework/wiring/BundleWire', + 'org/osgi/framework/wiring/BundleWiring' +] diff --git a/libs/elasticsearch-nio/licenses/log4j-api-2.9.1.jar.sha1 b/libs/elasticsearch-nio/licenses/log4j-api-2.9.1.jar.sha1 new file mode 100644 index 0000000000000..e1a89fadfed95 --- /dev/null +++ b/libs/elasticsearch-nio/licenses/log4j-api-2.9.1.jar.sha1 @@ -0,0 +1 @@ +7a2999229464e7a324aa503c0a52ec0f05efe7bd \ No newline at end of file diff --git a/libs/elasticsearch-nio/licenses/log4j-api-LICENSE.txt b/libs/elasticsearch-nio/licenses/log4j-api-LICENSE.txt new file mode 100644 index 0000000000000..6279e5206de13 --- /dev/null +++ b/libs/elasticsearch-nio/licenses/log4j-api-LICENSE.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 1999-2005 The Apache Software Foundation + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/libs/elasticsearch-nio/licenses/log4j-api-NOTICE.txt b/libs/elasticsearch-nio/licenses/log4j-api-NOTICE.txt new file mode 100644 index 0000000000000..0375732360047 --- /dev/null +++ b/libs/elasticsearch-nio/licenses/log4j-api-NOTICE.txt @@ -0,0 +1,5 @@ +Apache log4j +Copyright 2007 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). \ No newline at end of file diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AbstractNioChannel.java similarity index 91% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java rename to libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AbstractNioChannel.java index 7b08d831df83e..8285fef6d3985 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AbstractNioChannel.java @@ -17,23 +17,20 @@ * under the License. */ -package org.elasticsearch.transport.nio.channel; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.transport.nio.ESSelector; +package org.elasticsearch.nio; import java.io.IOException; import java.net.InetSocketAddress; -import java.net.StandardSocketOptions; import java.nio.channels.ClosedChannelException; import java.nio.channels.NetworkChannel; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; /** - * This is a basic channel abstraction used by the {@link org.elasticsearch.transport.nio.NioTransport}. + * This is a basic channel abstraction used by the {@link ESSelector}. *

* A channel is open once it is constructed. The channel remains open and {@link #isOpen()} will return * true until the channel is explicitly closed. @@ -138,8 +135,8 @@ public S getRawChannel() { } @Override - public void addCloseListener(ActionListener listener) { - closeContext.whenComplete(ActionListener.toBiConsumer(listener)); + public void addCloseListener(BiConsumer listener) { + closeContext.whenComplete(listener); } // Package visibility for testing diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/AcceptingSelector.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptingSelector.java similarity index 96% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/AcceptingSelector.java rename to libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptingSelector.java index 23775b4bc1665..2cbf7657e5d50 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/AcceptingSelector.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptingSelector.java @@ -17,9 +17,7 @@ * under the License. */ -package org.elasticsearch.transport.nio; - -import org.elasticsearch.transport.nio.channel.NioServerSocketChannel; +package org.elasticsearch.nio; import java.io.IOException; import java.nio.channels.ClosedChannelException; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/AcceptorEventHandler.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptorEventHandler.java similarity index 79% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/AcceptorEventHandler.java rename to libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptorEventHandler.java index ba0fa9356b9eb..a5727d9ef597a 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/AcceptorEventHandler.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptorEventHandler.java @@ -17,14 +17,10 @@ * under the License. */ -package org.elasticsearch.transport.nio; +package org.elasticsearch.nio; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.transport.nio.channel.ChannelFactory; -import org.elasticsearch.transport.nio.channel.NioServerSocketChannel; -import org.elasticsearch.transport.nio.channel.NioSocketChannel; -import org.elasticsearch.transport.nio.channel.SelectionKeyUtils; import java.io.IOException; import java.util.function.Supplier; @@ -47,7 +43,7 @@ public AcceptorEventHandler(Logger logger, Supplier selectorSupp * * @param nioServerSocketChannel that was registered */ - void serverChannelRegistered(NioServerSocketChannel nioServerSocketChannel) { + protected void serverChannelRegistered(NioServerSocketChannel nioServerSocketChannel) { SelectionKeyUtils.setAcceptInterested(nioServerSocketChannel); } @@ -57,7 +53,7 @@ void serverChannelRegistered(NioServerSocketChannel nioServerSocketChannel) { * @param channel that was registered * @param exception that occurred */ - void registrationException(NioServerSocketChannel channel, Exception exception) { + protected void registrationException(NioServerSocketChannel channel, Exception exception) { logger.error(new ParameterizedMessage("failed to register server channel: {}", channel), exception); } @@ -67,8 +63,8 @@ void registrationException(NioServerSocketChannel channel, Exception exception) * * @param nioServerChannel that can accept a connection */ - void acceptChannel(NioServerSocketChannel nioServerChannel) throws IOException { - ChannelFactory channelFactory = nioServerChannel.getChannelFactory(); + protected void acceptChannel(NioServerSocketChannel nioServerChannel) throws IOException { + ChannelFactory channelFactory = nioServerChannel.getChannelFactory(); SocketSelector selector = selectorSupplier.get(); NioSocketChannel nioSocketChannel = channelFactory.acceptNioChannel(nioServerChannel, selector); nioServerChannel.getAcceptContext().accept(nioSocketChannel); @@ -80,7 +76,7 @@ void acceptChannel(NioServerSocketChannel nioServerChannel) throws IOException { * @param nioServerChannel that accepting a connection * @param exception that occurred */ - void acceptException(NioServerSocketChannel nioServerChannel, Exception exception) { + protected void acceptException(NioServerSocketChannel nioServerChannel, Exception exception) { logger.debug(() -> new ParameterizedMessage("exception while accepting new channel from server channel: {}", nioServerChannel), exception); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ChannelFactory.java similarity index 85% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java rename to libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ChannelFactory.java index 97433cf4d0aad..d90927af8b91a 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ChannelFactory.java @@ -17,17 +17,16 @@ * under the License. */ -package org.elasticsearch.transport.nio.channel; - -import org.elasticsearch.mocksocket.PrivilegedSocketAccess; -import org.elasticsearch.transport.nio.AcceptingSelector; -import org.elasticsearch.transport.nio.SocketSelector; +package org.elasticsearch.nio; import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.security.AccessController; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; public abstract class ChannelFactory { @@ -38,7 +37,7 @@ public abstract class ChannelFactory) serverSocketChannel::accept); + } catch (PrivilegedActionException e) { + throw (IOException) e.getCause(); + } + } + + private static void connect(SocketChannel socketChannel, InetSocketAddress remoteAddress) throws IOException { + try { + AccessController.doPrivileged((PrivilegedExceptionAction) () -> socketChannel.connect(remoteAddress)); + } catch (PrivilegedActionException e) { + throw (IOException) e.getCause(); + } + } } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/ESSelector.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ESSelector.java similarity index 91% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/ESSelector.java rename to libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ESSelector.java index 91e308a33b5ff..ed566ffa7daf8 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/ESSelector.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ESSelector.java @@ -17,10 +17,7 @@ * under the License. */ -package org.elasticsearch.transport.nio; - -import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.transport.nio.channel.NioChannel; +package org.elasticsearch.nio; import java.io.Closeable; import java.io.IOException; @@ -30,17 +27,19 @@ import java.nio.channels.Selector; import java.util.Iterator; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; /** - * This is a basic selector abstraction used by {@link org.elasticsearch.transport.nio.NioTransport}. This - * selector wraps a raw nio {@link Selector}. When you call {@link #runLoop()}, the selector will run until - * {@link #close()} is called. This instance handles closing of channels. Users should call - * {@link #queueChannelClose(NioChannel)} to schedule a channel for close by this selector. + * This is a basic selector abstraction. This selector wraps a raw nio {@link Selector}. When you call + * {@link #runLoop()}, the selector will run until {@link #close()} is called. This instance handles closing + * of channels. Users should call {@link #queueChannelClose(NioChannel)} to schedule a channel for close by + * this selector. *

* Children of this class should implement the specific {@link #processKey(SelectionKey)}, * {@link #preSelect()}, and {@link #cleanup()} functionality. @@ -54,7 +53,7 @@ public abstract class ESSelector implements Closeable { private final ReentrantLock runLock = new ReentrantLock(); private final CountDownLatch exitedLoop = new CountDownLatch(1); private final AtomicBoolean isClosed = new AtomicBoolean(false); - private final PlainActionFuture isRunningFuture = PlainActionFuture.newFuture(); + private final CompletableFuture isRunningFuture = new CompletableFuture<>(); private volatile Thread thread; ESSelector(EventHandler eventHandler) throws IOException { @@ -71,7 +70,7 @@ public abstract class ESSelector implements Closeable { */ public void runLoop() { if (runLock.tryLock()) { - isRunningFuture.onResponse(true); + isRunningFuture.complete(null); try { setThread(); while (isOpen()) { @@ -205,7 +204,7 @@ public boolean isRunning() { return runLock.isLocked(); } - public PlainActionFuture isRunningFuture() { + public Future isRunningFuture() { return isRunningFuture; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/EventHandler.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/EventHandler.java similarity index 86% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/EventHandler.java rename to libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/EventHandler.java index 8521f71616266..42bc0555d509c 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/EventHandler.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/EventHandler.java @@ -17,11 +17,10 @@ * under the License. */ -package org.elasticsearch.transport.nio; +package org.elasticsearch.nio; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.transport.nio.channel.NioChannel; import java.io.IOException; import java.nio.channels.Selector; @@ -30,7 +29,7 @@ public abstract class EventHandler { protected final Logger logger; - public EventHandler(Logger logger) { + EventHandler(Logger logger) { this.logger = logger; } @@ -39,7 +38,7 @@ public EventHandler(Logger logger) { * * @param exception the exception */ - void selectException(IOException exception) { + protected void selectException(IOException exception) { logger.warn(new ParameterizedMessage("io exception during select [thread={}]", Thread.currentThread().getName()), exception); } @@ -48,7 +47,7 @@ void selectException(IOException exception) { * * @param exception the exception */ - void closeSelectorException(IOException exception) { + protected void closeSelectorException(IOException exception) { logger.warn(new ParameterizedMessage("io exception while closing selector [thread={}]", Thread.currentThread().getName()), exception); } @@ -58,7 +57,7 @@ void closeSelectorException(IOException exception) { * * @param exception that was uncaught */ - void uncaughtException(Exception exception) { + protected void uncaughtException(Exception exception) { Thread thread = Thread.currentThread(); thread.getUncaughtExceptionHandler().uncaughtException(thread, exception); } @@ -68,7 +67,7 @@ void uncaughtException(Exception exception) { * * @param channel that should be closed */ - void handleClose(NioChannel channel) { + protected void handleClose(NioChannel channel) { try { channel.closeFromSelector(); } catch (IOException e) { @@ -83,7 +82,7 @@ void handleClose(NioChannel channel) { * @param channel that was being closed * @param exception that occurred */ - void closeException(NioChannel channel, Exception exception) { + protected void closeException(NioChannel channel, Exception exception) { logger.debug(() -> new ParameterizedMessage("exception while closing channel: {}", channel), exception); } @@ -95,7 +94,7 @@ void closeException(NioChannel channel, Exception exception) { * @param channel that caused the exception * @param exception that was thrown */ - void genericChannelException(NioChannel channel, Exception exception) { + protected void genericChannelException(NioChannel channel, Exception exception) { logger.debug(() -> new ParameterizedMessage("exception while handling event for channel: {}", channel), exception); } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java similarity index 94% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java rename to libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java index 11d340a30a0c8..07b6b68908bd1 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java @@ -17,11 +17,9 @@ * under the License. */ -package org.elasticsearch.transport.nio; +package org.elasticsearch.nio; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.nio.utils.ExceptionsHelper; import java.nio.ByteBuffer; import java.util.ArrayDeque; @@ -37,9 +35,9 @@ * the pages internally. If more space is needed at the end of the buffer {@link #ensureCapacity(long)} can * be called and the buffer will expand using the supplier provided. */ -public final class InboundChannelBuffer implements Releasable { +public final class InboundChannelBuffer implements AutoCloseable { - private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE; + private static final int PAGE_SIZE = 1 << 14; private static final int PAGE_MASK = PAGE_SIZE - 1; private static final int PAGE_SHIFT = Integer.numberOfTrailingZeros(PAGE_SIZE); private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0]; @@ -226,20 +224,19 @@ private int indexInPage(long index) { return (int) (index & PAGE_MASK); } - public static class Page implements Releasable { + public static class Page implements AutoCloseable { private final ByteBuffer byteBuffer; - private final Releasable releasable; + private final Runnable closeable; - public Page(ByteBuffer byteBuffer, Releasable releasable) { + public Page(ByteBuffer byteBuffer, Runnable closeable) { this.byteBuffer = byteBuffer; - this.releasable = releasable; + this.closeable = closeable; } @Override public void close() { - releasable.close(); + closeable.run(); } - } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioChannel.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioChannel.java similarity index 88% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioChannel.java rename to libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioChannel.java index 93bc4faa4c5d5..433ec204e8684 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioChannel.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioChannel.java @@ -17,16 +17,14 @@ * under the License. */ -package org.elasticsearch.transport.nio.channel; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.transport.nio.ESSelector; +package org.elasticsearch.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; import java.nio.channels.NetworkChannel; import java.nio.channels.SelectionKey; +import java.util.function.BiConsumer; public interface NioChannel { @@ -53,5 +51,5 @@ public interface NioChannel { * * @param listener to be called at close */ - void addCloseListener(ActionListener listener); + void addCloseListener(BiConsumer listener); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioGroup.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioGroup.java similarity index 80% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/NioGroup.java rename to libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioGroup.java index b0e1862c706ca..109d22c45fd95 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioGroup.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioGroup.java @@ -17,17 +17,16 @@ * under the License. */ -package org.elasticsearch.transport.nio; +package org.elasticsearch.nio; import org.apache.logging.log4j.Logger; -import org.apache.lucene.util.IOUtils; -import org.elasticsearch.transport.nio.channel.ChannelFactory; -import org.elasticsearch.transport.nio.channel.NioServerSocketChannel; -import org.elasticsearch.transport.nio.channel.NioSocketChannel; +import org.elasticsearch.nio.utils.ExceptionsHelper; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; @@ -108,7 +107,16 @@ public S openChannel(InetSocketAddress address, Cha @Override public void close() throws IOException { if (isOpen.compareAndSet(true, false)) { - IOUtils.close(Stream.concat(acceptors.stream(), socketSelectors.stream()).collect(Collectors.toList())); + List toClose = Stream.concat(acceptors.stream(), socketSelectors.stream()).collect(Collectors.toList()); + List closingExceptions = new ArrayList<>(); + for (ESSelector selector : toClose) { + try { + selector.close(); + } catch (IOException e) { + closingExceptions.add(e); + } + } + ExceptionsHelper.rethrowAndSuppress(closingExceptions); } } @@ -116,7 +124,18 @@ private static void startSelectors(Iterable selectors, for (ESSelector acceptor : selectors) { if (acceptor.isRunning() == false) { threadFactory.newThread(acceptor::runLoop).start(); - acceptor.isRunningFuture().actionGet(); + try { + acceptor.isRunningFuture().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Interrupted while waiting for selector to start.", e); + } catch (ExecutionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else { + throw new RuntimeException("Exception during selector start.", e); + } + } } } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannel.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioServerSocketChannel.java similarity index 87% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannel.java rename to libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioServerSocketChannel.java index ffbd8f7a9874e..8eb904dc74179 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannel.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioServerSocketChannel.java @@ -17,9 +17,7 @@ * under the License. */ -package org.elasticsearch.transport.nio.channel; - -import org.elasticsearch.transport.nio.AcceptingSelector; +package org.elasticsearch.nio; import java.io.IOException; import java.nio.channels.ServerSocketChannel; @@ -27,16 +25,16 @@ public class NioServerSocketChannel extends AbstractNioChannel { - private final ChannelFactory channelFactory; + private final ChannelFactory channelFactory; private Consumer acceptContext; - public NioServerSocketChannel(ServerSocketChannel socketChannel, ChannelFactory channelFactory, AcceptingSelector selector) + public NioServerSocketChannel(ServerSocketChannel socketChannel, ChannelFactory channelFactory, AcceptingSelector selector) throws IOException { super(socketChannel, selector); this.channelFactory = channelFactory; } - public ChannelFactory getChannelFactory() { + public ChannelFactory getChannelFactory() { return channelFactory; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioSocketChannel.java similarity index 94% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java rename to libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioSocketChannel.java index 06f8aec627936..a6d6c6412ed72 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioSocketChannel.java @@ -17,11 +17,7 @@ * under the License. */ -package org.elasticsearch.transport.nio.channel; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.transport.nio.InboundChannelBuffer; -import org.elasticsearch.transport.nio.SocketSelector; +package org.elasticsearch.nio; import java.io.IOException; import java.net.InetSocketAddress; @@ -157,8 +153,8 @@ public boolean finishConnect() throws IOException { return isConnected; } - public void addConnectListener(ActionListener listener) { - connectContext.whenComplete(ActionListener.toBiConsumer(listener)); + public void addConnectListener(BiConsumer listener) { + connectContext.whenComplete(listener); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ReadContext.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ReadContext.java similarity index 95% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ReadContext.java rename to libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ReadContext.java index 243a6d8b239c2..cc9d2c8c43d69 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ReadContext.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ReadContext.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.transport.nio.channel; +package org.elasticsearch.nio; import java.io.IOException; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/RoundRobinSupplier.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/RoundRobinSupplier.java similarity index 93% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/RoundRobinSupplier.java rename to libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/RoundRobinSupplier.java index 395b955f7ab36..311403a48857b 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/RoundRobinSupplier.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/RoundRobinSupplier.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.transport.nio; +package org.elasticsearch.nio; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -28,7 +28,7 @@ public class RoundRobinSupplier implements Supplier { private final int count; private AtomicInteger counter = new AtomicInteger(0); - public RoundRobinSupplier(S[] selectors) { + RoundRobinSupplier(S[] selectors) { this.count = selectors.length; this.selectors = selectors; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/SelectionKeyUtils.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SelectionKeyUtils.java similarity index 97% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/channel/SelectionKeyUtils.java rename to libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SelectionKeyUtils.java index b0cf5552064fd..b6272ce713501 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/SelectionKeyUtils.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SelectionKeyUtils.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.transport.nio.channel; +package org.elasticsearch.nio; import java.nio.channels.CancelledKeyException; import java.nio.channels.SelectionKey; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/SocketEventHandler.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketEventHandler.java similarity index 83% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/SocketEventHandler.java rename to libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketEventHandler.java index 799e81dd4a413..d3be18f377638 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/SocketEventHandler.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketEventHandler.java @@ -17,17 +17,13 @@ * under the License. */ -package org.elasticsearch.transport.nio; +package org.elasticsearch.nio; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.transport.nio.channel.NioChannel; -import org.elasticsearch.transport.nio.channel.NioSocketChannel; -import org.elasticsearch.transport.nio.channel.SelectionKeyUtils; -import org.elasticsearch.transport.nio.channel.WriteContext; import java.io.IOException; +import java.util.function.BiConsumer; /** * Event handler designed to handle events from non-server sockets @@ -47,7 +43,7 @@ public SocketEventHandler(Logger logger) { * * @param channel that was registered */ - void handleRegistration(NioSocketChannel channel) { + protected void handleRegistration(NioSocketChannel channel) { SelectionKeyUtils.setConnectAndReadInterested(channel); } @@ -57,7 +53,7 @@ void handleRegistration(NioSocketChannel channel) { * @param channel that was registered * @param exception that occurred */ - void registrationException(NioSocketChannel channel, Exception exception) { + protected void registrationException(NioSocketChannel channel, Exception exception) { logger.debug(() -> new ParameterizedMessage("failed to register socket channel: {}", channel), exception); exceptionCaught(channel, exception); } @@ -68,7 +64,7 @@ void registrationException(NioSocketChannel channel, Exception exception) { * * @param channel that was registered */ - void handleConnect(NioSocketChannel channel) { + protected void handleConnect(NioSocketChannel channel) { SelectionKeyUtils.removeConnectInterested(channel); } @@ -78,7 +74,7 @@ void handleConnect(NioSocketChannel channel) { * @param channel that was connecting * @param exception that occurred */ - void connectException(NioSocketChannel channel, Exception exception) { + protected void connectException(NioSocketChannel channel, Exception exception) { logger.debug(() -> new ParameterizedMessage("failed to connect to socket channel: {}", channel), exception); exceptionCaught(channel, exception); } @@ -89,7 +85,7 @@ void connectException(NioSocketChannel channel, Exception exception) { * * @param channel that can be read */ - void handleRead(NioSocketChannel channel) throws IOException { + protected void handleRead(NioSocketChannel channel) throws IOException { int bytesRead = channel.getReadContext().read(); if (bytesRead == -1) { handleClose(channel); @@ -102,7 +98,7 @@ void handleRead(NioSocketChannel channel) throws IOException { * @param channel that was being read * @param exception that occurred */ - void readException(NioSocketChannel channel, Exception exception) { + protected void readException(NioSocketChannel channel, Exception exception) { logger.debug(() -> new ParameterizedMessage("exception while reading from socket channel: {}", channel), exception); exceptionCaught(channel, exception); } @@ -113,7 +109,7 @@ void readException(NioSocketChannel channel, Exception exception) { * * @param channel that can be read */ - void handleWrite(NioSocketChannel channel) throws IOException { + protected void handleWrite(NioSocketChannel channel) throws IOException { WriteContext channelContext = channel.getWriteContext(); channelContext.flushChannel(); if (channelContext.hasQueuedWriteOps()) { @@ -129,7 +125,7 @@ void handleWrite(NioSocketChannel channel) throws IOException { * @param channel that was being written to * @param exception that occurred */ - void writeException(NioSocketChannel channel, Exception exception) { + protected void writeException(NioSocketChannel channel, Exception exception) { logger.debug(() -> new ParameterizedMessage("exception while writing to socket channel: {}", channel), exception); exceptionCaught(channel, exception); } @@ -142,7 +138,7 @@ void writeException(NioSocketChannel channel, Exception exception) { * @param channel that caused the exception * @param exception that was thrown */ - void genericChannelException(NioChannel channel, Exception exception) { + protected void genericChannelException(NioChannel channel, Exception exception) { super.genericChannelException(channel, exception); exceptionCaught((NioSocketChannel) channel, exception); } @@ -153,7 +149,7 @@ void genericChannelException(NioChannel channel, Exception exception) { * @param listener that was called * @param exception that occurred */ - void listenerException(ActionListener listener, Exception exception) { + protected void listenerException(BiConsumer listener, Exception exception) { logger.warn(new ParameterizedMessage("exception while executing listener: {}", listener), exception); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/SocketSelector.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketSelector.java similarity index 93% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/SocketSelector.java rename to libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketSelector.java index 12bc51914d693..ac8ad87b726a2 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/SocketSelector.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketSelector.java @@ -17,12 +17,7 @@ * under the License. */ -package org.elasticsearch.transport.nio; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.transport.nio.channel.NioSocketChannel; -import org.elasticsearch.transport.nio.channel.SelectionKeyUtils; -import org.elasticsearch.transport.nio.channel.WriteContext; +package org.elasticsearch.nio; import java.io.IOException; import java.nio.channels.ClosedChannelException; @@ -30,6 +25,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.BiConsumer; /** * Selector implementation that handles {@link NioSocketChannel}. It's main piece of functionality is @@ -140,10 +136,10 @@ public void queueWriteInChannelBuffer(WriteOperation writeOperation) { * @param listener to be executed * @param value to provide to listener */ - public void executeListener(ActionListener listener, V value) { + public void executeListener(BiConsumer listener, V value) { assert isOnCurrentThread() : "Must be on selector thread"; try { - listener.onResponse(value); + listener.accept(value, null); } catch (Exception e) { eventHandler.listenerException(listener, e); } @@ -156,10 +152,10 @@ public void executeListener(ActionListener listener, V value) { * @param listener to be executed * @param exception to provide to listener */ - public void executeFailedListener(ActionListener listener, Exception exception) { + public void executeFailedListener(BiConsumer listener, Exception exception) { assert isOnCurrentThread() : "Must be on selector thread"; try { - listener.onFailure(exception); + listener.accept(null, exception); } catch (Exception e) { eventHandler.listenerException(listener, e); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/WriteContext.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/WriteContext.java similarity index 78% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/channel/WriteContext.java rename to libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/WriteContext.java index 718b7daf8c628..39e69e8f9a94e 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/WriteContext.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/WriteContext.java @@ -17,17 +17,14 @@ * under the License. */ -package org.elasticsearch.transport.nio.channel; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.transport.nio.WriteOperation; +package org.elasticsearch.nio; import java.io.IOException; +import java.util.function.BiConsumer; public interface WriteContext { - void sendMessage(BytesReference reference, ActionListener listener); + void sendMessage(Object message, BiConsumer listener); void queueWriteOperations(WriteOperation writeOperation); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/WriteOperation.java similarity index 65% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java rename to libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/WriteOperation.java index 1b2f2cfede4f0..b6fcc838a964f 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/WriteOperation.java @@ -17,33 +17,26 @@ * under the License. */ -package org.elasticsearch.transport.nio; - -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.BytesRefIterator; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.transport.nio.channel.NioSocketChannel; +package org.elasticsearch.nio; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Arrays; +import java.util.function.BiConsumer; public class WriteOperation { private final NioSocketChannel channel; - private final ActionListener listener; + private final BiConsumer listener; private final ByteBuffer[] buffers; private final int[] offsets; private final int length; private int internalIndex; - public WriteOperation(NioSocketChannel channel, BytesReference bytesReference, ActionListener listener) { + public WriteOperation(NioSocketChannel channel, ByteBuffer[] buffers, BiConsumer listener) { this.channel = channel; this.listener = listener; - this.buffers = toByteBuffers(bytesReference); + this.buffers = buffers; this.offsets = new int[buffers.length]; int offset = 0; for (int i = 0; i < buffers.length; i++) { @@ -58,7 +51,7 @@ public ByteBuffer[] getByteBuffers() { return buffers; } - public ActionListener getListener() { + public BiConsumer getListener() { return listener; } @@ -96,21 +89,4 @@ private int getOffsetIndex(int offset) { final int i = Arrays.binarySearch(offsets, offset); return i < 0 ? (-(i + 1)) - 1 : i; } - - private static ByteBuffer[] toByteBuffers(BytesReference bytesReference) { - BytesRefIterator byteRefIterator = bytesReference.iterator(); - BytesRef r; - try { - // Most network messages are composed of three buffers. - ArrayList buffers = new ArrayList<>(3); - while ((r = byteRefIterator.next()) != null) { - buffers.add(ByteBuffer.wrap(r.bytes, r.offset, r.length)); - } - return buffers.toArray(new ByteBuffer[buffers.size()]); - - } catch (IOException e) { - // this is really an error since we don't do IO in our bytesreferences - throw new AssertionError("won't happen", e); - } - } } diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/utils/ExceptionsHelper.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/utils/ExceptionsHelper.java new file mode 100644 index 0000000000000..a2dceba7af4d4 --- /dev/null +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/utils/ExceptionsHelper.java @@ -0,0 +1,50 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.nio.utils; + +import java.util.List; + +// TODO: This should eventually be removed once ExceptionsHelper is moved to a core library jar +public class ExceptionsHelper { + + /** + * Rethrows the first exception in the list and adds all remaining to the suppressed list. + * If the given list is empty no exception is thrown + * + */ + public static void rethrowAndSuppress(List exceptions) throws T { + T main = null; + for (T ex : exceptions) { + main = useOrSuppress(main, ex); + } + if (main != null) { + throw main; + } + } + + private static T useOrSuppress(T first, T second) { + if (first == null) { + return second; + } else { + first.addSuppressed(second); + } + return first; + } +} diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/AcceptingSelectorTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptingSelectorTests.java similarity index 96% rename from test/framework/src/test/java/org/elasticsearch/transport/nio/AcceptingSelectorTests.java rename to libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptingSelectorTests.java index 4e35f6177f5dc..048aa3af8ff01 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/AcceptingSelectorTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptingSelectorTests.java @@ -17,11 +17,9 @@ * under the License. */ -package org.elasticsearch.transport.nio; +package org.elasticsearch.nio; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.nio.channel.NioServerSocketChannel; -import org.elasticsearch.transport.nio.utils.TestSelectionKey; import org.junit.Before; import java.io.IOException; diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/AcceptorEventHandlerTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptorEventHandlerTests.java similarity index 86% rename from test/framework/src/test/java/org/elasticsearch/transport/nio/AcceptorEventHandlerTests.java rename to libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptorEventHandlerTests.java index 48a9e65f00dff..9d8f47fe3ef4d 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/AcceptorEventHandlerTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptorEventHandlerTests.java @@ -17,15 +17,9 @@ * under the License. */ -package org.elasticsearch.transport.nio; +package org.elasticsearch.nio; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.nio.channel.ChannelFactory; -import org.elasticsearch.transport.nio.channel.DoNotRegisterServerChannel; -import org.elasticsearch.transport.nio.channel.NioServerSocketChannel; -import org.elasticsearch.transport.nio.channel.NioSocketChannel; -import org.elasticsearch.transport.nio.channel.ReadContext; -import org.elasticsearch.transport.nio.channel.WriteContext; import org.junit.Before; import java.io.IOException; @@ -45,9 +39,9 @@ public class AcceptorEventHandlerTests extends ESTestCase { private AcceptorEventHandler handler; private SocketSelector socketSelector; - private ChannelFactory channelFactory; + private ChannelFactory channelFactory; private NioServerSocketChannel channel; - private Consumer acceptedChannelCallback; + private Consumer acceptedChannelCallback; @Before @SuppressWarnings("unchecked") diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/ChannelFactoryTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ChannelFactoryTests.java similarity index 95% rename from test/framework/src/test/java/org/elasticsearch/transport/nio/channel/ChannelFactoryTests.java rename to libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ChannelFactoryTests.java index 91e1c2023e74c..c1183af4e5b2e 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/ChannelFactoryTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ChannelFactoryTests.java @@ -17,12 +17,10 @@ * under the License. */ -package org.elasticsearch.transport.nio.channel; +package org.elasticsearch.nio; import org.apache.lucene.util.IOUtils; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.nio.AcceptingSelector; -import org.elasticsearch.transport.nio.SocketSelector; import org.junit.After; import org.junit.Before; @@ -41,7 +39,7 @@ public class ChannelFactoryTests extends ESTestCase { - private ChannelFactory channelFactory; + private ChannelFactory channelFactory; private ChannelFactory.RawChannelFactory rawChannelFactory; private SocketChannel rawChannel; private ServerSocketChannel rawServerChannel; @@ -51,7 +49,7 @@ public class ChannelFactoryTests extends ESTestCase { @Before @SuppressWarnings("unchecked") public void setupFactory() throws IOException { - rawChannelFactory = mock(TcpChannelFactory.RawChannelFactory.class); + rawChannelFactory = mock(ChannelFactory.RawChannelFactory.class); channelFactory = new TestChannelFactory(rawChannelFactory); socketSelector = mock(SocketSelector.class); acceptingSelector = mock(AcceptingSelector.class); @@ -131,7 +129,7 @@ public void testOpenedServerChannelRejected() throws IOException { assertFalse(rawServerChannel.isOpen()); } - private static class TestChannelFactory extends ChannelFactory { + private static class TestChannelFactory extends ChannelFactory { TestChannelFactory(RawChannelFactory rawChannelFactory) { super(rawChannelFactory); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/DoNotRegisterChannel.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/DoNotRegisterChannel.java similarity index 88% rename from test/framework/src/test/java/org/elasticsearch/transport/nio/channel/DoNotRegisterChannel.java rename to libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/DoNotRegisterChannel.java index f1f6ffb9f1138..dd73d43292a3f 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/DoNotRegisterChannel.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/DoNotRegisterChannel.java @@ -17,10 +17,7 @@ * under the License. */ -package org.elasticsearch.transport.nio.channel; - -import org.elasticsearch.transport.nio.SocketSelector; -import org.elasticsearch.transport.nio.utils.TestSelectionKey; +package org.elasticsearch.nio; import java.io.IOException; import java.nio.channels.ClosedChannelException; diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/DoNotRegisterServerChannel.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/DoNotRegisterServerChannel.java similarity index 84% rename from test/framework/src/test/java/org/elasticsearch/transport/nio/channel/DoNotRegisterServerChannel.java rename to libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/DoNotRegisterServerChannel.java index 073f2acf384da..1d5e605c444de 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/DoNotRegisterServerChannel.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/DoNotRegisterServerChannel.java @@ -17,10 +17,7 @@ * under the License. */ -package org.elasticsearch.transport.nio.channel; - -import org.elasticsearch.transport.nio.AcceptingSelector; -import org.elasticsearch.transport.nio.utils.TestSelectionKey; +package org.elasticsearch.nio; import java.io.IOException; import java.nio.channels.ClosedChannelException; @@ -28,7 +25,7 @@ public class DoNotRegisterServerChannel extends NioServerSocketChannel { - public DoNotRegisterServerChannel(ServerSocketChannel channel, ChannelFactory channelFactory, AcceptingSelector selector) + public DoNotRegisterServerChannel(ServerSocketChannel channel, ChannelFactory channelFactory, AcceptingSelector selector) throws IOException { super(channel, channelFactory, selector); } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/ESSelectorTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ESSelectorTests.java similarity index 97% rename from test/framework/src/test/java/org/elasticsearch/transport/nio/ESSelectorTests.java rename to libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ESSelectorTests.java index 6459447c1a865..1dab05487a114 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/ESSelectorTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ESSelectorTests.java @@ -17,10 +17,9 @@ * under the License. */ -package org.elasticsearch.transport.nio; +package org.elasticsearch.nio; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.nio.channel.NioChannel; import org.junit.Before; import java.io.IOException; diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/InboundChannelBufferTests.java similarity index 99% rename from test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java rename to libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/InboundChannelBufferTests.java index 9620f4c9c7660..199a509cbfabb 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/InboundChannelBufferTests.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.transport.nio; +package org.elasticsearch.nio; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.test.ESTestCase; diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/NioGroupTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioGroupTests.java similarity index 96% rename from test/framework/src/test/java/org/elasticsearch/transport/nio/NioGroupTests.java rename to libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioGroupTests.java index f9b3cbb4e5026..068527a525916 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/NioGroupTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioGroupTests.java @@ -17,12 +17,11 @@ * under the License. */ -package org.elasticsearch.transport.nio; +package org.elasticsearch.nio; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.nio.channel.ChannelFactory; import java.io.IOException; import java.net.InetSocketAddress; diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioServerSocketChannelTests.java similarity index 81% rename from test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java rename to libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioServerSocketChannelTests.java index ba5d47fe8f8dd..713f01ec283a4 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioServerSocketChannelTests.java @@ -17,13 +17,12 @@ * under the License. */ -package org.elasticsearch.transport.nio.channel; +package org.elasticsearch.nio; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.nio.AcceptingSelector; -import org.elasticsearch.transport.nio.AcceptorEventHandler; import org.junit.After; import org.junit.Before; @@ -48,7 +47,7 @@ public void setSelector() throws IOException { thread = new Thread(selector::runLoop); closedRawChannel = new AtomicBoolean(false); thread.start(); - selector.isRunningFuture().actionGet(); + FutureUtils.get(selector.isRunningFuture()); } @After @@ -61,9 +60,9 @@ public void testClose() throws Exception { AtomicBoolean isClosed = new AtomicBoolean(false); CountDownLatch latch = new CountDownLatch(1); - NioChannel channel = new DoNotCloseServerChannel("nio", mock(ServerSocketChannel.class), mock(ChannelFactory.class), selector); + NioChannel channel = new DoNotCloseServerChannel(mock(ServerSocketChannel.class), mock(ChannelFactory.class), selector); - channel.addCloseListener(new ActionListener() { + channel.addCloseListener(ActionListener.toBiConsumer(new ActionListener() { @Override public void onResponse(Void o) { isClosed.set(true); @@ -75,14 +74,14 @@ public void onFailure(Exception e) { isClosed.set(true); latch.countDown(); } - }); + })); assertTrue(channel.isOpen()); assertFalse(closedRawChannel.get()); assertFalse(isClosed.get()); PlainActionFuture closeFuture = PlainActionFuture.newFuture(); - channel.addCloseListener(closeFuture); + channel.addCloseListener(ActionListener.toBiConsumer(closeFuture)); channel.close(); closeFuture.actionGet(); @@ -95,8 +94,8 @@ public void onFailure(Exception e) { private class DoNotCloseServerChannel extends DoNotRegisterServerChannel { - private DoNotCloseServerChannel(String profile, ServerSocketChannel channel, ChannelFactory channelFactory, - AcceptingSelector selector) throws IOException { + private DoNotCloseServerChannel(ServerSocketChannel channel, ChannelFactory channelFactory, AcceptingSelector selector) + throws IOException { super(channel, channelFactory, selector); } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioSocketChannelTests.java similarity index 91% rename from test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java rename to libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioSocketChannelTests.java index fecaf8fe9701e..6a32b11f18b0f 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioSocketChannelTests.java @@ -17,13 +17,12 @@ * under the License. */ -package org.elasticsearch.transport.nio.channel; +package org.elasticsearch.nio; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.nio.SocketEventHandler; -import org.elasticsearch.transport.nio.SocketSelector; import org.junit.After; import org.junit.Before; @@ -52,7 +51,7 @@ public void startSelector() throws IOException { thread = new Thread(selector::runLoop); closedRawChannel = new AtomicBoolean(false); thread.start(); - selector.isRunningFuture().actionGet(); + FutureUtils.get(selector.isRunningFuture()); } @After @@ -68,7 +67,7 @@ public void testClose() throws Exception { NioSocketChannel socketChannel = new DoNotCloseChannel(mock(SocketChannel.class), selector); socketChannel.setContexts(mock(ReadContext.class), mock(WriteContext.class), mock(BiConsumer.class)); - socketChannel.addCloseListener(new ActionListener() { + socketChannel.addCloseListener(ActionListener.toBiConsumer(new ActionListener() { @Override public void onResponse(Void o) { isClosed.set(true); @@ -79,14 +78,14 @@ public void onFailure(Exception e) { isClosed.set(true); latch.countDown(); } - }); + })); assertTrue(socketChannel.isOpen()); assertFalse(closedRawChannel.get()); assertFalse(isClosed.get()); PlainActionFuture closeFuture = PlainActionFuture.newFuture(); - socketChannel.addCloseListener(closeFuture); + socketChannel.addCloseListener(ActionListener.toBiConsumer(closeFuture)); socketChannel.close(); closeFuture.actionGet(); @@ -105,7 +104,7 @@ public void testConnectSucceeds() throws Exception { selector.scheduleForRegistration(socketChannel); PlainActionFuture connectFuture = PlainActionFuture.newFuture(); - socketChannel.addConnectListener(connectFuture); + socketChannel.addConnectListener(ActionListener.toBiConsumer(connectFuture)); connectFuture.get(100, TimeUnit.SECONDS); assertTrue(socketChannel.isConnectComplete()); @@ -122,7 +121,7 @@ public void testConnectFails() throws Exception { selector.scheduleForRegistration(socketChannel); PlainActionFuture connectFuture = PlainActionFuture.newFuture(); - socketChannel.addConnectListener(connectFuture); + socketChannel.addConnectListener(ActionListener.toBiConsumer(connectFuture)); ExecutionException e = expectThrows(ExecutionException.class, () -> connectFuture.get(100, TimeUnit.SECONDS)); assertTrue(e.getCause() instanceof IOException); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketEventHandlerTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketEventHandlerTests.java similarity index 86% rename from test/framework/src/test/java/org/elasticsearch/transport/nio/SocketEventHandlerTests.java rename to libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketEventHandlerTests.java index b547273d30925..ff3d28727973d 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketEventHandlerTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketEventHandlerTests.java @@ -17,17 +17,10 @@ * under the License. */ -package org.elasticsearch.transport.nio; +package org.elasticsearch.nio; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.nio.channel.DoNotRegisterChannel; -import org.elasticsearch.transport.nio.channel.NioChannel; -import org.elasticsearch.transport.nio.channel.NioSocketChannel; -import org.elasticsearch.transport.nio.channel.ReadContext; -import org.elasticsearch.transport.nio.channel.SelectionKeyUtils; -import org.elasticsearch.transport.nio.channel.TcpWriteContext; +import org.elasticsearch.transport.nio.TcpWriteContext; import org.junit.Before; import java.io.IOException; @@ -121,10 +114,10 @@ public void testHandleWriteWithCompleteFlushRemovesOP_WRITEInterest() throws IOE setWriteAndRead(channel); assertEquals(SelectionKey.OP_READ | SelectionKey.OP_WRITE, selectionKey.interestOps()); - BytesArray bytesArray = new BytesArray(new byte[1]); - channel.getWriteContext().queueWriteOperations(new WriteOperation(channel, bytesArray, mock(ActionListener.class))); + ByteBuffer[] buffers = {ByteBuffer.allocate(1)}; + channel.getWriteContext().queueWriteOperations(new WriteOperation(channel, buffers, mock(BiConsumer.class))); - when(rawChannel.write(ByteBuffer.wrap(bytesArray.array()))).thenReturn(1); + when(rawChannel.write(buffers[0])).thenReturn(1); handler.handleWrite(channel); assertEquals(SelectionKey.OP_READ, selectionKey.interestOps()); @@ -136,10 +129,10 @@ public void testHandleWriteWithInCompleteFlushLeavesOP_WRITEInterest() throws IO setWriteAndRead(channel); assertEquals(SelectionKey.OP_READ | SelectionKey.OP_WRITE, selectionKey.interestOps()); - BytesArray bytesArray = new BytesArray(new byte[1]); - channel.getWriteContext().queueWriteOperations(new WriteOperation(channel, bytesArray, mock(ActionListener.class))); + ByteBuffer[] buffers = {ByteBuffer.allocate(1)}; + channel.getWriteContext().queueWriteOperations(new WriteOperation(channel, buffers, mock(BiConsumer.class))); - when(rawChannel.write(ByteBuffer.wrap(bytesArray.array()))).thenReturn(0); + when(rawChannel.write(buffers[0])).thenReturn(0); handler.handleWrite(channel); assertEquals(SelectionKey.OP_READ | SelectionKey.OP_WRITE, selectionKey.interestOps()); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketSelectorTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketSelectorTests.java similarity index 89% rename from test/framework/src/test/java/org/elasticsearch/transport/nio/SocketSelectorTests.java rename to libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketSelectorTests.java index 1b67d9d099b27..e50da352623b5 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketSelectorTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketSelectorTests.java @@ -17,18 +17,13 @@ * under the License. */ -package org.elasticsearch.transport.nio; +package org.elasticsearch.nio; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.nio.channel.NioSocketChannel; -import org.elasticsearch.transport.nio.channel.WriteContext; -import org.elasticsearch.transport.nio.utils.TestSelectionKey; import org.junit.Before; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedSelectorException; @@ -36,9 +31,11 @@ import java.nio.channels.Selector; import java.util.Collections; import java.util.HashSet; +import java.util.function.BiConsumer; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.isNull; import static org.mockito.Matchers.same; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -53,8 +50,8 @@ public class SocketSelectorTests extends ESTestCase { private NioSocketChannel channel; private TestSelectionKey selectionKey; private WriteContext writeContext; - private ActionListener listener; - private BytesReference bufferReference = new BytesArray(new byte[1]); + private BiConsumer listener; + private ByteBuffer[] buffers = {ByteBuffer.allocate(1)}; private Selector rawSelector; @Before @@ -64,7 +61,7 @@ public void setUp() throws Exception { eventHandler = mock(SocketEventHandler.class); channel = mock(NioSocketChannel.class); writeContext = mock(WriteContext.class); - listener = mock(ActionListener.class); + listener = mock(BiConsumer.class); selectionKey = new TestSelectionKey(0); selectionKey.attach(channel); rawSelector = mock(Selector.class); @@ -132,26 +129,26 @@ public void testConnectIncompleteWillNotNotify() throws Exception { public void testQueueWriteWhenNotRunning() throws Exception { socketSelector.close(); - socketSelector.queueWrite(new WriteOperation(channel, bufferReference, listener)); + socketSelector.queueWrite(new WriteOperation(channel, buffers, listener)); - verify(listener).onFailure(any(ClosedSelectorException.class)); + verify(listener).accept(isNull(Void.class), any(ClosedSelectorException.class)); } public void testQueueWriteChannelIsNoLongerWritable() throws Exception { - WriteOperation writeOperation = new WriteOperation(channel, bufferReference, listener); + WriteOperation writeOperation = new WriteOperation(channel, buffers, listener); socketSelector.queueWrite(writeOperation); when(channel.isWritable()).thenReturn(false); socketSelector.preSelect(); verify(writeContext, times(0)).queueWriteOperations(writeOperation); - verify(listener).onFailure(any(ClosedChannelException.class)); + verify(listener).accept(isNull(Void.class), any(ClosedChannelException.class)); } public void testQueueWriteSelectionKeyThrowsException() throws Exception { SelectionKey selectionKey = mock(SelectionKey.class); - WriteOperation writeOperation = new WriteOperation(channel, bufferReference, listener); + WriteOperation writeOperation = new WriteOperation(channel, buffers, listener); CancelledKeyException cancelledKeyException = new CancelledKeyException(); socketSelector.queueWrite(writeOperation); @@ -161,11 +158,11 @@ public void testQueueWriteSelectionKeyThrowsException() throws Exception { socketSelector.preSelect(); verify(writeContext, times(0)).queueWriteOperations(writeOperation); - verify(listener).onFailure(cancelledKeyException); + verify(listener).accept(null, cancelledKeyException); } public void testQueueWriteSuccessful() throws Exception { - WriteOperation writeOperation = new WriteOperation(channel, bufferReference, listener); + WriteOperation writeOperation = new WriteOperation(channel, buffers, listener); socketSelector.queueWrite(writeOperation); assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) == 0); @@ -178,7 +175,7 @@ public void testQueueWriteSuccessful() throws Exception { } public void testQueueDirectlyInChannelBufferSuccessful() throws Exception { - WriteOperation writeOperation = new WriteOperation(channel, bufferReference, listener); + WriteOperation writeOperation = new WriteOperation(channel, buffers, listener); assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) == 0); @@ -192,7 +189,7 @@ public void testQueueDirectlyInChannelBufferSuccessful() throws Exception { public void testQueueDirectlyInChannelBufferSelectionKeyThrowsException() throws Exception { SelectionKey selectionKey = mock(SelectionKey.class); - WriteOperation writeOperation = new WriteOperation(channel, bufferReference, listener); + WriteOperation writeOperation = new WriteOperation(channel, buffers, listener); CancelledKeyException cancelledKeyException = new CancelledKeyException(); when(channel.isWritable()).thenReturn(true); @@ -201,7 +198,7 @@ public void testQueueDirectlyInChannelBufferSelectionKeyThrowsException() throws socketSelector.queueWriteInChannelBuffer(writeOperation); verify(writeContext, times(0)).queueWriteOperations(writeOperation); - verify(listener).onFailure(cancelledKeyException); + verify(listener).accept(null, cancelledKeyException); } public void testConnectEvent() throws Exception { @@ -295,7 +292,7 @@ public void testCleanup() throws Exception { socketSelector.preSelect(); - socketSelector.queueWrite(new WriteOperation(mock(NioSocketChannel.class), new BytesArray(new byte[1]), listener)); + socketSelector.queueWrite(new WriteOperation(mock(NioSocketChannel.class), buffers, listener)); socketSelector.scheduleForRegistration(unRegisteredChannel); TestSelectionKey testSelectionKey = new TestSelectionKey(0); @@ -304,14 +301,14 @@ public void testCleanup() throws Exception { socketSelector.cleanupAndCloseChannels(); - verify(listener).onFailure(any(ClosedSelectorException.class)); + verify(listener).accept(isNull(Void.class), any(ClosedSelectorException.class)); verify(eventHandler).handleClose(channel); verify(eventHandler).handleClose(unRegisteredChannel); } public void testExecuteListenerWillHandleException() throws Exception { RuntimeException exception = new RuntimeException(); - doThrow(exception).when(listener).onResponse(null); + doThrow(exception).when(listener).accept(null, null); socketSelector.executeListener(listener, null); @@ -321,7 +318,7 @@ public void testExecuteListenerWillHandleException() throws Exception { public void testExecuteFailedListenerWillHandleException() throws Exception { IOException ioException = new IOException(); RuntimeException exception = new RuntimeException(); - doThrow(exception).when(listener).onFailure(ioException); + doThrow(exception).when(listener).accept(null, ioException); socketSelector.executeFailedListener(listener, ioException); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/utils/TestSelectionKey.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/TestSelectionKey.java similarity index 97% rename from test/framework/src/test/java/org/elasticsearch/transport/nio/utils/TestSelectionKey.java rename to libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/TestSelectionKey.java index 0f0011f15533b..1a195e7fe6956 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/utils/TestSelectionKey.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/TestSelectionKey.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.transport.nio.utils; +package org.elasticsearch.nio; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/WriteOperationTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/WriteOperationTests.java similarity index 76% rename from test/framework/src/test/java/org/elasticsearch/transport/nio/WriteOperationTests.java rename to libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/WriteOperationTests.java index 0085a9b204a31..da74269b8253a 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/WriteOperationTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/WriteOperationTests.java @@ -17,19 +17,16 @@ * under the License. */ -package org.elasticsearch.transport.nio; +package org.elasticsearch.nio; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.nio.channel.NioSocketChannel; import org.junit.Before; import org.mockito.ArgumentCaptor; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.function.BiConsumer; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -38,18 +35,19 @@ public class WriteOperationTests extends ESTestCase { private NioSocketChannel channel; - private ActionListener listener; + private BiConsumer listener; @Before @SuppressWarnings("unchecked") public void setFields() { channel = mock(NioSocketChannel.class); - listener = mock(ActionListener.class); + listener = mock(BiConsumer.class); } public void testFlush() throws IOException { - WriteOperation writeOp = new WriteOperation(channel, new BytesArray(new byte[10]), listener); + ByteBuffer[] buffers = {ByteBuffer.allocate(10)}; + WriteOperation writeOp = new WriteOperation(channel, buffers, listener); when(channel.write(any(ByteBuffer[].class))).thenReturn(10); @@ -60,7 +58,8 @@ public void testFlush() throws IOException { } public void testPartialFlush() throws IOException { - WriteOperation writeOp = new WriteOperation(channel, new BytesArray(new byte[10]), listener); + ByteBuffer[] buffers = {ByteBuffer.allocate(10)}; + WriteOperation writeOp = new WriteOperation(channel, buffers, listener); when(channel.write(any(ByteBuffer[].class))).thenReturn(5); @@ -70,11 +69,8 @@ public void testPartialFlush() throws IOException { } public void testMultipleFlushesWithCompositeBuffer() throws IOException { - BytesArray bytesReference1 = new BytesArray(new byte[10]); - BytesArray bytesReference2 = new BytesArray(new byte[15]); - BytesArray bytesReference3 = new BytesArray(new byte[3]); - CompositeBytesReference bytesReference = new CompositeBytesReference(bytesReference1, bytesReference2, bytesReference3); - WriteOperation writeOp = new WriteOperation(channel, bytesReference, listener); + ByteBuffer[] buffers = {ByteBuffer.allocate(10), ByteBuffer.allocate(15), ByteBuffer.allocate(3)}; + WriteOperation writeOp = new WriteOperation(channel, buffers, listener); ArgumentCaptor buffersCaptor = ArgumentCaptor.forClass(ByteBuffer[].class); diff --git a/libs/elasticsearch-nio/src/test/resources/testks.jks b/libs/elasticsearch-nio/src/test/resources/testks.jks new file mode 100644 index 0000000000000..cd706db5e3834 Binary files /dev/null and b/libs/elasticsearch-nio/src/test/resources/testks.jks differ diff --git a/settings.gradle b/settings.gradle index 5150ef256a17a..79a767a98d0ea 100644 --- a/settings.gradle +++ b/settings.gradle @@ -28,6 +28,7 @@ List projects = [ 'test:fixtures:krb5kdc-fixture', 'test:fixtures:old-elasticsearch', 'test:logger-usage', + 'libs:elasticsearch-nio', 'modules:aggs-matrix-stats', 'modules:analysis-common', 'modules:ingest-common', @@ -171,3 +172,5 @@ if (extraProjects.exists()) { addSubProjects('', extraProjectDir, projects, branches) } } +include 'libs' + diff --git a/test/framework/build.gradle b/test/framework/build.gradle index bbdbab5b2337a..215f6f2d394eb 100644 --- a/test/framework/build.gradle +++ b/test/framework/build.gradle @@ -21,6 +21,7 @@ import org.elasticsearch.gradle.precommit.PrecommitTasks; dependencies { compile "org.elasticsearch.client:elasticsearch-rest-client:${version}" + compile "org.elasticsearch:elasticsearch-nio:${version}" compile "org.elasticsearch:elasticsearch:${version}" compile "org.elasticsearch:elasticsearch-cli:${version}" compile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}" diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index bb28d93f85a84..c716da897a883 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -33,16 +33,15 @@ import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.nio.AcceptorEventHandler; +import org.elasticsearch.nio.InboundChannelBuffer; +import org.elasticsearch.nio.NioGroup; +import org.elasticsearch.nio.NioServerSocketChannel; +import org.elasticsearch.nio.NioSocketChannel; +import org.elasticsearch.nio.SocketEventHandler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.Transports; -import org.elasticsearch.transport.nio.channel.NioServerSocketChannel; -import org.elasticsearch.transport.nio.channel.NioSocketChannel; -import org.elasticsearch.transport.nio.channel.TcpChannelFactory; -import org.elasticsearch.transport.nio.channel.TcpNioServerSocketChannel; -import org.elasticsearch.transport.nio.channel.TcpNioSocketChannel; -import org.elasticsearch.transport.nio.channel.TcpReadContext; -import org.elasticsearch.transport.nio.channel.TcpWriteContext; import java.io.IOException; import java.net.InetSocketAddress; @@ -90,7 +89,7 @@ protected TcpNioServerSocketChannel bind(String name, InetSocketAddress address) protected TcpNioSocketChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener connectListener) throws IOException { TcpNioSocketChannel channel = nioGroup.openChannel(node.getAddress().address(), clientChannelFactory); - channel.addConnectListener(connectListener); + channel.addConnectListener(ActionListener.toBiConsumer(connectListener)); return channel; } @@ -154,7 +153,7 @@ private Consumer getContextSetter(String profileName) { return (c) -> { Supplier pageSupplier = () -> { Recycler.V bytes = pageCacheRecycler.bytePage(false); - return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes); + return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close); }; c.setContexts(new TcpReadContext(c, new TcpReadHandler(profileName, this), new InboundChannelBuffer(pageSupplier)), new TcpWriteContext(c), this::exceptionCaught); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpChannelFactory.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpChannelFactory.java similarity index 84% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpChannelFactory.java rename to test/framework/src/main/java/org/elasticsearch/transport/nio/TcpChannelFactory.java index 03d6db18e5a41..8f092153c77ed 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpChannelFactory.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpChannelFactory.java @@ -17,11 +17,14 @@ * under the License. */ -package org.elasticsearch.transport.nio.channel; +package org.elasticsearch.transport.nio; +import org.elasticsearch.nio.ChannelFactory; +import org.elasticsearch.nio.NioServerSocketChannel; +import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.transport.TcpTransport; -import org.elasticsearch.transport.nio.AcceptingSelector; -import org.elasticsearch.transport.nio.SocketSelector; +import org.elasticsearch.nio.AcceptingSelector; +import org.elasticsearch.nio.SocketSelector; import java.io.IOException; import java.nio.channels.ServerSocketChannel; @@ -39,8 +42,8 @@ public class TcpChannelFactory extends ChannelFactory contextSetter; private final Consumer serverContextSetter; - public TcpChannelFactory(TcpTransport.ProfileSettings profileSettings, Consumer contextSetter, - Consumer serverContextSetter) { + TcpChannelFactory(TcpTransport.ProfileSettings profileSettings, Consumer contextSetter, + Consumer serverContextSetter) { super(new RawChannelFactory(profileSettings.tcpNoDelay, profileSettings.tcpKeepAlive, profileSettings.reuseAddress, diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoder.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpFrameDecoder.java similarity index 98% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoder.java rename to test/framework/src/main/java/org/elasticsearch/transport/nio/TcpFrameDecoder.java index b2ba70fb2365d..1d389e7d121a2 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoder.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpFrameDecoder.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.transport.nio.channel; +package org.elasticsearch.transport.nio; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpNioServerSocketChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpNioServerSocketChannel.java similarity index 87% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpNioServerSocketChannel.java rename to test/framework/src/main/java/org/elasticsearch/transport/nio/TcpNioServerSocketChannel.java index 496295bd3203b..b6b059a434afb 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpNioServerSocketChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpNioServerSocketChannel.java @@ -17,12 +17,13 @@ * under the License. */ -package org.elasticsearch.transport.nio.channel; +package org.elasticsearch.transport.nio; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.nio.NioServerSocketChannel; import org.elasticsearch.transport.TcpChannel; -import org.elasticsearch.transport.nio.AcceptingSelector; +import org.elasticsearch.nio.AcceptingSelector; import java.io.IOException; import java.nio.channels.ServerSocketChannel; @@ -48,6 +49,11 @@ public void setSoLinger(int value) throws IOException { throw new UnsupportedOperationException("Cannot set SO_LINGER on a server channel."); } + @Override + public void addCloseListener(ActionListener listener) { + addCloseListener(ActionListener.toBiConsumer(listener)); + } + @Override public String toString() { return "TcpNioServerSocketChannel{" + diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpNioSocketChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpNioSocketChannel.java similarity index 82% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpNioSocketChannel.java rename to test/framework/src/main/java/org/elasticsearch/transport/nio/TcpNioSocketChannel.java index f1ee1bd4e67ad..cecfcdb736c89 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpNioSocketChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpNioSocketChannel.java @@ -17,12 +17,13 @@ * under the License. */ -package org.elasticsearch.transport.nio.channel; +package org.elasticsearch.transport.nio; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.transport.TcpChannel; -import org.elasticsearch.transport.nio.SocketSelector; +import org.elasticsearch.nio.SocketSelector; import java.io.IOException; import java.net.StandardSocketOptions; @@ -35,7 +36,7 @@ public TcpNioSocketChannel(SocketChannel socketChannel, SocketSelector selector) } public void sendMessage(BytesReference reference, ActionListener listener) { - getWriteContext().sendMessage(reference, listener); + getWriteContext().sendMessage(reference, ActionListener.toBiConsumer(listener)); } @Override @@ -45,6 +46,11 @@ public void setSoLinger(int value) throws IOException { } } + @Override + public void addCloseListener(ActionListener listener) { + addCloseListener(ActionListener.toBiConsumer(listener)); + } + @Override public String toString() { return "TcpNioSocketChannel{" + diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpReadContext.java similarity index 95% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java rename to test/framework/src/main/java/org/elasticsearch/transport/nio/TcpReadContext.java index 24a671412d220..ec0298cbc8853 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpReadContext.java @@ -17,13 +17,14 @@ * under the License. */ -package org.elasticsearch.transport.nio.channel; +package org.elasticsearch.transport.nio; import org.elasticsearch.common.bytes.ByteBufferReference; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; -import org.elasticsearch.transport.nio.InboundChannelBuffer; -import org.elasticsearch.transport.nio.TcpReadHandler; +import org.elasticsearch.nio.InboundChannelBuffer; +import org.elasticsearch.nio.NioSocketChannel; +import org.elasticsearch.nio.ReadContext; import java.io.IOException; import java.nio.ByteBuffer; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpReadHandler.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpReadHandler.java index 5c2ecea54c3f0..2e2cc08ad6b0f 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpReadHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpReadHandler.java @@ -20,8 +20,7 @@ package org.elasticsearch.transport.nio; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.transport.nio.channel.NioSocketChannel; -import org.elasticsearch.transport.nio.channel.TcpNioSocketChannel; +import org.elasticsearch.nio.NioSocketChannel; import java.io.IOException; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpWriteContext.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpWriteContext.java similarity index 72% rename from test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpWriteContext.java rename to test/framework/src/main/java/org/elasticsearch/transport/nio/TcpWriteContext.java index 63b876c098764..4d59c445bc895 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpWriteContext.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpWriteContext.java @@ -17,16 +17,22 @@ * under the License. */ -package org.elasticsearch.transport.nio.channel; +package org.elasticsearch.transport.nio; -import org.elasticsearch.action.ActionListener; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefIterator; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.transport.nio.SocketSelector; -import org.elasticsearch.transport.nio.WriteOperation; +import org.elasticsearch.nio.NioSocketChannel; +import org.elasticsearch.nio.SocketSelector; +import org.elasticsearch.nio.WriteContext; +import org.elasticsearch.nio.WriteOperation; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; import java.util.LinkedList; +import java.util.function.BiConsumer; public class TcpWriteContext implements WriteContext { @@ -38,13 +44,14 @@ public TcpWriteContext(NioSocketChannel channel) { } @Override - public void sendMessage(BytesReference reference, ActionListener listener) { + public void sendMessage(Object message, BiConsumer listener) { + BytesReference reference = (BytesReference) message; if (channel.isWritable() == false) { - listener.onFailure(new ClosedChannelException()); + listener.accept(null, new ClosedChannelException()); return; } - WriteOperation writeOperation = new WriteOperation(channel, reference, listener); + WriteOperation writeOperation = new WriteOperation(channel, toByteBuffers(reference), listener); SocketSelector selector = channel.getSelector(); if (selector.isOnCurrentThread() == false) { selector.queueWrite(writeOperation); @@ -110,4 +117,21 @@ private void multiFlush() throws IOException { lastOpCompleted = op.isFullyFlushed(); } } + + private static ByteBuffer[] toByteBuffers(BytesReference bytesReference) { + BytesRefIterator byteRefIterator = bytesReference.iterator(); + BytesRef r; + try { + // Most network messages are composed of three buffers. + ArrayList buffers = new ArrayList<>(3); + while ((r = byteRefIterator.next()) != null) { + buffers.add(ByteBuffer.wrap(r.bytes, r.offset, r.length)); + } + return buffers.toArray(new ByteBuffer[buffers.size()]); + + } catch (IOException e) { + // this is really an error since we don't do IO in our bytesreferences + throw new AssertionError("won't happen", e); + } + } } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java index 1f17c3df54118..59eab570876d6 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.nio.SocketEventHandler; import org.elasticsearch.node.Node; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoderTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/TcpFrameDecoderTests.java similarity index 99% rename from test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoderTests.java rename to test/framework/src/test/java/org/elasticsearch/transport/nio/TcpFrameDecoderTests.java index 450016b1dc3b8..d9ae2de14f176 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoderTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/TcpFrameDecoderTests.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.transport.nio.channel; +package org.elasticsearch.transport.nio; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/TcpReadContextTests.java similarity index 97% rename from test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java rename to test/framework/src/test/java/org/elasticsearch/transport/nio/TcpReadContextTests.java index f24c087e60a30..97a8f456d1096 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/TcpReadContextTests.java @@ -17,14 +17,13 @@ * under the License. */ -package org.elasticsearch.transport.nio.channel; +package org.elasticsearch.transport.nio; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.nio.InboundChannelBuffer; -import org.elasticsearch.transport.nio.TcpReadHandler; import org.junit.Before; import java.io.IOException; diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/TcpWriteContextTests.java similarity index 90% rename from test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java rename to test/framework/src/test/java/org/elasticsearch/transport/nio/TcpWriteContextTests.java index 33b84590aaa51..158078e707dc2 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/TcpWriteContextTests.java @@ -17,21 +17,23 @@ * under the License. */ -package org.elasticsearch.transport.nio.channel; +package org.elasticsearch.transport.nio; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.nio.NioSocketChannel; +import org.elasticsearch.nio.SocketSelector; +import org.elasticsearch.nio.WriteOperation; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.nio.SocketSelector; -import org.elasticsearch.transport.nio.WriteOperation; import org.junit.Before; import org.mockito.ArgumentCaptor; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; +import java.util.function.BiConsumer; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.isNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -40,7 +42,7 @@ public class TcpWriteContextTests extends ESTestCase { private SocketSelector selector; - private ActionListener listener; + private BiConsumer listener; private TcpWriteContext writeContext; private NioSocketChannel channel; @@ -49,7 +51,7 @@ public class TcpWriteContextTests extends ESTestCase { public void setUp() throws Exception { super.setUp(); selector = mock(SocketSelector.class); - listener = mock(ActionListener.class); + listener = mock(BiConsumer.class); channel = mock(NioSocketChannel.class); writeContext = new TcpWriteContext(channel); @@ -62,7 +64,7 @@ public void testWriteFailsIfChannelNotWritable() throws Exception { writeContext.sendMessage(new BytesArray(generateBytes(10)), listener); - verify(listener).onFailure(any(ClosedChannelException.class)); + verify(listener).accept(isNull(Void.class), any(ClosedChannelException.class)); } public void testSendMessageFromDifferentThreadIsQueuedWithSelector() throws Exception { @@ -103,7 +105,8 @@ public void testSendMessageFromSameThreadIsQueuedInChannel() throws Exception { public void testWriteIsQueuedInChannel() throws Exception { assertFalse(writeContext.hasQueuedWriteOps()); - writeContext.queueWriteOperations(new WriteOperation(channel, new BytesArray(generateBytes(10)), listener)); + ByteBuffer[] buffer = {ByteBuffer.allocate(10)}; + writeContext.queueWriteOperations(new WriteOperation(channel, buffer, listener)); assertTrue(writeContext.hasQueuedWriteOps()); } @@ -111,7 +114,8 @@ public void testWriteIsQueuedInChannel() throws Exception { public void testWriteOpsCanBeCleared() throws Exception { assertFalse(writeContext.hasQueuedWriteOps()); - writeContext.queueWriteOperations(new WriteOperation(channel, new BytesArray(generateBytes(10)), listener)); + ByteBuffer[] buffer = {ByteBuffer.allocate(10)}; + writeContext.queueWriteOperations(new WriteOperation(channel, buffer, listener)); assertTrue(writeContext.hasQueuedWriteOps()); @@ -151,7 +155,7 @@ public void testPartialFlush() throws IOException { when(writeOperation.isFullyFlushed()).thenReturn(false); writeContext.flushChannel(); - verify(listener, times(0)).onResponse(null); + verify(listener, times(0)).accept(null, null); assertTrue(writeContext.hasQueuedWriteOps()); } @@ -159,7 +163,7 @@ public void testPartialFlush() throws IOException { public void testMultipleWritesPartialFlushes() throws IOException { assertFalse(writeContext.hasQueuedWriteOps()); - ActionListener listener2 = mock(ActionListener.class); + BiConsumer listener2 = mock(BiConsumer.class); WriteOperation writeOperation1 = mock(WriteOperation.class); WriteOperation writeOperation2 = mock(WriteOperation.class); when(writeOperation1.getListener()).thenReturn(listener); @@ -174,7 +178,7 @@ public void testMultipleWritesPartialFlushes() throws IOException { writeContext.flushChannel(); verify(selector).executeListener(listener, null); - verify(listener2, times(0)).onResponse(channel); + verify(listener2, times(0)).accept(null, null); assertTrue(writeContext.hasQueuedWriteOps()); when(writeOperation2.isFullyFlushed()).thenReturn(true); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/TestingSocketEventHandler.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/TestingSocketEventHandler.java index a3cb92ad37663..a3fa5b9a4b563 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/TestingSocketEventHandler.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/TestingSocketEventHandler.java @@ -20,7 +20,8 @@ package org.elasticsearch.transport.nio; import org.apache.logging.log4j.Logger; -import org.elasticsearch.transport.nio.channel.NioSocketChannel; +import org.elasticsearch.nio.SocketEventHandler; +import org.elasticsearch.nio.NioSocketChannel; import java.io.IOException; import java.util.Collections;