Skip to content

Commit

Permalink
Notify onConnectionClosed rather than onNodeDisconnect to prune trans…
Browse files Browse the repository at this point in the history
…port handlers (#24639)

Today we prune transport handlers in TransportService when a node is disconnected.
This can cause connections to starve in the TransportService if the connection is
opened as a short living connection ie. without sharing the connection to a node
via registering in the transport itself. This change now moves to pruning based
on the connections cache key to ensure we notify handlers as soon as the connection
is closed for all connections not just for registered connections.

Relates to #24632
Relates to #24575
Relates to #24557
  • Loading branch information
s1monw authored May 12, 2017
1 parent 04e08f5 commit be2a6ce
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 28 deletions.
11 changes: 8 additions & 3 deletions core/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -357,8 +358,9 @@ public final class NodeChannels implements Connection {
private final DiscoveryNode node;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Version version;
private final Consumer<Connection> onClose;

public NodeChannels(DiscoveryNode node, Channel[] channels, ConnectionProfile connectionProfile) {
public NodeChannels(DiscoveryNode node, Channel[] channels, ConnectionProfile connectionProfile, Consumer<Connection> onClose) {
this.node = node;
this.channels = channels;
assert channels.length == connectionProfile.getNumConnections() : "expected channels size to be == "
Expand All @@ -369,13 +371,15 @@ public NodeChannels(DiscoveryNode node, Channel[] channels, ConnectionProfile co
typeMapping.put(type, handle);
}
version = node.getVersion();
this.onClose = onClose;
}

NodeChannels(NodeChannels channels, Version handshakeVersion) {
this.node = channels.node;
this.channels = channels.channels;
this.typeMapping = channels.typeMapping;
this.version = handshakeVersion;
this.onClose = channels.onClose;
}

@Override
Expand Down Expand Up @@ -408,6 +412,7 @@ public Channel channel(TransportRequestOptions.Type type) {
public synchronized void close() throws IOException {
if (closed.compareAndSet(false, true)) {
closeChannels(Arrays.stream(channels).filter(Objects::nonNull).collect(Collectors.toList()));
onClose.accept(this);
}
}

Expand Down Expand Up @@ -519,8 +524,8 @@ public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile c
final TimeValue handshakeTimeout = connectionProfile.getHandshakeTimeout() == null ?
connectTimeout : connectionProfile.getHandshakeTimeout();
final Version version = executeHandshake(node, channel, handshakeTimeout);
transportServiceAdapter.onConnectionOpened(node);
nodeChannels = new NodeChannels(nodeChannels, version);// clone the channels - we now have the correct version
transportServiceAdapter.onConnectionOpened(nodeChannels);
nodeChannels = new NodeChannels(nodeChannels, version); // clone the channels - we now have the correct version
success = true;
return nodeChannels;
} catch (ConnectTransportException e) {
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/org/elasticsearch/transport/Transport.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,5 +132,13 @@ void sendRequest(long requestId, String action, TransportRequest request, Transp
default Version getVersion() {
return getNode().getVersion();
}

/**
* Returns a key that this connection can be cached on. Delegating subclasses must delegate method call to
* the original connection.
*/
default Object getCacheKey() {
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,14 @@ default void onNodeConnected(DiscoveryNode node) {}
*/
default void onNodeDisconnected(DiscoveryNode node) {}

/**
* Called once a node connection is closed. The connection might not have been registered in the
* transport as a shared connection to a specific node
*/
default void onConnectionClosed(Transport.Connection connection) {}

/**
* Called once a node connection is opened.
*/
default void onConnectionOpened(DiscoveryNode node) {}
default void onConnectionOpened(Transport.Connection connection) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ private <T extends TransportResponse> void sendRequestInternal(final Transport.C
}
Supplier<ThreadContext.StoredContext> storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true);
TransportResponseHandler<T> responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler);
clientHandlers.put(requestId, new RequestHolder<>(responseHandler, connection.getNode(), action, timeoutHandler));
clientHandlers.put(requestId, new RequestHolder<>(responseHandler, connection, action, timeoutHandler));
if (lifecycle.stoppedOrClosed()) {
// if we are not started the exception handling will remove the RequestHolder again and calls the handler to notify
// the caller. It will only notify if the toStop code hasn't done the work yet.
Expand Down Expand Up @@ -810,7 +810,7 @@ public TransportResponseHandler onResponseReceived(final long requestId) {
}
holder.cancelTimeout();
if (traceEnabled() && shouldTraceAction(holder.action())) {
traceReceivedResponse(requestId, holder.node(), holder.action());
traceReceivedResponse(requestId, holder.connection().getNode(), holder.action());
}
return holder.handler();
}
Expand Down Expand Up @@ -855,12 +855,12 @@ public void onNodeConnected(final DiscoveryNode node) {
}

@Override
public void onConnectionOpened(DiscoveryNode node) {
public void onConnectionOpened(Transport.Connection connection) {
// capture listeners before spawning the background callback so the following pattern won't trigger a call
// connectToNode(); connection is completed successfully
// addConnectionListener(); this listener shouldn't be called
final Stream<TransportConnectionListener> listenersToNotify = TransportService.this.connectionListeners.stream();
threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onConnectionOpened(node)));
threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onConnectionOpened(connection)));
}

@Override
Expand All @@ -871,20 +871,28 @@ public void onNodeDisconnected(final DiscoveryNode node) {
connectionListener.onNodeDisconnected(node);
}
});
} catch (EsRejectedExecutionException ex) {
logger.debug("Rejected execution on NodeDisconnected", ex);
}
}

@Override
public void onConnectionClosed(Transport.Connection connection) {
try {
for (Map.Entry<Long, RequestHolder> entry : clientHandlers.entrySet()) {
RequestHolder holder = entry.getValue();
if (holder.node().equals(node)) {
if (holder.connection().getCacheKey().equals(connection.getCacheKey())) {
final RequestHolder holderToNotify = clientHandlers.remove(entry.getKey());
if (holderToNotify != null) {
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows
threadPool.generic().execute(() -> holderToNotify.handler().handleException(new NodeDisconnectedException(node,
holderToNotify.action())));
threadPool.generic().execute(() -> holderToNotify.handler().handleException(new NodeDisconnectedException(
connection.getNode(), holderToNotify.action())));
}
}
}
} catch (EsRejectedExecutionException ex) {
logger.debug("Rejected execution on NodeDisconnected", ex);
logger.debug("Rejected execution on onConnectionClosed", ex);
}
}

Expand Down Expand Up @@ -929,13 +937,14 @@ public void run() {
if (holder != null) {
// add it to the timeout information holder, in case we are going to get a response later
long timeoutTime = System.currentTimeMillis();
timeoutInfoHandlers.put(requestId, new TimeoutInfoHolder(holder.node(), holder.action(), sentTime, timeoutTime));
timeoutInfoHandlers.put(requestId, new TimeoutInfoHolder(holder.connection().getNode(), holder.action(), sentTime,
timeoutTime));
// now that we have the information visible via timeoutInfoHandlers, we try to remove the request id
final RequestHolder removedHolder = clientHandlers.remove(requestId);
if (removedHolder != null) {
assert removedHolder == holder : "two different holder instances for request [" + requestId + "]";
removedHolder.handler().handleException(
new ReceiveTimeoutTransportException(holder.node(), holder.action(),
new ReceiveTimeoutTransportException(holder.connection().getNode(), holder.action(),
"request_id [" + requestId + "] timed out after [" + (timeoutTime - sentTime) + "ms]"));
} else {
// response was processed, remove timeout info.
Expand Down Expand Up @@ -990,15 +999,15 @@ static class RequestHolder<T extends TransportResponse> {

private final TransportResponseHandler<T> handler;

private final DiscoveryNode node;
private final Transport.Connection connection;

private final String action;

private final TimeoutHandler timeoutHandler;

RequestHolder(TransportResponseHandler<T> handler, DiscoveryNode node, String action, TimeoutHandler timeoutHandler) {
RequestHolder(TransportResponseHandler<T> handler, Transport.Connection connection, String action, TimeoutHandler timeoutHandler) {
this.handler = handler;
this.node = node;
this.connection = connection;
this.action = action;
this.timeoutHandler = timeoutHandler;
}
Expand All @@ -1007,8 +1016,8 @@ public TransportResponseHandler<T> handler() {
return handler;
}

public DiscoveryNode node() {
return this.node;
public Transport.Connection connection() {
return this.connection;
}

public String action() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,8 +604,8 @@ public void testResolveReuseExistingNodeConnections() throws ExecutionException,
// install a listener to check that no new connections are made
handleA.transportService.addConnectionListener(new TransportConnectionListener() {
@Override
public void onConnectionOpened(DiscoveryNode node) {
fail("should not open any connections. got [" + node + "]");
public void onConnectionOpened(Transport.Connection connection) {
fail("should not open any connections. got [" + connection.getNode() + "]");
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ protected void sendMessage(Object o, BytesReference reference, ActionListener li

@Override
protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile) throws IOException {
return new NodeChannels(node, new Object[profile.getNumConnections()], profile);
return new NodeChannels(node, new Object[profile.getNumConnections()], profile, c -> {});
}

@Override
Expand All @@ -220,7 +220,7 @@ public long serverOpen() {
@Override
public NodeChannels getConnection(DiscoveryNode node) {
return new NodeChannels(node, new Object[MockTcpTransport.LIGHT_PROFILE.getNumConnections()],
MockTcpTransport.LIGHT_PROFILE);
MockTcpTransport.LIGHT_PROFILE, c -> {});
}
};
DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ public long serverOpen() {
@Override
protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile) {
final Channel[] channels = new Channel[profile.getNumConnections()];
final NodeChannels nodeChannels = new NodeChannels(node, channels, profile);
final NodeChannels nodeChannels = new NodeChannels(node, channels, profile, transportServiceAdapter::onConnectionClosed);
boolean success = false;
try {
final TimeValue connectTimeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,11 @@ public void sendRequest(long requestId, String action, TransportRequest request,
public void close() throws IOException {
connection.close();
}

@Override
public Object getCacheKey() {
return connection.getCacheKey();
}
}

public Transport getOriginalTransport() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2099,9 +2099,6 @@ public void handleException(TransportException exp) {

@Override
public String executor() {
if (1 == 1)
return "same";

return randomFrom(executors);
}
};
Expand All @@ -2111,4 +2108,59 @@ public String executor() {
latch.await();
}

public void testHandlerIsInvokedOnConnectionClose() throws IOException, InterruptedException {
List<String> executors = new ArrayList<>(ThreadPool.THREAD_POOL_TYPES.keySet());
CollectionUtil.timSort(executors); // makes sure it's reproducible
TransportService serviceC = build(Settings.builder().put("name", "TS_TEST").build(), version0, null, true);
serviceC.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
// do nothing
});
serviceC.start();
serviceC.acceptIncomingRequests();
CountDownLatch latch = new CountDownLatch(1);
TransportResponseHandler<TransportResponse> transportResponseHandler = new TransportResponseHandler<TransportResponse>() {
@Override
public TransportResponse newInstance() {
return TransportResponse.Empty.INSTANCE;
}

@Override
public void handleResponse(TransportResponse response) {
try {
fail("no response expected");
} finally {
latch.countDown();
}
}

@Override
public void handleException(TransportException exp) {
try {
assertTrue(exp.getClass().toString(), exp instanceof NodeDisconnectedException);
} finally {
latch.countDown();
}
}

@Override
public String executor() {
return randomFrom(executors);
}
};
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.addConnections(1,
TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.PING,
TransportRequestOptions.Type.RECOVERY,
TransportRequestOptions.Type.REG,
TransportRequestOptions.Type.STATE);
Transport.Connection connection = serviceB.openConnection(serviceC.getLocalNode(), builder.build());
serviceB.sendRequest(connection, "action", new TestRequest(randomFrom("fail", "pass")), TransportRequestOptions.EMPTY,
transportResponseHandler);
connection.close();
latch.await();
serviceC.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ private void readMessage(MockChannel mockChannel, StreamInput input) throws IOEx
@Override
protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile) throws IOException {
final MockChannel[] mockChannels = new MockChannel[1];
final NodeChannels nodeChannels = new NodeChannels(node, mockChannels, LIGHT_PROFILE); // we always use light here
final NodeChannels nodeChannels = new NodeChannels(node, mockChannels, LIGHT_PROFILE,
transportServiceAdapter::onConnectionClosed); // we always use light here
boolean success = false;
final MockSocket socket = new MockSocket();
try {
Expand Down

0 comments on commit be2a6ce

Please sign in to comment.