Skip to content

Commit

Permalink
Enforce cluster UUIDs (elastic#37775)
Browse files Browse the repository at this point in the history
This commit adds join validation around cluster UUIDs, preventing a node to join a cluster if it was
previously part of another cluster. The commit introduces a new flag to the cluster state,
clusterUUIDCommitted, which denotes whether the node has locked into a cluster with the given
uuid. When a cluster is committed, this flag will turn to true, and subsequent cluster state updates
will keep the information about committal. Note that coordinating-only nodes are still free to switch
clusters at will (after restart), as they don't carry any persistent state.
  • Loading branch information
ywelsch authored Jan 29, 2019
1 parent 09a11a3 commit 3c9f703
Show file tree
Hide file tree
Showing 10 changed files with 261 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ public void handleCommit(ApplyCommitRequest applyCommit) {
logger.trace("handleCommit: applying commit request for term [{}] and version [{}]", applyCommit.getTerm(),
applyCommit.getVersion());

persistedState.markLastAcceptedConfigAsCommitted();
persistedState.markLastAcceptedStateAsCommitted();
assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration());
}

Expand Down Expand Up @@ -471,16 +471,32 @@ public interface PersistedState {
/**
* Marks the last accepted cluster state as committed.
* After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set,
* with the last committed configuration now corresponding to the last accepted configuration.
* with the last committed configuration now corresponding to the last accepted configuration, and the cluster uuid, if set,
* marked as committed.
*/
default void markLastAcceptedConfigAsCommitted() {
default void markLastAcceptedStateAsCommitted() {
final ClusterState lastAcceptedState = getLastAcceptedState();
MetaData.Builder metaDataBuilder = null;
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
final CoordinationMetaData coordinationMetaData = CoordinationMetaData.builder(lastAcceptedState.coordinationMetaData())
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
.build();
final MetaData metaData = MetaData.builder(lastAcceptedState.metaData()).coordinationMetaData(coordinationMetaData).build();
setLastAcceptedState(ClusterState.builder(lastAcceptedState).metaData(metaData).build());
metaDataBuilder = MetaData.builder(lastAcceptedState.metaData());
metaDataBuilder.coordinationMetaData(coordinationMetaData);
}
// if we receive a commit from a Zen1 master that has not recovered its state yet, the cluster uuid might not been known yet.
assert lastAcceptedState.metaData().clusterUUID().equals(MetaData.UNKNOWN_CLUSTER_UUID) == false ||
lastAcceptedState.term() == ZEN1_BWC_TERM :
"received cluster state with empty cluster uuid but not Zen1 BWC term: " + lastAcceptedState;
if (lastAcceptedState.metaData().clusterUUID().equals(MetaData.UNKNOWN_CLUSTER_UUID) == false &&
lastAcceptedState.metaData().clusterUUIDCommitted() == false) {
if (metaDataBuilder == null) {
metaDataBuilder = MetaData.builder(lastAcceptedState.metaData());
}
metaDataBuilder.clusterUUIDCommitted(true);
}
if (metaDataBuilder != null) {
setLastAcceptedState(ClusterState.builder(lastAcceptedState).metaData(metaDataBuilder).build());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
this.masterService = masterService;
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
this::getCurrentTerm, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators);
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators);
this.persistedStateSupplier = persistedStateSupplier;
this.discoverySettings = new DiscoverySettings(settings, clusterSettings);
this.lastKnownLeader = Optional.empty();
Expand Down Expand Up @@ -281,7 +281,18 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) {
+ lastKnownLeader + ", rejecting");
}

if (publishRequest.getAcceptedState().term() > coordinationState.get().getLastAcceptedState().term()) {
final ClusterState localState = coordinationState.get().getLastAcceptedState();

if (localState.metaData().clusterUUIDCommitted() &&
localState.metaData().clusterUUID().equals(publishRequest.getAcceptedState().metaData().clusterUUID()) == false) {
logger.warn("received cluster state from {} with a different cluster uuid {} than local cluster uuid {}, rejecting",
sourceNode, publishRequest.getAcceptedState().metaData().clusterUUID(), localState.metaData().clusterUUID());
throw new CoordinationStateRejectedException("received cluster state from " + sourceNode +
" with a different cluster uuid " + publishRequest.getAcceptedState().metaData().clusterUUID() +
" than local cluster uuid " + localState.metaData().clusterUUID() + ", rejecting");
}

if (publishRequest.getAcceptedState().term() > localState.term()) {
// only do join validation if we have not accepted state from this master yet
onJoinValidators.forEach(a -> a.accept(getLocalNode(), publishRequest.getAcceptedState()));
}
Expand Down Expand Up @@ -653,6 +664,7 @@ public void invariant() {
assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState();
assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState();
assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID);
assert applierState.nodes().getMasterNodeId() == null || applierState.metaData().clusterUUIDCommitted();
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse())
: preVoteCollector + " vs " + getPreVoteResponse();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

public class JoinHelper {

Expand All @@ -84,7 +85,7 @@ public class JoinHelper {
final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = ConcurrentCollections.newConcurrentSet();

public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
TransportService transportService, LongSupplier currentTermSupplier,
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators) {
this.masterService = masterService;
Expand Down Expand Up @@ -132,6 +133,13 @@ public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentSta
transportService.registerRequestHandler(VALIDATE_JOIN_ACTION_NAME,
MembershipAction.ValidateJoinRequest::new, ThreadPool.Names.GENERIC,
(request, channel, task) -> {
final ClusterState localState = currentStateSupplier.get();
if (localState.metaData().clusterUUIDCommitted() &&
localState.metaData().clusterUUID().equals(request.getState().metaData().clusterUUID()) == false) {
throw new CoordinationStateRejectedException("join validation on cluster state" +
" with a different cluster uuid " + request.getState().metaData().clusterUUID() +
" than local cluster uuid " + localState.metaData().clusterUUID() + ", rejecting");
}
joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), request.getState()));
channel.sendResponse(Empty.INSTANCE);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
private static final Logger logger = LogManager.getLogger(MetaData.class);

public static final String ALL = "_all";
public static final String UNKNOWN_CLUSTER_UUID = "_na_";

public enum XContentContext {
/* Custom metadata should be returns as part of API call */
Expand Down Expand Up @@ -159,6 +160,7 @@ public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, Clust
private static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);

private final String clusterUUID;
private final boolean clusterUUIDCommitted;
private final long version;

private final CoordinationMetaData coordinationMetaData;
Expand All @@ -179,12 +181,13 @@ public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, Clust

private final SortedMap<String, AliasOrIndex> aliasAndIndexLookup;

MetaData(String clusterUUID, long version, CoordinationMetaData coordinationMetaData,
MetaData(String clusterUUID, boolean clusterUUIDCommitted, long version, CoordinationMetaData coordinationMetaData,
Settings transientSettings, Settings persistentSettings,
ImmutableOpenMap<String, IndexMetaData> indices, ImmutableOpenMap<String, IndexTemplateMetaData> templates,
ImmutableOpenMap<String, Custom> customs, String[] allIndices, String[] allOpenIndices, String[] allClosedIndices,
SortedMap<String, AliasOrIndex> aliasAndIndexLookup) {
this.clusterUUID = clusterUUID;
this.clusterUUIDCommitted = clusterUUIDCommitted;
this.version = version;
this.coordinationMetaData = coordinationMetaData;
this.transientSettings = transientSettings;
Expand Down Expand Up @@ -218,6 +221,14 @@ public String clusterUUID() {
return this.clusterUUID;
}

/**
* Whether the current node with the given cluster state is locked into the cluster with the UUID returned by {@link #clusterUUID()},
* meaning that it will not accept any cluster state with a different clusterUUID.
*/
public boolean clusterUUIDCommitted() {
return this.clusterUUIDCommitted;
}

/**
* Returns the merged transient and persistent settings.
*/
Expand Down Expand Up @@ -757,6 +768,12 @@ public static boolean isGlobalStateEquals(MetaData metaData1, MetaData metaData2
if (!metaData1.templates.equals(metaData2.templates())) {
return false;
}
if (!metaData1.clusterUUID.equals(metaData2.clusterUUID)) {
return false;
}
if (metaData1.clusterUUIDCommitted != metaData2.clusterUUIDCommitted) {
return false;
}
// Check if any persistent metadata needs to be saved
int customCount1 = 0;
for (ObjectObjectCursor<String, Custom> cursor : metaData1.customs) {
Expand Down Expand Up @@ -798,6 +815,7 @@ private static class MetaDataDiff implements Diff<MetaData> {

private long version;
private String clusterUUID;
private boolean clusterUUIDCommitted;
private CoordinationMetaData coordinationMetaData;
private Settings transientSettings;
private Settings persistentSettings;
Expand All @@ -807,6 +825,7 @@ private static class MetaDataDiff implements Diff<MetaData> {

MetaDataDiff(MetaData before, MetaData after) {
clusterUUID = after.clusterUUID;
clusterUUIDCommitted = after.clusterUUIDCommitted;
version = after.version;
coordinationMetaData = after.coordinationMetaData;
transientSettings = after.transientSettings;
Expand All @@ -818,8 +837,11 @@ private static class MetaDataDiff implements Diff<MetaData> {

MetaDataDiff(StreamInput in) throws IOException {
clusterUUID = in.readString();
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
clusterUUIDCommitted = in.readBoolean();
}
version = in.readLong();
if (in.getVersion().onOrAfter(Version.V_7_0_0)) { //TODO revisit after Zen2 BWC is implemented
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
coordinationMetaData = new CoordinationMetaData(in);
} else {
coordinationMetaData = CoordinationMetaData.EMPTY_META_DATA;
Expand All @@ -836,6 +858,9 @@ private static class MetaDataDiff implements Diff<MetaData> {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(clusterUUID);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeBoolean(clusterUUIDCommitted);
}
out.writeLong(version);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
coordinationMetaData.writeTo(out);
Expand All @@ -851,6 +876,7 @@ public void writeTo(StreamOutput out) throws IOException {
public MetaData apply(MetaData part) {
Builder builder = builder();
builder.clusterUUID(clusterUUID);
builder.clusterUUIDCommitted(clusterUUIDCommitted);
builder.version(version);
builder.coordinationMetaData(coordinationMetaData);
builder.transientSettings(transientSettings);
Expand All @@ -866,6 +892,9 @@ public static MetaData readFrom(StreamInput in) throws IOException {
Builder builder = new Builder();
builder.version = in.readLong();
builder.clusterUUID = in.readString();
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
builder.clusterUUIDCommitted = in.readBoolean();
}
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
builder.coordinationMetaData(new CoordinationMetaData(in));
}
Expand All @@ -891,6 +920,9 @@ public static MetaData readFrom(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(version);
out.writeString(clusterUUID);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeBoolean(clusterUUIDCommitted);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
coordinationMetaData.writeTo(out);
}
Expand Down Expand Up @@ -930,6 +962,7 @@ public static Builder builder(MetaData metaData) {
public static class Builder {

private String clusterUUID;
private boolean clusterUUIDCommitted;
private long version;

private CoordinationMetaData coordinationMetaData = CoordinationMetaData.EMPTY_META_DATA;
Expand All @@ -941,7 +974,7 @@ public static class Builder {
private final ImmutableOpenMap.Builder<String, Custom> customs;

public Builder() {
clusterUUID = "_na_";
clusterUUID = UNKNOWN_CLUSTER_UUID;
indices = ImmutableOpenMap.builder();
templates = ImmutableOpenMap.builder();
customs = ImmutableOpenMap.builder();
Expand All @@ -950,6 +983,7 @@ public Builder() {

public Builder(MetaData metaData) {
this.clusterUUID = metaData.clusterUUID;
this.clusterUUIDCommitted = metaData.clusterUUIDCommitted;
this.coordinationMetaData = metaData.coordinationMetaData;
this.transientSettings = metaData.transientSettings;
this.persistentSettings = metaData.persistentSettings;
Expand Down Expand Up @@ -1125,8 +1159,13 @@ public Builder clusterUUID(String clusterUUID) {
return this;
}

public Builder clusterUUIDCommitted(boolean clusterUUIDCommitted) {
this.clusterUUIDCommitted = clusterUUIDCommitted;
return this;
}

public Builder generateClusterUuidIfNeeded() {
if (clusterUUID.equals("_na_")) {
if (clusterUUID.equals(UNKNOWN_CLUSTER_UUID)) {
clusterUUID = UUIDs.randomBase64UUID();
}
return this;
Expand Down Expand Up @@ -1182,8 +1221,9 @@ public MetaData build() {
String[] allOpenIndicesArray = allOpenIndices.toArray(new String[allOpenIndices.size()]);
String[] allClosedIndicesArray = allClosedIndices.toArray(new String[allClosedIndices.size()]);

return new MetaData(clusterUUID, version, coordinationMetaData, transientSettings, persistentSettings, indices.build(),
templates.build(), customs.build(), allIndicesArray, allOpenIndicesArray, allClosedIndicesArray, aliasAndIndexLookup);
return new MetaData(clusterUUID, clusterUUIDCommitted, version, coordinationMetaData, transientSettings, persistentSettings,
indices.build(), templates.build(), customs.build(), allIndicesArray, allOpenIndicesArray, allClosedIndicesArray,
aliasAndIndexLookup);
}

private SortedMap<String, AliasOrIndex> buildAliasAndIndexLookup() {
Expand Down Expand Up @@ -1226,6 +1266,7 @@ public static void toXContent(MetaData metaData, XContentBuilder builder, ToXCon

builder.field("version", metaData.version());
builder.field("cluster_uuid", metaData.clusterUUID);
builder.field("cluster_uuid_committed", metaData.clusterUUIDCommitted);

builder.startObject("cluster_coordination");
metaData.coordinationMetaData().toXContent(builder, params);
Expand Down Expand Up @@ -1324,6 +1365,8 @@ public static MetaData fromXContent(XContentParser parser) throws IOException {
builder.version = parser.longValue();
} else if ("cluster_uuid".equals(currentFieldName) || "uuid".equals(currentFieldName)) {
builder.clusterUUID = parser.text();
} else if ("cluster_uuid_committed".equals(currentFieldName)) {
builder.clusterUUIDCommitted = parser.booleanValue();
} else {
throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
Expand Down
Loading

0 comments on commit 3c9f703

Please sign in to comment.