Skip to content

Commit

Permalink
Use InputStreams rather than XContent for serialization
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
shiv0408 committed Jun 8, 2024
1 parent c83706a commit 830b002
Show file tree
Hide file tree
Showing 16 changed files with 1,090 additions and 424 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;

Expand Down Expand Up @@ -325,6 +326,21 @@ public static Diff<ClusterBlocks> readDiffFrom(StreamInput in) throws IOExceptio
return AbstractDiffable.readDiffFrom(ClusterBlocks::readFrom, in);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ClusterBlocks that = (ClusterBlocks) o;
return Objects.equals(global, that.global)
&& Objects.equals(indicesBlocks, that.indicesBlocks)
&& Objects.equals(levelHolders, that.levelHolders);
}

@Override
public int hashCode() {
return Objects.hash(global, indicesBlocks, levelHolders);
}

/**
* An immutable level holder.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,26 +42,29 @@ public class RemoteRoutingTableService extends AbstractLifecycleComponent {
private final Supplier<RepositoriesService> repositoriesService;
private BlobStoreRepository blobStoreRepository;

private static final DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = new DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable>() {
@Override
public void write(IndexRoutingTable value, StreamOutput out) throws IOException {
value.writeTo(out);
}

@Override
public IndexRoutingTable read(StreamInput in, String key) throws IOException {
return IndexRoutingTable.readFrom(in);
}
};
private static final DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER =
new DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable>() {
@Override
public void write(IndexRoutingTable value, StreamOutput out) throws IOException {
value.writeTo(out);
}

@Override
public IndexRoutingTable read(StreamInput in, String key) throws IOException {
return IndexRoutingTable.readFrom(in);
}
};

public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService, Settings settings) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
}


public static DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> getIndicesRoutingMapDiff(RoutingTable before, RoutingTable after) {
public static DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> getIndicesRoutingMapDiff(
RoutingTable before,
RoutingTable after
) {
return DiffableUtils.diff(
before.getIndicesRouting(),
after.getIndicesRouting(),
Expand All @@ -70,7 +73,6 @@ public static DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, Index
);
}


@Override
protected void doClose() throws IOException {
if (blobStoreRepository != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.opensearch.gateway.remote;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.core.ParseField;
import org.opensearch.core.common.Strings;
Expand Down Expand Up @@ -43,7 +41,8 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment {
public static final int CODEC_V0 = 0; // Older codec version, where we haven't introduced codec versions for manifest.
public static final int CODEC_V1 = 1; // In Codec V1 we have introduced global-metadata and codec version in Manifest file.
public static final int CODEC_V2 = 2; // In Codec V2, there are seperate metadata files rather than a single global metadata file.
public static final int CODEC_V3 = 3; // In Codec V3, we introduce index routing-metadata, diff and other attributes as part of manifest required for state publication
public static final int CODEC_V3 = 3; // In Codec V3, we introduce index routing-metadata, diff and other attributes as part of manifest
// required for state publication

private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term");
private static final ParseField STATE_VERSION_FIELD = new ParseField("state_version");
Expand All @@ -67,7 +66,9 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment {
private static final ParseField UPLOADED_TRANSIENT_SETTINGS_METADATA = new ParseField("uploaded_transient_settings_metadata");
private static final ParseField UPLOADED_DISCOVERY_NODES_METADATA = new ParseField("uploaded_discovery_nodes_metadata");
private static final ParseField UPLOADED_CLUSTER_BLOCKS_METADATA = new ParseField("uploaded_cluster_blocks_metadata");
private static final ParseField UPLOADED_HASHES_OF_CONSISTENT_SETTINGS_METADATA = new ParseField("uploaded_hashes_of_consistent_settings_metadata");
private static final ParseField UPLOADED_HASHES_OF_CONSISTENT_SETTINGS_METADATA = new ParseField(
"uploaded_hashes_of_consistent_settings_metadata"
);
private static final ParseField UPLOADED_CLUSTER_STATE_CUSTOM_METADATA = new ParseField("uploaded_cluster_state_custom_metadata");
private static final ParseField DIFF_MANIFEST = new ParseField("diff_manifest");

Expand Down Expand Up @@ -99,8 +100,7 @@ private static ClusterMetadataManifest.Builder manifestV2Builder(Object[] fields
}

private static ClusterMetadataManifest.Builder manifestV3Builder(Object[] fields) {
return manifestV2Builder(fields)
.routingTableVersion(routingTableVersion(fields))
return manifestV2Builder(fields).routingTableVersion(routingTableVersion(fields))
.indicesRouting(indicesRouting(fields))
.discoveryNodesMetadata(discoveryNodesMetadata(fields))
.clusterBlocksMetadata(clusterBlocksMetadata(fields))
Expand Down Expand Up @@ -318,7 +318,7 @@ private static void declareParser(ConstructingObjectParser<ClusterMetadataManife
UPLOADED_CLUSTER_STATE_CUSTOM_METADATA
);
parser.declareObject(
ConstructingObjectParser.constructorArg(),
ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> ClusterStateDiffManifest.fromXContent(p),
DIFF_MANIFEST
);
Expand Down Expand Up @@ -543,10 +543,26 @@ public ClusterMetadataManifest(StreamInput in) throws IOException {
this.routingTableVersion = in.readLong();
this.indicesRouting = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new));
this.metadataVersion = in.readLong();
this.uploadedDiscoveryNodesMetadata = new UploadedMetadataAttribute(in);
this.uploadedClusterBlocksMetadata = new UploadedMetadataAttribute(in);
this.uploadedTransientSettingsMetadata = new UploadedMetadataAttribute(in);
this.uploadedHashesOfConsistentSettings = new UploadedMetadataAttribute(in);
if (in.readBoolean()) {
this.uploadedDiscoveryNodesMetadata = new UploadedMetadataAttribute(in);
} else {
this.uploadedDiscoveryNodesMetadata = null;
}
if (in.readBoolean()) {
this.uploadedClusterBlocksMetadata = new UploadedMetadataAttribute(in);
} else {
this.uploadedClusterBlocksMetadata = null;
}
if (in.readBoolean()) {
this.uploadedTransientSettingsMetadata = new UploadedMetadataAttribute(in);
} else {
this.uploadedTransientSettingsMetadata = null;
}
if (in.readBoolean()) {
this.uploadedHashesOfConsistentSettings = new UploadedMetadataAttribute(in);
} else {
this.uploadedHashesOfConsistentSettings = null;
}
this.uploadedClusterStateCustomMap = Collections.unmodifiableMap(
in.readMap(StreamInput::readString, UploadedMetadataAttribute::new)
);
Expand All @@ -572,6 +588,7 @@ public ClusterMetadataManifest(StreamInput in) throws IOException {
this.uploadedTransientSettingsMetadata = null;
this.uploadedHashesOfConsistentSettings = null;
this.uploadedClusterStateCustomMap = null;
}
}

public static Builder builder() {
Expand Down Expand Up @@ -695,10 +712,30 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(routingTableVersion);
out.writeCollection(indicesRouting);
out.writeLong(metadataVersion);
uploadedDiscoveryNodesMetadata.writeTo(out);
uploadedClusterBlocksMetadata.writeTo(out);
uploadedTransientSettingsMetadata.writeTo(out);
uploadedHashesOfConsistentSettings.writeTo(out);
if (uploadedDiscoveryNodesMetadata != null) {
out.writeBoolean(true);
uploadedDiscoveryNodesMetadata.writeTo(out);
} else {
out.writeBoolean(false);
}
if (uploadedClusterBlocksMetadata != null) {
out.writeBoolean(true);
uploadedClusterBlocksMetadata.writeTo(out);
} else {
out.writeBoolean(false);
}
if (uploadedTransientSettingsMetadata != null) {
out.writeBoolean(true);
uploadedTransientSettingsMetadata.writeTo(out);
} else {
out.writeBoolean(false);
}
if (uploadedHashesOfConsistentSettings != null) {
out.writeBoolean(true);
uploadedHashesOfConsistentSettings.writeTo(out);
} else {
out.writeBoolean(false);
}
out.writeMap(uploadedClusterStateCustomMap, StreamOutput::writeString, (o, v) -> v.writeTo(o));
} else if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
out.writeInt(codecVersion);
Expand All @@ -716,30 +753,31 @@ public boolean equals(Object o) {
return false;
}
final ClusterMetadataManifest that = (ClusterMetadataManifest) o;
return Objects.equals(indices, that.indices)
&& clusterTerm == that.clusterTerm
&& stateVersion == that.stateVersion
&& Objects.equals(clusterUUID, that.clusterUUID)
&& Objects.equals(stateUUID, that.stateUUID)
&& Objects.equals(opensearchVersion, that.opensearchVersion)
&& Objects.equals(nodeId, that.nodeId)
&& Objects.equals(committed, that.committed)
&& Objects.equals(previousClusterUUID, that.previousClusterUUID)
&& Objects.equals(clusterUUIDCommitted, that.clusterUUIDCommitted)
&& Objects.equals(globalMetadataFileName, that.globalMetadataFileName)
&& Objects.equals(codecVersion, that.codecVersion)
&& Objects.equals(routingTableVersion, that.routingTableVersion)
&& Objects.equals(indicesRouting, that.indicesRouting)
&& Objects.equals(uploadedCoordinationMetadata, that.uploadedCoordinationMetadata)
&& Objects.equals(uploadedSettingsMetadata, that.uploadedSettingsMetadata)
&& Objects.equals(uploadedTemplatesMetadata, that.uploadedTemplatesMetadata)
&& Objects.equals(uploadedCustomMetadataMap, that.uploadedCustomMetadataMap)
&& Objects.equals(metadataVersion, that.metadataVersion)
&& Objects.equals(uploadedDiscoveryNodesMetadata, that.uploadedDiscoveryNodesMetadata)
&& Objects.equals(uploadedClusterBlocksMetadata, that.uploadedClusterBlocksMetadata)
&& Objects.equals(uploadedTransientSettingsMetadata, that.uploadedTransientSettingsMetadata)
&& Objects.equals(uploadedHashesOfConsistentSettings, that.uploadedHashesOfConsistentSettings)
&& Objects.equals(uploadedClusterStateCustomMap, that.uploadedClusterStateCustomMap);
boolean ret = Objects.equals(indices, that.indices);
ret = ret && clusterTerm == that.clusterTerm;
ret = ret && stateVersion == that.stateVersion;
ret = ret && Objects.equals(clusterUUID, that.clusterUUID);
ret = ret && Objects.equals(stateUUID, that.stateUUID);
ret = ret && Objects.equals(opensearchVersion, that.opensearchVersion);
ret = ret && Objects.equals(nodeId, that.nodeId);
ret = ret && Objects.equals(committed, that.committed);
ret = ret && Objects.equals(previousClusterUUID, that.previousClusterUUID);
ret = ret && Objects.equals(clusterUUIDCommitted, that.clusterUUIDCommitted);
ret = ret && Objects.equals(globalMetadataFileName, that.globalMetadataFileName);
ret = ret && Objects.equals(codecVersion, that.codecVersion);
ret = ret && Objects.equals(routingTableVersion, that.routingTableVersion);
ret = ret && Objects.equals(indicesRouting, that.indicesRouting);
ret = ret && Objects.equals(uploadedCoordinationMetadata, that.uploadedCoordinationMetadata);
ret = ret && Objects.equals(uploadedSettingsMetadata, that.uploadedSettingsMetadata);
ret = ret && Objects.equals(uploadedTemplatesMetadata, that.uploadedTemplatesMetadata);
ret = ret && Objects.equals(uploadedCustomMetadataMap, that.uploadedCustomMetadataMap);
ret = ret && Objects.equals(metadataVersion, that.metadataVersion);
ret = ret && Objects.equals(uploadedDiscoveryNodesMetadata, that.uploadedDiscoveryNodesMetadata);
ret = ret && Objects.equals(uploadedClusterBlocksMetadata, that.uploadedClusterBlocksMetadata);
ret = ret && Objects.equals(uploadedTransientSettingsMetadata, that.uploadedTransientSettingsMetadata);
ret = ret && Objects.equals(uploadedHashesOfConsistentSettings, that.uploadedHashesOfConsistentSettings);
ret = ret && Objects.equals(uploadedClusterStateCustomMap, that.uploadedClusterStateCustomMap);
return ret;
}

@Override
Expand Down Expand Up @@ -1233,6 +1271,19 @@ public static UploadedMetadataAttribute fromXContent(XContentParser parser) thro
return PARSER.parse(parser, null, parser.currentName());
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
UploadedMetadataAttribute that = (UploadedMetadataAttribute) o;
return Objects.equals(attributeName, that.attributeName) && Objects.equals(uploadedFilename, that.uploadedFilename);
}

@Override
public int hashCode() {
return Objects.hash(attributeName, uploadedFilename);
}

@Override
public String toString() {
return "UploadedMetadataAttribute{"
Expand Down
Loading

0 comments on commit 830b002

Please sign in to comment.