From 33f6e20e206061070f4c9c897530cc9c5e9204fa Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 18 May 2018 15:41:23 -0600 Subject: [PATCH 1/3] Work on remove logging from elasticsearch-nio --- libs/elasticsearch-nio/build.gradle | 28 +++++++------- .../nio/AcceptorEventHandler.java | 14 +++---- .../org/elasticsearch/nio/ESSelector.java | 4 +- .../org/elasticsearch/nio/EventHandler.java | 38 ++++++------------- .../java/org/elasticsearch/nio/NioGroup.java | 12 +++--- .../elasticsearch/nio/SocketEventHandler.java | 20 +++------- .../nio/AcceptorEventHandlerTests.java | 2 +- .../elasticsearch/nio/ESSelectorTests.java | 3 +- .../org/elasticsearch/nio/NioGroupTests.java | 4 +- .../nio/SocketEventHandlerTests.java | 2 +- .../http/nio/NioHttpServerTransport.java | 2 +- .../transport/nio/NioTransport.java | 2 +- .../transport/nio/MockNioTransport.java | 2 +- .../nio/TestingSocketEventHandler.java | 5 --- 14 files changed, 51 insertions(+), 87 deletions(-) diff --git a/libs/elasticsearch-nio/build.gradle b/libs/elasticsearch-nio/build.gradle index a32a860a62848..9cbd557a8ff8d 100644 --- a/libs/elasticsearch-nio/build.gradle +++ b/libs/elasticsearch-nio/build.gradle @@ -33,8 +33,6 @@ publishing { } dependencies { - compile "org.apache.logging.log4j:log4j-api:${versions.log4j}" - testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}" testCompile "junit:junit:${versions.junit}" testCompile "org.hamcrest:hamcrest-all:${versions.hamcrest}" @@ -65,17 +63,17 @@ forbiddenApisMain { signaturesURLs = [PrecommitTasks.getResource('/forbidden/jdk-signatures.txt')] } -//JarHell is part of es core, which we don't want to pull in -jarHell.enabled=false +////JarHell is part of es core, which we don't want to pull in +//jarHell.enabled=false -thirdPartyAudit.excludes = [ - 'org/osgi/framework/AdaptPermission', - 'org/osgi/framework/AdminPermission', - 'org/osgi/framework/Bundle', - 'org/osgi/framework/BundleActivator', - 'org/osgi/framework/BundleContext', - 'org/osgi/framework/BundleEvent', - 'org/osgi/framework/SynchronousBundleListener', - 'org/osgi/framework/wiring/BundleWire', - 'org/osgi/framework/wiring/BundleWiring' -] +//thirdPartyAudit.excludes = [ +// 'org/osgi/framework/AdaptPermission', +// 'org/osgi/framework/AdminPermission', +// 'org/osgi/framework/Bundle', +// 'org/osgi/framework/BundleActivator', +// 'org/osgi/framework/BundleContext', +// 'org/osgi/framework/BundleEvent', +// 'org/osgi/framework/SynchronousBundleListener', +// 'org/osgi/framework/wiring/BundleWire', +// 'org/osgi/framework/wiring/BundleWiring' +//] diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptorEventHandler.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptorEventHandler.java index 474efad3c77b9..791e0032c88bf 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptorEventHandler.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptorEventHandler.java @@ -19,9 +19,6 @@ package org.elasticsearch.nio; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; - import java.io.IOException; import java.nio.channels.SelectionKey; import java.util.function.Supplier; @@ -33,8 +30,7 @@ public class AcceptorEventHandler extends EventHandler { private final Supplier selectorSupplier; - public AcceptorEventHandler(Logger logger, Supplier selectorSupplier) { - super(logger); + public AcceptorEventHandler(Supplier selectorSupplier) { this.selectorSupplier = selectorSupplier; } @@ -58,7 +54,8 @@ protected void handleRegistration(ServerChannelContext context) throws IOExcepti * @param exception that occurred */ protected void registrationException(ServerChannelContext context, Exception exception) { - logger.error(new ParameterizedMessage("failed to register server channel: {}", context.getChannel()), exception); + // TODO: Call exception handler that does not close channel +// logger.error(new ParameterizedMessage("failed to register server channel: {}", context.getChannel()), exception); } /** @@ -78,7 +75,8 @@ protected void acceptChannel(ServerChannelContext context) throws IOException { * @param exception that occurred */ protected void acceptException(ServerChannelContext context, Exception exception) { - logger.debug(() -> new ParameterizedMessage("exception while accepting new channel from server channel: {}", - context.getChannel()), exception); + // TODO: Call exception handler that does not close channel +// logger.debug(() -> new ParameterizedMessage("exception while accepting new channel from server channel: {}", +// context.getChannel()), exception); } } diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ESSelector.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ESSelector.java index e623e37f005da..c6cf97d10d38e 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ESSelector.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/ESSelector.java @@ -83,7 +83,7 @@ public void runLoop() { try { selector.close(); } catch (IOException e) { - eventHandler.closeSelectorException(e); + eventHandler.selectorException(e); } finally { runLock.unlock(); exitedLoop.countDown(); @@ -123,7 +123,7 @@ void singleLoop() { throw e; } } catch (IOException e) { - eventHandler.selectException(e); + eventHandler.selectorException(e); } catch (Exception e) { eventHandler.uncaughtException(e); } diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/EventHandler.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/EventHandler.java index d35b73c56b88d..95fd4aa25b2a1 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/EventHandler.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/EventHandler.java @@ -19,37 +19,19 @@ package org.elasticsearch.nio; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; - import java.io.IOException; import java.nio.channels.Selector; public abstract class EventHandler { - protected final Logger logger; - - EventHandler(Logger logger) { - this.logger = logger; - } - - /** - * This method handles an IOException that was thrown during a call to {@link Selector#select(long)}. - * - * @param exception the exception - */ - protected void selectException(IOException exception) { - logger.warn(new ParameterizedMessage("io exception during select [thread={}]", Thread.currentThread().getName()), exception); - } - /** - * This method handles an IOException that was thrown during a call to {@link Selector#close()}. + * This method handles an IOException that was thrown during a call to {@link Selector#select(long)} or + * {@link Selector#close()}. * * @param exception the exception */ - protected void closeSelectorException(IOException exception) { - logger.warn(new ParameterizedMessage("io exception while closing selector [thread={}]", Thread.currentThread().getName()), - exception); + protected void selectorException(IOException exception) { +// logger.warn(new ParameterizedMessage("io exception during select [thread={}]", Thread.currentThread().getName()), exception); } /** @@ -79,11 +61,13 @@ protected void handleClose(ChannelContext context) { /** * This method is called when an attempt to close a channel throws an exception. * - * @param context that was being closed + * @param channel that was being closed * @param exception that occurred */ - protected void closeException(ChannelContext context, Exception exception) { - logger.debug(() -> new ParameterizedMessage("exception while closing channel: {}", context.getChannel()), exception); + protected void closeException(ChannelContext channel, Exception exception) { + // TODO: Check that this exception handler should be called + channel.handleException(exception); +// logger.debug(() -> new ParameterizedMessage("exception while closing channel: {}", context.getChannel()), exception); } /** @@ -95,6 +79,8 @@ protected void closeException(ChannelContext context, Exception exception) { * @param exception that was thrown */ protected void genericChannelException(ChannelContext channel, Exception exception) { - logger.debug(() -> new ParameterizedMessage("exception while handling event for channel: {}", channel.getChannel()), exception); + // TODO: Check that this exception handler should be called + channel.handleException(exception); +// logger.debug(() -> new ParameterizedMessage("exception while handling event for channel: {}", channel.getChannel()), exception); } } diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioGroup.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioGroup.java index b763765616275..3f2fd44259c64 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioGroup.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/NioGroup.java @@ -19,7 +19,6 @@ package org.elasticsearch.nio; -import org.apache.logging.log4j.Logger; import org.elasticsearch.nio.utils.ExceptionsHelper; import java.io.IOException; @@ -29,7 +28,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -56,16 +54,16 @@ public class NioGroup implements AutoCloseable { private final AtomicBoolean isOpen = new AtomicBoolean(true); - public NioGroup(Logger logger, ThreadFactory acceptorThreadFactory, int acceptorCount, - BiFunction, AcceptorEventHandler> acceptorEventHandlerFunction, + public NioGroup(ThreadFactory acceptorThreadFactory, int acceptorCount, + Function, AcceptorEventHandler> acceptorEventHandlerFunction, ThreadFactory socketSelectorThreadFactory, int socketSelectorCount, - Function socketEventHandlerFunction) throws IOException { + Supplier socketEventHandlerFunction) throws IOException { acceptors = new ArrayList<>(acceptorCount); socketSelectors = new ArrayList<>(socketSelectorCount); try { for (int i = 0; i < socketSelectorCount; ++i) { - SocketSelector selector = new SocketSelector(socketEventHandlerFunction.apply(logger)); + SocketSelector selector = new SocketSelector(socketEventHandlerFunction.get()); socketSelectors.add(selector); } startSelectors(socketSelectors, socketSelectorThreadFactory); @@ -73,7 +71,7 @@ public NioGroup(Logger logger, ThreadFactory acceptorThreadFactory, int acceptor for (int i = 0; i < acceptorCount; ++i) { SocketSelector[] childSelectors = this.socketSelectors.toArray(new SocketSelector[this.socketSelectors.size()]); Supplier selectorSupplier = new RoundRobinSupplier<>(childSelectors); - AcceptingSelector acceptor = new AcceptingSelector(acceptorEventHandlerFunction.apply(logger, selectorSupplier)); + AcceptingSelector acceptor = new AcceptingSelector(acceptorEventHandlerFunction.apply(selectorSupplier)); acceptors.add(acceptor); } startSelectors(acceptors, acceptorThreadFactory); diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketEventHandler.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketEventHandler.java index cacee47e96196..2a730c932e09c 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketEventHandler.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketEventHandler.java @@ -19,9 +19,6 @@ package org.elasticsearch.nio; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; - import java.io.IOException; import java.nio.channels.SelectionKey; import java.util.function.BiConsumer; @@ -31,13 +28,6 @@ */ public class SocketEventHandler extends EventHandler { - private final Logger logger; - - public SocketEventHandler(Logger logger) { - super(logger); - this.logger = logger; - } - /** * This method is called when a NioSocketChannel is successfully registered. It should only be called * once per channel. @@ -62,7 +52,7 @@ protected void handleRegistration(SocketChannelContext context) throws IOExcepti * @param exception that occurred */ protected void registrationException(SocketChannelContext context, Exception exception) { - logger.debug(() -> new ParameterizedMessage("failed to register socket channel: {}", context.getChannel()), exception); +// logger.debug(() -> new ParameterizedMessage("failed to register socket channel: {}", context.getChannel()), exception); context.handleException(exception); } @@ -85,7 +75,7 @@ protected void handleConnect(SocketChannelContext context) throws IOException { * @param exception that occurred */ protected void connectException(SocketChannelContext context, Exception exception) { - logger.debug(() -> new ParameterizedMessage("failed to connect to socket channel: {}", context.getChannel()), exception); +// logger.debug(() -> new ParameterizedMessage("failed to connect to socket channel: {}", context.getChannel()), exception); context.handleException(exception); } @@ -106,7 +96,7 @@ protected void handleRead(SocketChannelContext context) throws IOException { * @param exception that occurred */ protected void readException(SocketChannelContext context, Exception exception) { - logger.debug(() -> new ParameterizedMessage("exception while reading from socket channel: {}", context.getChannel()), exception); +// logger.debug(() -> new ParameterizedMessage("exception while reading from socket channel: {}", context.getChannel()), exception); context.handleException(exception); } @@ -127,7 +117,7 @@ protected void handleWrite(SocketChannelContext context) throws IOException { * @param exception that occurred */ protected void writeException(SocketChannelContext context, Exception exception) { - logger.debug(() -> new ParameterizedMessage("exception while writing to socket channel: {}", context.getChannel()), exception); +// logger.debug(() -> new ParameterizedMessage("exception while writing to socket channel: {}", context.getChannel()), exception); context.handleException(exception); } @@ -138,7 +128,7 @@ protected void writeException(SocketChannelContext context, Exception exception) * @param exception that occurred */ protected void listenerException(BiConsumer listener, Exception exception) { - logger.warn(new ParameterizedMessage("exception while executing listener: {}", listener), exception); +// logger.warn(new ParameterizedMessage("exception while executing listener: {}", listener), exception); } /** diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptorEventHandlerTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptorEventHandlerTests.java index 50469b30acde9..221f44cf0dec3 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptorEventHandlerTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptorEventHandlerTests.java @@ -50,7 +50,7 @@ public void setUpHandler() throws IOException { ArrayList selectors = new ArrayList<>(); selectors.add(mock(SocketSelector.class)); selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new SocketSelector[selectors.size()])); - handler = new AcceptorEventHandler(logger, selectorSupplier); + handler = new AcceptorEventHandler(selectorSupplier); channel = new NioServerSocketChannel(mock(ServerSocketChannel.class)); context = new DoNotRegisterContext(channel, mock(AcceptingSelector.class), mock(Consumer.class)); diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ESSelectorTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ESSelectorTests.java index cb8f0757fb924..05b84345f45c2 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ESSelectorTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/ESSelectorTests.java @@ -27,7 +27,6 @@ import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.mock; @@ -81,7 +80,7 @@ public void testIOExceptionWhileSelect() throws IOException { this.selector.singleLoop(); - verify(handler).selectException(ioException); + verify(handler).selectorException(ioException); } public void testSelectorClosedIfOpenAndEventLoopNotRunning() throws IOException { diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioGroupTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioGroupTests.java index 068527a525916..4d65eeed3dfa5 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioGroupTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioGroupTests.java @@ -36,7 +36,7 @@ public class NioGroupTests extends ESTestCase { @Override public void setUp() throws Exception { super.setUp(); - nioGroup = new NioGroup(logger, daemonThreadFactory(Settings.EMPTY, "acceptor"), 1, AcceptorEventHandler::new, + nioGroup = new NioGroup(daemonThreadFactory(Settings.EMPTY, "acceptor"), 1, AcceptorEventHandler::new, daemonThreadFactory(Settings.EMPTY, "selector"), 1, SocketEventHandler::new); } @@ -71,7 +71,7 @@ public void testCanCloseTwice() throws IOException { public void testExceptionAtStartIsHandled() throws IOException { RuntimeException ex = new RuntimeException(); - CheckedRunnable ctor = () -> new NioGroup(logger, r -> {throw ex;}, 1, + CheckedRunnable ctor = () -> new NioGroup(r -> {throw ex;}, 1, AcceptorEventHandler::new, daemonThreadFactory(Settings.EMPTY, "selector"), 1, SocketEventHandler::new); RuntimeException runtimeException = expectThrows(RuntimeException.class, ctor::run); assertSame(ex, runtimeException); diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketEventHandlerTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketEventHandlerTests.java index a80563f7d74db..0f1cc697501ca 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketEventHandlerTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketEventHandlerTests.java @@ -50,7 +50,7 @@ public void setUpHandler() throws IOException { exceptionHandler = mock(Consumer.class); readWriteHandler = mock(ReadWriteHandler.class); SocketSelector selector = mock(SocketSelector.class); - handler = new SocketEventHandler(logger); + handler = new SocketEventHandler(); rawChannel = mock(SocketChannel.class); channel = new NioSocketChannel(rawChannel); when(rawChannel.finishConnect()).thenReturn(true); diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java index bdbee715bd0cf..20deb3f709cfc 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java @@ -154,7 +154,7 @@ protected void doStart() { try { int acceptorCount = NIO_HTTP_ACCEPTOR_COUNT.get(settings); int workerCount = NIO_HTTP_WORKER_COUNT.get(settings); - nioGroup = new NioGroup(logger, daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount, + nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount, AcceptorEventHandler::new, daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), workerCount, SocketEventHandler::new); channelFactory = new HttpChannelFactory(); diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 9d794f951c8d2..9b47a93469081 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -105,7 +105,7 @@ protected void doStart() { if (useNetworkServer) { acceptorCount = NioTransport.NIO_ACCEPTOR_COUNT.get(settings); } - nioGroup = new NioGroup(logger, daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount, + nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount, AcceptorEventHandler::new, daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), NioTransport.NIO_WORKER_COUNT.get(settings), SocketEventHandler::new); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index 36e282f32959d..66c2fdd1815ca 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -96,7 +96,7 @@ protected void doStart() { if (useNetworkServer) { acceptorCount = 1; } - nioGroup = new NioGroup(logger, daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount, + nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount, AcceptorEventHandler::new, daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2, TestingSocketEventHandler::new); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/TestingSocketEventHandler.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/TestingSocketEventHandler.java index 2e2d8aa5adadc..875c359650850 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/TestingSocketEventHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/TestingSocketEventHandler.java @@ -19,7 +19,6 @@ package org.elasticsearch.transport.nio; -import org.apache.logging.log4j.Logger; import org.elasticsearch.nio.SocketChannelContext; import org.elasticsearch.nio.SocketEventHandler; @@ -30,10 +29,6 @@ public class TestingSocketEventHandler extends SocketEventHandler { - public TestingSocketEventHandler(Logger logger) { - super(logger); - } - private Set hasConnectedMap = Collections.newSetFromMap(new WeakHashMap<>()); public void handleConnect(SocketChannelContext context) throws IOException { From 3bc5dade287c71d2444df75b6c4eb76c6edcf017 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 21 May 2018 12:32:20 -0600 Subject: [PATCH 2/3] Accept generic exception handlers --- libs/elasticsearch-nio/build.gradle | 15 -- .../licenses/log4j-api-2.9.1.jar.sha1 | 1 - .../licenses/log4j-api-LICENSE.txt | 202 ------------------ .../licenses/log4j-api-NOTICE.txt | 5 - .../nio/AcceptorEventHandler.java | 11 +- .../org/elasticsearch/nio/EventHandler.java | 13 +- .../elasticsearch/nio/SocketEventHandler.java | 15 +- .../org/elasticsearch/nio/SocketSelector.java | 4 +- .../nio/AcceptorEventHandlerTests.java | 10 +- .../org/elasticsearch/nio/NioGroupTests.java | 11 +- .../nio/SocketEventHandlerTests.java | 24 ++- .../nio/SocketSelectorTests.java | 4 +- .../http/nio/NioHttpServerTransport.java | 15 +- .../transport/nio/NioTransport.java | 12 +- .../elasticsearch/transport/TcpChannel.java | 9 + .../elasticsearch/transport/TcpTransport.java | 20 +- .../transport/nio/MockNioTransport.java | 11 +- .../nio/TestingSocketEventHandler.java | 5 + .../netty4/SecurityNetty4Transport.java | 8 +- .../transport/nio/SecurityNioTransport.java | 7 +- 20 files changed, 119 insertions(+), 283 deletions(-) delete mode 100644 libs/elasticsearch-nio/licenses/log4j-api-2.9.1.jar.sha1 delete mode 100644 libs/elasticsearch-nio/licenses/log4j-api-LICENSE.txt delete mode 100644 libs/elasticsearch-nio/licenses/log4j-api-NOTICE.txt diff --git a/libs/elasticsearch-nio/build.gradle b/libs/elasticsearch-nio/build.gradle index 9cbd557a8ff8d..018874adf7082 100644 --- a/libs/elasticsearch-nio/build.gradle +++ b/libs/elasticsearch-nio/build.gradle @@ -62,18 +62,3 @@ forbiddenApisMain { // es-all is not checked as we connect and accept sockets signaturesURLs = [PrecommitTasks.getResource('/forbidden/jdk-signatures.txt')] } - -////JarHell is part of es core, which we don't want to pull in -//jarHell.enabled=false - -//thirdPartyAudit.excludes = [ -// 'org/osgi/framework/AdaptPermission', -// 'org/osgi/framework/AdminPermission', -// 'org/osgi/framework/Bundle', -// 'org/osgi/framework/BundleActivator', -// 'org/osgi/framework/BundleContext', -// 'org/osgi/framework/BundleEvent', -// 'org/osgi/framework/SynchronousBundleListener', -// 'org/osgi/framework/wiring/BundleWire', -// 'org/osgi/framework/wiring/BundleWiring' -//] diff --git a/libs/elasticsearch-nio/licenses/log4j-api-2.9.1.jar.sha1 b/libs/elasticsearch-nio/licenses/log4j-api-2.9.1.jar.sha1 deleted file mode 100644 index e1a89fadfed95..0000000000000 --- a/libs/elasticsearch-nio/licenses/log4j-api-2.9.1.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -7a2999229464e7a324aa503c0a52ec0f05efe7bd \ No newline at end of file diff --git a/libs/elasticsearch-nio/licenses/log4j-api-LICENSE.txt b/libs/elasticsearch-nio/licenses/log4j-api-LICENSE.txt deleted file mode 100644 index 6279e5206de13..0000000000000 --- a/libs/elasticsearch-nio/licenses/log4j-api-LICENSE.txt +++ /dev/null @@ -1,202 +0,0 @@ - - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright 1999-2005 The Apache Software Foundation - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. diff --git a/libs/elasticsearch-nio/licenses/log4j-api-NOTICE.txt b/libs/elasticsearch-nio/licenses/log4j-api-NOTICE.txt deleted file mode 100644 index 0375732360047..0000000000000 --- a/libs/elasticsearch-nio/licenses/log4j-api-NOTICE.txt +++ /dev/null @@ -1,5 +0,0 @@ -Apache log4j -Copyright 2007 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). \ No newline at end of file diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptorEventHandler.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptorEventHandler.java index 791e0032c88bf..f3aab9c9be125 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptorEventHandler.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/AcceptorEventHandler.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.channels.SelectionKey; +import java.util.function.Consumer; import java.util.function.Supplier; /** @@ -30,7 +31,8 @@ public class AcceptorEventHandler extends EventHandler { private final Supplier selectorSupplier; - public AcceptorEventHandler(Supplier selectorSupplier) { + public AcceptorEventHandler(Supplier selectorSupplier, Consumer exceptionHandler) { + super(exceptionHandler); this.selectorSupplier = selectorSupplier; } @@ -54,8 +56,7 @@ protected void handleRegistration(ServerChannelContext context) throws IOExcepti * @param exception that occurred */ protected void registrationException(ServerChannelContext context, Exception exception) { - // TODO: Call exception handler that does not close channel -// logger.error(new ParameterizedMessage("failed to register server channel: {}", context.getChannel()), exception); + context.handleException(exception); } /** @@ -75,8 +76,6 @@ protected void acceptChannel(ServerChannelContext context) throws IOException { * @param exception that occurred */ protected void acceptException(ServerChannelContext context, Exception exception) { - // TODO: Call exception handler that does not close channel -// logger.debug(() -> new ParameterizedMessage("exception while accepting new channel from server channel: {}", -// context.getChannel()), exception); + context.handleException(exception); } } diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/EventHandler.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/EventHandler.java index 95fd4aa25b2a1..cb4d43af4fdc3 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/EventHandler.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/EventHandler.java @@ -21,9 +21,16 @@ import java.io.IOException; import java.nio.channels.Selector; +import java.util.function.Consumer; public abstract class EventHandler { + protected final Consumer exceptionHandler; + + protected EventHandler(Consumer exceptionHandler) { + this.exceptionHandler = exceptionHandler; + } + /** * This method handles an IOException that was thrown during a call to {@link Selector#select(long)} or * {@link Selector#close()}. @@ -31,7 +38,7 @@ public abstract class EventHandler { * @param exception the exception */ protected void selectorException(IOException exception) { -// logger.warn(new ParameterizedMessage("io exception during select [thread={}]", Thread.currentThread().getName()), exception); + exceptionHandler.accept(exception); } /** @@ -65,9 +72,7 @@ protected void handleClose(ChannelContext context) { * @param exception that occurred */ protected void closeException(ChannelContext channel, Exception exception) { - // TODO: Check that this exception handler should be called channel.handleException(exception); -// logger.debug(() -> new ParameterizedMessage("exception while closing channel: {}", context.getChannel()), exception); } /** @@ -79,8 +84,6 @@ protected void closeException(ChannelContext channel, Exception exception) { * @param exception that was thrown */ protected void genericChannelException(ChannelContext channel, Exception exception) { - // TODO: Check that this exception handler should be called channel.handleException(exception); -// logger.debug(() -> new ParameterizedMessage("exception while handling event for channel: {}", channel.getChannel()), exception); } } diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketEventHandler.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketEventHandler.java index 2a730c932e09c..b486243f2197d 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketEventHandler.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketEventHandler.java @@ -21,13 +21,17 @@ import java.io.IOException; import java.nio.channels.SelectionKey; -import java.util.function.BiConsumer; +import java.util.function.Consumer; /** * Event handler designed to handle events from non-server sockets */ public class SocketEventHandler extends EventHandler { + public SocketEventHandler(Consumer exceptionHandler) { + super(exceptionHandler); + } + /** * This method is called when a NioSocketChannel is successfully registered. It should only be called * once per channel. @@ -52,7 +56,6 @@ protected void handleRegistration(SocketChannelContext context) throws IOExcepti * @param exception that occurred */ protected void registrationException(SocketChannelContext context, Exception exception) { -// logger.debug(() -> new ParameterizedMessage("failed to register socket channel: {}", context.getChannel()), exception); context.handleException(exception); } @@ -75,7 +78,6 @@ protected void handleConnect(SocketChannelContext context) throws IOException { * @param exception that occurred */ protected void connectException(SocketChannelContext context, Exception exception) { -// logger.debug(() -> new ParameterizedMessage("failed to connect to socket channel: {}", context.getChannel()), exception); context.handleException(exception); } @@ -96,7 +98,6 @@ protected void handleRead(SocketChannelContext context) throws IOException { * @param exception that occurred */ protected void readException(SocketChannelContext context, Exception exception) { -// logger.debug(() -> new ParameterizedMessage("exception while reading from socket channel: {}", context.getChannel()), exception); context.handleException(exception); } @@ -117,18 +118,16 @@ protected void handleWrite(SocketChannelContext context) throws IOException { * @param exception that occurred */ protected void writeException(SocketChannelContext context, Exception exception) { -// logger.debug(() -> new ParameterizedMessage("exception while writing to socket channel: {}", context.getChannel()), exception); context.handleException(exception); } /** * This method is called when a listener attached to a channel operation throws an exception. * - * @param listener that was called * @param exception that occurred */ - protected void listenerException(BiConsumer listener, Exception exception) { -// logger.warn(new ParameterizedMessage("exception while executing listener: {}", listener), exception); + protected void listenerException(Exception exception) { + exceptionHandler.accept(exception); } /** diff --git a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketSelector.java b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketSelector.java index b1a3a08f02ddf..88b3cef41cd01 100644 --- a/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketSelector.java +++ b/libs/elasticsearch-nio/src/main/java/org/elasticsearch/nio/SocketSelector.java @@ -143,7 +143,7 @@ public void executeListener(BiConsumer listener, V value) { try { listener.accept(value, null); } catch (Exception e) { - eventHandler.listenerException(listener, e); + eventHandler.listenerException(e); } } @@ -159,7 +159,7 @@ public void executeFailedListener(BiConsumer listener, Excepti try { listener.accept(null, exception); } catch (Exception e) { - eventHandler.listenerException(listener, e); + eventHandler.listenerException(e); } } diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptorEventHandlerTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptorEventHandlerTests.java index 221f44cf0dec3..a162a8e234c21 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptorEventHandlerTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/AcceptorEventHandlerTests.java @@ -50,7 +50,7 @@ public void setUpHandler() throws IOException { ArrayList selectors = new ArrayList<>(); selectors.add(mock(SocketSelector.class)); selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new SocketSelector[selectors.size()])); - handler = new AcceptorEventHandler(selectorSupplier); + handler = new AcceptorEventHandler(selectorSupplier, mock(Consumer.class)); channel = new NioServerSocketChannel(mock(ServerSocketChannel.class)); context = new DoNotRegisterContext(channel, mock(AcceptingSelector.class), mock(Consumer.class)); @@ -99,6 +99,14 @@ public void testHandleAcceptCallsServerAcceptCallback() throws IOException { verify(serverChannelContext).acceptChannels(selectorSupplier); } + public void testAcceptExceptionCallsExceptionHandler() throws IOException { + ServerChannelContext serverChannelContext = mock(ServerChannelContext.class); + IOException exception = new IOException(); + handler.acceptException(serverChannelContext, exception); + + verify(serverChannelContext).handleException(exception); + } + private class DoNotRegisterContext extends ServerChannelContext { diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioGroupTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioGroupTests.java index 4d65eeed3dfa5..13ce2c136544e 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioGroupTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/NioGroupTests.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.function.Consumer; import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; import static org.mockito.Mockito.mock; @@ -34,10 +35,12 @@ public class NioGroupTests extends ESTestCase { private NioGroup nioGroup; @Override + @SuppressWarnings("unchecked") public void setUp() throws Exception { super.setUp(); - nioGroup = new NioGroup(daemonThreadFactory(Settings.EMPTY, "acceptor"), 1, AcceptorEventHandler::new, - daemonThreadFactory(Settings.EMPTY, "selector"), 1, SocketEventHandler::new); + nioGroup = new NioGroup(daemonThreadFactory(Settings.EMPTY, "acceptor"), 1, + (s) -> new AcceptorEventHandler(s, mock(Consumer.class)), daemonThreadFactory(Settings.EMPTY, "selector"), 1, + () -> new SocketEventHandler(mock(Consumer.class))); } @Override @@ -69,10 +72,12 @@ public void testCanCloseTwice() throws IOException { nioGroup.close(); } + @SuppressWarnings("unchecked") public void testExceptionAtStartIsHandled() throws IOException { RuntimeException ex = new RuntimeException(); CheckedRunnable ctor = () -> new NioGroup(r -> {throw ex;}, 1, - AcceptorEventHandler::new, daemonThreadFactory(Settings.EMPTY, "selector"), 1, SocketEventHandler::new); + (s) -> new AcceptorEventHandler(s, mock(Consumer.class)), daemonThreadFactory(Settings.EMPTY, "selector"), + 1, () -> new SocketEventHandler(mock(Consumer.class))); RuntimeException runtimeException = expectThrows(RuntimeException.class, ctor::run); assertSame(ex, runtimeException); // ctor starts threads. So we are testing that a failure to construct will stop threads. Our thread diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketEventHandlerTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketEventHandlerTests.java index 0f1cc697501ca..c85d9c0c5a8f8 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketEventHandlerTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketEventHandlerTests.java @@ -36,7 +36,8 @@ public class SocketEventHandlerTests extends ESTestCase { - private Consumer exceptionHandler; + private Consumer channelExceptionHandler; + private Consumer genericExceptionHandler; private ReadWriteHandler readWriteHandler; private SocketEventHandler handler; @@ -47,15 +48,16 @@ public class SocketEventHandlerTests extends ESTestCase { @Before @SuppressWarnings("unchecked") public void setUpHandler() throws IOException { - exceptionHandler = mock(Consumer.class); + channelExceptionHandler = mock(Consumer.class); + genericExceptionHandler = mock(Consumer.class); readWriteHandler = mock(ReadWriteHandler.class); SocketSelector selector = mock(SocketSelector.class); - handler = new SocketEventHandler(); + handler = new SocketEventHandler(genericExceptionHandler); rawChannel = mock(SocketChannel.class); channel = new NioSocketChannel(rawChannel); when(rawChannel.finishConnect()).thenReturn(true); - context = new DoNotRegisterContext(channel, selector, exceptionHandler, new TestSelectionKey(0), readWriteHandler); + context = new DoNotRegisterContext(channel, selector, channelExceptionHandler, new TestSelectionKey(0), readWriteHandler); channel.setContext(context); handler.handleRegistration(context); @@ -96,7 +98,7 @@ public void testRegisterWithPendingWritesAddsOP_CONNECTAndOP_READAndOP_WRITEInte public void testRegistrationExceptionCallsExceptionHandler() throws IOException { CancelledKeyException exception = new CancelledKeyException(); handler.registrationException(context, exception); - verify(exceptionHandler).accept(exception); + verify(channelExceptionHandler).accept(exception); } public void testConnectDoesNotRemoveOP_CONNECTInterestIfIncomplete() throws IOException { @@ -114,7 +116,7 @@ public void testConnectRemovesOP_CONNECTInterestIfComplete() throws IOException public void testConnectExceptionCallsExceptionHandler() throws IOException { IOException exception = new IOException(); handler.connectException(context, exception); - verify(exceptionHandler).accept(exception); + verify(channelExceptionHandler).accept(exception); } public void testHandleReadDelegatesToContext() throws IOException { @@ -130,13 +132,13 @@ public void testHandleReadDelegatesToContext() throws IOException { public void testReadExceptionCallsExceptionHandler() { IOException exception = new IOException(); handler.readException(context, exception); - verify(exceptionHandler).accept(exception); + verify(channelExceptionHandler).accept(exception); } public void testWriteExceptionCallsExceptionHandler() { IOException exception = new IOException(); handler.writeException(context, exception); - verify(exceptionHandler).accept(exception); + verify(channelExceptionHandler).accept(exception); } public void testPostHandlingCallWillCloseTheChannelIfReady() throws IOException { @@ -192,6 +194,12 @@ public void testPostHandlingWillRemoveWriteIfNecessary() throws IOException { assertEquals(SelectionKey.OP_READ, key.interestOps()); } + public void testListenerExceptionCallsGenericExceptionHandler() throws IOException { + RuntimeException listenerException = new RuntimeException(); + handler.listenerException(listenerException); + verify(genericExceptionHandler).accept(listenerException); + } + private class DoNotRegisterContext extends BytesChannelContext { private final TestSelectionKey selectionKey; diff --git a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketSelectorTests.java b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketSelectorTests.java index a68f5c05dad5a..78911f2028953 100644 --- a/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketSelectorTests.java +++ b/libs/elasticsearch-nio/src/test/java/org/elasticsearch/nio/SocketSelectorTests.java @@ -297,7 +297,7 @@ public void testExecuteListenerWillHandleException() throws Exception { socketSelector.executeListener(listener, null); - verify(eventHandler).listenerException(listener, exception); + verify(eventHandler).listenerException(exception); } public void testExecuteFailedListenerWillHandleException() throws Exception { @@ -307,6 +307,6 @@ public void testExecuteFailedListenerWillHandleException() throws Exception { socketSelector.executeFailedListener(listener, ioException); - verify(eventHandler).listenerException(listener, exception); + verify(eventHandler).listenerException(exception); } } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java index 20deb3f709cfc..06f581d7ab701 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java @@ -155,8 +155,9 @@ protected void doStart() { int acceptorCount = NIO_HTTP_ACCEPTOR_COUNT.get(settings); int workerCount = NIO_HTTP_WORKER_COUNT.get(settings); nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount, - AcceptorEventHandler::new, daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), - workerCount, SocketEventHandler::new); + (s) -> new AcceptorEventHandler(s, this::nonChannelExceptionCaught), + daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), workerCount, + () -> new SocketEventHandler(this::nonChannelExceptionCaught)); channelFactory = new HttpChannelFactory(); this.boundAddress = createBoundHttpAddress(); @@ -265,6 +266,10 @@ protected void exceptionCaught(NioSocketChannel channel, Exception cause) { } } + protected void nonChannelExceptionCaught(Exception ex) { + logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", Thread.currentThread().getName()), ex); + } + private void closeChannels(List channels) { List> futures = new ArrayList<>(channels.size()); @@ -312,8 +317,10 @@ public NioSocketChannel createChannel(SocketSelector selector, SocketChannel cha @Override public NioServerSocketChannel createServerChannel(AcceptingSelector selector, ServerSocketChannel channel) throws IOException { NioServerSocketChannel nioChannel = new NioServerSocketChannel(channel); - ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, NioHttpServerTransport.this::acceptChannel, - (e) -> {}); + Consumer exceptionHandler = (e) -> logger.error(() -> + new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e); + Consumer acceptor = NioHttpServerTransport.this::acceptChannel; + ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, acceptor, exceptionHandler); nioChannel.setContext(context); return nioChannel; } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 9b47a93469081..93b424d34b7ef 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport.nio; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -106,8 +107,9 @@ protected void doStart() { acceptorCount = NioTransport.NIO_ACCEPTOR_COUNT.get(settings); } nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount, - AcceptorEventHandler::new, daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), - NioTransport.NIO_WORKER_COUNT.get(settings), SocketEventHandler::new); + (s) -> new AcceptorEventHandler(s, this::nonChannelExceptionCaught), + daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), NioTransport.NIO_WORKER_COUNT.get(settings), + () -> new SocketEventHandler(this::nonChannelExceptionCaught)); ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default"); clientChannelFactory = channelFactory(clientProfileSettings, true); @@ -193,8 +195,10 @@ public TcpNioSocketChannel createChannel(SocketSelector selector, SocketChannel @Override public TcpNioServerSocketChannel createServerChannel(AcceptingSelector selector, ServerSocketChannel channel) throws IOException { TcpNioServerSocketChannel nioChannel = new TcpNioServerSocketChannel(profileName, channel); - ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, NioTransport.this::acceptChannel, - (e) -> {}); + Consumer exceptionHandler = (e) -> logger.error(() -> + new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e); + Consumer acceptor = NioTransport.this::acceptChannel; + ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, acceptor, exceptionHandler); nioChannel.setContext(context); return nioChannel; } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpChannel.java b/server/src/main/java/org/elasticsearch/transport/TcpChannel.java index 42f1417d79b32..1a022ee9f4856 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpChannel.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpChannel.java @@ -107,6 +107,15 @@ public interface TcpChannel extends Releasable { */ void sendMessage(BytesReference reference, ActionListener listener); + /** + * Closes the channel without blocking. + * + * @param channel to close + */ + static void closeChannel(C channel) { + closeChannel(channel, false); + } + /** * Closes the channel. * diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 033b72d04d985..8287c7cbdd83c 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -983,7 +983,7 @@ protected final void doStop() { protected void onException(TcpChannel channel, Exception e) { if (!lifecycle.started()) { // just close and ignore - we are already stopped and just need to make sure we release all resources - TcpChannel.closeChannel(channel, false); + TcpChannel.closeChannel(channel); return; } @@ -991,20 +991,20 @@ protected void onException(TcpChannel channel, Exception e) { logger.trace(() -> new ParameterizedMessage( "close connection exception caught on transport layer [{}], disconnecting from relevant node", channel), e); // close the channel, which will cause a node to be disconnected if relevant - TcpChannel.closeChannel(channel, false); + TcpChannel.closeChannel(channel); } else if (isConnectException(e)) { logger.trace(() -> new ParameterizedMessage("connect exception caught on transport layer [{}]", channel), e); // close the channel as safe measure, which will cause a node to be disconnected if relevant - TcpChannel.closeChannel(channel, false); + TcpChannel.closeChannel(channel); } else if (e instanceof BindException) { logger.trace(() -> new ParameterizedMessage("bind exception caught on transport layer [{}]", channel), e); // close the channel as safe measure, which will cause a node to be disconnected if relevant - TcpChannel.closeChannel(channel, false); + TcpChannel.closeChannel(channel); } else if (e instanceof CancelledKeyException) { logger.trace(() -> new ParameterizedMessage( "cancelled key exception caught on transport layer [{}], disconnecting from relevant node", channel), e); // close the channel as safe measure, which will cause a node to be disconnected if relevant - TcpChannel.closeChannel(channel, false); + TcpChannel.closeChannel(channel); } else if (e instanceof TcpTransport.HttpOnTransportException) { // in case we are able to return data, serialize the exception content and sent it back to the client if (channel.isOpen()) { @@ -1012,13 +1012,13 @@ protected void onException(TcpChannel channel, Exception e) { final SendMetricListener closeChannel = new SendMetricListener(message.length()) { @Override protected void innerInnerOnResponse(Void v) { - TcpChannel.closeChannel(channel, false); + TcpChannel.closeChannel(channel); } @Override protected void innerOnFailure(Exception e) { logger.debug("failed to send message to httpOnTransport channel", e); - TcpChannel.closeChannel(channel, false); + TcpChannel.closeChannel(channel); } }; internalSendMessage(channel, message, closeChannel); @@ -1026,10 +1026,14 @@ protected void innerOnFailure(Exception e) { } else { logger.warn(() -> new ParameterizedMessage("exception caught on transport layer [{}], closing connection", channel), e); // close the channel, which will cause a node to be disconnected if relevant - TcpChannel.closeChannel(channel, false); + TcpChannel.closeChannel(channel); } } + protected void nonChannelExceptionCaught(Exception ex) { + logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", Thread.currentThread().getName()), ex); + } + protected void serverAcceptedChannel(TcpChannel channel) { boolean addedOnThisCall = acceptedChannels.add(channel); assert addedOnThisCall : "Channel should only be added to accept channel set once"; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index 66c2fdd1815ca..e684502f2876b 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport.nio; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.BytesReference; @@ -52,6 +53,7 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.concurrent.ConcurrentMap; +import java.util.function.Consumer; import java.util.function.Supplier; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; @@ -97,8 +99,9 @@ protected void doStart() { acceptorCount = 1; } nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount, - AcceptorEventHandler::new, daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), - 2, TestingSocketEventHandler::new); + (s) -> new AcceptorEventHandler(s, this::nonChannelExceptionCaught), + daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2, + () -> new TestingSocketEventHandler(this::nonChannelExceptionCaught)); ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default"); clientChannelFactory = new MockTcpChannelFactory(clientProfileSettings, "client"); @@ -172,8 +175,10 @@ public MockSocketChannel createChannel(SocketSelector selector, SocketChannel ch @Override public MockServerChannel createServerChannel(AcceptingSelector selector, ServerSocketChannel channel) throws IOException { MockServerChannel nioServerChannel = new MockServerChannel(profileName, channel, this, selector); + Consumer exceptionHandler = (e) -> logger.error(() -> + new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e); ServerChannelContext context = new ServerChannelContext(nioServerChannel, this, selector, MockNioTransport.this::acceptChannel, - (e) -> {}); + exceptionHandler); nioServerChannel.setContext(context); return nioServerChannel; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/TestingSocketEventHandler.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/TestingSocketEventHandler.java index 875c359650850..810e42010224c 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/TestingSocketEventHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/TestingSocketEventHandler.java @@ -26,11 +26,16 @@ import java.util.Collections; import java.util.Set; import java.util.WeakHashMap; +import java.util.function.Consumer; public class TestingSocketEventHandler extends SocketEventHandler { private Set hasConnectedMap = Collections.newSetFromMap(new WeakHashMap<>()); + public TestingSocketEventHandler(Consumer exceptionHandler) { + super(exceptionHandler); + } + public void handleConnect(SocketChannelContext context) throws IOException { assert hasConnectedMap.contains(context) == false : "handleConnect should only be called is a channel is not yet connected"; super.handleConnect(context); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java index a7ef1f0c02f4f..d897d55e5fdc4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java @@ -111,7 +111,7 @@ protected ChannelHandler getClientChannelInitializer() { protected void onException(TcpChannel channel, Exception e) { if (!lifecycle.started()) { // just close and ignore - we are already stopped and just need to make sure we release all resources - TcpChannel.closeChannel(channel, false); + TcpChannel.closeChannel(channel); } else if (SSLExceptionHelper.isNotSslRecordException(e)) { if (logger.isTraceEnabled()) { logger.trace( @@ -119,21 +119,21 @@ protected void onException(TcpChannel channel, Exception e) { } else { logger.warn("received plaintext traffic on an encrypted channel, closing connection {}", channel); } - TcpChannel.closeChannel(channel, false); + TcpChannel.closeChannel(channel); } else if (SSLExceptionHelper.isCloseDuringHandshakeException(e)) { if (logger.isTraceEnabled()) { logger.trace(new ParameterizedMessage("connection {} closed during ssl handshake", channel), e); } else { logger.warn("connection {} closed during handshake", channel); } - TcpChannel.closeChannel(channel, false); + TcpChannel.closeChannel(channel); } else if (SSLExceptionHelper.isReceivedCertificateUnknownException(e)) { if (logger.isTraceEnabled()) { logger.trace(new ParameterizedMessage("client did not trust server's certificate, closing connection {}", channel), e); } else { logger.warn("client did not trust this server's certificate, closing connection {}", channel); } - TcpChannel.closeChannel(channel, false); + TcpChannel.closeChannel(channel); } else { super.onException(channel, e); } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java index 0f511af6b57d8..1c9d779c2cc37 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.security.transport.nio; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.recycler.Recycler; @@ -137,8 +138,10 @@ public TcpNioSocketChannel createChannel(SocketSelector selector, SocketChannel @Override public TcpNioServerSocketChannel createServerChannel(AcceptingSelector selector, ServerSocketChannel channel) throws IOException { TcpNioServerSocketChannel nioChannel = new TcpNioServerSocketChannel(profileName, channel); - ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, SecurityNioTransport.this::acceptChannel, - (e) -> {}); + Consumer exceptionHandler = (e) -> logger.error(() -> + new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e); + Consumer acceptor = SecurityNioTransport.this::acceptChannel; + ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, acceptor, exceptionHandler); nioChannel.setContext(context); return nioChannel; } From c2d456cd86928487fa318644551f6b2d9ef9baee Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 21 May 2018 12:34:29 -0600 Subject: [PATCH 3/3] Add doc --- .../org/elasticsearch/transport/nio/NioTransport.java | 4 ++-- .../java/org/elasticsearch/transport/TcpTransport.java | 10 ++++++++-- .../elasticsearch/transport/nio/MockNioTransport.java | 4 ++-- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 93b424d34b7ef..2ef49d7791234 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -107,9 +107,9 @@ protected void doStart() { acceptorCount = NioTransport.NIO_ACCEPTOR_COUNT.get(settings); } nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount, - (s) -> new AcceptorEventHandler(s, this::nonChannelExceptionCaught), + (s) -> new AcceptorEventHandler(s, this::onNonChannelException), daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), NioTransport.NIO_WORKER_COUNT.get(settings), - () -> new SocketEventHandler(this::nonChannelExceptionCaught)); + () -> new SocketEventHandler(this::onNonChannelException)); ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default"); clientChannelFactory = channelFactory(clientProfileSettings, true); diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 8287c7cbdd83c..04a882f3e8b45 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -1030,8 +1030,14 @@ protected void innerOnFailure(Exception e) { } } - protected void nonChannelExceptionCaught(Exception ex) { - logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", Thread.currentThread().getName()), ex); + /** + * Exception handler for exceptions that are not associated with a specific channel. + * + * @param exception the exception + */ + protected void onNonChannelException(Exception exception) { + logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", Thread.currentThread().getName()), + exception); } protected void serverAcceptedChannel(TcpChannel channel) { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index e684502f2876b..9481f60d93384 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -99,9 +99,9 @@ protected void doStart() { acceptorCount = 1; } nioGroup = new NioGroup(daemonThreadFactory(this.settings, TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount, - (s) -> new AcceptorEventHandler(s, this::nonChannelExceptionCaught), + (s) -> new AcceptorEventHandler(s, this::onNonChannelException), daemonThreadFactory(this.settings, TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2, - () -> new TestingSocketEventHandler(this::nonChannelExceptionCaught)); + () -> new TestingSocketEventHandler(this::onNonChannelException)); ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default"); clientChannelFactory = new MockTcpChannelFactory(clientProfileSettings, "client");