Skip to content

Commit

Permalink
Use a dedicated ConnectionManger for RemoteClusterConnection
Browse files Browse the repository at this point in the history
This change introduces a dedicated ConnectionManager for every RemoteClusterConnection
such that there is not state shared with the TransportService internal ConnectionManager.
All connections to a remote cluster are isolated from the TransportService but still uses
the TransportService and it's internal properties like the Transport, tracing and internal
listener actions on disconnects etc.
This allows a remote cluster connection to have a different lifecycle than a local cluster connection,
also local discovery code doesn't get notified if there is a disconnect on from a remote cluster and
each connection can use it's own dedicated connection profile which allows to have a reduced set of
connections per cluster without conflicting with the local cluster.

Closes elastic#31835
  • Loading branch information
s1monw committed Aug 20, 2018
1 parent e3700a9 commit 24c1572
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand All @@ -62,6 +63,7 @@ public class ConnectionManager implements Closeable {
private final TimeValue pingSchedule;
private final ConnectionProfile defaultProfile;
private final Lifecycle lifecycle = new Lifecycle();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();

Expand All @@ -83,7 +85,9 @@ public ConnectionManager(Settings settings, Transport transport, ThreadPool thre
}

public void addListener(TransportConnectionListener listener) {
this.connectionListener.listeners.add(listener);
if (connectionListener.listeners.contains(listener) == false) {
this.connectionListener.listeners.add(listener);
}
}

public void removeListener(TransportConnectionListener listener) {
Expand Down Expand Up @@ -186,45 +190,50 @@ public void disconnectFromNode(DiscoveryNode node) {
}
}

public int connectedNodeCount() {
/**
* Returns the number of nodes this manager is connected to.
*/
public int size() {
return connectedNodes.size();
}

@Override
public void close() {
lifecycle.moveToStopped();
CountDownLatch latch = new CountDownLatch(1);
if (closed.compareAndSet(false, true)) {
lifecycle.moveToStopped();
CountDownLatch latch = new CountDownLatch(1);

// TODO: Consider moving all read/write lock (in Transport and this class) to the TransportService
threadPool.generic().execute(() -> {
closeLock.writeLock().lock();
try {
// we are holding a write lock so nobody modifies the connectedNodes / openConnections map - it's safe to first close
// all instances and then clear them maps
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
try {
IOUtils.closeWhileHandlingException(next.getValue());
} finally {
iterator.remove();
// TODO: Consider moving all read/write lock (in Transport and this class) to the TransportService
threadPool.generic().execute(() -> {
closeLock.writeLock().lock();
try {
// we are holding a write lock so nobody modifies the connectedNodes / openConnections map - it's safe to first close
// all instances and then clear them maps
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
try {
IOUtils.closeWhileHandlingException(next.getValue());
} finally {
iterator.remove();
}
}
} finally {
closeLock.writeLock().unlock();
latch.countDown();
}
} finally {
closeLock.writeLock().unlock();
latch.countDown();
}
});
});

try {
try {
latch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// ignore
try {
latch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// ignore
}
} finally {
lifecycle.moveToClosed();
}
} finally {
lifecycle.moveToClosed();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -80,16 +81,17 @@
final class RemoteClusterConnection extends AbstractComponent implements TransportConnectionListener, Closeable {

private final TransportService transportService;
private final ConnectionManager connectionManager;
private final ConnectionProfile remoteProfile;
private final ConnectedNodes connectedNodes;
private final String clusterAlias;
private final int maxNumRemoteConnections;
private final Predicate<DiscoveryNode> nodePredicate;
private final ThreadPool threadPool;
private volatile List<Supplier<DiscoveryNode>> seedNodes;
private volatile boolean skipUnavailable;
private final ConnectHandler connectHandler;
private SetOnce<ClusterName> remoteClusterName = new SetOnce<>();
private final ClusterName localClusterName;

/**
* Creates a new {@link RemoteClusterConnection}
Expand All @@ -101,9 +103,9 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
*/
RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate) {
TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections,
Predicate<DiscoveryNode> nodePredicate) {
super(settings);
this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.transportService = transportService;
this.maxNumRemoteConnections = maxNumRemoteConnections;
this.nodePredicate = nodePredicate;
Expand All @@ -122,7 +124,11 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE
.getConcreteSettingForNamespace(clusterAlias).get(settings);
this.connectHandler = new ConnectHandler();
transportService.addConnectionListener(this);
this.threadPool = transportService.threadPool;
this.connectionManager = connectionManager;
connectionManager.addListener(this);
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
connectionManager.addListener(transportService);
}

/**
Expand Down Expand Up @@ -183,8 +189,9 @@ public void ensureConnected(ActionListener<Void> voidActionListener) {

private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest,
final ActionListener<ClusterSearchShardsResponse> listener) {
final DiscoveryNode node = connectedNodes.getAny();
transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest,
final DiscoveryNode node = getAnyConnectedNode();
Transport.Connection connection = connectionManager.getConnection(node);
transportService.sendRequest(connection, ClusterSearchShardsAction.NAME, searchShardsRequest, TransportRequestOptions.EMPTY,
new TransportResponseHandler<ClusterSearchShardsResponse>() {

@Override
Expand Down Expand Up @@ -219,12 +226,16 @@ void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
request.clear();
request.nodes(true);
request.local(true); // run this on the node that gets the request it's as good as any other
final DiscoveryNode node = connectedNodes.getAny();
transportService.sendRequest(node, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
final DiscoveryNode node = getAnyConnectedNode();
Transport.Connection connection = connectionManager.getConnection(node);
transportService.sendRequest(connection, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
new TransportResponseHandler<ClusterStateResponse>() {

@Override
public ClusterStateResponse newInstance() {
return new ClusterStateResponse();
public ClusterStateResponse read(StreamInput in) throws IOException {
ClusterStateResponse response = new ClusterStateResponse();
response.readFrom(in);
return response;
}

@Override
Expand Down Expand Up @@ -261,11 +272,11 @@ public String executor() {
* If such node is not connected, the returned connection will be a proxy connection that redirects to it.
*/
Transport.Connection getConnection(DiscoveryNode remoteClusterNode) {
if (transportService.nodeConnected(remoteClusterNode)) {
return transportService.getConnection(remoteClusterNode);
if (connectionManager.nodeConnected(remoteClusterNode)) {
return connectionManager.getConnection(remoteClusterNode);
}
DiscoveryNode discoveryNode = connectedNodes.getAny();
Transport.Connection connection = transportService.getConnection(discoveryNode);
DiscoveryNode discoveryNode = getAnyConnectedNode();
Transport.Connection connection = connectionManager.getConnection(discoveryNode);
return new ProxyConnection(connection, remoteClusterNode);
}

Expand Down Expand Up @@ -317,33 +328,18 @@ public Version getVersion() {
}

Transport.Connection getConnection() {
return transportService.getConnection(getAnyConnectedNode());
return connectionManager.getConnection(getAnyConnectedNode());
}

@Override
public void close() throws IOException {
connectHandler.close();
IOUtils.close(connectHandler, connectionManager);
}

public boolean isClosed() {
return connectHandler.isClosed();
}

private ConnectionProfile getRemoteProfile(ClusterName name) {
// we can only compare the cluster name to make a decision if we should use a remote profile
// we can't use a cluster UUID here since we could be connecting to that remote cluster before
// the remote node has joined its cluster and have a cluster UUID. The fact that we just lose a
// rather smallish optimization on the connection layer under certain situations where remote clusters
// have the same name as the local one is minor here.
// the alternative here is to complicate the remote infrastructure to also wait until we formed a cluster,
// gained a cluster UUID and then start connecting etc. we rather use this simplification in order to maintain simplicity
if (this.localClusterName.equals(name)) {
return null;
} else {
return remoteProfile;
}
}

/**
* The connect handler manages node discovery and the actual connect to the remote cluster.
* There is at most one connect job running at any time. If such a connect job is triggered
Expand Down Expand Up @@ -387,7 +383,7 @@ private void connect(ActionListener<Void> connectListener, boolean forceRun) {
final boolean runConnect;
final Collection<ActionListener<Void>> toNotify;
final ActionListener<Void> listener = connectListener == null ? null :
ContextPreservingActionListener.wrapPreservingContext(connectListener, transportService.getThreadPool().getThreadContext());
ContextPreservingActionListener.wrapPreservingContext(connectListener, threadPool.getThreadContext());
synchronized (queue) {
if (listener != null && queue.offer(listener) == false) {
listener.onFailure(new RejectedExecutionException("connect queue is full"));
Expand Down Expand Up @@ -415,7 +411,6 @@ private void connect(ActionListener<Void> connectListener, boolean forceRun) {
}

private void forkConnect(final Collection<ActionListener<Void>> toNotify) {
ThreadPool threadPool = transportService.getThreadPool();
ExecutorService executor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
executor.submit(new AbstractRunnable() {
@Override
Expand Down Expand Up @@ -452,13 +447,13 @@ protected void doRun() {
maybeConnect();
}
});
collectRemoteNodes(seedNodes.iterator(), transportService, listener);
collectRemoteNodes(seedNodes.iterator(), transportService, connectionManager, listener);
}
});
}

private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
final TransportService transportService, ActionListener<Void> listener) {
final TransportService transportService, final ConnectionManager manager, ActionListener<Void> listener) {
if (Thread.currentThread().isInterrupted()) {
listener.onFailure(new InterruptedException("remote connect thread got interrupted"));
}
Expand All @@ -467,7 +462,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
cancellableThreads.executeIO(() -> {
final DiscoveryNode seedNode = seedNodes.next().get();
final TransportService.HandshakeResponse handshakeResponse;
Transport.Connection connection = transportService.openConnection(seedNode,
Transport.Connection connection = manager.openConnection(seedNode,
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
boolean success = false;
try {
Expand All @@ -482,7 +477,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,

final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode();
if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) {
transportService.connectToNode(handshakeNode, getRemoteProfile(handshakeResponse.getClusterName()));
manager.connectToNode(handshakeNode, remoteProfile, transportService.connectionValidator(handshakeNode));
if (remoteClusterName.get() == null) {
assert handshakeResponse.getClusterName().value() != null;
remoteClusterName.set(handshakeResponse.getClusterName());
Expand Down Expand Up @@ -524,7 +519,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
// ISE if we fail the handshake with an version incompatible node
if (seedNodes.hasNext()) {
logger.debug(() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), ex);
collectRemoteNodes(seedNodes, transportService, listener);
collectRemoteNodes(seedNodes, transportService, manager, listener);
} else {
listener.onFailure(ex);
}
Expand Down Expand Up @@ -552,7 +547,6 @@ final boolean isClosed() {
/* This class handles the _state response from the remote cluster when sniffing nodes to connect to */
private class SniffClusterStateResponseHandler implements TransportResponseHandler<ClusterStateResponse> {

private final TransportService transportService;
private final Transport.Connection connection;
private final ActionListener<Void> listener;
private final Iterator<Supplier<DiscoveryNode>> seedNodes;
Expand All @@ -561,7 +555,6 @@ private class SniffClusterStateResponseHandler implements TransportResponseHandl
SniffClusterStateResponseHandler(TransportService transportService, Transport.Connection connection,
ActionListener<Void> listener, Iterator<Supplier<DiscoveryNode>> seedNodes,
CancellableThreads cancellableThreads) {
this.transportService = transportService;
this.connection = connection;
this.listener = listener;
this.seedNodes = seedNodes;
Expand Down Expand Up @@ -592,8 +585,8 @@ public void handleResponse(ClusterStateResponse response) {
for (DiscoveryNode node : nodesIter) {
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
try {
transportService.connectToNode(node, getRemoteProfile(remoteClusterName.get())); // noop if node is
// connected
connectionManager.connectToNode(node, remoteProfile,
transportService.connectionValidator(node)); // noop if node is connected
connectedNodes.add(node);
} catch (ConnectTransportException | IllegalStateException ex) {
// ISE if we fail the handshake with an version incompatible node
Expand All @@ -609,7 +602,7 @@ public void handleResponse(ClusterStateResponse response) {
listener.onFailure(ex); // we got canceled - fail the listener and step out
} catch (Exception ex) {
logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), ex);
collectRemoteNodes(seedNodes, transportService, listener);
collectRemoteNodes(seedNodes, transportService, connectionManager, listener);
}
}

Expand All @@ -620,7 +613,7 @@ public void handleException(TransportException exp) {
IOUtils.closeWhileHandlingException(connection);
} finally {
// once the connection is closed lets try the next node
collectRemoteNodes(seedNodes, transportService, listener);
collectRemoteNodes(seedNodes, transportService, connectionManager, listener);
}
}

Expand Down Expand Up @@ -715,4 +708,8 @@ private synchronized void ensureIteratorAvailable() {
}
}
}

ConnectionManager getConnectionManager() {
return connectionManager;
}
}
Loading

0 comments on commit 24c1572

Please sign in to comment.