From 71f79221658ec8d86565d3f936725e7e80fadd79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Vl=C4=8Dek?= Date: Wed, 20 Mar 2024 17:21:31 +0100 Subject: [PATCH] Replication IT for #12047 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is WIP to drive the discussion further, do not merge it! Signed-off-by: Lukáš Vlček --- .../indices/recovery/IndexRecoveryIT.java | 159 +++++++++++++++++- 1 file changed, 158 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java index 72e680e22ed75..aa68ba2588998 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java @@ -39,6 +39,7 @@ import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; @@ -69,6 +70,7 @@ import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.SetOnce; @@ -823,7 +825,14 @@ private List findRecoveriesForTargetNode(String nodeName, List= 0; logger.info("--> creating test index: {}", name); assertAcked( prepareCreate( @@ -838,7 +847,6 @@ private IndicesStatsResponse createAndPopulateIndex(String name, int nodeCount, ensureGreen(); logger.info("--> indexing sample data"); - final int numDocs = numDocs(); final IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs]; for (int i = 0; i < numDocs; i++) { @@ -864,6 +872,155 @@ protected int numDocs() { return between(MIN_DOC_COUNT, MAX_DOC_COUNT); } + /** + * **Temporary:** + * Let's try to recreate issue described in GitHub ticket .... + * + * How we do it: + * 1) Start a single node cluster + * 2) Create several (four) indices with some data, having no replicas and single primary shard + * 3) We check that both counters(*) match (only active shards and no ongoing recoveries) + * 4) Slowdown recovery speed (throttle recovery) + * 5) Disable shard rebalancing + * 6) Start second node + * 7) Verify cluster rebalancing does not kick in + * 8) Force all shards to start relocating from node A to node B + * 9) While shards are relocating we compare both counters(*) again, we check restore stage type is INDEX + * 10) We lift the throttling limit and wait for rebalancing to finish, [END] + * + * (*) both counters = counters obtained via two different calls: + * a) client().admin().cluster().health(...) + * b) client().admin().indices().recoveries(...) + * ------------------------------------------------------------------- + * The recreation requires two types of calls: + * 1) An admin cluster health request – a transport level request + * 2) A "_cat" API call to get recoveries – a REST level request + * + * It is a challenge to combine these two types of calls in a single IT test because the REST call + * requires proper configuration of a transport layer. Although there are some examples of tests + * combining these two request types in DanglingIndicesRestIT class it turns out that controlling + * cluster lifecycle remains a challenge in such environment. + * Instead, we are going to "rip-out" transport level requests from the RestCatRecoveryAction class + * source. BTW, test {@link #testRerouteRecovery()} seems to be doing similar thing. + */ + public void testRecoveryCountConsistency() throws Exception { + + // Define more indices name constants as there is only a single constant defined for tests + // in this class, and we will need more indices. + final String INDEX_NAME_1 = "text-idx-1"; + final String INDEX_NAME_2 = "text-idx-2"; + final String INDEX_NAME_3 = "text-idx-3"; + final String INDEX_NAME_4 = "text-idx-4"; + final String[] INDICES = new String[]{ + INDEX_NAME_1, INDEX_NAME_2, INDEX_NAME_3, INDEX_NAME_4 + }; + int numDocs = 1000; + + logger.info("--> start node A"); + final String nodeA = internalCluster().startNode(); + + ByteSizeValue shardSize = null; + for (String index: INDICES) { + logger.info("--> create index on node: {}", nodeA); + shardSize = createAndPopulateIndex(index, 1, SHARD_COUNT, REPLICA_COUNT, numDocs).getShards()[0].getStats() + .getStore() + .size(); + } + + ensureGreen(); + refreshAndWaitForReplication(); + + ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().execute().actionGet(); + assertEquals(clusterHealth.getInitializingShards(), 0); + assertEquals(clusterHealth.getRelocatingShards(), 0); + assertEquals(clusterHealth.getActivePrimaryShards(), INDICES.length); + + // Not possible here: + // Response catResponse = getRestClient().performRequest(new Request("GET", "/_cat/recovery?active_only")); + // assertThat(catResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + + RecoveryRequest recoveryRequest = new RecoveryRequest(); + // We are going to throttle the recovery process, hence we do not want the request to timeout + recoveryRequest.timeout(TimeValue.MAX_VALUE); + recoveryRequest.detailed(false); + recoveryRequest.activeOnly(true); + //recoveryRequest.indicesOptions(); // not used for now + + RecoveryResponse recoveryResponse = client().admin().indices().recoveries(recoveryRequest).actionGet(); + assertTrue(recoveryResponse.hasRecoveries()); + assertEquals(recoveryResponse.shardRecoveryStates().size(), INDICES.length); + recoveryResponse.shardRecoveryStates().forEach((s, recoveryStates) -> { + + assertTrue(Arrays.asList(INDICES).contains(s)); + // Initially, the (only) index shard has been recovered so there is no active one going on. + assertEquals(recoveryStates.size(), 0); + }); + + logger.info("--> slowing down recoveries"); + slowDownRecovery(shardSize); + + logger.info("--> disable cluster rebalancing"); + ClusterUpdateSettingsResponse settingsUpdateResponse = client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE) + ) + .get(); + assertAcked(settingsUpdateResponse); + + logger.info("--> start node B"); + final String nodeB = internalCluster().startNode(); + + // No shards want to rebalance, all stay at node A. + ensureGreen(); + + for (String index: INDICES) { + logger.info("--> move shard from: {} to: {}", nodeA, nodeB); + ClusterState clusterState = client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(index, 0, nodeA, nodeB)) + .execute() + .actionGet() + .getState(); + clusterState.coordinationMetadata(); + } + + logger.info("--> waiting for recovery to start both on source and target"); + for (String index_name: INDICES) { + final Index index = resolveIndex(index_name); + assertBusyWithFixedSleepTime(() -> { + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeA); + assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsSource(), equalTo(1)); + indicesService = internalCluster().getInstance(IndicesService.class, nodeB); + assertThat(indicesService.indexServiceSafe(index).getShard(0).recoveryStats().currentAsTarget(), equalTo(1)); + }, TimeValue.timeValueSeconds(10), TimeValue.timeValueMillis(500)); + } + + // We expect all shards are relocating now (all indices have a single shard that + // is now relocating from node A to node B) + clusterHealth = client().admin().cluster().prepareHealth().execute().actionGet(); + assertEquals(clusterHealth.getRelocatingShards(), INDICES.length); + + recoveryResponse = client().admin().indices().recoveries(recoveryRequest).actionGet(); + assertTrue(recoveryResponse.hasRecoveries()); + assertEquals(recoveryResponse.shardRecoveryStates().size(), INDICES.length); + recoveryResponse.shardRecoveryStates().forEach((s, recoveryStates) -> { + + assertTrue(Arrays.asList(INDICES).contains(s)); + assertEquals(recoveryStates.size(), 1); + recoveryStates.forEach(recoveryState -> { + assertEquals(recoveryState.getStage(), Stage.INDEX); + }); + }); + + restoreRecoverySpeed(); + refreshAndWaitForReplication(); + ensureGreen(); + } + public void testTransientErrorsDuringRecoveryAreRetried() throws Exception { final String indexName = "test"; final Settings nodeSettings = Settings.builder()