Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TransportService.connectToNode should validate remote node ID #22828

Merged
merged 37 commits into from
Feb 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
7110121
Always handshake on Transport#connectToNode
bleskes Jan 25, 2017
b9ff025
better handling of timeout piping
bleskes Jan 25, 2017
3b2dda0
lint
bleskes Jan 25, 2017
1a9d05c
one more connection resolving
bleskes Jan 25, 2017
567ab19
better exception class
bleskes Jan 25, 2017
84e2c77
rewrote client connection handling. Fingers crossed.
bleskes Jan 25, 2017
2afc9d3
fix TaskManagerTestCase
bleskes Jan 25, 2017
248045e
fix TransportClientHeadersTests
bleskes Jan 26, 2017
af03e24
liveness response always contains local node - we block incoming requ…
bleskes Jan 26, 2017
8bb0fbc
fix testNodeConnectWithDifferentNodeId
bleskes Jan 26, 2017
35107e3
fix PublishClusterStateActionTests
bleskes Jan 26, 2017
627dcec
onResponseSent is not called on local node
bleskes Jan 26, 2017
8e40796
fix TransportActionProxyTests
bleskes Jan 26, 2017
51b2ea7
doh
bleskes Jan 26, 2017
2096317
concurrent connect on another thread
bleskes Jan 26, 2017
efd6ea4
tribes
bleskes Jan 26, 2017
7810c4f
don't send rejoin on CS thread (we connect to node)
bleskes Jan 26, 2017
a864ab6
Merge remote-tracking branch 'upstream/master' into transport_validat…
bleskes Jan 26, 2017
e7c1260
linting
bleskes Jan 26, 2017
48b7fd2
fix AbstractSimpleTransportTestCase
bleskes Jan 26, 2017
5bebb32
test for connection profile resolving
bleskes Jan 27, 2017
5a5f2ef
Fix new networking tests
bleskes Jan 27, 2017
23e1637
linting
bleskes Jan 27, 2017
c215478
fix Netty4ScheduledPingTests
bleskes Jan 27, 2017
26c17bc
better handling of connection closing in client
bleskes Jan 27, 2017
1415e48
minor feedback
bleskes Jan 27, 2017
61c99ac
Merge remote-tracking branch 'upstream/master' into transport_validat…
bleskes Feb 6, 2017
5f9b5ac
feedback
bleskes Feb 6, 2017
3c2ee90
Merge remote-tracking branch 'upstream/master' into transport_validat…
bleskes Feb 6, 2017
b74dbfb
fix CancellableTasksTests
bleskes Feb 6, 2017
402da3b
handshake response back to public
bleskes Feb 6, 2017
71ea969
add thread interrupt flag
bleskes Feb 7, 2017
24dd6b3
Merge remote-tracking branch 'upstream/master' into transport_validat…
bleskes Feb 7, 2017
401c332
feedback
bleskes Feb 7, 2017
533b245
Merge remote-tracking branch 'upstream/master' into transport_validat…
bleskes Feb 7, 2017
2cc9f66
assert busy in testAdapterSendReceiveCallbacks
bleskes Feb 7, 2017
6aabf6b
move connection close to inner try
bleskes Feb 7, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
Expand All @@ -38,6 +39,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -46,6 +48,8 @@
import org.elasticsearch.transport.FutureTransportResponseHandler;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.transport.PlainTransportFuture;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
Expand Down Expand Up @@ -401,51 +405,37 @@ protected void doSample() {
HashSet<DiscoveryNode> newNodes = new HashSet<>();
HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();
for (DiscoveryNode listedNode : listedNodes) {
if (!transportService.nodeConnected(listedNode)) {
try {
// its a listed node, light connect to it...
logger.trace("connecting to listed node [{}]", listedNode);
transportService.connectToNode(listedNode, LISTED_NODES_PROFILE);
} catch (Exception e) {
logger.info(
(Supplier<?>)
() -> new ParameterizedMessage("failed to connect to node [{}], removed from nodes list", listedNode), e);
hostFailureListener.onNodeDisconnected(listedNode, e);
newFilteredNodes.add(listedNode);
continue;
}
}
try {
LivenessResponse livenessResponse = transportService.submitRequest(listedNode, TransportLivenessAction.NAME,
new LivenessRequest(),
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
new FutureTransportResponseHandler<LivenessResponse>() {
@Override
public LivenessResponse newInstance() {
return new LivenessResponse();
}
}).txGet();
try (Transport.Connection connection = transportService.openConnection(listedNode, LISTED_NODES_PROFILE)){
final PlainTransportFuture<LivenessResponse> handler = new PlainTransportFuture<>(
new FutureTransportResponseHandler<LivenessResponse>() {
@Override
public LivenessResponse newInstance() {
return new LivenessResponse();
}
});
transportService.sendRequest(connection, TransportLivenessAction.NAME, new LivenessRequest(),
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
handler);
final LivenessResponse livenessResponse = handler.txGet();
if (!ignoreClusterName && !clusterName.equals(livenessResponse.getClusterName())) {
logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName);
newFilteredNodes.add(listedNode);
} else if (livenessResponse.getDiscoveryNode() != null) {
} else {
// use discovered information but do keep the original transport address,
// so people can control which address is exactly used.
DiscoveryNode nodeWithInfo = livenessResponse.getDiscoveryNode();
newNodes.add(new DiscoveryNode(nodeWithInfo.getName(), nodeWithInfo.getId(), nodeWithInfo.getEphemeralId(),
nodeWithInfo.getHostName(), nodeWithInfo.getHostAddress(), listedNode.getAddress(),
nodeWithInfo.getAttributes(), nodeWithInfo.getRoles(), nodeWithInfo.getVersion()));
} else {
// although we asked for one node, our target may not have completed
// initialization yet and doesn't have cluster nodes
logger.debug("node {} didn't return any discovery info, temporarily using transport discovery node", listedNode);
newNodes.add(listedNode);
}
} catch (ConnectTransportException e) {
logger.debug(
(Supplier<?>)
() -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", listedNode), e);
hostFailureListener.onNodeDisconnected(listedNode, e);
} catch (Exception e) {
logger.info(
(Supplier<?>) () -> new ParameterizedMessage("failed to get node info for {}, disconnecting...", listedNode), e);
transportService.disconnectFromNode(listedNode);
hostFailureListener.onNodeDisconnected(listedNode, e);
}
}

Expand All @@ -470,78 +460,91 @@ protected void doSample() {

final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
final ConcurrentMap<DiscoveryNode, ClusterStateResponse> clusterStateResponses = ConcurrentCollections.newConcurrentMap();
for (final DiscoveryNode listedNode : nodesToPing) {
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() {
@Override
public void run() {
try {
if (!transportService.nodeConnected(listedNode)) {
try {
try {
for (final DiscoveryNode nodeToPing : nodesToPing) {
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {

/**
* we try to reuse existing connections but if needed we will open a temporary connection
* that will be closed at the end of the execution.
*/
Transport.Connection connectionToClose = null;

@Override
public void onAfter() {
IOUtils.closeWhileHandlingException(connectionToClose);
}

// if its one of the actual nodes we will talk to, not to listed nodes, fully connect
if (nodes.contains(listedNode)) {
logger.trace("connecting to cluster node [{}]", listedNode);
transportService.connectToNode(listedNode);
} else {
// its a listed node, light connect to it...
logger.trace("connecting to listed node (light) [{}]", listedNode);
transportService.connectToNode(listedNode, LISTED_NODES_PROFILE);
}
} catch (Exception e) {
logger.debug(
(Supplier<?>)
() -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", listedNode), e);
latch.countDown();
return;
@Override
public void onFailure(Exception e) {
latch.countDown();
if (e instanceof ConnectTransportException) {
logger.debug((Supplier<?>)
() -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", nodeToPing), e);
hostFailureListener.onNodeDisconnected(nodeToPing, e);
} else {
logger.info(
(Supplier<?>) () -> new ParameterizedMessage(
"failed to get local cluster state info for {}, disconnecting...", nodeToPing), e);
}
}

@Override
protected void doRun() throws Exception {
Transport.Connection pingConnection = null;
if (nodes.contains(nodeToPing)) {
try {
pingConnection = transportService.getConnection(nodeToPing);
} catch (NodeNotConnectedException e) {
// will use a temp connection
}
}
transportService.sendRequest(listedNode, ClusterStateAction.NAME,
Requests.clusterStateRequest().clear().nodes(true).local(true),
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE)
.withTimeout(pingTimeout).build(),
new TransportResponseHandler<ClusterStateResponse>() {

@Override
public ClusterStateResponse newInstance() {
return new ClusterStateResponse();
}
if (pingConnection == null) {
logger.trace("connecting to cluster node [{}]", nodeToPing);
connectionToClose = transportService.openConnection(nodeToPing, LISTED_NODES_PROFILE);
pingConnection = connectionToClose;
}
transportService.sendRequest(pingConnection, ClusterStateAction.NAME,
Requests.clusterStateRequest().clear().nodes(true).local(true),
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE)
.withTimeout(pingTimeout).build(),
new TransportResponseHandler<ClusterStateResponse>() {

@Override
public ClusterStateResponse newInstance() {
return new ClusterStateResponse();
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(ClusterStateResponse response) {
clusterStateResponses.put(listedNode, response);
latch.countDown();
}
@Override
public void handleResponse(ClusterStateResponse response) {
clusterStateResponses.put(nodeToPing, response);
latch.countDown();
}

@Override
public void handleException(TransportException e) {
logger.info(
(Supplier<?>) () -> new ParameterizedMessage(
"failed to get local cluster state for {}, disconnecting...", listedNode), e);
transportService.disconnectFromNode(listedNode);
@Override
public void handleException(TransportException e) {
logger.info(
(Supplier<?>) () -> new ParameterizedMessage(
"failed to get local cluster state for {}, disconnecting...", nodeToPing), e);
try {
hostFailureListener.onNodeDisconnected(nodeToPing, e);
}
finally {
latch.countDown();
hostFailureListener.onNodeDisconnected(listedNode, e);
}
});
} catch (Exception e) {
logger.info(
(Supplier<?>)() -> new ParameterizedMessage(
"failed to get local cluster state info for {}, disconnecting...", listedNode), e);
transportService.disconnectFromNode(listedNode);
latch.countDown();
hostFailureListener.onNodeDisconnected(listedNode, e);
}
});
}
}
});
}

try {
});
}
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,18 +191,24 @@ public DiscoveryNode(String nodeName, String nodeId, String ephemeralId, String
/** Creates a DiscoveryNode representing the local node. */
public static DiscoveryNode createLocal(Settings settings, TransportAddress publishAddress, String nodeId) {
Map<String, String> attributes = new HashMap<>(Node.NODE_ATTRIBUTES.get(settings).getAsMap());
Set<DiscoveryNode.Role> roles = new HashSet<>();
Set<Role> roles = getRolesFromSettings(settings);

return new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), nodeId, publishAddress, attributes, roles, Version.CURRENT);
}

/** extract node roles from the given settings */
public static Set<Role> getRolesFromSettings(Settings settings) {
Set<Role> roles = new HashSet<>();
if (Node.NODE_INGEST_SETTING.get(settings)) {
roles.add(DiscoveryNode.Role.INGEST);
roles.add(Role.INGEST);
}
if (Node.NODE_MASTER_SETTING.get(settings)) {
roles.add(DiscoveryNode.Role.MASTER);
roles.add(Role.MASTER);
}
if (Node.NODE_DATA_SETTING.get(settings)) {
roles.add(DiscoveryNode.Role.DATA);
roles.add(Role.DATA);
}

return new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), nodeId, publishAddress, attributes, roles, Version.CURRENT);
return roles;
}

/**
Expand Down
30 changes: 30 additions & 0 deletions core/src/main/java/org/elasticsearch/common/CheckedBiConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common;

import java.util.function.BiConsumer;

/**
* A {@link BiConsumer}-like interface which allows throwing checked exceptions.
*/
@FunctionalInterface
public interface CheckedBiConsumer<T, U, E extends Exception> {
void accept(T t, U u) throws E;
}
Loading