Skip to content

Commit

Permalink
Replication IT for #12047
Browse files Browse the repository at this point in the history
This is WIP to drive the discussion further, do not merge it!

Signed-off-by: Lukáš Vlček <lukas.vlcek@aiven.io>
  • Loading branch information
lukas-vlcek committed Mar 20, 2024
1 parent 69fc7dd commit 71f7922
Showing 1 changed file with 158 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.SetOnce;
Expand Down Expand Up @@ -823,7 +825,14 @@ private List<RecoveryState> findRecoveriesForTargetNode(String nodeName, List<Re

private IndicesStatsResponse createAndPopulateIndex(String name, int nodeCount, int shardCount, int replicaCount)
throws ExecutionException, InterruptedException {
final int numDocs = numDocs();
return createAndPopulateIndex(name, nodeCount, shardCount, replicaCount, numDocs);
}

private IndicesStatsResponse createAndPopulateIndex(String name, int nodeCount, int shardCount, int replicaCount, int numDocs)
throws ExecutionException, InterruptedException {

assert numDocs >= 0;
logger.info("--> creating test index: {}", name);
assertAcked(
prepareCreate(
Expand All @@ -838,7 +847,6 @@ private IndicesStatsResponse createAndPopulateIndex(String name, int nodeCount,
ensureGreen();

logger.info("--> indexing sample data");
final int numDocs = numDocs();
final IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs];

for (int i = 0; i < numDocs; i++) {
Expand All @@ -864,6 +872,155 @@ protected int numDocs() {
return between(MIN_DOC_COUNT, MAX_DOC_COUNT);
}

/**
* **Temporary:**
* Let's try to recreate issue described in GitHub ticket <a href="https://github.com/opensearch-project/OpenSearch/issues/12047">...</a>.
*
* How we do it:
* 1) Start a single node cluster
* 2) Create several (four) indices with some data, having no replicas and single primary shard
* 3) We check that both counters(*) match (only active shards and no ongoing recoveries)
* 4) Slowdown recovery speed (throttle recovery)
* 5) Disable shard rebalancing
* 6) Start second node
* 7) Verify cluster rebalancing does not kick in
* 8) Force all shards to start relocating from node A to node B
* 9) While shards are relocating we compare both counters(*) again, we check restore stage type is INDEX
* 10) We lift the throttling limit and wait for rebalancing to finish, [END]
*
* (*) both counters = counters obtained via two different calls:
* a) client().admin().cluster().health(...)
* b) client().admin().indices().recoveries(...)
* -------------------------------------------------------------------
* The recreation requires two types of calls:
* 1) An admin cluster health request – a transport level request
* 2) A "_cat" API call to get recoveries – a REST level request
*
* It is a challenge to combine these two types of calls in a single IT test because the REST call
* requires proper configuration of a transport layer. Although there are some examples of tests
* combining these two request types in DanglingIndicesRestIT class it turns out that controlling
* cluster lifecycle remains a challenge in such environment.
* Instead, we are going to "rip-out" transport level requests from the RestCatRecoveryAction class
* source. BTW, test {@link #testRerouteRecovery()} seems to be doing similar thing.
*/
public void testRecoveryCountConsistency() throws Exception {

// Define more indices name constants as there is only a single constant defined for tests
// in this class, and we will need more indices.
final String INDEX_NAME_1 = "text-idx-1";
final String INDEX_NAME_2 = "text-idx-2";
final String INDEX_NAME_3 = "text-idx-3";
final String INDEX_NAME_4 = "text-idx-4";
final String[] INDICES = new String[]{
INDEX_NAME_1, INDEX_NAME_2, INDEX_NAME_3, INDEX_NAME_4
};
int numDocs = 1000;

logger.info("--> start node A");
final String nodeA = internalCluster().startNode();

ByteSizeValue shardSize = null;
for (String index: INDICES) {
logger.info("--> create index on node: {}", nodeA);
shardSize = createAndPopulateIndex(index, 1, SHARD_COUNT, REPLICA_COUNT, numDocs).getShards()[0].getStats()
.getStore()
.size();
}

ensureGreen();
refreshAndWaitForReplication();

ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().execute().actionGet();
assertEquals(clusterHealth.getInitializingShards(), 0);
assertEquals(clusterHealth.getRelocatingShards(), 0);
assertEquals(clusterHealth.getActivePrimaryShards(), INDICES.length);

// Not possible here:
// Response catResponse = getRestClient().performRequest(new Request("GET", "/_cat/recovery?active_only"));
// assertThat(catResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));

RecoveryRequest recoveryRequest = new RecoveryRequest();
// We are going to throttle the recovery process, hence we do not want the request to timeout
recoveryRequest.timeout(TimeValue.MAX_VALUE);
recoveryRequest.detailed(false);
recoveryRequest.activeOnly(true);
//recoveryRequest.indicesOptions(); // not used for now

RecoveryResponse recoveryResponse = client().admin().indices().recoveries(recoveryRequest).actionGet();
assertTrue(recoveryResponse.hasRecoveries());
assertEquals(recoveryResponse.shardRecoveryStates().size(), INDICES.length);
recoveryResponse.shardRecoveryStates().forEach((s, recoveryStates) -> {

assertTrue(Arrays.asList(INDICES).contains(s));
// Initially, the (only) index shard has been recovered so there is no active one going on.
assertEquals(recoveryStates.size(), 0);
});

logger.info("--> slowing down recoveries");
slowDownRecovery(shardSize);

logger.info("--> disable cluster rebalancing");
ClusterUpdateSettingsResponse settingsUpdateResponse = client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)
)
.get();
assertAcked(settingsUpdateResponse);

logger.info("--> start node B");
final String nodeB = internalCluster().startNode();

// No shards want to rebalance, all stay at node A.
ensureGreen();

for (String index: INDICES) {
logger.info("--> move shard from: {} to: {}", nodeA, nodeB);
ClusterState clusterState = client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(index, 0, nodeA, nodeB))
.execute()
.actionGet()
.getState();
clusterState.coordinationMetadata();
}

logger.info("--> waiting for recovery to start both on source and target");
for (String index_name: INDICES) {
final Index index = resolveIndex(index_name);
assertBusyWithFixedSleepTime(() -> {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeA);
assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsSource(), equalTo(1));
indicesService = internalCluster().getInstance(IndicesService.class, nodeB);
assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsTarget(), equalTo(1));
}, TimeValue.timeValueSeconds(10), TimeValue.timeValueMillis(500));
}

// We expect all shards are relocating now (all indices have a single shard that
// is now relocating from node A to node B)
clusterHealth = client().admin().cluster().prepareHealth().execute().actionGet();
assertEquals(clusterHealth.getRelocatingShards(), INDICES.length);

recoveryResponse = client().admin().indices().recoveries(recoveryRequest).actionGet();
assertTrue(recoveryResponse.hasRecoveries());
assertEquals(recoveryResponse.shardRecoveryStates().size(), INDICES.length);
recoveryResponse.shardRecoveryStates().forEach((s, recoveryStates) -> {

assertTrue(Arrays.asList(INDICES).contains(s));
assertEquals(recoveryStates.size(), 1);
recoveryStates.forEach(recoveryState -> {
assertEquals(recoveryState.getStage(), Stage.INDEX);
});
});

restoreRecoverySpeed();
refreshAndWaitForReplication();
ensureGreen();
}

public void testTransientErrorsDuringRecoveryAreRetried() throws Exception {
final String indexName = "test";
final Settings nodeSettings = Settings.builder()
Expand Down

0 comments on commit 71f7922

Please sign in to comment.