Skip to content

Commit

Permalink
Fixed typos and added tests
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Chandani <chngau@amazon.com>
  • Loading branch information
Gaurav614 committed Sep 28, 2023
1 parent b9407cc commit f03e99e
Show file tree
Hide file tree
Showing 4 changed files with 685 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.admin.indices.stats.IndexStats;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.action.support.ActionTestUtils;
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.ElectionSchedulerFactory;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -82,6 +86,8 @@
import java.util.stream.IntStream;

import static org.opensearch.cluster.coordination.ClusterBootstrapService.INITIAL_CLUSTER_MANAGER_NODES_SETTING;
import static org.opensearch.cluster.health.ClusterHealthStatus.GREEN;
import static org.opensearch.cluster.health.ClusterHealthStatus.RED;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
Expand Down Expand Up @@ -734,4 +740,172 @@ public void testMessyElectionsStillMakeClusterGoGreen() throws Exception {
internalCluster().fullRestart();
ensureGreen("test");
}

public void testBatchModeEnabled() throws Exception {
internalCluster().startClusterManagerOnlyNodes(1);
List<String> dataOnlyNodes = internalCluster().startDataOnlyNodes(2);
createIndex(
"test",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
);
ensureGreen("test");
Settings node0DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(0));
Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1));
internalCluster().stopRandomDataNode();
internalCluster().stopRandomDataNode();
ensureRed("test");
ensureStableCluster(1);

logger.info("--> Now do a protective reroute");
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
assertTrue(clusterRerouteResponse.isAcknowledged());

GatewayAllocator gatewayAllocator = internalCluster().getInstance(
GatewayAllocator.class,
internalCluster().getClusterManagerName()
);
assertEquals(1, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(1, gatewayAllocator.getNumberOfStoreShardBatches());
assertTrue(
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE_ENABLED.get(internalCluster().clusterService().getSettings())
);

// Now start both data nodes and ensure batch mode is working
logger.info("--> restarting the stopped nodes");
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build());
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build());
ensureStableCluster(3);
ensureGreen("test");
assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
}

public void testBatchModeDisabled() throws Exception {
internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder().put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE_ENABLED.getKey(), false).build()
);
List<String> dataOnlyNodes = internalCluster().startDataOnlyNodes(2);
createIndex(
"test",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
);

ensureGreen("test");
Settings node0DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(0));
Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1));
internalCluster().stopRandomDataNode();
internalCluster().stopRandomDataNode();
ensureStableCluster(1);

logger.info("--> Now do a protective reroute");
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
assertTrue(clusterRerouteResponse.isAcknowledged());

GatewayAllocator gatewayAllocator = internalCluster().getInstance(
GatewayAllocator.class,
internalCluster().getClusterManagerName()
);
ensureRed("test");

// assert no batches created
assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());

logger.info("--> restarting the stopped nodes");
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build());
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build());
ensureStableCluster(3);
ensureGreen("test");
}

public void testNBatchesCreationAndAssignment() throws Exception {
// we will reduce batch size to 5 to make sure we have enough batches to test assignment
// Total number of primary shards = 50 (50 indices*1)
// Total number of replica shards = 50 (50 indices*1)
// Total batches creation for primaries and replicas will be 10 each

internalCluster().startClusterManagerOnlyNodes(1);
List<String> dataOnlyNodes = internalCluster().startDataOnlyNodes(2);
createNIndices(50, "test");
ensureStableCluster(3);
IndicesStatsResponse indicesStats = dataNodeClient().admin().indices().prepareStats().get();
assertThat(indicesStats.getSuccessfulShards(), equalTo(100));
ClusterHealthResponse health = client().admin()
.cluster()
.health(Requests.clusterHealthRequest().waitForGreenStatus().timeout("1m"))
.actionGet();
assertFalse(health.isTimedOut());
assertEquals(GREEN, health.getStatus());

// Now we will first stop cluster manager node and then stop data nodes. This will ensure to avoid any scenarios
// of more number of batch creation.
String clusterManagerName = internalCluster().getClusterManagerName();
Settings clusterManagerDataPathSettings = internalCluster().dataPathSettings(clusterManagerName);
Settings node0DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(0));
Settings node1DataPathSettings = internalCluster().dataPathSettings(dataOnlyNodes.get(1));

internalCluster().stopCurrentClusterManagerNode();
internalCluster().stopRandomDataNode();
internalCluster().stopRandomDataNode();

// Now start cluster manager node and post that verify batches created
internalCluster().startClusterManagerOnlyNodes(
1,
Settings.builder()
.put("node.name", clusterManagerName)
.put(clusterManagerDataPathSettings)
.put(GatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE.getKey(), 5)
.build()
);
ensureStableCluster(1);

logger.info("--> Now do a protective reroute"); // to avoid any race condition in test
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
assertTrue(clusterRerouteResponse.isAcknowledged());

GatewayAllocator gatewayAllocator = internalCluster().getInstance(
GatewayAllocator.class,
internalCluster().getClusterManagerName()
);
assertEquals(10, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(10, gatewayAllocator.getNumberOfStoreShardBatches());
health = client(internalCluster().getClusterManagerName()).admin().cluster().health(Requests.clusterHealthRequest()).actionGet();
assertFalse(health.isTimedOut());
assertEquals(RED, health.getStatus());
assertEquals(100, health.getUnassignedShards());
assertEquals(0, health.getInitializingShards());
assertEquals(0, health.getActiveShards());
assertEquals(0, health.getRelocatingShards());
assertEquals(0, health.getNumberOfDataNodes());

// Now start both data nodes and ensure batch mode is working
logger.info("--> restarting the stopped nodes");
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(0)).put(node0DataPathSettings).build());
internalCluster().startDataOnlyNode(Settings.builder().put("node.name", dataOnlyNodes.get(1)).put(node1DataPathSettings).build());
ensureStableCluster(3);

// wait for cluster to turn green
health = client().admin().cluster().health(Requests.clusterHealthRequest().waitForGreenStatus().timeout("5m")).actionGet();
assertFalse(health.isTimedOut());
assertEquals(GREEN, health.getStatus());
assertEquals(0, health.getUnassignedShards());
assertEquals(0, health.getInitializingShards());
assertEquals(100, health.getActiveShards());
assertEquals(0, health.getRelocatingShards());
assertEquals(2, health.getNumberOfDataNodes());
assertEquals(0, gatewayAllocator.getNumberOfStartedShardBatches());
assertEquals(0, gatewayAllocator.getNumberOfStoreShardBatches());
}

private void createNIndices(int n, String prefix) {

for (int i = 0; i < n; i++) {
createIndex(
prefix + i,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
);
ensureGreen(prefix + i);
}
}
}
Loading

0 comments on commit f03e99e

Please sign in to comment.