Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add infrastructure to manage network connections outside of Transport/TransportService #22194

Merged
merged 4 commits into from
Dec 17, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
Expand All @@ -47,7 +48,9 @@
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
Expand All @@ -58,6 +61,7 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -68,8 +72,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -80,6 +84,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -328,10 +333,6 @@ protected void doRun() throws Exception {
sendPings(duration, TimeValue.timeValueMillis(duration.millis() / 2), sendPingsHandler, resolvedDiscoveryNodes);
sendPingsHandler.close();
listener.onPing(sendPingsHandler.pingCollection().toList());
for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) {
logger.trace("[{}] disconnecting from {}", sendPingsHandler.id(), node);
transportService.disconnectFromNode(node);
}
}

@Override
Expand Down Expand Up @@ -359,7 +360,7 @@ public void onFailure(Exception e) {

class SendPingsHandler implements Releasable {
private final int id;
private final Set<DiscoveryNode> nodeToDisconnect = ConcurrentCollections.newConcurrentSet();
private final List<Transport.Connection> temporaryConnections = new CopyOnWriteArrayList<>();
private final PingCollection pingCollection;

private AtomicBoolean closed = new AtomicBoolean(false);
Expand All @@ -385,8 +386,18 @@ public PingCollection pingCollection() {
public void close() {
if (closed.compareAndSet(false, true)) {
receivedResponses.remove(id);
try {
IOUtils.close(temporaryConnections);
temporaryConnections.clear();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

public void addTemporaryConnection(Transport.Connection connection) {
temporaryConnections.add(connection);
}
}


Expand Down Expand Up @@ -455,7 +466,6 @@ void sendPings(
logger.trace("replacing {} with temp node {}", nodeToSend, tempNode);
nodeToSend = tempNode;
}
sendPingsHandler.nodeToDisconnect.add(nodeToSend);
}
// fork the connection to another thread
final DiscoveryNode finalNodeToSend = nodeToSend;
Expand All @@ -467,18 +477,20 @@ public void run() {
}
boolean success = false;
try {
// connect to the node, see if we manage to do it, if not, bail
if (!nodeFoundByAddress) {
logger.trace("[{}] connecting (light) to {}", sendPingsHandler.id(), finalNodeToSend);
transportService.connectToNodeAndHandshake(finalNodeToSend, timeout.getMillis());
Transport.Connection connection;
if (nodeFoundByAddress && transportService.nodeConnected(finalNodeToSend)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can this returns true? This code is only executed if further up !transportService.nodeConnected(nodeToSend) holds (or we created a temporary disco node which has a fresh id and is never connected).

I also wonder if we should use a temporary disco-node when nodeFoundByAddress == true and transportService.nodeConnected(nodeToSend) == false.

Last, regarding the existing comment on line 454

// if we find on the disco nodes a matching node by address, we are going to restore the connection
// anyhow down the line if its not connected...

Now that we are not reestablishing the full connection to a node anymore that's in the CS, I wonder what this will do when the node gets disconnected but not removed from the cluster state. When pinging completes (with a temporary connection) and the node stays in the cluster state that is published by the master, then the master will not reestablish the connection in ClusterService (as the node has not been removed on the CS and connectToNodes is only called on the newly added nodes).

Let's talk about this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ywelsch these are valid points. Seeing the PR already made me realize it's time to re-write this class as it has grown too tangled. I hoped I can do it after @s1monw pushed this PR in, but I'm not sure it's worth it anymore to push through and fix all of this.

@s1monw how about you push just the transport layer changes and I will use them in a follow up PR to clean up this class?

Re node connections -

You are right too that this is not good and not optimal. Since the nodes was never removed from the cluster state, it was also never removed from NodeConnectionService (I think). IMO this is just too complex (and there is nodeFD and node joins that play a role here too, on the master). I suggest we change NodeConnectionService to always work on the ClusterState.nodes() directly rather than trying to be overly heroic and use the deltas. This will give a hard validation of every CS. This should be very cheep tests. We can do it as another follow - potentially before we do the unicast zen ping work.

How does that sound?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will just keep the transport layer changes here...

logger.trace("[{}] reusing existing ping connection to {}", sendPingsHandler.id(), finalNodeToSend);
connection = transportService.getConnection(finalNodeToSend);
} else {
logger.trace("[{}] connecting to {}", sendPingsHandler.id(), finalNodeToSend);
transportService.connectToNode(finalNodeToSend);
// connect to the node, see if we manage to do it, if not, bail
logger.trace("[{}] open ping connection to {}", sendPingsHandler.id(), finalNodeToSend);
connection = transportService.openConnection(finalNodeToSend, ConnectionProfile.LIGHT_PROFILE);
sendPingsHandler.addTemporaryConnection(connection);
transportService.handshake(connection, timeout.millis());
}
logger.trace("[{}] connected to {}", sendPingsHandler.id(), node);
if (receivedResponses.containsKey(sendPingsHandler.id())) {
// we are connected and still in progress, send the ping request
sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, finalNodeToSend);
sendPingRequestToNode(() -> connection, sendPingsHandler.id(), timeout, pingRequest, latch, node, finalNodeToSend);
} else {
// connect took too long, just log it and bail
latch.countDown();
Expand Down Expand Up @@ -508,7 +520,9 @@ public void run() {
}
});
} else {
sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, nodeToSend);
final DiscoveryNode finalNodeToSend = nodeToSend;
sendPingRequestToNode(() -> transportService.getConnection(finalNodeToSend),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question - why not just get the connection from the transport service right here? does the supplier buy as something? maybe we can even try to get it first and create a new temporary only if it fails

            Transport.Connection existingConnection = null;
            if (transportService.nodeConnected(nodeToSend)) {
                try {
                    existingConnection = transportService.getConnection(nodeToSend);
                } catch (NodeNotConnectedException e) {
                    // it's ok we'll create a temp connection
                }
            }

            if (existingConnection != null) {
                sendPingRequestToNode(existingConnection, sendPingsHandler.id(), timeout, pingRequest, latch, node, nodeToSend);
            } else {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yet, it buys us to not duplicate the exception handling.

sendPingsHandler.id(), timeout, pingRequest, latch, node, finalNodeToSend);
}
}
if (waitTime != null) {
Expand All @@ -520,11 +534,22 @@ public void run() {
}
}

private void sendPingRequestToNode(final int id, final TimeValue timeout, final UnicastPingRequest pingRequest,
final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) {
private void sendPingRequestToNode(final Supplier<Transport.Connection> connection, final int id, final TimeValue timeout,
final UnicastPingRequest pingRequest, final CountDownLatch latch, final DiscoveryNode node,
final DiscoveryNode nodeToSend) {
logger.trace("[{}] sending to {}", id, nodeToSend);
transportService.sendRequest(nodeToSend, ACTION_NAME, pingRequest, TransportRequestOptions.builder()
.withTimeout((long) (timeout.millis() * 1.25)).build(), new TransportResponseHandler<UnicastPingResponse>() {
TransportRequestOptions options = TransportRequestOptions.builder()
.withTimeout((long) (timeout.millis() * 1.25)).build();
Consumer<Exception> handleException = (exp) -> {
latch.countDown();
if (exp instanceof ConnectTransportException) {
// ok, not connected...
logger.trace((Supplier<?>) () -> new ParameterizedMessage("failed to connect to {}", nodeToSend), exp);
} else {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to send ping to [{}]", node), exp);
}
};
TransportResponseHandler<UnicastPingResponse> handler = new TransportResponseHandler<UnicastPingResponse>() {

@Override
public UnicastPingResponse newInstance() {
Expand Down Expand Up @@ -563,15 +588,15 @@ public void handleResponse(UnicastPingResponse response) {

@Override
public void handleException(TransportException exp) {
latch.countDown();
if (exp instanceof ConnectTransportException) {
// ok, not connected...
logger.trace((Supplier<?>) () -> new ParameterizedMessage("failed to connect to {}", nodeToSend), exp);
} else {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to send ping to [{}]", node), exp);
}
handleException.accept(exp);
}
});
};
try {
transportService.sendRequest(connection.get(), ACTION_NAME, pingRequest, options, handler);
} catch (Exception e) {
// connection.get() might barf - we have to handle this
handleException.accept(e);
}
}

private UnicastPingResponse handlePingRequest(final UnicastPingRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ public List<String> getLocalAddresses() {
return transport.getLocalAddresses();
}

/**
* Returns <code>true</code> iff the given node is already connected.
*/
public boolean nodeConnected(DiscoveryNode node) {
return node.equals(localNode) || transport.nodeConnected(node);
}
Expand All @@ -312,32 +315,32 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection
}

/**
* Lightly connect to the specified node, returning updated node
* information. The handshake will fail if the cluster name on the
* target node mismatches the local cluster name and
* {@code checkClusterName} is {@code true}.
* Establishes and returns a new connection to the given node. The connection is NOT maintained by this service, it's the callers
* responsibility to close the connection once it goes out of scope.
* @param node the node to connect to
* @param profile the connection profile to use
*/
public Transport.Connection openConnection(final DiscoveryNode node, ConnectionProfile profile) throws IOException {
if (node.equals(localNode)) {
return localNodeConnection;
} else {
return transport.openConnection(node, profile);
}
}

/**
* Executes a high-level handshake using the given connection
* and returns the discovery node of the node the connection
* was established with. The handshake will fail if the cluster
* name on the target node mismatches the local cluster name.
*
* @param node the node to connect to
* @param connection the connection to a specific node
* @param handshakeTimeout handshake timeout
* @return the connected node
* @throws ConnectTransportException if the connection failed
* @throws IllegalStateException if the handshake failed
*/
public DiscoveryNode connectToNodeAndHandshake(
final DiscoveryNode node,
final long handshakeTimeout) throws IOException {
if (node.equals(localNode)) {
return localNode;
}
DiscoveryNode handshakeNode;
try (Transport.Connection connection = transport.openConnection(node, ConnectionProfile.LIGHT_PROFILE)) {
handshakeNode = handshake(connection, handshakeTimeout);
}
connectToNode(node, ConnectionProfile.LIGHT_PROFILE);
return handshakeNode;
}

private DiscoveryNode handshake(
public DiscoveryNode handshake(
final Transport.Connection connection,
final long handshakeTimeout) throws ConnectTransportException {
final HandshakeResponse response;
Expand Down Expand Up @@ -465,7 +468,7 @@ public final <T extends TransportResponse> void sendRequest(final DiscoveryNode
}
}

final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
Expand All @@ -477,7 +480,7 @@ final <T extends TransportResponse> void sendRequest(final Transport.Connection
* Returns either a real transport connection or a local node connection if we are using the local node optimization.
* @throws NodeNotConnectedException if the given node is not connected
*/
private Transport.Connection getConnection(DiscoveryNode node) {
public Transport.Connection getConnection(DiscoveryNode node) {
if (Objects.requireNonNull(node, "node must be non-null").equals(localNode)) {
return localNodeConnection;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.MockTcpTransport;
Expand Down Expand Up @@ -571,7 +572,7 @@ private NetworkHandle startServices(
final BiFunction<Settings, Version, Transport> supplier) {
final Transport transport = supplier.apply(settings, version);
final TransportService transportService =
new TransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
transportService.start();
transportService.acceptIncomingRequests();
final ConcurrentMap<TransportAddress, AtomicInteger> counters = ConcurrentCollections.newConcurrentMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,15 @@ public void testConnectToNodeLight() throws IOException {
emptyMap(),
emptySet(),
Version.CURRENT.minimumCompatibilityVersion());
DiscoveryNode connectedNode =
handleA.transportService.connectToNodeAndHandshake(discoveryNode, timeout);
assertNotNull(connectedNode);

// the name and version should be updated
assertEquals(connectedNode.getName(), "TS_B");
assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion());
assertTrue(handleA.transportService.nodeConnected(discoveryNode));
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, ConnectionProfile.LIGHT_PROFILE)){
DiscoveryNode connectedNode = handleA.transportService.handshake(connection, timeout);
assertNotNull(connectedNode);
// the name and version should be updated
assertEquals(connectedNode.getName(), "TS_B");
assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion());
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
}

}

public void testMismatchedClusterName() {
Expand All @@ -133,8 +134,11 @@ public void testMismatchedClusterName() {
emptyMap(),
emptySet(),
Version.CURRENT.minimumCompatibilityVersion());
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> handleA.transportService.connectToNodeAndHandshake(
discoveryNode, timeout));
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> {
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, ConnectionProfile.LIGHT_PROFILE)) {
handleA.transportService.handshake(connection, timeout);
}
});
assertThat(ex.getMessage(), containsString("handshake failed, mismatched cluster name [Cluster [b]]"));
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
}
Expand All @@ -150,8 +154,11 @@ public void testIncompatibleVersions() {
emptyMap(),
emptySet(),
Version.CURRENT.minimumCompatibilityVersion());
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> handleA.transportService.connectToNodeAndHandshake(
discoveryNode, timeout));
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> {
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, ConnectionProfile.LIGHT_PROFILE)) {
handleA.transportService.handshake(connection, timeout);
}
});
assertThat(ex.getMessage(), containsString("handshake failed, incompatible version"));
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
}
Expand Down
Loading