From f37eb1b403cdfe391f625a9013e35f3c4ee26323 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 20 Nov 2017 12:20:42 -0700 Subject: [PATCH] Remove tcp profile from low level nio channel (#27441) This is related to #27260. Currently every nio channel has a profile field. Profile is a concept that only relates to the tcp transport. Http channels will not have profiles. This commit moves the profile from the nio channel to the read context. The context is the level that protocol specific features and logic should live. --- .../transport/nio/NioTransport.java | 15 +++-- .../transport/nio/TcpReadHandler.java | 6 +- .../nio/channel/AbstractNioChannel.java | 9 +-- .../transport/nio/channel/ChannelFactory.java | 58 +++++++++++++------ .../transport/nio/channel/NioChannel.java | 4 -- .../nio/channel/NioServerSocketChannel.java | 9 ++- .../nio/channel/NioSocketChannel.java | 7 +-- .../nio/AcceptorEventHandlerTests.java | 8 +-- .../nio/SocketEventHandlerTests.java | 2 +- .../nio/channel/ChannelFactoryTests.java | 9 +-- .../nio/channel/DoNotRegisterChannel.java | 4 +- .../channel/DoNotRegisterServerChannel.java | 7 +-- .../channel/NioServerSocketChannelTests.java | 2 +- .../nio/channel/NioSocketChannelTests.java | 10 ++-- .../nio/channel/TcpReadContextTests.java | 2 - 15 files changed, 79 insertions(+), 73 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 822c0181ae7e9..38c897b3be241 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -66,7 +66,6 @@ public class NioTransport extends TcpTransport { intSetting("transport.nio.acceptor_count", 1, 1, Setting.Property.NodeScope); protected final OpenChannels openChannels = new OpenChannels(logger); - private final Consumer contextSetter; private final ConcurrentMap profileToChannelFactory = newConcurrentMap(); private final ArrayList acceptors = new ArrayList<>(); private final ArrayList socketSelectors = new ArrayList<>(); @@ -77,7 +76,6 @@ public class NioTransport extends TcpTransport { public NioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { super("nio", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService); - contextSetter = (c) -> c.setContexts(new TcpReadContext(c, new TcpReadHandler(this)), new TcpWriteContext(c)); } @Override @@ -89,7 +87,7 @@ public long getNumOpenServerConnections() { protected NioServerSocketChannel bind(String name, InetSocketAddress address) throws IOException { ChannelFactory channelFactory = this.profileToChannelFactory.get(name); AcceptingSelector selector = acceptors.get(++acceptorNumber % NioTransport.NIO_ACCEPTOR_COUNT.get(settings)); - return channelFactory.openNioServerSocketChannel(name, address, selector); + return channelFactory.openNioServerSocketChannel(address, selector); } @Override @@ -119,8 +117,9 @@ protected void doStart() { } } + Consumer clientContextSetter = getContextSetter("client-socket"); clientSelectorSupplier = new RoundRobinSelectorSupplier(socketSelectors); - clientChannelFactory = new ChannelFactory(new ProfileSettings(settings, "default"), contextSetter); + clientChannelFactory = new ChannelFactory(new ProfileSettings(settings, "default"), clientContextSetter); if (NetworkService.NETWORK_SERVER.get(settings)) { int acceptorCount = NioTransport.NIO_ACCEPTOR_COUNT.get(settings); @@ -142,7 +141,9 @@ protected void doStart() { // loop through all profiles and start them up, special handling for default one for (ProfileSettings profileSettings : profileSettings) { - profileToChannelFactory.putIfAbsent(profileSettings.profileName, new ChannelFactory(profileSettings, contextSetter)); + String profileName = profileSettings.profileName; + Consumer contextSetter = getContextSetter(profileName); + profileToChannelFactory.putIfAbsent(profileName, new ChannelFactory(profileSettings, contextSetter)); bindServer(profileSettings); } } @@ -174,4 +175,8 @@ protected SocketEventHandler getSocketEventHandler() { final void exceptionCaught(NioSocketChannel channel, Exception exception) { onException(channel, exception); } + + private Consumer getContextSetter(String profileName) { + return (c) -> c.setContexts(new TcpReadContext(c, new TcpReadHandler(profileName,this)), new TcpWriteContext(c)); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpReadHandler.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpReadHandler.java index 2cb59ed95d1c5..1260546d34cab 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpReadHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpReadHandler.java @@ -26,15 +26,17 @@ public class TcpReadHandler { + private final String profile; private final NioTransport transport; - public TcpReadHandler(NioTransport transport) { + public TcpReadHandler(String profile, NioTransport transport) { + this.profile = profile; this.transport = transport; } public void handleMessage(BytesReference reference, NioSocketChannel channel, int messageBytesLength) { try { - transport.messageReceived(reference, channel, channel.getProfile(), channel.getRemoteAddress(), messageBytesLength); + transport.messageReceived(reference, channel, profile, channel.getRemoteAddress(), messageBytesLength); } catch (IOException e) { handleException(channel, e); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java index a2924eff56bcb..7743fe0d83c2a 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java @@ -56,13 +56,11 @@ public abstract class AbstractNioChannel closeContext = new CompletableFuture<>(); private final ESSelector selector; private SelectionKey selectionKey; - AbstractNioChannel(String profile, S socketChannel, ESSelector selector) throws IOException { - this.profile = profile; + AbstractNioChannel(S socketChannel, ESSelector selector) throws IOException { this.socketChannel = socketChannel; this.localAddress = (InetSocketAddress) socketChannel.getLocalAddress(); this.selector = selector; @@ -78,11 +76,6 @@ public InetSocketAddress getLocalAddress() { return localAddress; } - @Override - public String getProfile() { - return profile; - } - /** * Schedules a channel to be closed by the selector event loop with which it is registered. *

diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java index 8d739fd72778e..84385de062681 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java @@ -20,7 +20,6 @@ package org.elasticsearch.transport.nio.channel; -import org.apache.lucene.util.IOUtils; import org.elasticsearch.mocksocket.PrivilegedSocketAccess; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.nio.AcceptingSelector; @@ -64,33 +63,51 @@ public ChannelFactory(TcpTransport.ProfileSettings profileSettings, Consumer listener) @Override public String toString() { return "NioServerSocketChannel{" + - "profile=" + getProfile() + - ", localAddress=" + getLocalAddress() + + "localAddress=" + getLocalAddress() + '}'; } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java index fb2d940348a16..d0c3d9c3330d9 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java @@ -41,8 +41,8 @@ public class NioSocketChannel extends AbstractNioChannel { private ReadContext readContext; private Exception connectException; - public NioSocketChannel(String profile, SocketChannel socketChannel, SocketSelector selector) throws IOException { - super(profile, socketChannel, selector); + public NioSocketChannel(SocketChannel socketChannel, SocketSelector selector) throws IOException { + super(socketChannel, selector); this.remoteAddress = (InetSocketAddress) socketChannel.getRemoteAddress(); this.socketSelector = selector; } @@ -169,8 +169,7 @@ public void addConnectListener(ActionListener listener) { @Override public String toString() { return "NioSocketChannel{" + - "profile=" + getProfile() + - ", localAddress=" + getLocalAddress() + + "localAddress=" + getLocalAddress() + ", remoteAddress=" + remoteAddress + '}'; } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/AcceptorEventHandlerTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/AcceptorEventHandlerTests.java index abca0295f5835..3f23531407cb0 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/AcceptorEventHandlerTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/AcceptorEventHandlerTests.java @@ -23,13 +23,11 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.nio.channel.ChannelFactory; import org.elasticsearch.transport.nio.channel.DoNotRegisterServerChannel; -import org.elasticsearch.transport.nio.channel.NioChannel; import org.elasticsearch.transport.nio.channel.NioServerSocketChannel; import org.elasticsearch.transport.nio.channel.NioSocketChannel; import org.elasticsearch.transport.nio.channel.ReadContext; import org.elasticsearch.transport.nio.channel.WriteContext; import org.junit.Before; -import org.mockito.ArgumentCaptor; import java.io.IOException; import java.nio.channels.SelectionKey; @@ -67,7 +65,7 @@ public void setUpHandler() throws IOException { handler = new AcceptorEventHandler(logger, openChannels, new RoundRobinSelectorSupplier(selectors), acceptedChannelCallback); AcceptingSelector selector = mock(AcceptingSelector.class); - channel = new DoNotRegisterServerChannel("", mock(ServerSocketChannel.class), channelFactory, selector); + channel = new DoNotRegisterServerChannel(mock(ServerSocketChannel.class), channelFactory, selector); channel.register(); } @@ -88,7 +86,7 @@ public void testHandleRegisterSetsOP_ACCEPTInterest() { } public void testHandleAcceptCallsChannelFactory() throws IOException { - NioSocketChannel childChannel = new NioSocketChannel("", mock(SocketChannel.class), socketSelector); + NioSocketChannel childChannel = new NioSocketChannel(mock(SocketChannel.class), socketSelector); when(channelFactory.acceptNioChannel(same(channel), same(socketSelector))).thenReturn(childChannel); handler.acceptChannel(channel); @@ -100,7 +98,7 @@ public void testHandleAcceptCallsChannelFactory() throws IOException { @SuppressWarnings("unchecked") public void testHandleAcceptAddsToOpenChannelsAndIsRemovedOnClose() throws IOException { SocketChannel rawChannel = SocketChannel.open(); - NioSocketChannel childChannel = new NioSocketChannel("", rawChannel, socketSelector); + NioSocketChannel childChannel = new NioSocketChannel(rawChannel, socketSelector); childChannel.setContexts(mock(ReadContext.class), mock(WriteContext.class)); when(channelFactory.acceptNioChannel(same(channel), same(socketSelector))).thenReturn(childChannel); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketEventHandlerTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketEventHandlerTests.java index abbe023b97c0b..cd4e70ab3acb0 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketEventHandlerTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketEventHandlerTests.java @@ -57,7 +57,7 @@ public void setUpHandler() throws IOException { SocketSelector socketSelector = mock(SocketSelector.class); handler = new SocketEventHandler(logger, exceptionHandler, mock(OpenChannels.class)); rawChannel = mock(SocketChannel.class); - channel = new DoNotRegisterChannel("", rawChannel, socketSelector); + channel = new DoNotRegisterChannel(rawChannel, socketSelector); readContext = mock(ReadContext.class); when(rawChannel.finishConnect()).thenReturn(true); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/ChannelFactoryTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/ChannelFactoryTests.java index 50770f459cd75..f6bcf26a02c2f 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/ChannelFactoryTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/ChannelFactoryTests.java @@ -76,14 +76,12 @@ public void ensureClosed() throws IOException { public void testAcceptChannel() throws IOException { NioServerSocketChannel serverChannel = mock(NioServerSocketChannel.class); when(rawChannelFactory.acceptNioChannel(serverChannel)).thenReturn(rawChannel); - when(serverChannel.getProfile()).thenReturn("parent-profile"); NioSocketChannel channel = channelFactory.acceptNioChannel(serverChannel, socketSelector); verify(socketSelector).scheduleForRegistration(channel); assertEquals(socketSelector, channel.getSelector()); - assertEquals("parent-profile", channel.getProfile()); assertEquals(rawChannel, channel.getRawChannel()); } @@ -106,7 +104,6 @@ public void testOpenChannel() throws IOException { verify(socketSelector).scheduleForRegistration(channel); assertEquals(socketSelector, channel.getSelector()); - assertEquals("client-socket", channel.getProfile()); assertEquals(rawChannel, channel.getRawChannel()); } @@ -124,13 +121,11 @@ public void testOpenServerChannel() throws IOException { InetSocketAddress address = mock(InetSocketAddress.class); when(rawChannelFactory.openNioServerSocketChannel(same(address))).thenReturn(rawServerChannel); - String profile = "profile"; - NioServerSocketChannel channel = channelFactory.openNioServerSocketChannel(profile, address, acceptingSelector); + NioServerSocketChannel channel = channelFactory.openNioServerSocketChannel(address, acceptingSelector); verify(acceptingSelector).scheduleForRegistration(channel); assertEquals(acceptingSelector, channel.getSelector()); - assertEquals(profile, channel.getProfile()); assertEquals(rawServerChannel, channel.getRawChannel()); } @@ -139,7 +134,7 @@ public void testOpenedServerChannelRejected() throws IOException { when(rawChannelFactory.openNioServerSocketChannel(same(address))).thenReturn(rawServerChannel); doThrow(new IllegalStateException()).when(acceptingSelector).scheduleForRegistration(any()); - expectThrows(IllegalStateException.class, () -> channelFactory.openNioServerSocketChannel("", address, acceptingSelector)); + expectThrows(IllegalStateException.class, () -> channelFactory.openNioServerSocketChannel(address, acceptingSelector)); assertFalse(rawServerChannel.isOpen()); } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/DoNotRegisterChannel.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/DoNotRegisterChannel.java index 70496da8a491d..f1f6ffb9f1138 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/DoNotRegisterChannel.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/DoNotRegisterChannel.java @@ -28,8 +28,8 @@ public class DoNotRegisterChannel extends NioSocketChannel { - public DoNotRegisterChannel(String profile, SocketChannel socketChannel, SocketSelector selector) throws IOException { - super(profile, socketChannel, selector); + public DoNotRegisterChannel(SocketChannel socketChannel, SocketSelector selector) throws IOException { + super(socketChannel, selector); } @Override diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/DoNotRegisterServerChannel.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/DoNotRegisterServerChannel.java index 783bd6fc5fa18..073f2acf384da 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/DoNotRegisterServerChannel.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/DoNotRegisterServerChannel.java @@ -20,7 +20,6 @@ package org.elasticsearch.transport.nio.channel; import org.elasticsearch.transport.nio.AcceptingSelector; -import org.elasticsearch.transport.nio.ESSelector; import org.elasticsearch.transport.nio.utils.TestSelectionKey; import java.io.IOException; @@ -29,9 +28,9 @@ public class DoNotRegisterServerChannel extends NioServerSocketChannel { - public DoNotRegisterServerChannel(String profile, ServerSocketChannel channel, ChannelFactory channelFactory, - AcceptingSelector selector) throws IOException { - super(profile, channel, channelFactory, selector); + public DoNotRegisterServerChannel(ServerSocketChannel channel, ChannelFactory channelFactory, AcceptingSelector selector) + throws IOException { + super(channel, channelFactory, selector); } @Override diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java index 93d905c806810..9c01f5edc6106 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java @@ -101,7 +101,7 @@ private class DoNotCloseServerChannel extends DoNotRegisterServerChannel { private DoNotCloseServerChannel(String profile, ServerSocketChannel channel, ChannelFactory channelFactory, AcceptingSelector selector) throws IOException { - super(profile, channel, channelFactory, selector); + super(channel, channelFactory, selector); } @Override diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java index 1fb32d0f6e164..e3053a3e73a3c 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java @@ -71,7 +71,7 @@ public void testClose() throws Exception { AtomicBoolean isClosed = new AtomicBoolean(false); CountDownLatch latch = new CountDownLatch(1); - NioSocketChannel socketChannel = new DoNotCloseChannel(NioChannel.CLIENT, mock(SocketChannel.class), selector); + NioSocketChannel socketChannel = new DoNotCloseChannel(mock(SocketChannel.class), selector); openChannels.clientChannelOpened(socketChannel); socketChannel.setContexts(mock(ReadContext.class), mock(WriteContext.class)); socketChannel.addCloseListener(new ActionListener() { @@ -107,7 +107,7 @@ public void onFailure(Exception e) { public void testConnectSucceeds() throws Exception { SocketChannel rawChannel = mock(SocketChannel.class); when(rawChannel.finishConnect()).thenReturn(true); - NioSocketChannel socketChannel = new DoNotCloseChannel(NioChannel.CLIENT, rawChannel, selector); + NioSocketChannel socketChannel = new DoNotCloseChannel(rawChannel, selector); socketChannel.setContexts(mock(ReadContext.class), mock(WriteContext.class)); selector.scheduleForRegistration(socketChannel); @@ -123,7 +123,7 @@ public void testConnectSucceeds() throws Exception { public void testConnectFails() throws Exception { SocketChannel rawChannel = mock(SocketChannel.class); when(rawChannel.finishConnect()).thenThrow(new ConnectException()); - NioSocketChannel socketChannel = new DoNotCloseChannel(NioChannel.CLIENT, rawChannel, selector); + NioSocketChannel socketChannel = new DoNotCloseChannel(rawChannel, selector); socketChannel.setContexts(mock(ReadContext.class), mock(WriteContext.class)); selector.scheduleForRegistration(socketChannel); @@ -139,8 +139,8 @@ public void testConnectFails() throws Exception { private class DoNotCloseChannel extends DoNotRegisterChannel { - private DoNotCloseChannel(String profile, SocketChannel channel, SocketSelector selector) throws IOException { - super(profile, channel, selector); + private DoNotCloseChannel(SocketChannel channel, SocketSelector selector) throws IOException { + super(channel, selector); } @Override diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java index 2b4db98010238..2dc0b32ae5bea 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java @@ -52,8 +52,6 @@ public void init() throws IOException { messageLength = randomInt(96) + 4; channel = mock(NioSocketChannel.class); readContext = new TcpReadContext(channel, handler); - - when(channel.getProfile()).thenReturn(PROFILE); } public void testSuccessfulRead() throws IOException {