Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove internal channel tracking in transports #27711

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 12 additions & 16 deletions core/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,22 +195,22 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
protected final NetworkService networkService;
protected final Set<ProfileSettings> profileSettings;

protected volatile TransportService transportService;
// node id to actual channel
protected final ConcurrentMap<DiscoveryNode, NodeChannels> connectedNodes = newConcurrentMap();
private volatile TransportService transportService;

protected final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();
private final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();
// node id to actual channel
private final ConcurrentMap<DiscoveryNode, NodeChannels> connectedNodes = newConcurrentMap();
private final Map<String, List<TcpChannel>> serverChannels = newConcurrentMap();
private final Set<TcpChannel> acceptedChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());

protected final KeyedLock<String> connectionLock = new KeyedLock<>();
private final KeyedLock<String> connectionLock = new KeyedLock<>();
private final NamedWriteableRegistry namedWriteableRegistry;

// this lock is here to make sure we close this transport and disconnect all the client nodes
// connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?)
protected final ReadWriteLock closeLock = new ReentrantReadWriteLock();
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
protected final boolean compress;
protected volatile BoundTransportAddress boundAddress;
private volatile BoundTransportAddress boundAddress;
private final String transportName;
protected final ConnectionProfile defaultConnectionProfile;

Expand Down Expand Up @@ -438,7 +438,7 @@ public boolean allChannelsOpen() {
}

@Override
public void close() throws IOException {
public void close() {
if (closed.compareAndSet(false, true)) {
try {
if (lifecycle.stopped()) {
Expand Down Expand Up @@ -582,7 +582,7 @@ protected ConnectionProfile resolveConnectionProfile(ConnectionProfile connectio
}

@Override
public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) throws IOException {
public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
if (node == null) {
throw new ConnectTransportException(null, "can't open connection to a null node");
}
Expand All @@ -602,6 +602,7 @@ public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile c
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
connectionFutures.add(connectFuture);
TcpChannel channel = initiateChannel(node, connectionProfile.getConnectTimeout(), connectFuture);
logger.trace(() -> new ParameterizedMessage("Tcp transport client channel opened: {}", channel));
channels.add(channel);
} catch (Exception e) {
// If there was an exception when attempting to instantiate the raw channels, we close all of the channels
Expand Down Expand Up @@ -1041,6 +1042,7 @@ protected void serverAcceptedChannel(TcpChannel channel) {
boolean addedOnThisCall = acceptedChannels.add(channel);
assert addedOnThisCall : "Channel should only be added to accept channel set once";
channel.addCloseListener(ActionListener.wrap(() -> acceptedChannels.remove(channel)));
logger.trace(() -> new ParameterizedMessage("Tcp transport channel accepted: {}", channel));
}

/**
Expand Down Expand Up @@ -1738,15 +1740,9 @@ private void closeAndCallback(final Exception e) {
}
}

/**
* Returns count of currently open connections
*/
protected abstract long getNumOpenServerConnections();

@Override
public final TransportStats getStats() {
return new TransportStats(
getNumOpenServerConnections(), readBytesMetric.count(), readBytesMetric.sum(), transmittedBytesMetric.count(),
return new TransportStats(acceptedChannels.size(), readBytesMetric.count(), readBytesMetric.sum(), transmittedBytesMetric.count(),
transmittedBytesMetric.sum());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,6 @@ protected FakeChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeo
return new FakeChannel(messageCaptor);
}

@Override
public long getNumOpenServerConnections() {
return 0;
}

@Override
public NodeChannels getConnection(DiscoveryNode node) {
int numConnections = MockTcpTransport.LIGHT_PROFILE.getNumConnections();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ public class Netty4Transport extends TcpTransport {
protected final int workerCount;
protected final ByteSizeValue receivePredictorMin;
protected final ByteSizeValue receivePredictorMax;
// package private for testing
volatile Netty4OpenChannelsHandler serverOpenChannels;
protected volatile Bootstrap bootstrap;
protected final Map<String, ServerBootstrap> serverBootstraps = newConcurrentMap();

Expand All @@ -132,8 +130,6 @@ protected void doStart() {
try {
bootstrap = createBootstrap();
if (NetworkService.NETWORK_SERVER.get(settings)) {
final Netty4OpenChannelsHandler openChannels = new Netty4OpenChannelsHandler(logger);
this.serverOpenChannels = openChannels;
for (ProfileSettings profileSettings : profileSettings) {
createServerBootstrap(profileSettings);
bindServer(profileSettings);
Expand Down Expand Up @@ -242,12 +238,6 @@ protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
onException(channel.attr(CHANNEL_KEY).get(), t instanceof Exception ? (Exception) t : new ElasticsearchException(t));
}

@Override
public long getNumOpenServerConnections() {
Netty4OpenChannelsHandler channels = serverOpenChannels;
return channels == null ? 0 : channels.numberOfOpenChannels();
}

@Override
protected NettyTcpChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> listener)
throws IOException {
Expand Down Expand Up @@ -294,7 +284,7 @@ ScheduledPing getPing() {
@Override
@SuppressForbidden(reason = "debug")
protected void stopInternal() {
Releasables.close(serverOpenChannels, () -> {
Releasables.close(() -> {
final List<Tuple<String, Future<?>>> serverBootstrapCloseFutures = new ArrayList<>(serverBootstraps.size());
for (final Map.Entry<String, ServerBootstrap> entry : serverBootstraps.entrySet()) {
serverBootstrapCloseFutures.add(
Expand Down Expand Up @@ -349,7 +339,6 @@ protected void initChannel(Channel ch) throws Exception {
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
serverAcceptedChannel(nettyTcpChannel);
ch.pipeline().addLast("logging", new ESLoggingHandler());
ch.pipeline().addLast("open_channels", Netty4Transport.this.serverOpenChannels);
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void sendMessage(BytesReference reference, ActionListener<Void> listener)
}
});
channel.writeAndFlush(Netty4Utils.toByteBuf(reference), writePromise);

if (channel.eventLoop().isShutdown()) {
listener.onFailure(new TransportException("Cannot send message, event loop is shutting down."));
}
Expand All @@ -105,4 +105,12 @@ public void sendMessage(BytesReference reference, ActionListener<Void> listener)
public Channel getLowLevelChannel() {
return channel;
}

@Override
public String toString() {
return "NettyTcpChannel{" +
"localAddress=" + getLocalAddress() +
", remoteAddress=" + channel.remoteAddress() +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {

protected abstract MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake);

protected int channelsPerNodeConnection() {
return 13;
}

@Override
@Before
public void setUp() throws Exception {
Expand Down Expand Up @@ -2345,6 +2349,24 @@ public String executor() {
}
}

public void testAcceptedChannelCount() throws Exception {
assertBusy(() -> {
TransportStats transportStats = serviceA.transport.getStats();
assertEquals(channelsPerNodeConnection(), transportStats.getServerOpen());
});
assertBusy(() -> {
TransportStats transportStats = serviceB.transport.getStats();
assertEquals(channelsPerNodeConnection(), transportStats.getServerOpen());
});

serviceA.close();

assertBusy(() -> {
TransportStats transportStats = serviceB.transport.getStats();
assertEquals(0, transportStats.getServerOpen());
});
}

public void testTransportStatsWithException() throws Exception {
MockTransportService serviceC = build(Settings.builder().put("name", "TS_TEST").build(), version0, null, true);
CountDownLatch receivedLatch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,6 @@ private void configureSocket(Socket socket) throws SocketException {
socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings));
}

@Override
public long getNumOpenServerConnections() {
return 1;
}

public final class MockChannel implements Closeable, TcpChannel {
private final AtomicBoolean isOpen = new AtomicBoolean(true);
private final InetSocketAddress localAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,12 @@ public NioShutdown(Logger logger) {
this.logger = logger;
}

void orderlyShutdown(OpenChannels openChannels, ArrayList<AcceptingSelector> acceptors, ArrayList<SocketSelector> socketSelectors) {

// Start by closing the server channels. Once these are closed, we are guaranteed to no accept new connections
openChannels.closeServerChannels();
void orderlyShutdown(ArrayList<AcceptingSelector> acceptors, ArrayList<SocketSelector> socketSelectors) {

for (AcceptingSelector acceptor : acceptors) {
shutdownSelector(acceptor);
}

openChannels.close();

for (SocketSelector selector : socketSelectors) {
shutdownSelector(selector);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.transport.nio.channel.NioChannel;
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
import org.elasticsearch.transport.nio.channel.TcpChannelFactory;
Expand Down Expand Up @@ -70,7 +69,6 @@ public class NioTransport extends TcpTransport {
public static final Setting<Integer> NIO_ACCEPTOR_COUNT =
intSetting("transport.nio.acceptor_count", 1, 1, Setting.Property.NodeScope);

private final OpenChannels openChannels = new OpenChannels(logger);
private final PageCacheRecycler pageCacheRecycler;
private final ConcurrentMap<String, TcpChannelFactory> profileToChannelFactory = newConcurrentMap();
private final ArrayList<AcceptingSelector> acceptors = new ArrayList<>();
Expand All @@ -86,27 +84,17 @@ public NioTransport(Settings settings, ThreadPool threadPool, NetworkService net
this.pageCacheRecycler = pageCacheRecycler;
}

@Override
public long getNumOpenServerConnections() {
return openChannels.serverChannelsCount();
}

@Override
protected TcpNioServerSocketChannel bind(String name, InetSocketAddress address) throws IOException {
TcpChannelFactory channelFactory = this.profileToChannelFactory.get(name);
AcceptingSelector selector = acceptors.get(++acceptorNumber % NioTransport.NIO_ACCEPTOR_COUNT.get(settings));
TcpNioServerSocketChannel serverChannel = channelFactory.openNioServerSocketChannel(address, selector);
openChannels.serverChannelOpened(serverChannel);
serverChannel.addCloseListener(ActionListener.wrap(() -> openChannels.channelClosed(serverChannel)));
return serverChannel;
return channelFactory.openNioServerSocketChannel(address, selector);
}

@Override
protected TcpNioSocketChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> connectListener)
throws IOException {
TcpNioSocketChannel channel = clientChannelFactory.openNioChannel(node.getAddress().address(), clientSelectorSupplier.get());
openChannels.clientChannelOpened(channel);
channel.addCloseListener(ActionListener.wrap(() -> openChannels.channelClosed(channel)));
channel.addConnectListener(connectListener);
return channel;
}
Expand Down Expand Up @@ -175,7 +163,7 @@ protected void doStart() {
@Override
protected void stopInternal() {
NioShutdown nioShutdown = new NioShutdown(logger);
nioShutdown.orderlyShutdown(openChannels, acceptors, socketSelectors);
nioShutdown.orderlyShutdown(acceptors, socketSelectors);

profileToChannelFactory.clear();
socketSelectors.clear();
Expand All @@ -202,8 +190,6 @@ private Consumer<NioSocketChannel> getContextSetter(String profileName) {

private void acceptChannel(NioSocketChannel channel) {
TcpNioSocketChannel tcpChannel = (TcpNioSocketChannel) channel;
openChannels.acceptedChannelOpened(tcpChannel);
tcpChannel.addCloseListener(ActionListener.wrap(() -> openChannels.channelClosed(channel)));
serverAcceptedChannel(tcpChannel);

}
Expand Down
Loading