Skip to content

Commit

Permalink
Add unit tests for read flow of RemoteClusterStateService and bug fix…
Browse files Browse the repository at this point in the history
… for transient settings (#14476)

Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
shiv0408 authored Jul 10, 2024
1 parent c4d960f commit b068355
Show file tree
Hide file tree
Showing 8 changed files with 1,531 additions and 325 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
)
);
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(
request.getClusterName(),
manifest,
lastSeen,
transportService.getLocalNode().getId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -152,17 +153,17 @@ public ClusterStateDiffManifest(
this.settingsMetadataUpdated = settingsMetadataUpdated;
this.transientSettingsMetadataUpdated = transientSettingsMetadataUpdate;
this.templatesMetadataUpdated = templatesMetadataUpdated;
this.customMetadataUpdated = customMetadataUpdated;
this.customMetadataDeleted = customMetadataDeleted;
this.indicesUpdated = indicesUpdated;
this.indicesDeleted = indicesDeleted;
this.customMetadataUpdated = Collections.unmodifiableList(customMetadataUpdated);
this.customMetadataDeleted = Collections.unmodifiableList(customMetadataDeleted);
this.indicesUpdated = Collections.unmodifiableList(indicesUpdated);
this.indicesDeleted = Collections.unmodifiableList(indicesDeleted);
this.clusterBlocksUpdated = clusterBlocksUpdated;
this.discoveryNodesUpdated = discoveryNodesUpdated;
this.indicesRoutingUpdated = indicesRoutingUpdated;
this.indicesRoutingDeleted = indicesRoutingDeleted;
this.indicesRoutingUpdated = Collections.unmodifiableList(indicesRoutingUpdated);
this.indicesRoutingDeleted = Collections.unmodifiableList(indicesRoutingDeleted);
this.hashesOfConsistentSettingsUpdated = hashesOfConsistentSettingsUpdated;
this.clusterStateCustomUpdated = clusterStateCustomUpdated;
this.clusterStateCustomDeleted = clusterStateCustomDeleted;
this.clusterStateCustomUpdated = Collections.unmodifiableList(clusterStateCustomUpdated);
this.clusterStateCustomDeleted = Collections.unmodifiableList(clusterStateCustomDeleted);
}

public ClusterStateDiffManifest(StreamInput in) throws IOException {
Expand Down Expand Up @@ -563,7 +564,16 @@ public static class Builder {
private List<String> clusterStateCustomUpdated;
private List<String> clusterStateCustomDeleted;

public Builder() {}
public Builder() {
customMetadataUpdated = Collections.emptyList();
customMetadataDeleted = Collections.emptyList();
indicesUpdated = Collections.emptyList();
indicesDeleted = Collections.emptyList();
indicesRoutingUpdated = Collections.emptyList();
indicesRoutingDeleted = Collections.emptyList();
clusterStateCustomUpdated = Collections.emptyList();
clusterStateCustomDeleted = Collections.emptyList();
}

public Builder fromStateUUID(String fromStateUUID) {
this.fromStateUUID = fromStateUUID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,8 @@ public ClusterState getLatestClusterState(String clusterName, String clusterUUID
return getClusterStateForManifest(clusterName, clusterMetadataManifest.get(), nodeId, includeEphemeral);
}

private ClusterState readClusterStateInParallel(
// package private for testing
ClusterState readClusterStateInParallel(
ClusterState previousState,
ClusterMetadataManifest manifest,
String clusterUUID,
Expand Down Expand Up @@ -1285,7 +1286,7 @@ public ClusterState getClusterStateForManifest(
manifest.getCustomMetadataMap(),
manifest.getCoordinationMetadata() != null,
manifest.getSettingsMetadata() != null,
manifest.getTransientSettingsMetadata() != null,
includeEphemeral && manifest.getTransientSettingsMetadata() != null,
manifest.getTemplatesMetadata() != null,
includeEphemeral && manifest.getDiscoveryNodesMetadata() != null,
includeEphemeral && manifest.getClusterBlocksMetadata() != null,
Expand Down Expand Up @@ -1321,13 +1322,9 @@ public ClusterState getClusterStateForManifest(

}

public ClusterState getClusterStateUsingDiff(
String clusterName,
ClusterMetadataManifest manifest,
ClusterState previousState,
String localNodeId
) throws IOException {
assert manifest.getDiffManifest() != null;
public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, ClusterState previousState, String localNodeId)
throws IOException {
assert manifest.getDiffManifest() != null : "Diff manifest null which is required for downloading cluster state";
ClusterStateDiffManifest diff = manifest.getDiffManifest();
List<UploadedIndexMetadata> updatedIndices = diff.getIndicesUpdated().stream().map(idx -> {
Optional<UploadedIndexMetadata> uploadedIndexMetadataOptional = manifest.getIndices()
Expand Down Expand Up @@ -1586,6 +1583,19 @@ private boolean isValidClusterUUID(ClusterMetadataManifest manifest) {
return manifest.isClusterUUIDCommitted();
}

// package private setter which are required for injecting mock managers, these setters are not supposed to be used elsewhere
void setRemoteIndexMetadataManager(RemoteIndexMetadataManager remoteIndexMetadataManager) {
this.remoteIndexMetadataManager = remoteIndexMetadataManager;
}

void setRemoteGlobalMetadataManager(RemoteGlobalMetadataManager remoteGlobalMetadataManager) {
this.remoteGlobalMetadataManager = remoteGlobalMetadataManager;
}

void setRemoteClusterStateAttributesManager(RemoteClusterStateAttributesManager remoteClusterStateAttributeManager) {
this.remoteClusterStateAttributesManager = remoteClusterStateAttributeManager;
}

public void writeMetadataFailed() {
getStats().stateFailed();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@

package org.opensearch.gateway.remote;

import org.opensearch.Version;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.AbstractNamedDiffable;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterState.Custom;
Expand All @@ -23,11 +21,8 @@
import org.opensearch.common.util.TestCapturingListener;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.compress.NoneCompressor;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.gateway.remote.model.RemoteClusterBlocks;
import org.opensearch.gateway.remote.model.RemoteClusterStateCustoms;
import org.opensearch.gateway.remote.model.RemoteDiscoveryNodes;
Expand All @@ -51,6 +46,10 @@
import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTE;
import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION;
import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.DISCOVERY_NODES;
import static org.opensearch.gateway.remote.RemoteClusterStateTestUtils.TestClusterStateCustom1;
import static org.opensearch.gateway.remote.RemoteClusterStateTestUtils.TestClusterStateCustom2;
import static org.opensearch.gateway.remote.RemoteClusterStateTestUtils.TestClusterStateCustom3;
import static org.opensearch.gateway.remote.RemoteClusterStateTestUtils.TestClusterStateCustom4;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_EPHEMERAL_PATH_TOKEN;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CUSTOM_DELIMITER;
Expand Down Expand Up @@ -338,22 +337,22 @@ public void testGetAsyncMetadataReadAction_Exception() throws IOException, Inter

public void testGetUpdatedCustoms() {
Map<String, ClusterState.Custom> previousCustoms = Map.of(
TestCustom1.TYPE,
new TestCustom1("data1"),
TestCustom2.TYPE,
new TestCustom2("data2"),
TestCustom3.TYPE,
new TestCustom3("data3")
TestClusterStateCustom1.TYPE,
new TestClusterStateCustom1("data1"),
TestClusterStateCustom2.TYPE,
new TestClusterStateCustom2("data2"),
TestClusterStateCustom3.TYPE,
new TestClusterStateCustom3("data3")
);
ClusterState previousState = ClusterState.builder(new ClusterName("test-cluster")).customs(previousCustoms).build();

Map<String, Custom> currentCustoms = Map.of(
TestCustom2.TYPE,
new TestCustom2("data2"),
TestCustom3.TYPE,
new TestCustom3("data3-changed"),
TestCustom4.TYPE,
new TestCustom4("data4")
TestClusterStateCustom2.TYPE,
new TestClusterStateCustom2("data2"),
TestClusterStateCustom3.TYPE,
new TestClusterStateCustom3("data3-changed"),
TestClusterStateCustom4.TYPE,
new TestClusterStateCustom4("data4")
);

ClusterState currentState = ClusterState.builder(new ClusterName("test-cluster")).customs(currentCustoms).build();
Expand All @@ -368,136 +367,14 @@ public void testGetUpdatedCustoms() {
assertThat(customsDiff.getDeletes(), is(Collections.emptyList()));

Map<String, ClusterState.Custom> expectedCustoms = Map.of(
TestCustom3.TYPE,
new TestCustom3("data3-changed"),
TestCustom4.TYPE,
new TestCustom4("data4")
TestClusterStateCustom3.TYPE,
new TestClusterStateCustom3("data3-changed"),
TestClusterStateCustom4.TYPE,
new TestClusterStateCustom4("data4")
);

customsDiff = remoteClusterStateAttributesManager.getUpdatedCustoms(currentState, previousState, true, false);
assertThat(customsDiff.getUpserts(), is(expectedCustoms));
assertThat(customsDiff.getDeletes(), is(List.of(TestCustom1.TYPE)));
}

private static abstract class AbstractTestCustom extends AbstractNamedDiffable<Custom> implements ClusterState.Custom {

private final String value;

AbstractTestCustom(String value) {
this.value = value;
}

AbstractTestCustom(StreamInput in) throws IOException {
this.value = in.readString();
}

@Override
public Version getMinimalSupportedVersion() {
return Version.CURRENT;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(value);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder;
}

@Override
public boolean isPrivate() {
return true;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

AbstractTestCustom that = (AbstractTestCustom) o;

if (!value.equals(that.value)) return false;

return true;
}

@Override
public int hashCode() {
return value.hashCode();
}
}

private static class TestCustom1 extends AbstractTestCustom {

private static final String TYPE = "custom_1";

TestCustom1(String value) {
super(value);
}

TestCustom1(StreamInput in) throws IOException {
super(in);
}

@Override
public String getWriteableName() {
return TYPE;
}
}

private static class TestCustom2 extends AbstractTestCustom {

private static final String TYPE = "custom_2";

TestCustom2(String value) {
super(value);
}

TestCustom2(StreamInput in) throws IOException {
super(in);
}

@Override
public String getWriteableName() {
return TYPE;
}
}

private static class TestCustom3 extends AbstractTestCustom {

private static final String TYPE = "custom_3";

TestCustom3(String value) {
super(value);
}

TestCustom3(StreamInput in) throws IOException {
super(in);
}

@Override
public String getWriteableName() {
return TYPE;
}
}

private static class TestCustom4 extends AbstractTestCustom {

private static final String TYPE = "custom_4";

TestCustom4(String value) {
super(value);
}

TestCustom4(StreamInput in) throws IOException {
super(in);
}

@Override
public String getWriteableName() {
return TYPE;
}
assertThat(customsDiff.getDeletes(), is(List.of(TestClusterStateCustom1.TYPE)));
}
}
Loading

0 comments on commit b068355

Please sign in to comment.