diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 822c0181ae7e9..38c897b3be241 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -66,7 +66,6 @@ public class NioTransport extends TcpTransport { intSetting("transport.nio.acceptor_count", 1, 1, Setting.Property.NodeScope); protected final OpenChannels openChannels = new OpenChannels(logger); - private final Consumer contextSetter; private final ConcurrentMap profileToChannelFactory = newConcurrentMap(); private final ArrayList acceptors = new ArrayList<>(); private final ArrayList socketSelectors = new ArrayList<>(); @@ -77,7 +76,6 @@ public class NioTransport extends TcpTransport { public NioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { super("nio", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService); - contextSetter = (c) -> c.setContexts(new TcpReadContext(c, new TcpReadHandler(this)), new TcpWriteContext(c)); } @Override @@ -89,7 +87,7 @@ public long getNumOpenServerConnections() { protected NioServerSocketChannel bind(String name, InetSocketAddress address) throws IOException { ChannelFactory channelFactory = this.profileToChannelFactory.get(name); AcceptingSelector selector = acceptors.get(++acceptorNumber % NioTransport.NIO_ACCEPTOR_COUNT.get(settings)); - return channelFactory.openNioServerSocketChannel(name, address, selector); + return channelFactory.openNioServerSocketChannel(address, selector); } @Override @@ -119,8 +117,9 @@ protected void doStart() { } } + Consumer clientContextSetter = getContextSetter("client-socket"); clientSelectorSupplier = new RoundRobinSelectorSupplier(socketSelectors); - clientChannelFactory = new ChannelFactory(new ProfileSettings(settings, "default"), contextSetter); + clientChannelFactory = new ChannelFactory(new ProfileSettings(settings, "default"), clientContextSetter); if (NetworkService.NETWORK_SERVER.get(settings)) { int acceptorCount = NioTransport.NIO_ACCEPTOR_COUNT.get(settings); @@ -142,7 +141,9 @@ protected void doStart() { // loop through all profiles and start them up, special handling for default one for (ProfileSettings profileSettings : profileSettings) { - profileToChannelFactory.putIfAbsent(profileSettings.profileName, new ChannelFactory(profileSettings, contextSetter)); + String profileName = profileSettings.profileName; + Consumer contextSetter = getContextSetter(profileName); + profileToChannelFactory.putIfAbsent(profileName, new ChannelFactory(profileSettings, contextSetter)); bindServer(profileSettings); } } @@ -174,4 +175,8 @@ protected SocketEventHandler getSocketEventHandler() { final void exceptionCaught(NioSocketChannel channel, Exception exception) { onException(channel, exception); } + + private Consumer getContextSetter(String profileName) { + return (c) -> c.setContexts(new TcpReadContext(c, new TcpReadHandler(profileName,this)), new TcpWriteContext(c)); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpReadHandler.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpReadHandler.java index 2cb59ed95d1c5..1260546d34cab 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpReadHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/TcpReadHandler.java @@ -26,15 +26,17 @@ public class TcpReadHandler { + private final String profile; private final NioTransport transport; - public TcpReadHandler(NioTransport transport) { + public TcpReadHandler(String profile, NioTransport transport) { + this.profile = profile; this.transport = transport; } public void handleMessage(BytesReference reference, NioSocketChannel channel, int messageBytesLength) { try { - transport.messageReceived(reference, channel, channel.getProfile(), channel.getRemoteAddress(), messageBytesLength); + transport.messageReceived(reference, channel, profile, channel.getRemoteAddress(), messageBytesLength); } catch (IOException e) { handleException(channel, e); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java index a2924eff56bcb..7743fe0d83c2a 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java @@ -56,13 +56,11 @@ public abstract class AbstractNioChannel closeContext = new CompletableFuture<>(); private final ESSelector selector; private SelectionKey selectionKey; - AbstractNioChannel(String profile, S socketChannel, ESSelector selector) throws IOException { - this.profile = profile; + AbstractNioChannel(S socketChannel, ESSelector selector) throws IOException { this.socketChannel = socketChannel; this.localAddress = (InetSocketAddress) socketChannel.getLocalAddress(); this.selector = selector; @@ -78,11 +76,6 @@ public InetSocketAddress getLocalAddress() { return localAddress; } - @Override - public String getProfile() { - return profile; - } - /** * Schedules a channel to be closed by the selector event loop with which it is registered. *

diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java index 8d739fd72778e..84385de062681 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java @@ -20,7 +20,6 @@ package org.elasticsearch.transport.nio.channel; -import org.apache.lucene.util.IOUtils; import org.elasticsearch.mocksocket.PrivilegedSocketAccess; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.nio.AcceptingSelector; @@ -64,33 +63,51 @@ public ChannelFactory(TcpTransport.ProfileSettings profileSettings, Consumer listener) @Override public String toString() { return "NioServerSocketChannel{" + - "profile=" + getProfile() + - ", localAddress=" + getLocalAddress() + + "localAddress=" + getLocalAddress() + '}'; } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java index fb2d940348a16..d0c3d9c3330d9 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java @@ -41,8 +41,8 @@ public class NioSocketChannel extends AbstractNioChannel { private ReadContext readContext; private Exception connectException; - public NioSocketChannel(String profile, SocketChannel socketChannel, SocketSelector selector) throws IOException { - super(profile, socketChannel, selector); + public NioSocketChannel(SocketChannel socketChannel, SocketSelector selector) throws IOException { + super(socketChannel, selector); this.remoteAddress = (InetSocketAddress) socketChannel.getRemoteAddress(); this.socketSelector = selector; } @@ -169,8 +169,7 @@ public void addConnectListener(ActionListener listener) { @Override public String toString() { return "NioSocketChannel{" + - "profile=" + getProfile() + - ", localAddress=" + getLocalAddress() + + "localAddress=" + getLocalAddress() + ", remoteAddress=" + remoteAddress + '}'; } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/AcceptorEventHandlerTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/AcceptorEventHandlerTests.java index abca0295f5835..3f23531407cb0 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/AcceptorEventHandlerTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/AcceptorEventHandlerTests.java @@ -23,13 +23,11 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.nio.channel.ChannelFactory; import org.elasticsearch.transport.nio.channel.DoNotRegisterServerChannel; -import org.elasticsearch.transport.nio.channel.NioChannel; import org.elasticsearch.transport.nio.channel.NioServerSocketChannel; import org.elasticsearch.transport.nio.channel.NioSocketChannel; import org.elasticsearch.transport.nio.channel.ReadContext; import org.elasticsearch.transport.nio.channel.WriteContext; import org.junit.Before; -import org.mockito.ArgumentCaptor; import java.io.IOException; import java.nio.channels.SelectionKey; @@ -67,7 +65,7 @@ public void setUpHandler() throws IOException { handler = new AcceptorEventHandler(logger, openChannels, new RoundRobinSelectorSupplier(selectors), acceptedChannelCallback); AcceptingSelector selector = mock(AcceptingSelector.class); - channel = new DoNotRegisterServerChannel("", mock(ServerSocketChannel.class), channelFactory, selector); + channel = new DoNotRegisterServerChannel(mock(ServerSocketChannel.class), channelFactory, selector); channel.register(); } @@ -88,7 +86,7 @@ public void testHandleRegisterSetsOP_ACCEPTInterest() { } public void testHandleAcceptCallsChannelFactory() throws IOException { - NioSocketChannel childChannel = new NioSocketChannel("", mock(SocketChannel.class), socketSelector); + NioSocketChannel childChannel = new NioSocketChannel(mock(SocketChannel.class), socketSelector); when(channelFactory.acceptNioChannel(same(channel), same(socketSelector))).thenReturn(childChannel); handler.acceptChannel(channel); @@ -100,7 +98,7 @@ public void testHandleAcceptCallsChannelFactory() throws IOException { @SuppressWarnings("unchecked") public void testHandleAcceptAddsToOpenChannelsAndIsRemovedOnClose() throws IOException { SocketChannel rawChannel = SocketChannel.open(); - NioSocketChannel childChannel = new NioSocketChannel("", rawChannel, socketSelector); + NioSocketChannel childChannel = new NioSocketChannel(rawChannel, socketSelector); childChannel.setContexts(mock(ReadContext.class), mock(WriteContext.class)); when(channelFactory.acceptNioChannel(same(channel), same(socketSelector))).thenReturn(childChannel); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketEventHandlerTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketEventHandlerTests.java index abbe023b97c0b..cd4e70ab3acb0 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketEventHandlerTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/SocketEventHandlerTests.java @@ -57,7 +57,7 @@ public void setUpHandler() throws IOException { SocketSelector socketSelector = mock(SocketSelector.class); handler = new SocketEventHandler(logger, exceptionHandler, mock(OpenChannels.class)); rawChannel = mock(SocketChannel.class); - channel = new DoNotRegisterChannel("", rawChannel, socketSelector); + channel = new DoNotRegisterChannel(rawChannel, socketSelector); readContext = mock(ReadContext.class); when(rawChannel.finishConnect()).thenReturn(true); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/ChannelFactoryTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/ChannelFactoryTests.java index 50770f459cd75..f6bcf26a02c2f 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/ChannelFactoryTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/ChannelFactoryTests.java @@ -76,14 +76,12 @@ public void ensureClosed() throws IOException { public void testAcceptChannel() throws IOException { NioServerSocketChannel serverChannel = mock(NioServerSocketChannel.class); when(rawChannelFactory.acceptNioChannel(serverChannel)).thenReturn(rawChannel); - when(serverChannel.getProfile()).thenReturn("parent-profile"); NioSocketChannel channel = channelFactory.acceptNioChannel(serverChannel, socketSelector); verify(socketSelector).scheduleForRegistration(channel); assertEquals(socketSelector, channel.getSelector()); - assertEquals("parent-profile", channel.getProfile()); assertEquals(rawChannel, channel.getRawChannel()); } @@ -106,7 +104,6 @@ public void testOpenChannel() throws IOException { verify(socketSelector).scheduleForRegistration(channel); assertEquals(socketSelector, channel.getSelector()); - assertEquals("client-socket", channel.getProfile()); assertEquals(rawChannel, channel.getRawChannel()); } @@ -124,13 +121,11 @@ public void testOpenServerChannel() throws IOException { InetSocketAddress address = mock(InetSocketAddress.class); when(rawChannelFactory.openNioServerSocketChannel(same(address))).thenReturn(rawServerChannel); - String profile = "profile"; - NioServerSocketChannel channel = channelFactory.openNioServerSocketChannel(profile, address, acceptingSelector); + NioServerSocketChannel channel = channelFactory.openNioServerSocketChannel(address, acceptingSelector); verify(acceptingSelector).scheduleForRegistration(channel); assertEquals(acceptingSelector, channel.getSelector()); - assertEquals(profile, channel.getProfile()); assertEquals(rawServerChannel, channel.getRawChannel()); } @@ -139,7 +134,7 @@ public void testOpenedServerChannelRejected() throws IOException { when(rawChannelFactory.openNioServerSocketChannel(same(address))).thenReturn(rawServerChannel); doThrow(new IllegalStateException()).when(acceptingSelector).scheduleForRegistration(any()); - expectThrows(IllegalStateException.class, () -> channelFactory.openNioServerSocketChannel("", address, acceptingSelector)); + expectThrows(IllegalStateException.class, () -> channelFactory.openNioServerSocketChannel(address, acceptingSelector)); assertFalse(rawServerChannel.isOpen()); } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/DoNotRegisterChannel.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/DoNotRegisterChannel.java index 70496da8a491d..f1f6ffb9f1138 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/DoNotRegisterChannel.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/DoNotRegisterChannel.java @@ -28,8 +28,8 @@ public class DoNotRegisterChannel extends NioSocketChannel { - public DoNotRegisterChannel(String profile, SocketChannel socketChannel, SocketSelector selector) throws IOException { - super(profile, socketChannel, selector); + public DoNotRegisterChannel(SocketChannel socketChannel, SocketSelector selector) throws IOException { + super(socketChannel, selector); } @Override diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/DoNotRegisterServerChannel.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/DoNotRegisterServerChannel.java index 783bd6fc5fa18..073f2acf384da 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/DoNotRegisterServerChannel.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/DoNotRegisterServerChannel.java @@ -20,7 +20,6 @@ package org.elasticsearch.transport.nio.channel; import org.elasticsearch.transport.nio.AcceptingSelector; -import org.elasticsearch.transport.nio.ESSelector; import org.elasticsearch.transport.nio.utils.TestSelectionKey; import java.io.IOException; @@ -29,9 +28,9 @@ public class DoNotRegisterServerChannel extends NioServerSocketChannel { - public DoNotRegisterServerChannel(String profile, ServerSocketChannel channel, ChannelFactory channelFactory, - AcceptingSelector selector) throws IOException { - super(profile, channel, channelFactory, selector); + public DoNotRegisterServerChannel(ServerSocketChannel channel, ChannelFactory channelFactory, AcceptingSelector selector) + throws IOException { + super(channel, channelFactory, selector); } @Override diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java index 93d905c806810..9c01f5edc6106 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java @@ -101,7 +101,7 @@ private class DoNotCloseServerChannel extends DoNotRegisterServerChannel { private DoNotCloseServerChannel(String profile, ServerSocketChannel channel, ChannelFactory channelFactory, AcceptingSelector selector) throws IOException { - super(profile, channel, channelFactory, selector); + super(channel, channelFactory, selector); } @Override diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java index 1fb32d0f6e164..e3053a3e73a3c 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java @@ -71,7 +71,7 @@ public void testClose() throws Exception { AtomicBoolean isClosed = new AtomicBoolean(false); CountDownLatch latch = new CountDownLatch(1); - NioSocketChannel socketChannel = new DoNotCloseChannel(NioChannel.CLIENT, mock(SocketChannel.class), selector); + NioSocketChannel socketChannel = new DoNotCloseChannel(mock(SocketChannel.class), selector); openChannels.clientChannelOpened(socketChannel); socketChannel.setContexts(mock(ReadContext.class), mock(WriteContext.class)); socketChannel.addCloseListener(new ActionListener() { @@ -107,7 +107,7 @@ public void onFailure(Exception e) { public void testConnectSucceeds() throws Exception { SocketChannel rawChannel = mock(SocketChannel.class); when(rawChannel.finishConnect()).thenReturn(true); - NioSocketChannel socketChannel = new DoNotCloseChannel(NioChannel.CLIENT, rawChannel, selector); + NioSocketChannel socketChannel = new DoNotCloseChannel(rawChannel, selector); socketChannel.setContexts(mock(ReadContext.class), mock(WriteContext.class)); selector.scheduleForRegistration(socketChannel); @@ -123,7 +123,7 @@ public void testConnectSucceeds() throws Exception { public void testConnectFails() throws Exception { SocketChannel rawChannel = mock(SocketChannel.class); when(rawChannel.finishConnect()).thenThrow(new ConnectException()); - NioSocketChannel socketChannel = new DoNotCloseChannel(NioChannel.CLIENT, rawChannel, selector); + NioSocketChannel socketChannel = new DoNotCloseChannel(rawChannel, selector); socketChannel.setContexts(mock(ReadContext.class), mock(WriteContext.class)); selector.scheduleForRegistration(socketChannel); @@ -139,8 +139,8 @@ public void testConnectFails() throws Exception { private class DoNotCloseChannel extends DoNotRegisterChannel { - private DoNotCloseChannel(String profile, SocketChannel channel, SocketSelector selector) throws IOException { - super(profile, channel, selector); + private DoNotCloseChannel(SocketChannel channel, SocketSelector selector) throws IOException { + super(channel, selector); } @Override diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java index 2b4db98010238..2dc0b32ae5bea 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java @@ -52,8 +52,6 @@ public void init() throws IOException { messageLength = randomInt(96) + 4; channel = mock(NioSocketChannel.class); readContext = new TcpReadContext(channel, handler); - - when(channel.getProfile()).thenReturn(PROFILE); } public void testSuccessfulRead() throws IOException {