Skip to content

Commit

Permalink
Upload global cluster state to remote store (#10404)
Browse files Browse the repository at this point in the history
* Upload global cluster state to remote store

Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
  • Loading branch information
dhwanilpatel authored Oct 18, 2023
1 parent 8276cbd commit 4632f4b
Show file tree
Hide file tree
Showing 9 changed files with 617 additions and 39 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Provide service accounts tokens to extensions ([#9618](https://github.com/opensearch-project/OpenSearch/pull/9618))
- [Admission control] Add enhancements to FS stats to include read/write time, queue size and IO time ([#10541](https://github.com/opensearch-project/OpenSearch/pull/10541))
- [Admission control] Add Resource usage collector service and resource usage tracker ([#9890](https://github.com/opensearch-project/OpenSearch/pull/9890))
- Change file names for remote cluster state ([#10557](https://github.com/opensearch-project/OpenSearch/pull/10557))
- [Remote cluster state] Change file names for remote cluster state ([#10557](https://github.com/opensearch-project/OpenSearch/pull/10557))
- [Remote cluster state] Upload global metadata in cluster state to remote store([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down Expand Up @@ -121,4 +122,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.12...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.12...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,18 @@
*/
public class ClusterMetadataManifest implements Writeable, ToXContentFragment {

public static final int CODEC_V0 = 0; // Older codec version, where we haven't introduced codec versions for manifest.
public static final int CODEC_V1 = 1; // In Codec V1 we have introduced global-metadata and codec version in Manifest file.

private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term");
private static final ParseField STATE_VERSION_FIELD = new ParseField("state_version");
private static final ParseField CLUSTER_UUID_FIELD = new ParseField("cluster_uuid");
private static final ParseField STATE_UUID_FIELD = new ParseField("state_uuid");
private static final ParseField OPENSEARCH_VERSION_FIELD = new ParseField("opensearch_version");
private static final ParseField NODE_ID_FIELD = new ParseField("node_id");
private static final ParseField COMMITTED_FIELD = new ParseField("committed");
private static final ParseField CODEC_VERSION_FIELD = new ParseField("codec_version");
private static final ParseField GLOBAL_METADATA_FIELD = new ParseField("global_metadata");
private static final ParseField INDICES_FIELD = new ParseField("indices");
private static final ParseField PREVIOUS_CLUSTER_UUID = new ParseField("previous_cluster_uuid");
private static final ParseField CLUSTER_UUID_COMMITTED = new ParseField("cluster_uuid_committed");
Expand Down Expand Up @@ -84,7 +89,33 @@ private static boolean clusterUUIDCommitted(Object[] fields) {
return (boolean) fields[9];
}

private static final ConstructingObjectParser<ClusterMetadataManifest, Void> PARSER = new ConstructingObjectParser<>(
private static int codecVersion(Object[] fields) {
return (int) fields[10];
}

private static String globalMetadataFileName(Object[] fields) {
return (String) fields[11];
}

private static final ConstructingObjectParser<ClusterMetadataManifest, Void> PARSER_V0 = new ConstructingObjectParser<>(
"cluster_metadata_manifest",
fields -> new ClusterMetadataManifest(
term(fields),
version(fields),
clusterUUID(fields),
stateUUID(fields),
opensearchVersion(fields),
nodeId(fields),
committed(fields),
CODEC_V0,
null,
indices(fields),
previousClusterUUID(fields),
clusterUUIDCommitted(fields)
)
);

private static final ConstructingObjectParser<ClusterMetadataManifest, Void> PARSER_V1 = new ConstructingObjectParser<>(
"cluster_metadata_manifest",
fields -> new ClusterMetadataManifest(
term(fields),
Expand All @@ -94,29 +125,45 @@ private static boolean clusterUUIDCommitted(Object[] fields) {
opensearchVersion(fields),
nodeId(fields),
committed(fields),
codecVersion(fields),
globalMetadataFileName(fields),
indices(fields),
previousClusterUUID(fields),
clusterUUIDCommitted(fields)
)
);

private static final ConstructingObjectParser<ClusterMetadataManifest, Void> CURRENT_PARSER = PARSER_V1;

static {
PARSER.declareLong(ConstructingObjectParser.constructorArg(), CLUSTER_TERM_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), STATE_VERSION_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), STATE_UUID_FIELD);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), OPENSEARCH_VERSION_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), NODE_ID_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), COMMITTED_FIELD);
PARSER.declareObjectArray(
declareParser(PARSER_V0, CODEC_V0);
declareParser(PARSER_V1, CODEC_V1);
}

private static void declareParser(ConstructingObjectParser<ClusterMetadataManifest, Void> parser, long codec_version) {
parser.declareLong(ConstructingObjectParser.constructorArg(), CLUSTER_TERM_FIELD);
parser.declareLong(ConstructingObjectParser.constructorArg(), STATE_VERSION_FIELD);
parser.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_FIELD);
parser.declareString(ConstructingObjectParser.constructorArg(), STATE_UUID_FIELD);
parser.declareInt(ConstructingObjectParser.constructorArg(), OPENSEARCH_VERSION_FIELD);
parser.declareString(ConstructingObjectParser.constructorArg(), NODE_ID_FIELD);
parser.declareBoolean(ConstructingObjectParser.constructorArg(), COMMITTED_FIELD);
parser.declareObjectArray(
ConstructingObjectParser.constructorArg(),
(p, c) -> UploadedIndexMetadata.fromXContent(p),
INDICES_FIELD
);
PARSER.declareString(ConstructingObjectParser.constructorArg(), PREVIOUS_CLUSTER_UUID);
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_COMMITTED);
parser.declareString(ConstructingObjectParser.constructorArg(), PREVIOUS_CLUSTER_UUID);
parser.declareBoolean(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_COMMITTED);

if (codec_version >= CODEC_V1) {
parser.declareInt(ConstructingObjectParser.constructorArg(), CODEC_VERSION_FIELD);
parser.declareString(ConstructingObjectParser.constructorArg(), GLOBAL_METADATA_FIELD);
}
}

private final int codecVersion;
private final String globalMetadataFileName;
private final List<UploadedIndexMetadata> indices;
private final long clusterTerm;
private final long stateVersion;
Expand Down Expand Up @@ -168,6 +215,14 @@ public boolean isClusterUUIDCommitted() {
return clusterUUIDCommitted;
}

public int getCodecVersion() {
return codecVersion;
}

public String getGlobalMetadataFileName() {
return globalMetadataFileName;
}

public ClusterMetadataManifest(
long clusterTerm,
long version,
Expand All @@ -176,6 +231,8 @@ public ClusterMetadataManifest(
Version opensearchVersion,
String nodeId,
boolean committed,
int codecVersion,
String globalMetadataFileName,
List<UploadedIndexMetadata> indices,
String previousClusterUUID,
boolean clusterUUIDCommitted
Expand All @@ -187,6 +244,8 @@ public ClusterMetadataManifest(
this.opensearchVersion = opensearchVersion;
this.nodeId = nodeId;
this.committed = committed;
this.codecVersion = codecVersion;
this.globalMetadataFileName = globalMetadataFileName;
this.indices = Collections.unmodifiableList(indices);
this.previousClusterUUID = previousClusterUUID;
this.clusterUUIDCommitted = clusterUUIDCommitted;
Expand All @@ -203,6 +262,13 @@ public ClusterMetadataManifest(StreamInput in) throws IOException {
this.indices = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new));
this.previousClusterUUID = in.readString();
this.clusterUUIDCommitted = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
this.codecVersion = in.readInt();
this.globalMetadataFileName = in.readString();
} else {
this.codecVersion = CODEC_V0; // Default codec
this.globalMetadataFileName = null;
}
}

public static Builder builder() {
Expand Down Expand Up @@ -231,6 +297,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.endArray();
builder.field(PREVIOUS_CLUSTER_UUID.getPreferredName(), getPreviousClusterUUID());
builder.field(CLUSTER_UUID_COMMITTED.getPreferredName(), isClusterUUIDCommitted());
if (onOrAfterCodecVersion(CODEC_V1)) {
builder.field(CODEC_VERSION_FIELD.getPreferredName(), getCodecVersion());
builder.field(GLOBAL_METADATA_FIELD.getPreferredName(), getGlobalMetadataFileName());
}
return builder;
}

Expand All @@ -246,6 +316,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(indices);
out.writeString(previousClusterUUID);
out.writeBoolean(clusterUUIDCommitted);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeInt(codecVersion);
out.writeString(globalMetadataFileName);
}
}

@Override
Expand All @@ -266,12 +340,16 @@ public boolean equals(Object o) {
&& Objects.equals(nodeId, that.nodeId)
&& Objects.equals(committed, that.committed)
&& Objects.equals(previousClusterUUID, that.previousClusterUUID)
&& Objects.equals(clusterUUIDCommitted, that.clusterUUIDCommitted);
&& Objects.equals(clusterUUIDCommitted, that.clusterUUIDCommitted)
&& Objects.equals(globalMetadataFileName, that.globalMetadataFileName)
&& Objects.equals(codecVersion, that.codecVersion);
}

@Override
public int hashCode() {
return Objects.hash(
codecVersion,
globalMetadataFileName,
indices,
clusterTerm,
stateVersion,
Expand All @@ -290,8 +368,16 @@ public String toString() {
return Strings.toString(MediaTypeRegistry.JSON, this);
}

public boolean onOrAfterCodecVersion(int codecVersion) {
return this.codecVersion >= codecVersion;
}

public static ClusterMetadataManifest fromXContentV0(XContentParser parser) throws IOException {
return PARSER_V0.parse(parser, null);
}

public static ClusterMetadataManifest fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
return CURRENT_PARSER.parse(parser, null);
}

/**
Expand All @@ -301,6 +387,8 @@ public static ClusterMetadataManifest fromXContent(XContentParser parser) throws
*/
public static class Builder {

private String globalMetadataFileName;
private int codecVersion;
private List<UploadedIndexMetadata> indices;
private long clusterTerm;
private long stateVersion;
Expand All @@ -317,6 +405,16 @@ public Builder indices(List<UploadedIndexMetadata> indices) {
return this;
}

public Builder codecVersion(int codecVersion) {
this.codecVersion = codecVersion;
return this;
}

public Builder globalMetadataFileName(String globalMetadataFileName) {
this.globalMetadataFileName = globalMetadataFileName;
return this;
}

public Builder clusterTerm(long clusterTerm) {
this.clusterTerm = clusterTerm;
return this;
Expand Down Expand Up @@ -378,6 +476,8 @@ public Builder(ClusterMetadataManifest manifest) {
this.opensearchVersion = manifest.opensearchVersion;
this.nodeId = manifest.nodeId;
this.committed = manifest.committed;
this.globalMetadataFileName = manifest.globalMetadataFileName;
this.codecVersion = manifest.codecVersion;
this.indices = new ArrayList<>(manifest.indices);
this.previousClusterUUID = manifest.previousClusterUUID;
this.clusterUUIDCommitted = manifest.clusterUUIDCommitted;
Expand All @@ -392,6 +492,8 @@ public ClusterMetadataManifest build() {
opensearchVersion,
nodeId,
committed,
codecVersion,
globalMetadataFileName,
indices,
previousClusterUUID,
clusterUUIDCommitted
Expand Down
Loading

0 comments on commit 4632f4b

Please sign in to comment.