Skip to content

Commit

Permalink
Move ElectionStrategy from interface to class
Browse files Browse the repository at this point in the history
  • Loading branch information
ywelsch committed Jun 21, 2019
1 parent 6907d06 commit e284426
Show file tree
Hide file tree
Showing 11 changed files with 38 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,31 @@
import org.elasticsearch.cluster.node.DiscoveryNode;

/**
* Allows plugging in a custom election strategy. Note that, in order to guarantee safety of the system, custom election strategies should
* only be more restrictive than the behavior that's provided by {@link DefaultElectionStrategy}.
* Allows plugging in a custom election strategy, restricting the notion of an election quorum.
*/
public interface ElectionStrategy {
public class ElectionStrategy {

public static final ElectionStrategy DEFAULT_INSTANCE = new ElectionStrategy();

protected ElectionStrategy() {

}

/**
* Whether there is an election quorum from the point of view of the given local node under the provided voting configurations
*/
boolean isElectionQuorum(DiscoveryNode localNode, long localCurrentTerm, long localAcceptedTerm, long localAcceptedVersion,
VotingConfiguration lastCommittedConfiguration, VotingConfiguration lastAcceptedConfiguration,
VoteCollection joinVotes);

class DefaultElectionStrategy implements ElectionStrategy {

public static final ElectionStrategy INSTANCE = new DefaultElectionStrategy();
public final boolean isElectionQuorum(DiscoveryNode localNode, long localCurrentTerm, long localAcceptedTerm, long localAcceptedVersion,
VotingConfiguration lastCommittedConfiguration, VotingConfiguration lastAcceptedConfiguration,
VoteCollection joinVotes) {
return joinVotes.isQuorum(lastCommittedConfiguration) &&
joinVotes.isQuorum(lastAcceptedConfiguration) &&
isCustomElectionQuorum(localNode, localCurrentTerm, localAcceptedTerm, localAcceptedVersion, lastCommittedConfiguration,
lastAcceptedConfiguration, joinVotes);
}

@Override
public boolean isElectionQuorum(DiscoveryNode localNode, long localCurrentTerm, long localAcceptedTerm, long localAcceptedVersion,
VotingConfiguration lastCommittedConfiguration, VotingConfiguration lastAcceptedConfiguration,
VoteCollection joinVotes) {
return joinVotes.isQuorum(lastCommittedConfiguration) && joinVotes.isQuorum(lastAcceptedConfiguration);
}
protected boolean isCustomElectionQuorum(DiscoveryNode localNode, long localCurrentTerm, long localAcceptedTerm,
long localAcceptedVersion, VotingConfiguration lastCommittedConfiguration,
VotingConfiguration lastAcceptedConfiguration, VoteCollection joinVotes) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
hostProviders.put("settings", () -> new SettingsBasedSeedHostsProvider(settings, transportService));
hostProviders.put("file", () -> new FileBasedSeedHostsProvider(configFile));
final Map<String, ElectionStrategy> electionStrategies = new HashMap<>();
electionStrategies.put(DEFAULT_ELECTION_STRATEGY, ElectionStrategy.DefaultElectionStrategy.INSTANCE);
electionStrategies.put(DEFAULT_ELECTION_STRATEGY, ElectionStrategy.DEFAULT_INSTANCE);
for (DiscoveryPlugin plugin : plugins) {
plugin.getSeedHostProviders(transportService, networkService).forEach((key, value) -> {
if (hostProviders.put(key, value) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ default Map<String, Supplier<SeedHostsProvider>> getSeedHostProviders(TransportS
*/
default BiConsumer<DiscoveryNode,ClusterState> getJoinValidator() { return null; }

/**
* Allows plugging in election strategies (see {@link ElectionStrategy}) that define a customized notion of an election quorum.
*/
default Map<String, ElectionStrategy> getElectionStrategies() {
return Collections.emptyMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@

public class ClusterFormationFailureHelperTests extends ESTestCase {

private static final ElectionStrategy electionStrategy = ElectionStrategy.DefaultElectionStrategy.INSTANCE;
private static final ElectionStrategy electionStrategy = ElectionStrategy.DEFAULT_INSTANCE;

public void testScheduling() {
final long expectedDelayMillis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -766,12 +766,12 @@ public void testVoteCollection() {
public void testSafety() {
new CoordinationStateTestCluster(IntStream.range(0, randomIntBetween(1, 5))
.mapToObj(i -> new DiscoveryNode("node_" + i, buildNewFakeTransportAddress(), Version.CURRENT))
.collect(Collectors.toList()), ElectionStrategy.DefaultElectionStrategy.INSTANCE)
.collect(Collectors.toList()), ElectionStrategy.DEFAULT_INSTANCE)
.runRandomly();
}

public static CoordinationState createCoordinationState(PersistedState storage, DiscoveryNode localNode) {
return new CoordinationState(localNode, storage, ElectionStrategy.DefaultElectionStrategy.INSTANCE);
return new CoordinationState(localNode, storage, ElectionStrategy.DEFAULT_INSTANCE);
}

public static ClusterState clusterState(long term, long version, DiscoveryNode localNode, VotingConfiguration lastCommittedConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ transportService, writableRegistry(),
() -> new InMemoryPersistedState(term, initialState), r -> emptyList(),
new NoOpClusterApplier(),
Collections.emptyList(),
random, s -> {}, ElectionStrategy.DefaultElectionStrategy.INSTANCE);
random, s -> {}, ElectionStrategy.DEFAULT_INSTANCE);
transportService.start();
transportService.acceptIncomingRequests();
transport = capturingTransport;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public String toString() {
assert electionOccurred == false;
electionOccurred = true;
}, l -> {
}, ElectionStrategy.DefaultElectionStrategy.INSTANCE); // TODO need tests that check that the max term seen is updated
}, ElectionStrategy.DEFAULT_INSTANCE); // TODO need tests that check that the max term seen is updated
preVoteCollector.update(getLocalPreVoteResponse(), null);
}

Expand Down Expand Up @@ -234,7 +234,7 @@ public void testPrevotingIndicatesElectionSuccess() {
startAndRunCollector(votingNodes);

final CoordinationState coordinationState = new CoordinationState(localNode,
new InMemoryPersistedState(currentTerm, makeClusterState(votingNodes)), ElectionStrategy.DefaultElectionStrategy.INSTANCE);
new InMemoryPersistedState(currentTerm, makeClusterState(votingNodes)), ElectionStrategy.DEFAULT_INSTANCE);

final long newTerm = randomLongBetween(currentTerm + 1, Long.MAX_VALUE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class MockNode {
ClusterState initialState = CoordinationStateTests.clusterState(0L, 0L, localNode,
CoordinationMetaData.VotingConfiguration.EMPTY_CONFIG, CoordinationMetaData.VotingConfiguration.EMPTY_CONFIG, 0L);
coordinationState = new CoordinationState(localNode, new InMemoryPersistedState(0L, initialState),
ElectionStrategy.DefaultElectionStrategy.INSTANCE);
ElectionStrategy.DEFAULT_INSTANCE);
}

final DiscoveryNode localNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1247,7 +1247,7 @@ public void start(ClusterState initialState) {
hostsResolver -> testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode())
.map(n -> n.node.getAddress()).collect(Collectors.toList()),
clusterService.getClusterApplierService(), Collections.emptyList(), random(),
new RoutingService(clusterService, allocationService)::reroute, ElectionStrategy.DefaultElectionStrategy.INSTANCE);
new RoutingService(clusterService, allocationService)::reroute, ElectionStrategy.DEFAULT_INSTANCE);
masterService.setClusterStatePublisher(coordinator);
coordinator.start();
masterService.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
allocationService, masterService, this::getPersistedState,
Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), s -> {},
ElectionStrategy.DefaultElectionStrategy.INSTANCE);
ElectionStrategy.DEFAULT_INSTANCE);
masterService.setClusterStatePublisher(coordinator);
final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,
deterministicTaskQueue.getThreadPool(this::onNode), null, coordinator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,12 @@ public Settings additionalSettings() {
return Settings.builder().put(DiscoveryModule.ELECTION_STRATEGY_SETTING.getKey(), VOTING_ONLY_ELECTION_STRATEGY).build();
}

static class VotingOnlyNodeElectionStrategy extends ElectionStrategy.DefaultElectionStrategy {
static class VotingOnlyNodeElectionStrategy extends ElectionStrategy {

@Override
public boolean isElectionQuorum(DiscoveryNode localNode, long localCurrentTerm, long localAcceptedTerm, long localAcceptedVersion,
VotingConfiguration lastCommittedConfiguration, VotingConfiguration lastAcceptedConfiguration,
VoteCollection joinVotes) {
public boolean isCustomElectionQuorum(DiscoveryNode localNode, long localCurrentTerm, long localAcceptedTerm,
long localAcceptedVersion, VotingConfiguration lastCommittedConfiguration,
VotingConfiguration lastAcceptedConfiguration, VoteCollection joinVotes) {
// if local node is voting only, have additional checks on election quorum definition
if (isVotingOnlyNode(localNode)) {
// if all votes are from voting only nodes, do not elect as master (no need to transfer state)
Expand All @@ -177,9 +177,7 @@ public boolean isElectionQuorum(DiscoveryNode localNode, long localCurrentTerm,
return false;
}
}
// fall back to default election quorum definition
return super.isElectionQuorum(localNode, localCurrentTerm, localAcceptedTerm, localAcceptedVersion,
lastCommittedConfiguration, lastAcceptedConfiguration, joinVotes);
return true;
}
}

Expand Down

0 comments on commit e284426

Please sign in to comment.