From 728ba44a5c00c3a1303cdb92e74b741f793cb8b4 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 7 Dec 2017 15:26:14 -0700 Subject: [PATCH] Remove internal channel tracking in transports This commit attempts to continue unifying the logic between different transport implementations. As transports call a `TcpTransport` callback when a new channel is accepted, there is no need to internally track channels accepted. Instead there is a set of accepted channels in `TcpTransport`. This set is used for metrics and shutting down channels. --- .../elasticsearch/transport/TcpTransport.java | 28 ++--- .../transport/TcpTransportTests.java | 5 - .../transport/netty4/Netty4Transport.java | 13 +- .../transport/netty4/NettyTcpChannel.java | 10 +- .../AbstractSimpleTransportTestCase.java | 22 ++++ .../transport/MockTcpTransport.java | 5 - .../transport/nio/NioShutdown.java | 7 +- .../transport/nio/NioTransport.java | 18 +-- .../transport/nio/OpenChannels.java | 116 ------------------ .../transport/MockTcpTransportTests.java | 5 + 10 files changed, 52 insertions(+), 177 deletions(-) delete mode 100644 test/framework/src/main/java/org/elasticsearch/transport/nio/OpenChannels.java diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 6b532a600a15d..5fbc45a804c80 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -195,22 +195,22 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements protected final NetworkService networkService; protected final Set profileSettings; - protected volatile TransportService transportService; - // node id to actual channel - protected final ConcurrentMap connectedNodes = newConcurrentMap(); + private volatile TransportService transportService; - protected final ConcurrentMap profileBoundAddresses = newConcurrentMap(); + private final ConcurrentMap profileBoundAddresses = newConcurrentMap(); + // node id to actual channel + private final ConcurrentMap connectedNodes = newConcurrentMap(); private final Map> serverChannels = newConcurrentMap(); private final Set acceptedChannels = Collections.newSetFromMap(new ConcurrentHashMap<>()); - protected final KeyedLock connectionLock = new KeyedLock<>(); + private final KeyedLock connectionLock = new KeyedLock<>(); private final NamedWriteableRegistry namedWriteableRegistry; // this lock is here to make sure we close this transport and disconnect all the client nodes // connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?) - protected final ReadWriteLock closeLock = new ReentrantReadWriteLock(); + private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); protected final boolean compress; - protected volatile BoundTransportAddress boundAddress; + private volatile BoundTransportAddress boundAddress; private final String transportName; protected final ConnectionProfile defaultConnectionProfile; @@ -438,7 +438,7 @@ public boolean allChannelsOpen() { } @Override - public void close() throws IOException { + public void close() { if (closed.compareAndSet(false, true)) { try { if (lifecycle.stopped()) { @@ -582,7 +582,7 @@ protected ConnectionProfile resolveConnectionProfile(ConnectionProfile connectio } @Override - public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) throws IOException { + public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) { if (node == null) { throw new ConnectTransportException(null, "can't open connection to a null node"); } @@ -602,6 +602,7 @@ public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile c PlainActionFuture connectFuture = PlainActionFuture.newFuture(); connectionFutures.add(connectFuture); TcpChannel channel = initiateChannel(node, connectionProfile.getConnectTimeout(), connectFuture); + logger.trace(() -> new ParameterizedMessage("Tcp transport client channel opened: {}", channel)); channels.add(channel); } catch (Exception e) { // If there was an exception when attempting to instantiate the raw channels, we close all of the channels @@ -1041,6 +1042,7 @@ protected void serverAcceptedChannel(TcpChannel channel) { boolean addedOnThisCall = acceptedChannels.add(channel); assert addedOnThisCall : "Channel should only be added to accept channel set once"; channel.addCloseListener(ActionListener.wrap(() -> acceptedChannels.remove(channel))); + logger.trace(() -> new ParameterizedMessage("Tcp transport channel accepted: {}", channel)); } /** @@ -1738,15 +1740,9 @@ private void closeAndCallback(final Exception e) { } } - /** - * Returns count of currently open connections - */ - protected abstract long getNumOpenServerConnections(); - @Override public final TransportStats getStats() { - return new TransportStats( - getNumOpenServerConnections(), readBytesMetric.count(), readBytesMetric.sum(), transmittedBytesMetric.count(), + return new TransportStats(acceptedChannels.size(), readBytesMetric.count(), readBytesMetric.sum(), transmittedBytesMetric.count(), transmittedBytesMetric.sum()); } diff --git a/core/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/core/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 5182951a0fd53..00871a373c9c2 100644 --- a/core/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -190,11 +190,6 @@ protected FakeChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeo return new FakeChannel(messageCaptor); } - @Override - public long getNumOpenServerConnections() { - return 0; - } - @Override public NodeChannels getConnection(DiscoveryNode node) { int numConnections = MockTcpTransport.LIGHT_PROFILE.getNumConnections(); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 6343a24184370..87a09ad8ee3a0 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -104,8 +104,6 @@ public class Netty4Transport extends TcpTransport { protected final int workerCount; protected final ByteSizeValue receivePredictorMin; protected final ByteSizeValue receivePredictorMax; - // package private for testing - volatile Netty4OpenChannelsHandler serverOpenChannels; protected volatile Bootstrap bootstrap; protected final Map serverBootstraps = newConcurrentMap(); @@ -132,8 +130,6 @@ protected void doStart() { try { bootstrap = createBootstrap(); if (NetworkService.NETWORK_SERVER.get(settings)) { - final Netty4OpenChannelsHandler openChannels = new Netty4OpenChannelsHandler(logger); - this.serverOpenChannels = openChannels; for (ProfileSettings profileSettings : profileSettings) { createServerBootstrap(profileSettings); bindServer(profileSettings); @@ -242,12 +238,6 @@ protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) onException(channel.attr(CHANNEL_KEY).get(), t instanceof Exception ? (Exception) t : new ElasticsearchException(t)); } - @Override - public long getNumOpenServerConnections() { - Netty4OpenChannelsHandler channels = serverOpenChannels; - return channels == null ? 0 : channels.numberOfOpenChannels(); - } - @Override protected NettyTcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener listener) throws IOException { @@ -294,7 +284,7 @@ ScheduledPing getPing() { @Override @SuppressForbidden(reason = "debug") protected void stopInternal() { - Releasables.close(serverOpenChannels, () -> { + Releasables.close(() -> { final List>> serverBootstrapCloseFutures = new ArrayList<>(serverBootstraps.size()); for (final Map.Entry entry : serverBootstraps.entrySet()) { serverBootstrapCloseFutures.add( @@ -349,7 +339,6 @@ protected void initChannel(Channel ch) throws Exception { ch.attr(CHANNEL_KEY).set(nettyTcpChannel); serverAcceptedChannel(nettyTcpChannel); ch.pipeline().addLast("logging", new ESLoggingHandler()); - ch.pipeline().addLast("open_channels", Netty4Transport.this.serverOpenChannels); ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder()); ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name)); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java index 3d71735a2a8e6..17c18f15ae15c 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java @@ -96,7 +96,7 @@ public void sendMessage(BytesReference reference, ActionListener listener) } }); channel.writeAndFlush(Netty4Utils.toByteBuf(reference), writePromise); - + if (channel.eventLoop().isShutdown()) { listener.onFailure(new TransportException("Cannot send message, event loop is shutting down.")); } @@ -105,4 +105,12 @@ public void sendMessage(BytesReference reference, ActionListener listener) public Channel getLowLevelChannel() { return channel; } + + @Override + public String toString() { + return "NettyTcpChannel{" + + "localAddress=" + getLocalAddress() + + ", remoteAddress=" + channel.remoteAddress() + + '}'; + } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index a45411324b0fc..17423207d56c6 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -109,6 +109,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { protected abstract MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake); + protected int channelsPerNodeConnection() { + return 13; + } + @Override @Before public void setUp() throws Exception { @@ -2345,6 +2349,24 @@ public String executor() { } } + public void testAcceptedChannelCount() throws Exception { + assertBusy(() -> { + TransportStats transportStats = serviceA.transport.getStats(); + assertEquals(channelsPerNodeConnection(), transportStats.getServerOpen()); + }); + assertBusy(() -> { + TransportStats transportStats = serviceB.transport.getStats(); + assertEquals(channelsPerNodeConnection(), transportStats.getServerOpen()); + }); + + serviceA.close(); + + assertBusy(() -> { + TransportStats transportStats = serviceB.transport.getStats(); + assertEquals(0, transportStats.getServerOpen()); + }); + } + public void testTransportStatsWithException() throws Exception { MockTransportService serviceC = build(Settings.builder().put("name", "TS_TEST").build(), version0, null, true); CountDownLatch receivedLatch = new CountDownLatch(1); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index f6ec96d13df81..68f79b1cef779 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -217,11 +217,6 @@ private void configureSocket(Socket socket) throws SocketException { socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings)); } - @Override - public long getNumOpenServerConnections() { - return 1; - } - public final class MockChannel implements Closeable, TcpChannel { private final AtomicBoolean isOpen = new AtomicBoolean(true); private final InetSocketAddress localAddress; diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioShutdown.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioShutdown.java index 320da6a88d186..3970e69b2c1d6 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioShutdown.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioShutdown.java @@ -34,17 +34,12 @@ public NioShutdown(Logger logger) { this.logger = logger; } - void orderlyShutdown(OpenChannels openChannels, ArrayList acceptors, ArrayList socketSelectors) { - - // Start by closing the server channels. Once these are closed, we are guaranteed to no accept new connections - openChannels.closeServerChannels(); + void orderlyShutdown(ArrayList acceptors, ArrayList socketSelectors) { for (AcceptingSelector acceptor : acceptors) { shutdownSelector(acceptor); } - openChannels.close(); - for (SocketSelector selector : socketSelectors) { shutdownSelector(selector); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index d1ab10fb5683a..acbf68080c187 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -33,7 +33,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.Transports; -import org.elasticsearch.transport.nio.channel.NioChannel; import org.elasticsearch.transport.nio.channel.NioServerSocketChannel; import org.elasticsearch.transport.nio.channel.NioSocketChannel; import org.elasticsearch.transport.nio.channel.TcpChannelFactory; @@ -67,7 +66,6 @@ public class NioTransport extends TcpTransport { public static final Setting NIO_ACCEPTOR_COUNT = intSetting("transport.nio.acceptor_count", 1, 1, Setting.Property.NodeScope); - private final OpenChannels openChannels = new OpenChannels(logger); private final ConcurrentMap profileToChannelFactory = newConcurrentMap(); private final ArrayList acceptors = new ArrayList<>(); private final ArrayList socketSelectors = new ArrayList<>(); @@ -80,27 +78,17 @@ public NioTransport(Settings settings, ThreadPool threadPool, NetworkService net super("nio", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService); } - @Override - public long getNumOpenServerConnections() { - return openChannels.serverChannelsCount(); - } - @Override protected TcpNioServerSocketChannel bind(String name, InetSocketAddress address) throws IOException { TcpChannelFactory channelFactory = this.profileToChannelFactory.get(name); AcceptingSelector selector = acceptors.get(++acceptorNumber % NioTransport.NIO_ACCEPTOR_COUNT.get(settings)); - TcpNioServerSocketChannel serverChannel = channelFactory.openNioServerSocketChannel(address, selector); - openChannels.serverChannelOpened(serverChannel); - serverChannel.addCloseListener(ActionListener.wrap(() -> openChannels.channelClosed(serverChannel))); - return serverChannel; + return channelFactory.openNioServerSocketChannel(address, selector); } @Override protected TcpNioSocketChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener connectListener) throws IOException { TcpNioSocketChannel channel = clientChannelFactory.openNioChannel(node.getAddress().address(), clientSelectorSupplier.get()); - openChannels.clientChannelOpened(channel); - channel.addCloseListener(ActionListener.wrap(() -> openChannels.channelClosed(channel))); channel.addConnectListener(connectListener); return channel; } @@ -169,7 +157,7 @@ protected void doStart() { @Override protected void stopInternal() { NioShutdown nioShutdown = new NioShutdown(logger); - nioShutdown.orderlyShutdown(openChannels, acceptors, socketSelectors); + nioShutdown.orderlyShutdown(acceptors, socketSelectors); profileToChannelFactory.clear(); socketSelectors.clear(); @@ -190,8 +178,6 @@ private Consumer getContextSetter(String profileName) { private void acceptChannel(NioSocketChannel channel) { TcpNioSocketChannel tcpChannel = (TcpNioSocketChannel) channel; - openChannels.acceptedChannelOpened(tcpChannel); - tcpChannel.addCloseListener(ActionListener.wrap(() -> openChannels.channelClosed(channel))); serverAcceptedChannel(tcpChannel); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/OpenChannels.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/OpenChannels.java deleted file mode 100644 index 12c12aaa48eb1..0000000000000 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/OpenChannels.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.transport.nio; - -import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.transport.TcpChannel; -import org.elasticsearch.transport.nio.channel.NioChannel; -import org.elasticsearch.transport.nio.channel.NioServerSocketChannel; -import org.elasticsearch.transport.nio.channel.NioSocketChannel; -import org.elasticsearch.transport.nio.channel.TcpNioServerSocketChannel; -import org.elasticsearch.transport.nio.channel.TcpNioSocketChannel; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Map; -import java.util.concurrent.ConcurrentMap; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; - -public class OpenChannels implements Releasable { - - // TODO: Maybe set concurrency levels? - private final ConcurrentMap openClientChannels = newConcurrentMap(); - private final ConcurrentMap openAcceptedChannels = newConcurrentMap(); - private final ConcurrentMap openServerChannels = newConcurrentMap(); - - private final Logger logger; - - public OpenChannels(Logger logger) { - this.logger = logger; - } - - public void serverChannelOpened(TcpNioServerSocketChannel channel) { - boolean added = openServerChannels.putIfAbsent(channel, System.nanoTime()) == null; - if (added && logger.isTraceEnabled()) { - logger.trace("server channel opened: {}", channel); - } - } - - public long serverChannelsCount() { - return openServerChannels.size(); - } - - public void acceptedChannelOpened(TcpNioSocketChannel channel) { - boolean added = openAcceptedChannels.putIfAbsent(channel, System.nanoTime()) == null; - if (added && logger.isTraceEnabled()) { - logger.trace("accepted channel opened: {}", channel); - } - } - - public HashSet getAcceptedChannels() { - return new HashSet<>(openAcceptedChannels.keySet()); - } - - public void clientChannelOpened(TcpNioSocketChannel channel) { - boolean added = openClientChannels.putIfAbsent(channel, System.nanoTime()) == null; - if (added && logger.isTraceEnabled()) { - logger.trace("client channel opened: {}", channel); - } - } - - public Map getClientChannels() { - return openClientChannels; - } - - public void channelClosed(NioChannel channel) { - boolean removed; - if (channel instanceof NioServerSocketChannel) { - removed = openServerChannels.remove(channel) != null; - } else { - NioSocketChannel socketChannel = (NioSocketChannel) channel; - removed = openClientChannels.remove(socketChannel) != null; - if (removed == false) { - removed = openAcceptedChannels.remove(socketChannel) != null; - } - } - if (removed && logger.isTraceEnabled()) { - logger.trace("channel closed: {}", channel); - } - } - - public void closeServerChannels() { - TcpChannel.closeChannels(new ArrayList<>(openServerChannels.keySet()), true); - - openServerChannels.clear(); - } - - @Override - public void close() { - Stream channels = Stream.concat(openClientChannels.keySet().stream(), openAcceptedChannels.keySet().stream()); - TcpChannel.closeChannels(channels.collect(Collectors.toList()), true); - - openClientChannels.clear(); - openAcceptedChannels.clear(); - } -} diff --git a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java index 6844b55cadc3a..916e97ffd1211 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java @@ -55,6 +55,11 @@ protected Version executeHandshake(DiscoveryNode node, TcpChannel mockChannel, T return mockTransportService; } + @Override + public int channelsPerNodeConnection() { + return 1; + } + @Override protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException { final MockTcpTransport t = (MockTcpTransport) transport;