Skip to content

Commit

Permalink
Remove internal channel tracking in transports (#27711)
Browse files Browse the repository at this point in the history
This commit attempts to continue unifying the logic between different
transport implementations. As transports call a `TcpTransport` callback
when a new channel is accepted, there is no need to internally track
channels accepted. Instead there is a set of accepted channels in
`TcpTransport`. This set is used for metrics and shutting down channels.
  • Loading branch information
Tim-Brooks authored Dec 8, 2017
1 parent f50f99e commit d1acb76
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 177 deletions.
28 changes: 12 additions & 16 deletions core/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,22 +195,22 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
protected final NetworkService networkService;
protected final Set<ProfileSettings> profileSettings;

protected volatile TransportService transportService;
// node id to actual channel
protected final ConcurrentMap<DiscoveryNode, NodeChannels> connectedNodes = newConcurrentMap();
private volatile TransportService transportService;

protected final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();
private final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();
// node id to actual channel
private final ConcurrentMap<DiscoveryNode, NodeChannels> connectedNodes = newConcurrentMap();
private final Map<String, List<TcpChannel>> serverChannels = newConcurrentMap();
private final Set<TcpChannel> acceptedChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());

protected final KeyedLock<String> connectionLock = new KeyedLock<>();
private final KeyedLock<String> connectionLock = new KeyedLock<>();
private final NamedWriteableRegistry namedWriteableRegistry;

// this lock is here to make sure we close this transport and disconnect all the client nodes
// connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?)
protected final ReadWriteLock closeLock = new ReentrantReadWriteLock();
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
protected final boolean compress;
protected volatile BoundTransportAddress boundAddress;
private volatile BoundTransportAddress boundAddress;
private final String transportName;
protected final ConnectionProfile defaultConnectionProfile;

Expand Down Expand Up @@ -438,7 +438,7 @@ public boolean allChannelsOpen() {
}

@Override
public void close() throws IOException {
public void close() {
if (closed.compareAndSet(false, true)) {
try {
if (lifecycle.stopped()) {
Expand Down Expand Up @@ -582,7 +582,7 @@ protected ConnectionProfile resolveConnectionProfile(ConnectionProfile connectio
}

@Override
public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) throws IOException {
public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
if (node == null) {
throw new ConnectTransportException(null, "can't open connection to a null node");
}
Expand All @@ -602,6 +602,7 @@ public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile c
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
connectionFutures.add(connectFuture);
TcpChannel channel = initiateChannel(node, connectionProfile.getConnectTimeout(), connectFuture);
logger.trace(() -> new ParameterizedMessage("Tcp transport client channel opened: {}", channel));
channels.add(channel);
} catch (Exception e) {
// If there was an exception when attempting to instantiate the raw channels, we close all of the channels
Expand Down Expand Up @@ -1041,6 +1042,7 @@ protected void serverAcceptedChannel(TcpChannel channel) {
boolean addedOnThisCall = acceptedChannels.add(channel);
assert addedOnThisCall : "Channel should only be added to accept channel set once";
channel.addCloseListener(ActionListener.wrap(() -> acceptedChannels.remove(channel)));
logger.trace(() -> new ParameterizedMessage("Tcp transport channel accepted: {}", channel));
}

/**
Expand Down Expand Up @@ -1738,15 +1740,9 @@ private void closeAndCallback(final Exception e) {
}
}

/**
* Returns count of currently open connections
*/
protected abstract long getNumOpenServerConnections();

@Override
public final TransportStats getStats() {
return new TransportStats(
getNumOpenServerConnections(), readBytesMetric.count(), readBytesMetric.sum(), transmittedBytesMetric.count(),
return new TransportStats(acceptedChannels.size(), readBytesMetric.count(), readBytesMetric.sum(), transmittedBytesMetric.count(),
transmittedBytesMetric.sum());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,6 @@ protected FakeChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeo
return new FakeChannel(messageCaptor);
}

@Override
public long getNumOpenServerConnections() {
return 0;
}

@Override
public NodeChannels getConnection(DiscoveryNode node) {
int numConnections = MockTcpTransport.LIGHT_PROFILE.getNumConnections();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ public class Netty4Transport extends TcpTransport {
protected final int workerCount;
protected final ByteSizeValue receivePredictorMin;
protected final ByteSizeValue receivePredictorMax;
// package private for testing
volatile Netty4OpenChannelsHandler serverOpenChannels;
protected volatile Bootstrap bootstrap;
protected final Map<String, ServerBootstrap> serverBootstraps = newConcurrentMap();

Expand All @@ -132,8 +130,6 @@ protected void doStart() {
try {
bootstrap = createBootstrap();
if (NetworkService.NETWORK_SERVER.get(settings)) {
final Netty4OpenChannelsHandler openChannels = new Netty4OpenChannelsHandler(logger);
this.serverOpenChannels = openChannels;
for (ProfileSettings profileSettings : profileSettings) {
createServerBootstrap(profileSettings);
bindServer(profileSettings);
Expand Down Expand Up @@ -242,12 +238,6 @@ protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
onException(channel.attr(CHANNEL_KEY).get(), t instanceof Exception ? (Exception) t : new ElasticsearchException(t));
}

@Override
public long getNumOpenServerConnections() {
Netty4OpenChannelsHandler channels = serverOpenChannels;
return channels == null ? 0 : channels.numberOfOpenChannels();
}

@Override
protected NettyTcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> listener)
throws IOException {
Expand Down Expand Up @@ -294,7 +284,7 @@ ScheduledPing getPing() {
@Override
@SuppressForbidden(reason = "debug")
protected void stopInternal() {
Releasables.close(serverOpenChannels, () -> {
Releasables.close(() -> {
final List<Tuple<String, Future<?>>> serverBootstrapCloseFutures = new ArrayList<>(serverBootstraps.size());
for (final Map.Entry<String, ServerBootstrap> entry : serverBootstraps.entrySet()) {
serverBootstrapCloseFutures.add(
Expand Down Expand Up @@ -349,7 +339,6 @@ protected void initChannel(Channel ch) throws Exception {
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
serverAcceptedChannel(nettyTcpChannel);
ch.pipeline().addLast("logging", new ESLoggingHandler());
ch.pipeline().addLast("open_channels", Netty4Transport.this.serverOpenChannels);
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void sendMessage(BytesReference reference, ActionListener<Void> listener)
}
});
channel.writeAndFlush(Netty4Utils.toByteBuf(reference), writePromise);

if (channel.eventLoop().isShutdown()) {
listener.onFailure(new TransportException("Cannot send message, event loop is shutting down."));
}
Expand All @@ -105,4 +105,12 @@ public void sendMessage(BytesReference reference, ActionListener<Void> listener)
public Channel getLowLevelChannel() {
return channel;
}

@Override
public String toString() {
return "NettyTcpChannel{" +
"localAddress=" + getLocalAddress() +
", remoteAddress=" + channel.remoteAddress() +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {

protected abstract MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake);

protected int channelsPerNodeConnection() {
return 13;
}

@Override
@Before
public void setUp() throws Exception {
Expand Down Expand Up @@ -2345,6 +2349,24 @@ public String executor() {
}
}

public void testAcceptedChannelCount() throws Exception {
assertBusy(() -> {
TransportStats transportStats = serviceA.transport.getStats();
assertEquals(channelsPerNodeConnection(), transportStats.getServerOpen());
});
assertBusy(() -> {
TransportStats transportStats = serviceB.transport.getStats();
assertEquals(channelsPerNodeConnection(), transportStats.getServerOpen());
});

serviceA.close();

assertBusy(() -> {
TransportStats transportStats = serviceB.transport.getStats();
assertEquals(0, transportStats.getServerOpen());
});
}

public void testTransportStatsWithException() throws Exception {
MockTransportService serviceC = build(Settings.builder().put("name", "TS_TEST").build(), version0, null, true);
CountDownLatch receivedLatch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,6 @@ private void configureSocket(Socket socket) throws SocketException {
socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings));
}

@Override
public long getNumOpenServerConnections() {
return 1;
}

public final class MockChannel implements Closeable, TcpChannel {
private final AtomicBoolean isOpen = new AtomicBoolean(true);
private final InetSocketAddress localAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,12 @@ public NioShutdown(Logger logger) {
this.logger = logger;
}

void orderlyShutdown(OpenChannels openChannels, ArrayList<AcceptingSelector> acceptors, ArrayList<SocketSelector> socketSelectors) {

// Start by closing the server channels. Once these are closed, we are guaranteed to no accept new connections
openChannels.closeServerChannels();
void orderlyShutdown(ArrayList<AcceptingSelector> acceptors, ArrayList<SocketSelector> socketSelectors) {

for (AcceptingSelector acceptor : acceptors) {
shutdownSelector(acceptor);
}

openChannels.close();

for (SocketSelector selector : socketSelectors) {
shutdownSelector(selector);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.transport.nio.channel.NioChannel;
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
import org.elasticsearch.transport.nio.channel.TcpChannelFactory;
Expand Down Expand Up @@ -70,7 +69,6 @@ public class NioTransport extends TcpTransport {
public static final Setting<Integer> NIO_ACCEPTOR_COUNT =
intSetting("transport.nio.acceptor_count", 1, 1, Setting.Property.NodeScope);

private final OpenChannels openChannels = new OpenChannels(logger);
private final PageCacheRecycler pageCacheRecycler;
private final ConcurrentMap<String, TcpChannelFactory> profileToChannelFactory = newConcurrentMap();
private final ArrayList<AcceptingSelector> acceptors = new ArrayList<>();
Expand All @@ -86,27 +84,17 @@ public NioTransport(Settings settings, ThreadPool threadPool, NetworkService net
this.pageCacheRecycler = pageCacheRecycler;
}

@Override
public long getNumOpenServerConnections() {
return openChannels.serverChannelsCount();
}

@Override
protected TcpNioServerSocketChannel bind(String name, InetSocketAddress address) throws IOException {
TcpChannelFactory channelFactory = this.profileToChannelFactory.get(name);
AcceptingSelector selector = acceptors.get(++acceptorNumber % NioTransport.NIO_ACCEPTOR_COUNT.get(settings));
TcpNioServerSocketChannel serverChannel = channelFactory.openNioServerSocketChannel(address, selector);
openChannels.serverChannelOpened(serverChannel);
serverChannel.addCloseListener(ActionListener.wrap(() -> openChannels.channelClosed(serverChannel)));
return serverChannel;
return channelFactory.openNioServerSocketChannel(address, selector);
}

@Override
protected TcpNioSocketChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> connectListener)
throws IOException {
TcpNioSocketChannel channel = clientChannelFactory.openNioChannel(node.getAddress().address(), clientSelectorSupplier.get());
openChannels.clientChannelOpened(channel);
channel.addCloseListener(ActionListener.wrap(() -> openChannels.channelClosed(channel)));
channel.addConnectListener(connectListener);
return channel;
}
Expand Down Expand Up @@ -175,7 +163,7 @@ protected void doStart() {
@Override
protected void stopInternal() {
NioShutdown nioShutdown = new NioShutdown(logger);
nioShutdown.orderlyShutdown(openChannels, acceptors, socketSelectors);
nioShutdown.orderlyShutdown(acceptors, socketSelectors);

profileToChannelFactory.clear();
socketSelectors.clear();
Expand All @@ -202,8 +190,6 @@ private Consumer<NioSocketChannel> getContextSetter(String profileName) {

private void acceptChannel(NioSocketChannel channel) {
TcpNioSocketChannel tcpChannel = (TcpNioSocketChannel) channel;
openChannels.acceptedChannelOpened(tcpChannel);
tcpChannel.addCloseListener(ActionListener.wrap(() -> openChannels.channelClosed(channel)));
serverAcceptedChannel(tcpChannel);

}
Expand Down
Loading

0 comments on commit d1acb76

Please sign in to comment.