Skip to content

Commit

Permalink
TransportService.connectToNode should validate remote node ID (#22828)
Browse files Browse the repository at this point in the history
 #22194 gave us the ability to open low level temporary connections to remote node based on their address. With this use case out of the way, actual full blown connections should validate the node on the other side, making sure we speak to who we think we speak to. This helps in case where multiple nodes are started on the same host and a quick node restart causes them to swap addresses, which in turn can cause confusion down the road.
  • Loading branch information
bleskes committed Feb 9, 2017
1 parent 43e2382 commit e173483
Show file tree
Hide file tree
Showing 28 changed files with 593 additions and 284 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
Expand All @@ -38,6 +39,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -46,6 +48,8 @@
import org.elasticsearch.transport.FutureTransportResponseHandler;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.transport.PlainTransportFuture;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
Expand Down Expand Up @@ -401,51 +405,37 @@ protected void doSample() {
HashSet<DiscoveryNode> newNodes = new HashSet<>();
HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();
for (DiscoveryNode listedNode : listedNodes) {
if (!transportService.nodeConnected(listedNode)) {
try {
// its a listed node, light connect to it...
logger.trace("connecting to listed node [{}]", listedNode);
transportService.connectToNode(listedNode, LISTED_NODES_PROFILE);
} catch (Exception e) {
logger.info(
(Supplier<?>)
() -> new ParameterizedMessage("failed to connect to node [{}], removed from nodes list", listedNode), e);
hostFailureListener.onNodeDisconnected(listedNode, e);
newFilteredNodes.add(listedNode);
continue;
}
}
try {
LivenessResponse livenessResponse = transportService.submitRequest(listedNode, TransportLivenessAction.NAME,
new LivenessRequest(),
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
new FutureTransportResponseHandler<LivenessResponse>() {
@Override
public LivenessResponse newInstance() {
return new LivenessResponse();
}
}).txGet();
try (Transport.Connection connection = transportService.openConnection(listedNode, LISTED_NODES_PROFILE)){
final PlainTransportFuture<LivenessResponse> handler = new PlainTransportFuture<>(
new FutureTransportResponseHandler<LivenessResponse>() {
@Override
public LivenessResponse newInstance() {
return new LivenessResponse();
}
});
transportService.sendRequest(connection, TransportLivenessAction.NAME, new LivenessRequest(),
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
handler);
final LivenessResponse livenessResponse = handler.txGet();
if (!ignoreClusterName && !clusterName.equals(livenessResponse.getClusterName())) {
logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName);
newFilteredNodes.add(listedNode);
} else if (livenessResponse.getDiscoveryNode() != null) {
} else {
// use discovered information but do keep the original transport address,
// so people can control which address is exactly used.
DiscoveryNode nodeWithInfo = livenessResponse.getDiscoveryNode();
newNodes.add(new DiscoveryNode(nodeWithInfo.getName(), nodeWithInfo.getId(), nodeWithInfo.getEphemeralId(),
nodeWithInfo.getHostName(), nodeWithInfo.getHostAddress(), listedNode.getAddress(),
nodeWithInfo.getAttributes(), nodeWithInfo.getRoles(), nodeWithInfo.getVersion()));
} else {
// although we asked for one node, our target may not have completed
// initialization yet and doesn't have cluster nodes
logger.debug("node {} didn't return any discovery info, temporarily using transport discovery node", listedNode);
newNodes.add(listedNode);
}
} catch (ConnectTransportException e) {
logger.debug(
(Supplier<?>)
() -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", listedNode), e);
hostFailureListener.onNodeDisconnected(listedNode, e);
} catch (Exception e) {
logger.info(
(Supplier<?>) () -> new ParameterizedMessage("failed to get node info for {}, disconnecting...", listedNode), e);
transportService.disconnectFromNode(listedNode);
hostFailureListener.onNodeDisconnected(listedNode, e);
}
}

Expand All @@ -470,78 +460,91 @@ protected void doSample() {

final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
final ConcurrentMap<DiscoveryNode, ClusterStateResponse> clusterStateResponses = ConcurrentCollections.newConcurrentMap();
for (final DiscoveryNode listedNode : nodesToPing) {
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() {
@Override
public void run() {
try {
if (!transportService.nodeConnected(listedNode)) {
try {
try {
for (final DiscoveryNode nodeToPing : nodesToPing) {
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {

/**
* we try to reuse existing connections but if needed we will open a temporary connection
* that will be closed at the end of the execution.
*/
Transport.Connection connectionToClose = null;

@Override
public void onAfter() {
IOUtils.closeWhileHandlingException(connectionToClose);
}

// if its one of the actual nodes we will talk to, not to listed nodes, fully connect
if (nodes.contains(listedNode)) {
logger.trace("connecting to cluster node [{}]", listedNode);
transportService.connectToNode(listedNode);
} else {
// its a listed node, light connect to it...
logger.trace("connecting to listed node (light) [{}]", listedNode);
transportService.connectToNode(listedNode, LISTED_NODES_PROFILE);
}
} catch (Exception e) {
logger.debug(
(Supplier<?>)
() -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", listedNode), e);
latch.countDown();
return;
@Override
public void onFailure(Exception e) {
latch.countDown();
if (e instanceof ConnectTransportException) {
logger.debug((Supplier<?>)
() -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", nodeToPing), e);
hostFailureListener.onNodeDisconnected(nodeToPing, e);
} else {
logger.info(
(Supplier<?>) () -> new ParameterizedMessage(
"failed to get local cluster state info for {}, disconnecting...", nodeToPing), e);
}
}

@Override
protected void doRun() throws Exception {
Transport.Connection pingConnection = null;
if (nodes.contains(nodeToPing)) {
try {
pingConnection = transportService.getConnection(nodeToPing);
} catch (NodeNotConnectedException e) {
// will use a temp connection
}
}
transportService.sendRequest(listedNode, ClusterStateAction.NAME,
Requests.clusterStateRequest().clear().nodes(true).local(true),
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE)
.withTimeout(pingTimeout).build(),
new TransportResponseHandler<ClusterStateResponse>() {

@Override
public ClusterStateResponse newInstance() {
return new ClusterStateResponse();
}
if (pingConnection == null) {
logger.trace("connecting to cluster node [{}]", nodeToPing);
connectionToClose = transportService.openConnection(nodeToPing, LISTED_NODES_PROFILE);
pingConnection = connectionToClose;
}
transportService.sendRequest(pingConnection, ClusterStateAction.NAME,
Requests.clusterStateRequest().clear().nodes(true).local(true),
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE)
.withTimeout(pingTimeout).build(),
new TransportResponseHandler<ClusterStateResponse>() {

@Override
public ClusterStateResponse newInstance() {
return new ClusterStateResponse();
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(ClusterStateResponse response) {
clusterStateResponses.put(listedNode, response);
latch.countDown();
}
@Override
public void handleResponse(ClusterStateResponse response) {
clusterStateResponses.put(nodeToPing, response);
latch.countDown();
}

@Override
public void handleException(TransportException e) {
logger.info(
(Supplier<?>) () -> new ParameterizedMessage(
"failed to get local cluster state for {}, disconnecting...", listedNode), e);
transportService.disconnectFromNode(listedNode);
@Override
public void handleException(TransportException e) {
logger.info(
(Supplier<?>) () -> new ParameterizedMessage(
"failed to get local cluster state for {}, disconnecting...", nodeToPing), e);
try {
hostFailureListener.onNodeDisconnected(nodeToPing, e);
}
finally {
latch.countDown();
hostFailureListener.onNodeDisconnected(listedNode, e);
}
});
} catch (Exception e) {
logger.info(
(Supplier<?>)() -> new ParameterizedMessage(
"failed to get local cluster state info for {}, disconnecting...", listedNode), e);
transportService.disconnectFromNode(listedNode);
latch.countDown();
hostFailureListener.onNodeDisconnected(listedNode, e);
}
});
}
}
});
}

try {
});
}
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,24 @@ public DiscoveryNode(String nodeName, String nodeId, String ephemeralId, String
/** Creates a DiscoveryNode representing the local node. */
public static DiscoveryNode createLocal(Settings settings, TransportAddress publishAddress, String nodeId) {
Map<String, String> attributes = new HashMap<>(Node.NODE_ATTRIBUTES.get(settings).getAsMap());
Set<DiscoveryNode.Role> roles = new HashSet<>();
Set<Role> roles = getRolesFromSettings(settings);

return new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), nodeId, publishAddress, attributes, roles, Version.CURRENT);
}

/** extract node roles from the given settings */
public static Set<Role> getRolesFromSettings(Settings settings) {
Set<Role> roles = new HashSet<>();
if (Node.NODE_INGEST_SETTING.get(settings)) {
roles.add(DiscoveryNode.Role.INGEST);
roles.add(Role.INGEST);
}
if (Node.NODE_MASTER_SETTING.get(settings)) {
roles.add(DiscoveryNode.Role.MASTER);
roles.add(Role.MASTER);
}
if (Node.NODE_DATA_SETTING.get(settings)) {
roles.add(DiscoveryNode.Role.DATA);
roles.add(Role.DATA);
}

return new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), nodeId, publishAddress, attributes, roles, Version.CURRENT);
return roles;
}

/**
Expand Down
30 changes: 30 additions & 0 deletions core/src/main/java/org/elasticsearch/common/CheckedBiConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common;

import java.util.function.BiConsumer;

/**
* A {@link BiConsumer}-like interface which allows throwing checked exceptions.
*/
@FunctionalInterface
public interface CheckedBiConsumer<T, U, E extends Exception> {
void accept(T t, U u) throws E;
}
Loading

0 comments on commit e173483

Please sign in to comment.