Skip to content

Commit

Permalink
Dianna's updates: always provide a 'reason' msg for node ineligibilit…
Browse files Browse the repository at this point in the history
…y; make some methods public for out-of-package testing
  • Loading branch information
DiannaHohensee committed Mar 22, 2024
1 parent ff72df6 commit 0df37d7
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt
private final MasterServiceTaskQueue<NodeLeftExecutor.Task> nodeLeftQueue;
private final Supplier<CoordinationState.PersistedState> persistedStateSupplier;
private final NoMasterBlockService noMasterBlockService;
final Object mutex = new Object(); // package-private to allow tests to call methods that assert that the mutex is held
public final Object mutex = new Object(); // public to allow tests to call methods that assert that the mutex is held
private final SetOnce<CoordinationState> coordinationState = new SetOnce<>(); // initialized on start-up (see doStart)
private volatile ClusterState applierState; // the state that should be exposed to the cluster state applier

Expand Down Expand Up @@ -540,15 +540,11 @@ private void startElection() {
if (mode == Mode.CANDIDATE) {
final var nodeEligibility = localNodeMayWinElection(getLastAcceptedState(), electionStrategy);
if (nodeEligibility.mayWin() == false) {
if (nodeEligibility.reason() == null) {
logger.trace("skip election as local node may not win it: {}", getLastAcceptedState().coordinationMetadata());
} else {
logger.info(
"skip election as local node may not win it ({}): {}",
nodeEligibility.reason(),
getLastAcceptedState().coordinationMetadata()
);
}
logger.info(
"skip election as local node may not win it ({}): {}",
nodeEligibility.reason(),
getLastAcceptedState().coordinationMetadata()
);
return;
}

Expand Down Expand Up @@ -838,7 +834,7 @@ public String toString() {
}
}

void becomeCandidate(String method) {
public void becomeCandidate(String method) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
logger.debug(
"{}: coordinator becoming CANDIDATE in term {} (was {}, lastKnownLeader was [{}])",
Expand Down Expand Up @@ -1018,8 +1014,8 @@ long getCurrentTerm() {
}
}

// package-visible for testing
Mode getMode() {
// visible for testing
public Mode getMode() {
synchronized (mutex) {
return mode;
}
Expand Down Expand Up @@ -1273,8 +1269,12 @@ public boolean setInitialConfiguration(final VotingConfiguration votingConfigura
metadataBuilder.coordinationMetadata(coordinationMetadata);

coordinationState.get().setInitialState(ClusterState.builder(currentState).metadata(metadataBuilder).build());
assert localNodeMayWinElection(getLastAcceptedState(), electionStrategy).mayWin()
: "initial state does not allow local node to win election: " + getLastAcceptedState().coordinationMetadata();
var nodeEligibility = localNodeMayWinElection(getLastAcceptedState(), electionStrategy);
assert nodeEligibility.mayWin()
: "initial state does not allow local node to win election, reason: "
+ nodeEligibility.reason()
+ " , metadata: "
+ getLastAcceptedState().coordinationMetadata();
preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version
startElectionScheduler();
return true;
Expand Down Expand Up @@ -1759,18 +1759,11 @@ public void run() {
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
final var nodeEligibility = localNodeMayWinElection(lastAcceptedState, electionStrategy);
if (nodeEligibility.mayWin() == false) {
if (nodeEligibility.reason() == null) {
logger.trace(
"skip prevoting as local node may not win election: {}",
lastAcceptedState.coordinationMetadata()
);
} else {
logger.info(
"skip prevoting as local node may not win election ({}): {}",
nodeEligibility.reason(),
lastAcceptedState.coordinationMetadata()
);
}
logger.info(
"skip prevoting as local node may not win election ({}): {}",
nodeEligibility.reason(),
lastAcceptedState.coordinationMetadata()
);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
* Custom additional quorum restrictions can be defined by implementing the {@link #satisfiesAdditionalQuorumConstraints} method.
*/
public abstract class ElectionStrategy {

public static final ElectionStrategy DEFAULT_INSTANCE = new ElectionStrategy() {
@Override
protected boolean satisfiesAdditionalQuorumConstraints(
Expand All @@ -34,6 +33,11 @@ protected boolean satisfiesAdditionalQuorumConstraints(
}
};

/**
* Contains a result for whether a node may win an election and the reason if not.
*/
public record NodeEligibility(boolean mayWin, String reason) {}

/**
* Whether there is an election quorum from the point of view of the given local node under the provided voting configurations
*/
Expand Down Expand Up @@ -105,17 +109,17 @@ public void beforeCommit(long term, long version, ActionListener<Void> listener)
listener.onResponse(null);
}

// Whether a node may win elections
public record NodeEligibility(boolean mayWin, String reason) {}

public static final NodeEligibility NODE_MAY_WIN_ELECTION = new NodeEligibility(true, null);
public static final NodeEligibility NODE_MAY_NOT_WIN_ELECTION = new NodeEligibility(false, null);

public NodeEligibility nodeMayWinElection(ClusterState lastAcceptedState, DiscoveryNode node) {
final String nodeId = node.getId();
final boolean nodeMayWin = lastAcceptedState.getLastCommittedConfiguration().getNodeIds().contains(nodeId)
|| lastAcceptedState.getLastAcceptedConfiguration().getNodeIds().contains(nodeId)
|| lastAcceptedState.getVotingConfigExclusions().stream().noneMatch(vce -> vce.getNodeId().equals(nodeId));
return nodeMayWin ? NODE_MAY_WIN_ELECTION : NODE_MAY_NOT_WIN_ELECTION;
if (lastAcceptedState.getLastCommittedConfiguration().getNodeIds().contains(nodeId) == false) {
return new NodeEligibility(false, "node cannot win election, it is not part of the last committed cluster state");
}
if (lastAcceptedState.getLastAcceptedConfiguration().getNodeIds().contains(nodeId) == false) {
return new NodeEligibility(false, "node cannot win election, it is not part of the last accepted cluster state");
}
if (lastAcceptedState.getVotingConfigExclusions().stream().noneMatch(vce -> vce.getNodeId().equals(nodeId)) == false) {
return new NodeEligibility(false, "node cannot win election, it is excluded from voting");
}
return new NodeEligibility(true, "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ public class Cluster implements Releasable {
@Nullable // null means construct a list from all the current nodes
private List<TransportAddress> seedHostsList;

Cluster(int initialNodeCount) {
public Cluster(int initialNodeCount) {
this(initialNodeCount, true, Settings.EMPTY);
}

Expand Down Expand Up @@ -360,7 +360,7 @@ List<ClusterNode> addNodes(int newNodesCount) {
return addedNodes;
}

int size() {
public int size() {
return clusterNodes.size();
}

Expand Down Expand Up @@ -753,7 +753,7 @@ private void stabilise(long stabilisationDurationMillis, boolean expectIdleJoinV
}
}

void bootstrapIfNecessary() {
public void bootstrapIfNecessary() {
if (clusterNodes.stream().allMatch(ClusterNode::isNotUsefullyBootstrapped)) {
assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty());
assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty());
Expand All @@ -766,7 +766,7 @@ void bootstrapIfNecessary() {
}
}

void runFor(long runDurationMillis, String description) {
public void runFor(long runDurationMillis, String description) {
final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + runDurationMillis;
logger.info("--> runFor({}ms) running until [{}ms]: {}", runDurationMillis, endTime, description);

Expand Down Expand Up @@ -849,7 +849,7 @@ ClusterNode getAnyNode() {
return getAnyNodeExcept();
}

ClusterNode getAnyNodeExcept(ClusterNode... clusterNodesToExclude) {
public ClusterNode getAnyNodeExcept(ClusterNode... clusterNodesToExclude) {
List<ClusterNode> filteredNodes = getAllNodesExcept(clusterNodesToExclude);
assert filteredNodes.isEmpty() == false;
return randomFrom(filteredNodes);
Expand Down Expand Up @@ -946,7 +946,7 @@ public final class ClusterNode {
private static final Logger logger = LogManager.getLogger(ClusterNode.class);

private final int nodeIndex;
Coordinator coordinator;
public Coordinator coordinator;
private final DiscoveryNode localNode;
final CoordinationState.PersistedState persistedState;
final Settings nodeSettings;
Expand Down Expand Up @@ -1378,7 +1378,7 @@ public void onFailure(Exception e) {
});
}

AckCollector submitUpdateTask(
public AckCollector submitUpdateTask(
String source,
UnaryOperator<ClusterState> clusterStateUpdate,
CoordinatorTestClusterStateUpdateTask taskListener
Expand Down Expand Up @@ -1450,7 +1450,7 @@ void onDisconnectEventFrom(ClusterNode clusterNode) {
transportService.disconnectFromNode(clusterNode.localNode);
}

ClusterState getLastAppliedClusterState() {
public ClusterState getLastAppliedClusterState() {
return clusterApplierService.state();
}

Expand Down

0 comments on commit 0df37d7

Please sign in to comment.