Skip to content

Commit

Permalink
Discovery: back port #7558 to 1.x and add bwc protections of the new …
Browse files Browse the repository at this point in the history
…ping on master gone introduced in #7493

The change in #7558 adds a flag to PingResponse. However, when unicast discovery is used,  this extra flag can not be serialized by the very initial pings as they do not know yet what node version they ping (i.e., they have to default to 1.0.0, which excludes changing the serialization format). This commit bypasses this problem by adding a dedicated action which only exist on nodes of version 1.4 or up. Nodes first try to ping this endpoint using 1.4.0 as a serialization version. If that fails they fall back to the pre 1.4.0 action. This is optimal if all nodes are on 1.4.0 or higher, with a small down side if the cluster has mixed versions - but this is a temporary state.

Further two bwc protections are added:
1) Disable the preference to nodes who previously joined the cluster if some of the pings are on version < 1.4.0
2) Disable the rejoin on master gone functionality if some nodes in the cluster or version < 1.4.0

Closes #7694
  • Loading branch information
bleskes committed Sep 16, 2014
1 parent d9ea628 commit e5de47d
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 79 deletions.
9 changes: 8 additions & 1 deletion src/main/java/org/elasticsearch/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,13 @@ public static Version smallest(Version version1, Version version2) {
return version1.id < version2.id ? version1 : version2;
}

/**
* Returns the largest version between the 2.
*/
public static Version largest(Version version1, Version version2) {
return version1.id > version2.id ? version1 : version2;
}

/**
* Returns the version given its string representation, current version if the argument is null or empty
*/
Expand Down Expand Up @@ -426,7 +433,7 @@ public static Version fromString(String version) {

return fromId(major + minor + revision + build);

} catch(NumberFormatException e) {
} catch (NumberFormatException e) {
throw new IllegalArgumentException("unable to parse version " + version, e);
}
}
Expand Down
22 changes: 20 additions & 2 deletions src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,16 @@ public ClusterState execute(ClusterState currentState) {
processNewClusterStates.drainTo(pendingNewClusterStates);
logger.trace("removed [{}] pending cluster states", pendingNewClusterStates.size());

if (rejoinOnMasterGone) {
boolean rejoin = true;
if (!rejoinOnMasterGone) {
logger.debug("not rejoining cluster due to rejoinOnMasterGone [{}]", rejoinOnMasterGone);
rejoin = false;
} else if (discoveryNodes.smallestNonClientNodeVersion().before(Version.V_1_4_0_Beta1)) {
logger.debug("not rejoining cluster due a minimum node version of [{}]", discoveryNodes.smallestNonClientNodeVersion());
rejoin = false;
}

if (rejoin) {
return rejoin(ClusterState.builder(currentState).nodes(discoveryNodes).build(), "master left (reason = " + reason + ")");
}

Expand Down Expand Up @@ -991,13 +1000,22 @@ private DiscoveryNode findMaster() {
joinedOnceActiveNodes.add(localNode);
}
}

Version minimumPingVersion = localNode.version();
for (ZenPing.PingResponse pingResponse : pingResponses) {
activeNodes.add(pingResponse.node());
if (pingResponse.hasJoinedOnce()) {
minimumPingVersion = Version.smallest(pingResponse.node().version(), minimumPingVersion);
if (pingResponse.hasJoinedOnce() != null && pingResponse.hasJoinedOnce()) {
assert pingResponse.node().getVersion().onOrAfter(Version.V_1_4_0_Beta1) : "ping version [" + pingResponse.node().version() + "]< 1.4.0 while having hasJoinedOnce == true";
joinedOnceActiveNodes.add(pingResponse.node());
}
}

if (minimumPingVersion.before(Version.V_1_4_0_Beta1)) {
logger.trace("ignoring joined once flags in ping responses, minimum ping version [{}]", minimumPingVersion);
joinedOnceActiveNodes.clear();
}

if (pingMasters.isEmpty()) {
if (electMaster.hasEnoughMasterNodes(activeNodes)) {
// we give preference to nodes who have previously already joined the cluster. Those will
Expand Down
30 changes: 18 additions & 12 deletions src/main/java/org/elasticsearch/discovery/zen/ping/ZenPing.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -58,7 +59,8 @@ public static class PingResponse implements Streamable {

private DiscoveryNode master;

private boolean hasJoinedOnce;
@Nullable
private Boolean hasJoinedOnce;

private PingResponse() {
}
Expand Down Expand Up @@ -90,8 +92,9 @@ public DiscoveryNode master() {
return master;
}

/** true if the joined has successfully joined the cluster before */
public boolean hasJoinedOnce() {
/** true if the node has successfully joined the cluster before, null for nodes with a <1.4.0 version */
@Nullable
public Boolean hasJoinedOnce() {
return hasJoinedOnce;
}

Expand All @@ -108,14 +111,12 @@ public void readFrom(StreamInput in) throws IOException {
if (in.readBoolean()) {
master = readNode(in);
}
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
this.hasJoinedOnce = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_1_4_0_Beta1)) {
if (in.readBoolean()) {
this.hasJoinedOnce = in.readBoolean();
}
} else {
// As of 1.4.0 we prefer to elect nodes which have previously successfully joined the cluster.
// Nodes before 1.4.0 do not take this into consideration. If pre<1.4.0 node elects it self as master
// based on the pings, we need to make sure we do the same. We therefore can not demote it
// and thus mark it as if it has previously joined.
this.hasJoinedOnce = true;
this.hasJoinedOnce = null;
}

}
Expand All @@ -130,8 +131,13 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(true);
master.writeTo(out);
}
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeBoolean(hasJoinedOnce);
if (out.getVersion().onOrAfter(Version.V_1_4_0_Beta1)) {
if (hasJoinedOnce != null) {
out.writeBoolean(true);
out.writeBoolean(hasJoinedOnce);
} else {
out.writeBoolean(false);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@

import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.*;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -62,6 +59,18 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen

public static final String ACTION_NAME = "internal:discovery/zen/unicast";


/**
* when pinging the initial configured target hosts, we do not know their version. We therefore use
* the lowest possible version (i.e., 1.0.0) for serializing information on the wire. As of 1.4, we needed to extend
* the information sent in a ping, to prefer nodes which have previously joined the cluster during master election.
* This information is only needed if all the cluster is on version 1.4 or up. To bypass this issue we introduce
* a second action name which is guaranteed to exist only on nodes from version 1.4.0 and up. Using this action,
* we can safely use 1.4.0 as a serialization format. If this fails with a {@link ActionNotFoundTransportException}
* we know we speak to a node with <1.4 version, and fall back to use {@link #ACTION_NAME}.
*/
public static final String ACTION_NAME_GTE_1_4 = "internal:discovery/zen/unicast_gte_1_4";

public static final int LIMIT_PORTS_COUNT = 1;

private final ThreadPool threadPool;
Expand Down Expand Up @@ -127,7 +136,9 @@ public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService
}
this.configuredTargetNodes = configuredTargetNodes.toArray(new DiscoveryNode[configuredTargetNodes.size()]);

transportService.registerHandler(ACTION_NAME, new UnicastPingRequestHandler());
UnicastPingRequestHandler unicastPingHanlder = new UnicastPingRequestHandler();
transportService.registerHandler(ACTION_NAME, unicastPingHanlder);
transportService.registerHandler(ACTION_NAME_GTE_1_4, unicastPingHanlder);
}

@Override
Expand Down Expand Up @@ -334,7 +345,12 @@ public void run() {
logger.trace("[{}] connected to {}", sendPingsHandler.id(), node);
if (receivedResponses.containsKey(sendPingsHandler.id())) {
// we are connected and still in progress, send the ping request
sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, finalNodeToSend);
if (nodeFoundByAddress) {
// we're good - we know what version to use for serialization
sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, finalNodeToSend);
} else {
sendPingRequestTo14NodeWithFallback(sendPingsHandler.id(), timeout, pingRequest, latch, node, finalNodeToSend);
}
} else {
// connect took too long, just log it and bail
latch.countDown();
Expand Down Expand Up @@ -366,6 +382,46 @@ public void run() {
}
}

/** See {@link #ACTION_NAME_GTE_1_4} for an explanation for why this needed */
private void sendPingRequestTo14NodeWithFallback(final int id, final TimeValue timeout, final UnicastPingRequest pingRequest, final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) {
logger.trace("[{}] sending to {}, using >=1.4.0 serialization", id, nodeToSend);
DiscoveryNode actualNodeToSend = new DiscoveryNode(nodeToSend.name(), nodeToSend.id(), nodeToSend.getHostName(), nodeToSend.getHostAddress(),
nodeToSend.address(), nodeToSend.attributes(), Version.largest(nodeToSend.version(), Version.V_1_4_0_Beta1));
transportService.sendRequest(actualNodeToSend, ACTION_NAME_GTE_1_4, pingRequest, TransportRequestOptions.options().withTimeout((long) (timeout.millis() * 1.25)), new BaseTransportResponseHandler<UnicastPingResponse>() {

@Override
public UnicastPingResponse newInstance() {
return new UnicastPingResponse();
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(UnicastPingResponse response) {
handlePingResponse(response, id, nodeToSend, latch);
}

@Override
public void handleException(TransportException exp) {
if (ExceptionsHelper.unwrapCause(exp) instanceof ActionNotFoundTransportException) {
logger.trace("failed to ping {}, falling back to <=1.4.0 ping action", node);
sendPingRequestToNode(id, timeout, pingRequest, latch, node, nodeToSend);
return;
}
latch.countDown();
if (exp instanceof ConnectTransportException) {
// ok, not connected...
logger.trace("failed to connect to {}", exp, nodeToSend);
} else {
logger.warn("failed to send ping to [{}]", exp, node);
}
}
});
}

private void sendPingRequestToNode(final int id, final TimeValue timeout, final UnicastPingRequest pingRequest, final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) {
logger.trace("[{}] sending to {}", id, nodeToSend);
transportService.sendRequest(nodeToSend, ACTION_NAME, pingRequest, TransportRequestOptions.options().withTimeout((long) (timeout.millis() * 1.25)), new BaseTransportResponseHandler<UnicastPingResponse>() {
Expand All @@ -382,40 +438,7 @@ public String executor() {

@Override
public void handleResponse(UnicastPingResponse response) {
logger.trace("[{}] received response from {}: {}", id, nodeToSend, Arrays.toString(response.pingResponses));
try {
DiscoveryNodes discoveryNodes = contextProvider.nodes();
for (PingResponse pingResponse : response.pingResponses) {
if (pingResponse.node().id().equals(discoveryNodes.localNodeId())) {
// that's us, ignore
continue;
}
if (!pingResponse.clusterName().equals(clusterName)) {
// not part of the cluster
logger.debug("[{}] filtering out response from {}, not same cluster_name [{}]", id, pingResponse.node(), pingResponse.clusterName().value());
continue;
}
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.get(response.id);
if (responses == null) {
logger.warn("received ping response {} with no matching id [{}]", pingResponse, response.id);
} else {
PingResponse existingResponse = responses.get(pingResponse.node());
if (existingResponse == null) {
responses.put(pingResponse.node(), pingResponse);
} else {
// try and merge the best ping response for it, i.e. if the new one
// doesn't have the master node set, and the existing one does, then
// the existing one is better, so we keep it
// if both have a master or both have none, we prefer the latest ping
if (existingResponse.master() == null || pingResponse.master() != null) {
responses.put(pingResponse.node(), pingResponse);
}
}
}
}
} finally {
latch.countDown();
}
handlePingResponse(response, id, nodeToSend, latch);
}

@Override
Expand All @@ -431,6 +454,43 @@ public void handleException(TransportException exp) {
});
}

private void handlePingResponse(UnicastPingResponse response, int id, DiscoveryNode nodeToSend, CountDownLatch latch) {
logger.trace("[{}] received response from {}: {}", id, nodeToSend, Arrays.toString(response.pingResponses));
try {
DiscoveryNodes discoveryNodes = contextProvider.nodes();
for (PingResponse pingResponse : response.pingResponses) {
if (pingResponse.node().id().equals(discoveryNodes.localNodeId())) {
// that's us, ignore
continue;
}
if (!pingResponse.clusterName().equals(clusterName)) {
// not part of the cluster
logger.debug("[{}] filtering out response from {}, not same cluster_name [{}]", id, pingResponse.node(), pingResponse.clusterName().value());
continue;
}
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.get(response.id);
if (responses == null) {
logger.warn("received ping response {} with no matching id [{}]", pingResponse, response.id);
} else {
PingResponse existingResponse = responses.get(pingResponse.node());
if (existingResponse == null) {
responses.put(pingResponse.node(), pingResponse);
} else {
// try and merge the best ping response for it, i.e. if the new one
// doesn't have the master node set, and the existing one does, then
// the existing one is better, so we keep it
// if both have a master or both have none, we prefer the latest ping
if (existingResponse.master() == null || pingResponse.master() != null) {
responses.put(pingResponse.node(), pingResponse);
}
}
}
}
} finally {
latch.countDown();
}
}

private UnicastPingResponse handlePingRequest(final UnicastPingRequest request) {
if (lifecycle.stoppedOrClosed()) {
throw new ElasticsearchIllegalStateException("received ping request while stopped/closed");
Expand Down
Loading

0 comments on commit e5de47d

Please sign in to comment.