Skip to content

Commit

Permalink
Use correct cluster state version for node fault detection (#30810)
Browse files Browse the repository at this point in the history
Since its introduction in ES 1.4, node fault detection has been using the wrong cluster state version to send
as part of the ping request, by using always the constant -1 (ClusterState.UNKNOWN_VERSION). This can, in an
unfortunate series of events, lead to a situation where a previous stale master can regain its authority and
revert the cluster to an older state.

This commit makes NodesFaultDetection use the correct current cluster state for sending ping requests, avoiding
the situation where a stale master possibly forces a newer master to step down and rejoin the stale one.
  • Loading branch information
ywelsch committed May 23, 2018
1 parent df5789e commit 34b1b0b
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Supplier;

import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;

Expand All @@ -66,13 +67,16 @@ public void onPingReceived(PingRequest pingRequest) {}

private final ConcurrentMap<DiscoveryNode, NodeFD> nodesFD = newConcurrentMap();

private volatile long clusterStateVersion = ClusterState.UNKNOWN_VERSION;
private final Supplier<ClusterState> clusterStateSupplier;

private volatile DiscoveryNode localNode;

public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService,
Supplier<ClusterState> clusterStateSupplier, ClusterName clusterName) {
super(settings, threadPool, transportService, clusterName);

this.clusterStateSupplier = clusterStateSupplier;

logger.debug("[node ] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout,
pingRetryCount);

Expand Down Expand Up @@ -208,15 +212,18 @@ private boolean running() {
return NodeFD.this.equals(nodesFD.get(node));
}

private PingRequest newPingRequest() {
return new PingRequest(node, clusterName, localNode, clusterStateSupplier.get().version());
}

@Override
public void run() {
if (!running()) {
return;
}
final PingRequest pingRequest = new PingRequest(node, clusterName, localNode, clusterStateVersion);
final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING)
.withTimeout(pingRetryTimeout).build();
transportService.sendRequest(node, PING_ACTION_NAME, pingRequest, options, new TransportResponseHandler<PingResponse>() {
transportService.sendRequest(node, PING_ACTION_NAME, newPingRequest(), options, new TransportResponseHandler<PingResponse>() {
@Override
public PingResponse newInstance() {
return new PingResponse();
Expand Down Expand Up @@ -254,7 +261,7 @@ public void handleException(TransportException exp) {
}
} else {
// resend the request, not reschedule, rely on send timeout
transportService.sendRequest(node, PING_ACTION_NAME, pingRequest, options, this);
transportService.sendRequest(node, PING_ACTION_NAME, newPingRequest(), options, this);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t

this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this::clusterState, masterService, clusterName);
this.masterFD.addListener(new MasterNodeFailureListener());
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName);
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, this::clusterState, clusterName);
this.nodesFD.addListener(new NodeFaultDetectionListener());
this.pendingStatesQueue = new PendingClusterStatesQueue(logger, MAX_PENDING_CLUSTER_STATES_SETTING.get(settings));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,19 @@ public void testNodesFaultDetectionConnectOnDisconnect() throws InterruptedExcep
final Settings pingSettings = Settings.builder()
.put(FaultDetection.CONNECT_ON_NETWORK_DISCONNECT_SETTING.getKey(), shouldRetry)
.put(FaultDetection.PING_INTERVAL_SETTING.getKey(), "5m").build();
ClusterState clusterState = ClusterState.builder(new ClusterName("test")).nodes(buildNodesForA(true)).build();
ClusterState clusterState = ClusterState.builder(new ClusterName("test")).version(randomNonNegativeLong())
.nodes(buildNodesForA(true)).build();
NodesFaultDetection nodesFDA = new NodesFaultDetection(Settings.builder().put(settingsA).put(pingSettings).build(),
threadPool, serviceA, clusterState.getClusterName());
threadPool, serviceA, () -> clusterState, clusterState.getClusterName());
nodesFDA.setLocalNode(nodeA);
NodesFaultDetection nodesFDB = new NodesFaultDetection(Settings.builder().put(settingsB).put(pingSettings).build(),
threadPool, serviceB, clusterState.getClusterName());
threadPool, serviceB, () -> clusterState, clusterState.getClusterName());
nodesFDB.setLocalNode(nodeB);
final CountDownLatch pingSent = new CountDownLatch(1);
nodesFDB.addListener(new NodesFaultDetection.Listener() {
@Override
public void onPingReceived(NodesFaultDetection.PingRequest pingRequest) {
assertThat(pingRequest.clusterStateVersion(), equalTo(clusterState.version()));
pingSent.countDown();
}
});
Expand Down

0 comments on commit 34b1b0b

Please sign in to comment.