Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETWORKING: Make RemoteClusterConn. Lazy Resolve DNS (#32764) #32976

Merged
merged 1 commit into from
Aug 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport;

import java.util.function.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -48,9 +49,20 @@ public abstract class RemoteClusterAware extends AbstractComponent {
/**
* A list of initial seed nodes to discover eligible nodes from the remote cluster
*/
public static final Setting.AffixSetting<List<InetSocketAddress>> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting("search.remote.",
"seeds", (key) -> Setting.listSetting(key, Collections.emptyList(), RemoteClusterAware::parseSeedAddress,
Setting.Property.NodeScope, Setting.Property.Dynamic));
public static final Setting.AffixSetting<List<String>> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting(
"search.remote.",
"seeds",
key -> Setting.listSetting(
key, Collections.emptyList(),
s -> {
// validate seed address
parsePort(s);
return s;
},
Setting.Property.NodeScope,
Setting.Property.Dynamic
)
);
public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
public static final String LOCAL_CLUSTER_GROUP_KEY = "";

Expand All @@ -65,18 +77,20 @@ protected RemoteClusterAware(Settings settings) {
this.clusterNameResolver = new ClusterNameExpressionResolver(settings);
}

protected static Map<String, List<DiscoveryNode>> buildRemoteClustersSeeds(Settings settings) {
Stream<Setting<List<InetSocketAddress>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
protected static Map<String, List<Supplier<DiscoveryNode>>> buildRemoteClustersSeeds(Settings settings) {
Stream<Setting<List<String>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
return allConcreteSettings.collect(
Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> {
String clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting);
List<DiscoveryNode> nodes = new ArrayList<>();
for (InetSocketAddress address : concreteSetting.get(settings)) {
TransportAddress transportAddress = new TransportAddress(address);
DiscoveryNode node = new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
nodes.add(node);
List<String> addresses = concreteSetting.get(settings);
List<Supplier<DiscoveryNode>> nodes = new ArrayList<>(addresses.size());
for (String address : addresses) {
nodes.add(() -> {
TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));
return new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
});
}
return nodes;
}));
Expand Down Expand Up @@ -128,7 +142,7 @@ public Map<String, List<String>> groupClusterIndices(String[] requestIndices, Pr
* Subclasses must implement this to receive information about updated cluster aliases. If the given address list is
* empty the cluster alias is unregistered and should be removed.
*/
protected abstract void updateRemoteCluster(String clusterAlias, List<InetSocketAddress> addresses);
protected abstract void updateRemoteCluster(String clusterAlias, List<String> addresses);

/**
* Registers this instance to listen to updates on the cluster settings.
Expand All @@ -138,27 +152,35 @@ public void listenForUpdates(ClusterSettings clusterSettings) {
(namespace, value) -> {});
}

private static InetSocketAddress parseSeedAddress(String remoteHost) {
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300
if (portSeparator == -1 || portSeparator == remoteHost.length()) {
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead");
}
String host = remoteHost.substring(0, portSeparator);
protected static InetSocketAddress parseSeedAddress(String remoteHost) {
String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost));
InetAddress hostAddress;
try {
hostAddress = InetAddress.getByName(host);
} catch (UnknownHostException e) {
throw new IllegalArgumentException("unknown host [" + host + "]", e);
}
return new InetSocketAddress(hostAddress, parsePort(remoteHost));
}

private static int parsePort(String remoteHost) {
try {
int port = Integer.valueOf(remoteHost.substring(portSeparator + 1));
int port = Integer.valueOf(remoteHost.substring(indexOfPortSeparator(remoteHost) + 1));
if (port <= 0) {
throw new IllegalArgumentException("port number must be > 0 but was: [" + port + "]");
}
return new InetSocketAddress(hostAddress, port);
return port;
} catch (NumberFormatException e) {
throw new IllegalArgumentException("port must be a number", e);
throw new IllegalArgumentException("failed to parse port", e);
}
}

private static int indexOfPortSeparator(String remoteHost) {
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300
if (portSeparator == -1 || portSeparator == remoteHost.length()) {
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead");
}
return portSeparator;
}

public static String buildRemoteIndexName(String clusterAlias, String indexName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport;

import java.util.function.Supplier;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.SetOnce;
Expand Down Expand Up @@ -88,7 +89,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
private final String clusterAlias;
private final int maxNumRemoteConnections;
private final Predicate<DiscoveryNode> nodePredicate;
private volatile List<DiscoveryNode> seedNodes;
private volatile List<Supplier<DiscoveryNode>> seedNodes;
private volatile boolean skipUnavailable;
private final ConnectHandler connectHandler;
private SetOnce<ClusterName> remoteClusterName = new SetOnce<>();
Expand All @@ -103,7 +104,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
* @param maxNumRemoteConnections the maximum number of connections to the remote cluster
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
*/
RemoteClusterConnection(Settings settings, String clusterAlias, List<DiscoveryNode> seedNodes,
RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate) {
super(settings);
this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
Expand Down Expand Up @@ -131,7 +132,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
/**
* Updates the list of seed nodes for this cluster connection
*/
synchronized void updateSeedNodes(List<DiscoveryNode> seedNodes, ActionListener<Void> connectListener) {
synchronized void updateSeedNodes(List<Supplier<DiscoveryNode>> seedNodes, ActionListener<Void> connectListener) {
this.seedNodes = Collections.unmodifiableList(new ArrayList<>(seedNodes));
connectHandler.connect(connectListener);
}
Expand Down Expand Up @@ -460,15 +461,15 @@ protected void doRun() {
});
}

void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
final TransportService transportService, ActionListener<Void> listener) {
if (Thread.currentThread().isInterrupted()) {
listener.onFailure(new InterruptedException("remote connect thread got interrupted"));
}
try {
if (seedNodes.hasNext()) {
cancellableThreads.executeIO(() -> {
final DiscoveryNode seedNode = seedNodes.next();
final DiscoveryNode seedNode = seedNodes.next().get();
final TransportService.HandshakeResponse handshakeResponse;
Transport.Connection connection = transportService.openConnection(seedNode,
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
Expand Down Expand Up @@ -558,11 +559,11 @@ private class SniffClusterStateResponseHandler implements TransportResponseHandl
private final TransportService transportService;
private final Transport.Connection connection;
private final ActionListener<Void> listener;
private final Iterator<DiscoveryNode> seedNodes;
private final Iterator<Supplier<DiscoveryNode>> seedNodes;
private final CancellableThreads cancellableThreads;

SniffClusterStateResponseHandler(TransportService transportService, Transport.Connection connection,
ActionListener<Void> listener, Iterator<DiscoveryNode> seedNodes,
ActionListener<Void> listener, Iterator<Supplier<DiscoveryNode>> seedNodes,
CancellableThreads cancellableThreads) {
this.transportService = transportService;
this.connection = connection;
Expand Down Expand Up @@ -694,7 +695,8 @@ public void handleResponse(NodesInfoResponse response) {
}
}
RemoteConnectionInfo remoteConnectionInfo = new RemoteConnectionInfo(clusterAlias,
seedNodes.stream().map(DiscoveryNode::getAddress).collect(Collectors.toList()), new ArrayList<>(httpAddresses),
seedNodes.stream().map(sup -> sup.get().getAddress()).collect(Collectors.toList()),
new ArrayList<>(httpAddresses),
maxNumRemoteConnections, connectedNodes.size(),
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings), skipUnavailable);
listener.onResponse(remoteConnectionInfo);
Expand All @@ -711,7 +713,6 @@ public String executor() {
}
});
}

}

int getNumNodesConnected() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport;

import java.util.function.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
Expand All @@ -41,7 +42,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -116,7 +116,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
* @param seeds a cluster alias to discovery node mapping representing the remote clusters seeds nodes
* @param connectionListener a listener invoked once every configured cluster has been connected to
*/
private synchronized void updateRemoteClusters(Map<String, List<DiscoveryNode>> seeds, ActionListener<Void> connectionListener) {
private synchronized void updateRemoteClusters(Map<String, List<Supplier<DiscoveryNode>>> seeds,
ActionListener<Void> connectionListener) {
if (seeds.containsKey(LOCAL_CLUSTER_GROUP_KEY)) {
throw new IllegalArgumentException("remote clusters must not have the empty string as its key");
}
Expand All @@ -126,7 +127,7 @@ private synchronized void updateRemoteClusters(Map<String, List<DiscoveryNode>>
} else {
CountDown countDown = new CountDown(seeds.size());
remoteClusters.putAll(this.remoteClusters);
for (Map.Entry<String, List<DiscoveryNode>> entry : seeds.entrySet()) {
for (Map.Entry<String, List<Supplier<DiscoveryNode>>> entry : seeds.entrySet()) {
RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey());
if (entry.getValue().isEmpty()) { // with no seed nodes we just remove the connection
try {
Expand Down Expand Up @@ -311,16 +312,17 @@ synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavail
}
}

protected void updateRemoteCluster(String clusterAlias, List<InetSocketAddress> addresses) {
@Override
protected void updateRemoteCluster(String clusterAlias, List<String> addresses) {
updateRemoteCluster(clusterAlias, addresses, ActionListener.wrap((x) -> {}, (x) -> {}));
}

void updateRemoteCluster(
final String clusterAlias,
final List<InetSocketAddress> addresses,
final List<String> addresses,
final ActionListener<Void> connectionListener) {
final List<DiscoveryNode> nodes = addresses.stream().map(address -> {
final TransportAddress transportAddress = new TransportAddress(address);
final List<Supplier<DiscoveryNode>> nodes = addresses.stream().<Supplier<DiscoveryNode>>map(address -> () -> {
final TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));
final String id = clusterAlias + "#" + transportAddress.toString();
final Version version = Version.CURRENT.minimumCompatibilityVersion();
return new DiscoveryNode(id, transportAddress, version);
Expand All @@ -335,7 +337,7 @@ void updateRemoteCluster(
void initializeRemoteClusters() {
final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
final PlainActionFuture<Void> future = new PlainActionFuture<>();
Map<String, List<DiscoveryNode>> seeds = RemoteClusterAware.buildRemoteClustersSeeds(settings);
Map<String, List<Supplier<DiscoveryNode>>> seeds = RemoteClusterAware.buildRemoteClustersSeeds(settings);
updateRemoteClusters(seeds, future);
try {
future.get(timeValue.millis(), TimeUnit.MILLISECONDS);
Expand Down
Loading