Skip to content

Commit

Permalink
Use InputStreams rather than XContent for serialization
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
shiv0408 committed Jun 7, 2024
1 parent 661b44a commit 9d73439
Show file tree
Hide file tree
Showing 16 changed files with 996 additions and 364 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;

Expand Down Expand Up @@ -325,6 +326,21 @@ public static Diff<ClusterBlocks> readDiffFrom(StreamInput in) throws IOExceptio
return AbstractDiffable.readDiffFrom(ClusterBlocks::readFrom, in);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ClusterBlocks that = (ClusterBlocks) o;
return Objects.equals(global, that.global)
&& Objects.equals(indicesBlocks, that.indicesBlocks)
&& Objects.equals(levelHolders, that.levelHolders);
}

@Override
public int hashCode() {
return Objects.hash(global, indicesBlocks, levelHolders);
}

/**
* An immutable level holder.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,26 +42,29 @@ public class RemoteRoutingTableService extends AbstractLifecycleComponent {
private final Supplier<RepositoriesService> repositoriesService;
private BlobStoreRepository blobStoreRepository;

private static final DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = new DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable>() {
@Override
public void write(IndexRoutingTable value, StreamOutput out) throws IOException {
value.writeTo(out);
}

@Override
public IndexRoutingTable read(StreamInput in, String key) throws IOException {
return IndexRoutingTable.readFrom(in);
}
};
private static final DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER =
new DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable>() {
@Override
public void write(IndexRoutingTable value, StreamOutput out) throws IOException {
value.writeTo(out);
}

@Override
public IndexRoutingTable read(StreamInput in, String key) throws IOException {
return IndexRoutingTable.readFrom(in);
}
};

public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService, Settings settings) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
}


public static DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> getIndicesRoutingMapDiff(RoutingTable before, RoutingTable after) {
public static DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> getIndicesRoutingMapDiff(
RoutingTable before,
RoutingTable after
) {
return DiffableUtils.diff(
before.getIndicesRouting(),
after.getIndicesRouting(),
Expand All @@ -70,7 +73,6 @@ public static DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, Index
);
}


@Override
protected void doClose() throws IOException {
if (blobStoreRepository != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.opensearch.gateway.remote;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.core.ParseField;
import org.opensearch.core.common.Strings;
Expand Down Expand Up @@ -67,7 +65,9 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment {
private static final ParseField UPLOADED_TRANSIENT_SETTINGS_METADATA = new ParseField("uploaded_transient_settings_metadata");
private static final ParseField UPLOADED_DISCOVERY_NODES_METADATA = new ParseField("uploaded_discovery_nodes_metadata");
private static final ParseField UPLOADED_CLUSTER_BLOCKS_METADATA = new ParseField("uploaded_cluster_blocks_metadata");
private static final ParseField UPLOADED_HASHES_OF_CONSISTENT_SETTINGS_METADATA = new ParseField("uploaded_hashes_of_consistent_settings_metadata");
private static final ParseField UPLOADED_HASHES_OF_CONSISTENT_SETTINGS_METADATA = new ParseField(
"uploaded_hashes_of_consistent_settings_metadata"
);
private static final ParseField UPLOADED_CLUSTER_STATE_CUSTOM_METADATA = new ParseField("uploaded_cluster_state_custom_metadata");
private static final ParseField DIFF_MANIFEST = new ParseField("diff_manifest");

Expand Down Expand Up @@ -99,8 +99,7 @@ private static ClusterMetadataManifest.Builder manifestV2Builder(Object[] fields
}

private static ClusterMetadataManifest.Builder manifestV3Builder(Object[] fields) {
return manifestV2Builder(fields)
.routingTableVersion(routingTableVersion(fields))
return manifestV2Builder(fields).routingTableVersion(routingTableVersion(fields))
.indicesRouting(indicesRouting(fields))
.discoveryNodesMetadata(discoveryNodesMetadata(fields))
.clusterBlocksMetadata(clusterBlocksMetadata(fields))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.core.xcontent.XContentParserUtils.parseStringList;

public class ClusterStateDiffManifest implements ToXContentObject {
private static final String FROM_STATE_UUID_FIELD = "from_state_uuid";
Expand Down Expand Up @@ -80,7 +78,8 @@ public class ClusterStateDiffManifest implements ToXContentObject {
discoveryNodesUpdated = state.nodes().delta(previousState.nodes()).hasChanges();
customMetadataUpdated = new ArrayList<>();
for (String custom : state.metadata().customs().keySet()) {
if (!previousState.metadata().customs().containsKey(custom) || !state.metadata().customs().get(custom).equals(previousState.metadata().customs().get(custom))) {
if (!previousState.metadata().customs().containsKey(custom)
|| !state.metadata().customs().get(custom).equals(previousState.metadata().customs().get(custom))) {
customMetadataUpdated.add(custom);
}
}
Expand All @@ -91,14 +90,16 @@ public class ClusterStateDiffManifest implements ToXContentObject {
}
}

DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> routingTableDiff = RemoteRoutingTableService.getIndicesRoutingMapDiff(previousState.getRoutingTable(),
state.getRoutingTable());
DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> routingTableDiff = RemoteRoutingTableService
.getIndicesRoutingMapDiff(previousState.getRoutingTable(), state.getRoutingTable());

indicesRoutingUpdated = new ArrayList<>();
routingTableDiff.getUpserts().forEach((k,v) -> indicesRoutingUpdated.add(k));
routingTableDiff.getUpserts().forEach((k, v) -> indicesRoutingUpdated.add(k));

indicesRoutingDeleted = routingTableDiff.getDeletes();
hashesOfConsistentSettingsUpdated = !state.metadata().hashesOfConsistentSettings().equals(previousState.metadata().hashesOfConsistentSettings());
hashesOfConsistentSettingsUpdated = !state.metadata()
.hashesOfConsistentSettings()
.equals(previousState.metadata().hashesOfConsistentSettings());
clusterStateCustomUpdated = new ArrayList<>();
clusterStateCustomDeleted = new ArrayList<>();
for (String custom : state.customs().keySet()) {
Expand Down Expand Up @@ -126,8 +127,8 @@ public ClusterStateDiffManifest(
List<String> indicesDeleted,
boolean clusterBlocksUpdated,
boolean discoveryNodesUpdated,
List<String>indicesRoutingUpdated,
List<String>indicesRoutingDeleted,
List<String> indicesRoutingUpdated,
List<String> indicesRoutingDeleted,
boolean hashesOfConsistentSettingsUpdated,
List<String> clusterStateCustomUpdated,
List<String> clusterStateCustomDeleted
Expand Down Expand Up @@ -267,10 +268,10 @@ public static ClusterStateDiffManifest fromXContent(XContentParser parser) throw
token = parser.nextToken();
switch (currentFieldName) {
case UPSERTS_FIELD:
builder.indicesUpdated(parseStringList(parser));
builder.indicesUpdated(convertListToString(parser.listOrderedMap()));
break;
case DELETES_FIELD:
builder.indicesDeleted(parseStringList(parser));
builder.indicesDeleted(convertListToString(parser.listOrderedMap()));
break;
default:
throw new XContentParseException("Unexpected field [" + currentFieldName + "]");
Expand All @@ -282,10 +283,10 @@ public static ClusterStateDiffManifest fromXContent(XContentParser parser) throw
token = parser.nextToken();
switch (currentFieldName) {
case UPSERTS_FIELD:
builder.customMetadataUpdated(parseStringList(parser));
builder.customMetadataUpdated(convertListToString(parser.listOrderedMap()));
break;
case DELETES_FIELD:
builder.customMetadataDeleted(parseStringList(parser));
builder.customMetadataDeleted(convertListToString(parser.listOrderedMap()));
break;
default:
throw new XContentParseException("Unexpected field [" + currentFieldName + "]");
Expand All @@ -304,10 +305,10 @@ public static ClusterStateDiffManifest fromXContent(XContentParser parser) throw
parser.nextToken();
switch (currentFieldName) {
case UPSERTS_FIELD:
builder.indicesRoutingUpdated(parseStringList(parser));
builder.indicesRoutingUpdated(convertListToString(parser.listOrderedMap()));
break;
case DELETES_FIELD:
builder.indicesRoutingDeleted(parseStringList(parser));
builder.indicesRoutingDeleted(convertListToString(parser.listOrderedMap()));
break;
default:
throw new XContentParseException("Unexpected field [" + currentFieldName + "]");
Expand All @@ -319,10 +320,10 @@ public static ClusterStateDiffManifest fromXContent(XContentParser parser) throw
parser.nextToken();
switch (currentFieldName) {
case UPSERTS_FIELD:
builder.clusterStateCustomUpdated(parseStringList(parser));
builder.clusterStateCustomUpdated(convertListToString(parser.listOrderedMap()));
break;
case DELETES_FIELD:
builder.clusterStateCustomDeleted(parseStringList(parser));
builder.clusterStateCustomDeleted(convertListToString(parser.listOrderedMap()));
break;
default:
throw new XContentParseException("Unexpected field [" + currentFieldName + "]");
Expand Down Expand Up @@ -371,6 +372,14 @@ public List<String> findRemovedIndices(Map<String, IndexMetadata> indices, Map<S
return removedIndices;
}

private static List<String> convertListToString(List<Object> list) {
List<String> convertedList = new ArrayList<>();
for (Object o : list) {
convertedList.add(o.toString());
}
return convertedList;
}

public List<String> findUpdatedIndices(Map<String, IndexMetadata> indices, Map<String, IndexMetadata> previousIndices) {
List<String> updatedIndices = new ArrayList<>();
for (String index : indices.keySet()) {
Expand All @@ -385,8 +394,8 @@ public List<String> findUpdatedIndices(Map<String, IndexMetadata> indices, Map<S

public List<String> getIndicesRoutingDeleted(RoutingTable previousRoutingTable, RoutingTable currentRoutingTable) {
List<String> deletedIndicesRouting = new ArrayList<>();
for(IndexRoutingTable previousIndexRouting: previousRoutingTable.getIndicesRouting().values()) {
if(!currentRoutingTable.getIndicesRouting().containsKey(previousIndexRouting.getIndex().getName())) {
for (IndexRoutingTable previousIndexRouting : previousRoutingTable.getIndicesRouting().values()) {
if (!currentRoutingTable.getIndicesRouting().containsKey(previousIndexRouting.getIndex().getName())) {
// Latest Routing Table does not have entry for the index which means the index is deleted
deletedIndicesRouting.add(previousIndexRouting.getIndex().getName());
}
Expand All @@ -396,12 +405,14 @@ public List<String> getIndicesRoutingDeleted(RoutingTable previousRoutingTable,

public List<String> getIndicesRoutingUpdated(RoutingTable previousRoutingTable, RoutingTable currentRoutingTable) {
List<String> updatedIndicesRouting = new ArrayList<>();
for(IndexRoutingTable currentIndicesRouting: currentRoutingTable.getIndicesRouting().values()) {
if(!previousRoutingTable.getIndicesRouting().containsKey(currentIndicesRouting.getIndex().getName())) {
for (IndexRoutingTable currentIndicesRouting : currentRoutingTable.getIndicesRouting().values()) {
if (!previousRoutingTable.getIndicesRouting().containsKey(currentIndicesRouting.getIndex().getName())) {
// Latest Routing Table does not have entry for the index which means the index is created
updatedIndicesRouting.add(currentIndicesRouting.getIndex().getName());
} else {
if(previousRoutingTable.getIndicesRouting().get(currentIndicesRouting.getIndex().getName()).equals(currentIndicesRouting)) {
if (previousRoutingTable.getIndicesRouting()
.get(currentIndicesRouting.getIndex().getName())
.equals(currentIndicesRouting)) {
// if the latest routing table has the same routing table as the previous routing table, then the index is not updated
continue;
}
Expand Down
Loading

0 comments on commit 9d73439

Please sign in to comment.