Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a dedicated ConnectionManger for RemoteClusterConnection #32988

Merged
merged 3 commits into from
Aug 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand All @@ -62,6 +63,7 @@ public class ConnectionManager implements Closeable {
private final TimeValue pingSchedule;
private final ConnectionProfile defaultProfile;
private final Lifecycle lifecycle = new Lifecycle();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();

Expand All @@ -83,7 +85,9 @@ public ConnectionManager(Settings settings, Transport transport, ThreadPool thre
}

public void addListener(TransportConnectionListener listener) {
this.connectionListener.listeners.add(listener);
if (connectionListener.listeners.contains(listener) == false) {
this.connectionListener.listeners.add(listener);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as connectionListener.listeners is a CopyOnWriteArrayList, you can use addIfAbsent, which also has proper concurrency semantics

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed b6fca90

}
}

public void removeListener(TransportConnectionListener listener) {
Expand Down Expand Up @@ -186,45 +190,50 @@ public void disconnectFromNode(DiscoveryNode node) {
}
}

public int connectedNodeCount() {
/**
* Returns the number of nodes this manager is connected to.
*/
public int size() {
return connectedNodes.size();
}

@Override
public void close() {
lifecycle.moveToStopped();
CountDownLatch latch = new CountDownLatch(1);
if (closed.compareAndSet(false, true)) {
lifecycle.moveToStopped();
CountDownLatch latch = new CountDownLatch(1);

// TODO: Consider moving all read/write lock (in Transport and this class) to the TransportService
threadPool.generic().execute(() -> {
closeLock.writeLock().lock();
try {
// we are holding a write lock so nobody modifies the connectedNodes / openConnections map - it's safe to first close
// all instances and then clear them maps
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
try {
IOUtils.closeWhileHandlingException(next.getValue());
} finally {
iterator.remove();
// TODO: Consider moving all read/write lock (in Transport and this class) to the TransportService
threadPool.generic().execute(() -> {
closeLock.writeLock().lock();
try {
// we are holding a write lock so nobody modifies the connectedNodes / openConnections map - it's safe to first close
// all instances and then clear them maps
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
try {
IOUtils.closeWhileHandlingException(next.getValue());
} finally {
iterator.remove();
}
}
} finally {
closeLock.writeLock().unlock();
latch.countDown();
}
} finally {
closeLock.writeLock().unlock();
latch.countDown();
}
});
});

try {
try {
latch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// ignore
try {
latch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// ignore
}
} finally {
lifecycle.moveToClosed();
}
} finally {
lifecycle.moveToClosed();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -80,30 +81,32 @@
final class RemoteClusterConnection extends AbstractComponent implements TransportConnectionListener, Closeable {

private final TransportService transportService;
private final ConnectionManager connectionManager;
private final ConnectionProfile remoteProfile;
private final ConnectedNodes connectedNodes;
private final String clusterAlias;
private final int maxNumRemoteConnections;
private final Predicate<DiscoveryNode> nodePredicate;
private final ThreadPool threadPool;
private volatile List<Supplier<DiscoveryNode>> seedNodes;
private volatile boolean skipUnavailable;
private final ConnectHandler connectHandler;
private SetOnce<ClusterName> remoteClusterName = new SetOnce<>();
private final ClusterName localClusterName;

/**
* Creates a new {@link RemoteClusterConnection}
* @param settings the nodes settings object
* @param clusterAlias the configured alias of the cluster to connect to
* @param seedNodes a list of seed nodes to discover eligible nodes from
* @param transportService the local nodes transport service
* @param connectionManager the connection manager to use for this remote connection
* @param maxNumRemoteConnections the maximum number of connections to the remote cluster
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
*/
RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate) {
TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections,
Predicate<DiscoveryNode> nodePredicate) {
super(settings);
this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.transportService = transportService;
this.maxNumRemoteConnections = maxNumRemoteConnections;
this.nodePredicate = nodePredicate;
Expand All @@ -122,7 +125,11 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE
.getConcreteSettingForNamespace(clusterAlias).get(settings);
this.connectHandler = new ConnectHandler();
transportService.addConnectionListener(this);
this.threadPool = transportService.threadPool;
this.connectionManager = connectionManager;
connectionManager.addListener(this);
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
connectionManager.addListener(transportService);
}

/**
Expand Down Expand Up @@ -183,8 +190,9 @@ public void ensureConnected(ActionListener<Void> voidActionListener) {

private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest,
final ActionListener<ClusterSearchShardsResponse> listener) {
final DiscoveryNode node = connectedNodes.getAny();
transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest,
final DiscoveryNode node = getAnyConnectedNode();
Transport.Connection connection = connectionManager.getConnection(node);
transportService.sendRequest(connection, ClusterSearchShardsAction.NAME, searchShardsRequest, TransportRequestOptions.EMPTY,
new TransportResponseHandler<ClusterSearchShardsResponse>() {

@Override
Expand Down Expand Up @@ -219,12 +227,16 @@ void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
request.clear();
request.nodes(true);
request.local(true); // run this on the node that gets the request it's as good as any other
final DiscoveryNode node = connectedNodes.getAny();
transportService.sendRequest(node, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
final DiscoveryNode node = getAnyConnectedNode();
Transport.Connection connection = connectionManager.getConnection(node);
transportService.sendRequest(connection, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
new TransportResponseHandler<ClusterStateResponse>() {

@Override
public ClusterStateResponse newInstance() {
return new ClusterStateResponse();
public ClusterStateResponse read(StreamInput in) throws IOException {
ClusterStateResponse response = new ClusterStateResponse();
response.readFrom(in);
return response;
}

@Override
Expand Down Expand Up @@ -261,11 +273,11 @@ public String executor() {
* If such node is not connected, the returned connection will be a proxy connection that redirects to it.
*/
Transport.Connection getConnection(DiscoveryNode remoteClusterNode) {
if (transportService.nodeConnected(remoteClusterNode)) {
return transportService.getConnection(remoteClusterNode);
if (connectionManager.nodeConnected(remoteClusterNode)) {
return connectionManager.getConnection(remoteClusterNode);
}
DiscoveryNode discoveryNode = connectedNodes.getAny();
Transport.Connection connection = transportService.getConnection(discoveryNode);
DiscoveryNode discoveryNode = getAnyConnectedNode();
Transport.Connection connection = connectionManager.getConnection(discoveryNode);
return new ProxyConnection(connection, remoteClusterNode);
}

Expand Down Expand Up @@ -317,33 +329,18 @@ public Version getVersion() {
}

Transport.Connection getConnection() {
return transportService.getConnection(getAnyConnectedNode());
return connectionManager.getConnection(getAnyConnectedNode());
}

@Override
public void close() throws IOException {
connectHandler.close();
IOUtils.close(connectHandler, connectionManager);
}

public boolean isClosed() {
return connectHandler.isClosed();
}

private ConnectionProfile getRemoteProfile(ClusterName name) {
// we can only compare the cluster name to make a decision if we should use a remote profile
// we can't use a cluster UUID here since we could be connecting to that remote cluster before
// the remote node has joined its cluster and have a cluster UUID. The fact that we just lose a
// rather smallish optimization on the connection layer under certain situations where remote clusters
// have the same name as the local one is minor here.
// the alternative here is to complicate the remote infrastructure to also wait until we formed a cluster,
// gained a cluster UUID and then start connecting etc. we rather use this simplification in order to maintain simplicity
if (this.localClusterName.equals(name)) {
return null;
} else {
return remoteProfile;
}
}

/**
* The connect handler manages node discovery and the actual connect to the remote cluster.
* There is at most one connect job running at any time. If such a connect job is triggered
Expand Down Expand Up @@ -387,7 +384,7 @@ private void connect(ActionListener<Void> connectListener, boolean forceRun) {
final boolean runConnect;
final Collection<ActionListener<Void>> toNotify;
final ActionListener<Void> listener = connectListener == null ? null :
ContextPreservingActionListener.wrapPreservingContext(connectListener, transportService.getThreadPool().getThreadContext());
ContextPreservingActionListener.wrapPreservingContext(connectListener, threadPool.getThreadContext());
synchronized (queue) {
if (listener != null && queue.offer(listener) == false) {
listener.onFailure(new RejectedExecutionException("connect queue is full"));
Expand Down Expand Up @@ -415,7 +412,6 @@ private void connect(ActionListener<Void> connectListener, boolean forceRun) {
}

private void forkConnect(final Collection<ActionListener<Void>> toNotify) {
ThreadPool threadPool = transportService.getThreadPool();
ExecutorService executor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
executor.submit(new AbstractRunnable() {
@Override
Expand Down Expand Up @@ -452,13 +448,13 @@ protected void doRun() {
maybeConnect();
}
});
collectRemoteNodes(seedNodes.iterator(), transportService, listener);
collectRemoteNodes(seedNodes.iterator(), transportService, connectionManager, listener);
}
});
}

private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
final TransportService transportService, ActionListener<Void> listener) {
final TransportService transportService, final ConnectionManager manager, ActionListener<Void> listener) {
if (Thread.currentThread().isInterrupted()) {
listener.onFailure(new InterruptedException("remote connect thread got interrupted"));
}
Expand All @@ -467,7 +463,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
cancellableThreads.executeIO(() -> {
final DiscoveryNode seedNode = seedNodes.next().get();
final TransportService.HandshakeResponse handshakeResponse;
Transport.Connection connection = transportService.openConnection(seedNode,
Transport.Connection connection = manager.openConnection(seedNode,
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
boolean success = false;
try {
Expand All @@ -482,7 +478,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,

final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode();
if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) {
transportService.connectToNode(handshakeNode, getRemoteProfile(handshakeResponse.getClusterName()));
manager.connectToNode(handshakeNode, remoteProfile, transportService.connectionValidator(handshakeNode));
if (remoteClusterName.get() == null) {
assert handshakeResponse.getClusterName().value() != null;
remoteClusterName.set(handshakeResponse.getClusterName());
Expand Down Expand Up @@ -524,7 +520,7 @@ private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
// ISE if we fail the handshake with an version incompatible node
if (seedNodes.hasNext()) {
logger.debug(() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), ex);
collectRemoteNodes(seedNodes, transportService, listener);
collectRemoteNodes(seedNodes, transportService, manager, listener);
} else {
listener.onFailure(ex);
}
Expand Down Expand Up @@ -552,7 +548,6 @@ final boolean isClosed() {
/* This class handles the _state response from the remote cluster when sniffing nodes to connect to */
private class SniffClusterStateResponseHandler implements TransportResponseHandler<ClusterStateResponse> {

private final TransportService transportService;
private final Transport.Connection connection;
private final ActionListener<Void> listener;
private final Iterator<Supplier<DiscoveryNode>> seedNodes;
Expand All @@ -561,7 +556,6 @@ private class SniffClusterStateResponseHandler implements TransportResponseHandl
SniffClusterStateResponseHandler(TransportService transportService, Transport.Connection connection,
ActionListener<Void> listener, Iterator<Supplier<DiscoveryNode>> seedNodes,
CancellableThreads cancellableThreads) {
this.transportService = transportService;
this.connection = connection;
this.listener = listener;
this.seedNodes = seedNodes;
Expand Down Expand Up @@ -592,8 +586,8 @@ public void handleResponse(ClusterStateResponse response) {
for (DiscoveryNode node : nodesIter) {
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
try {
transportService.connectToNode(node, getRemoteProfile(remoteClusterName.get())); // noop if node is
// connected
connectionManager.connectToNode(node, remoteProfile,
transportService.connectionValidator(node)); // noop if node is connected
connectedNodes.add(node);
} catch (ConnectTransportException | IllegalStateException ex) {
// ISE if we fail the handshake with an version incompatible node
Expand All @@ -609,7 +603,7 @@ public void handleResponse(ClusterStateResponse response) {
listener.onFailure(ex); // we got canceled - fail the listener and step out
} catch (Exception ex) {
logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), ex);
collectRemoteNodes(seedNodes, transportService, listener);
collectRemoteNodes(seedNodes, transportService, connectionManager, listener);
}
}

Expand All @@ -620,7 +614,7 @@ public void handleException(TransportException exp) {
IOUtils.closeWhileHandlingException(connection);
} finally {
// once the connection is closed lets try the next node
collectRemoteNodes(seedNodes, transportService, listener);
collectRemoteNodes(seedNodes, transportService, connectionManager, listener);
}
}

Expand Down Expand Up @@ -715,4 +709,8 @@ private synchronized void ensureIteratorAvailable() {
}
}
}

ConnectionManager getConnectionManager() {
return connectionManager;
}
}
Loading