From c1d4b3d19bd37586e738fe98dd8c44b97b61d621 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Fri, 28 Jul 2023 15:27:38 +0530 Subject: [PATCH] [Remote Store] Add support to restore only unassigned shards of an index (#8792) (#8950) Signed-off-by: Sachin Kale --- CHANGELOG.md | 1 + .../RemoteStoreBaseIntegTestCase.java | 10 +- .../remotestore/RemoteStoreForceMergeIT.java | 12 +- .../opensearch/remotestore/RemoteStoreIT.java | 136 ++++++++++++------ .../snapshots/RestoreSnapshotIT.java | 14 +- .../restore/RestoreRemoteStoreRequest.java | 33 ++++- .../cluster/routing/IndexRoutingTable.java | 12 +- .../cluster/routing/RoutingTable.java | 8 +- .../cluster/RestRestoreRemoteStoreAction.java | 1 + .../opensearch/snapshots/RestoreService.java | 44 ++++-- .../RestoreRemoteStoreRequestTests.java | 2 + .../cluster/routing/RoutingTableTests.java | 34 ++++- .../SegmentReplicationIndexShardTests.java | 3 - 13 files changed, 234 insertions(+), 76 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 17674a07396e8..44c82cd6974a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Exclude 'benchmarks' from codecov report ([#8805](https://github.com/opensearch-project/OpenSearch/pull/8805)) - Create separate SourceLookup instance per segment slice in SignificantTextAggregatorFactory ([#8807](https://github.com/opensearch-project/OpenSearch/pull/8807)) - Replace the deprecated IndexReader APIs with new storedFields() & termVectors() ([#7792](https://github.com/opensearch-project/OpenSearch/pull/7792)) +- [Remote Store] Add support to restore only unassigned shards of an index ([#8792](https://github.com/opensearch-project/OpenSearch/pull/8792)) ### Deprecated diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index c5d023bdd7a64..2887fbc56106c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -26,6 +26,7 @@ import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -37,6 +38,13 @@ public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase { protected static final int REPLICA_COUNT = 1; protected Path absolutePath; protected Path absolutePath2; + private final List documentKeys = List.of( + randomAlphaOfLength(5), + randomAlphaOfLength(5), + randomAlphaOfLength(5), + randomAlphaOfLength(5), + randomAlphaOfLength(5) + ); @Override protected boolean addMockInternalEngine() { @@ -59,7 +67,7 @@ public Settings indexSettings() { IndexResponse indexSingleDoc(String indexName) { return client().prepareIndex(indexName) .setId(UUIDs.randomBase64UUID()) - .setSource(randomAlphaOfLength(5), randomAlphaOfLength(5)) + .setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5)) .get(); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreForceMergeIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreForceMergeIT.java index b4456f887cbaa..4d5648c74ba5c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreForceMergeIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreForceMergeIT.java @@ -104,9 +104,17 @@ private void testRestoreWithMergeFlow(int numberOfIterations, boolean invokeFlus Map indexStats = indexData(numberOfIterations, invokeFlush, flushAfterMerge, deletedDocs); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME))); - assertAcked(client().admin().indices().prepareClose(INDEX_NAME)); - client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture()); + boolean restoreAllShards = randomBoolean(); + if (restoreAllShards) { + assertAcked(client().admin().indices().prepareClose(INDEX_NAME)); + } + client().admin() + .cluster() + .restoreRemoteStore( + new RestoreRemoteStoreRequest().indices(INDEX_NAME).restoreAllShards(restoreAllShards), + PlainActionFuture.newFuture() + ); ensureGreen(INDEX_NAME); if (deletedDocs == -1) { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index f3cc144fc65f9..2da9f1bf1a35e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -10,6 +10,7 @@ import org.junit.Before; import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.admin.indices.recovery.RecoveryResponse; import org.opensearch.action.index.IndexResponse; @@ -17,7 +18,6 @@ import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.RecoverySource; -import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; import org.opensearch.index.shard.RemoteStoreRefreshListener; import org.opensearch.indices.recovery.RecoveryState; @@ -33,13 +33,14 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.comparesEqualTo; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0) public class RemoteStoreIT extends RemoteStoreBaseIntegTestCase { private static final String INDEX_NAME = "remote-store-test-idx-1"; @@ -65,13 +66,6 @@ public Settings indexSettings() { return remoteStoreIndexSettings(0); } - private IndexResponse indexSingleDoc() { - return client().prepareIndex(INDEX_NAME) - .setId(UUIDs.randomBase64UUID()) - .setSource(randomAlphaOfLength(5), randomAlphaOfLength(5)) - .get(); - } - private Map indexData(int numberOfIterations, boolean invokeFlush, String index) { long totalOperations = 0; long refreshedOrFlushedOperations = 0; @@ -90,7 +84,7 @@ private Map indexData(int numberOfIterations, boolean invokeFlush, refreshedOrFlushedOperations = totalOperations; int numberOfOperations = randomIntBetween(20, 50); for (int j = 0; j < numberOfOperations; j++) { - IndexResponse response = INDEX_NAME.equals(index) ? indexSingleDoc() : indexSingleDoc(index); + IndexResponse response = indexSingleDoc(index); maxSeqNo = response.getSeqNo(); shardId = response.getShardId().id(); indexingStats.put(MAX_SEQ_NO_TOTAL + "-shard-" + shardId, maxSeqNo); @@ -106,12 +100,14 @@ private Map indexData(int numberOfIterations, boolean invokeFlush, } private void verifyRestoredData(Map indexStats, boolean checkTotal, String indexName) { + // This is required to get updated number from already active shards which were not restored + refresh(indexName); String statsGranularity = checkTotal ? TOTAL_OPERATIONS : REFRESHED_OR_FLUSHED_OPERATIONS; String maxSeqNoGranularity = checkTotal ? MAX_SEQ_NO_TOTAL : MAX_SEQ_NO_REFRESHED_OR_FLUSHED; ensureYellowAndNoInitializingShards(indexName); ensureGreen(indexName); assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(statsGranularity)); - IndexResponse response = INDEX_NAME.equals(indexName) ? indexSingleDoc() : indexSingleDoc(indexName); + IndexResponse response = indexSingleDoc(indexName); assertEquals(indexStats.get(maxSeqNoGranularity + "-shard-" + response.getShardId().id()) + 1, response.getSeqNo()); refresh(indexName); assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(statsGranularity) + 1); @@ -127,6 +123,28 @@ private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, St } } + private void restore(String... indices) { + boolean restoreAllShards = randomBoolean(); + if (restoreAllShards) { + assertAcked(client().admin().indices().prepareClose(indices)); + } + client().admin() + .cluster() + .restoreRemoteStore( + new RestoreRemoteStoreRequest().indices(indices).restoreAllShards(restoreAllShards), + PlainActionFuture.newFuture() + ); + } + + private void restoreAndVerify(int shardCount, int replicaCount, Map indexStats) { + restore(INDEX_NAME); + ensureGreen(INDEX_NAME); + // This is required to get updated number from already active shards which were not restored + assertEquals(shardCount * (1 + replicaCount), getNumShards(INDEX_NAME).totalNumShards); + assertEquals(replicaCount, getNumShards(INDEX_NAME).numReplicas); + verifyRestoredData(indexStats, true, INDEX_NAME); + } + /** * Helper function to test restoring an index with no replication from remote store. Only primary node is dropped. * @param numberOfIterations Number of times a refresh/flush should be invoked, followed by indexing some data. @@ -141,23 +159,16 @@ private void testRestoreFlow(int numberOfIterations, boolean invokeFlush, int sh internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME))); ensureRed(INDEX_NAME); - assertAcked(client().admin().indices().prepareClose(INDEX_NAME)); - client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture()); - - ensureGreen(INDEX_NAME); - assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards); - verifyRestoredData(indexStats, true, INDEX_NAME); + restoreAndVerify(shardCount, 0, indexStats); } /** * Helper function to test restoring an index having replicas from remote store when all the nodes housing the primary/replica drop. - * @param remoteTranslog If true, Remote Translog Store is also enabled in addition to Remote Segment Store. * @param numberOfIterations Number of times a refresh/flush should be invoked, followed by indexing some data. * @param invokeFlush If true, a flush is invoked. Otherwise, a refresh is invoked. * @throws IOException IO Exception. */ - private void testRestoreFlowBothPrimaryReplicasDown(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush, int shardCount) - throws IOException { + private void testRestoreFlowBothPrimaryReplicasDown(int numberOfIterations, boolean invokeFlush, int shardCount) throws IOException { prepareCluster(1, 2, INDEX_NAME, 1, shardCount); Map indexStats = indexData(numberOfIterations, invokeFlush, INDEX_NAME); assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards); @@ -167,14 +178,7 @@ private void testRestoreFlowBothPrimaryReplicasDown(boolean remoteTranslog, int ensureRed(INDEX_NAME); internalCluster().startDataOnlyNodes(2); - assertAcked(client().admin().indices().prepareClose(INDEX_NAME)); - client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture()); - - ensureGreen(INDEX_NAME); - - assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards); - assertEquals(0, getNumShards(INDEX_NAME).numReplicas); - verifyRestoredData(indexStats, true, INDEX_NAME); + restoreAndVerify(shardCount, 1, indexStats); } /** @@ -209,10 +213,16 @@ private void testRestoreFlowMultipleIndices(int numberOfIterations, boolean invo ensureRed(indices); internalCluster().startDataOnlyNodes(3); - assertAcked(client().admin().indices().prepareClose(indices)); + boolean restoreAllShards = randomBoolean(); + if (restoreAllShards) { + assertAcked(client().admin().indices().prepareClose(indices)); + } client().admin() .cluster() - .restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAMES_WILDCARD.split(",")), PlainActionFuture.newFuture()); + .restoreRemoteStore( + new RestoreRemoteStoreRequest().indices(INDEX_NAMES_WILDCARD.split(",")).restoreAllShards(restoreAllShards), + PlainActionFuture.newFuture() + ); ensureGreen(indices); for (String index : indices) { assertEquals(shardCount, getNumShards(index).totalNumShards); @@ -220,6 +230,37 @@ private void testRestoreFlowMultipleIndices(int numberOfIterations, boolean invo } } + public void testRestoreFlowAllShardsNoRedIndex() throws InterruptedException { + int shardCount = randomIntBetween(1, 5); + prepareCluster(0, 3, INDEX_NAME, 0, shardCount); + indexData(randomIntBetween(2, 5), true, INDEX_NAME); + assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards); + + PlainActionFuture future = PlainActionFuture.newFuture(); + client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME).restoreAllShards(true), future); + try { + future.get(); + } catch (ExecutionException e) { + // If the request goes to co-ordinator, e.getCause() can be RemoteTransportException + assertTrue(e.getCause() instanceof IllegalStateException || e.getCause().getCause() instanceof IllegalStateException); + } + } + + public void testRestoreFlowNoRedIndex() { + int shardCount = randomIntBetween(1, 5); + prepareCluster(0, 3, INDEX_NAME, 0, shardCount); + Map indexStats = indexData(randomIntBetween(2, 5), true, INDEX_NAME); + assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards); + + client().admin() + .cluster() + .restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME).restoreAllShards(false), PlainActionFuture.newFuture()); + + ensureGreen(INDEX_NAME); + assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards); + verifyRestoredData(indexStats, true, INDEX_NAME); + } + /** * Simulates all data restored using Remote Translog Store. * @throws IOException IO Exception. @@ -262,7 +303,7 @@ public void testRemoteTranslogRestoreWithCommittedData() throws IOException { // @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6188") @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479") public void testRTSRestoreWithNoDataPostCommitPrimaryReplicaDown() throws IOException { - testRestoreFlowBothPrimaryReplicasDown(true, 1, true, randomIntBetween(1, 5)); + testRestoreFlowBothPrimaryReplicasDown(1, true, randomIntBetween(1, 5)); } /** @@ -271,7 +312,7 @@ public void testRTSRestoreWithNoDataPostCommitPrimaryReplicaDown() throws IOExce */ @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479") public void testRTSRestoreWithNoDataPostRefreshPrimaryReplicaDown() throws IOException { - testRestoreFlowBothPrimaryReplicasDown(true, 1, false, randomIntBetween(1, 5)); + testRestoreFlowBothPrimaryReplicasDown(1, false, randomIntBetween(1, 5)); } /** @@ -281,7 +322,7 @@ public void testRTSRestoreWithNoDataPostRefreshPrimaryReplicaDown() throws IOExc */ @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479") public void testRTSRestoreWithRefreshedDataPrimaryReplicaDown() throws IOException { - testRestoreFlowBothPrimaryReplicasDown(true, randomIntBetween(2, 5), false, randomIntBetween(1, 5)); + testRestoreFlowBothPrimaryReplicasDown(randomIntBetween(2, 5), false, randomIntBetween(1, 5)); } /** @@ -291,7 +332,7 @@ public void testRTSRestoreWithRefreshedDataPrimaryReplicaDown() throws IOExcepti */ @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479") public void testRTSRestoreWithCommittedDataPrimaryReplicaDown() throws IOException { - testRestoreFlowBothPrimaryReplicasDown(true, randomIntBetween(2, 5), true, randomIntBetween(1, 5)); + testRestoreFlowBothPrimaryReplicasDown(randomIntBetween(2, 5), true, randomIntBetween(1, 5)); } /** @@ -338,10 +379,7 @@ public void testRTSRestoreWithCommittedDataDefaultAllIndices() throws IOExceptio ensureRed(indices); internalCluster().startDataOnlyNodes(3); - assertAcked(client().admin().indices().prepareClose(indices)); - client().admin() - .cluster() - .restoreRemoteStore(new RestoreRemoteStoreRequest().indices(new String[] {}), PlainActionFuture.newFuture()); + restore(indices); ensureGreen(indices); for (String index : indices) { @@ -378,10 +416,16 @@ public void testRTSRestoreWithCommittedDataNotAllRedRemoteIndices() throws IOExc ensureRed(indices); internalCluster().startDataOnlyNodes(3); - assertAcked(client().admin().indices().prepareClose(indices[0], indices[1])); + boolean restoreAllShards = randomBoolean(); + if (restoreAllShards) { + assertAcked(client().admin().indices().prepareClose(indices[0], indices[1])); + } client().admin() .cluster() - .restoreRemoteStore(new RestoreRemoteStoreRequest().indices(indices[0], indices[1]), PlainActionFuture.newFuture()); + .restoreRemoteStore( + new RestoreRemoteStoreRequest().indices(indices[0], indices[1]).restoreAllShards(restoreAllShards), + PlainActionFuture.newFuture() + ); ensureGreen(indices[0], indices[1]); assertEquals(shardCount, getNumShards(indices[0]).totalNumShards); verifyRestoredData(indicesStats.get(indices[0]), true, indices[0]); @@ -424,10 +468,16 @@ public void testRTSRestoreWithCommittedDataExcludeIndicesPatterns() throws IOExc ensureRed(indices); internalCluster().startDataOnlyNodes(3); - assertAcked(client().admin().indices().prepareClose(indices[0], indices[1])); + boolean restoreAllShards = randomBoolean(); + if (restoreAllShards) { + assertAcked(client().admin().indices().prepareClose(indices[0], indices[1])); + } client().admin() .cluster() - .restoreRemoteStore(new RestoreRemoteStoreRequest().indices("*", "-remote-store-test-index-*"), PlainActionFuture.newFuture()); + .restoreRemoteStore( + new RestoreRemoteStoreRequest().indices("*", "-remote-store-test-index-*").restoreAllShards(restoreAllShards), + PlainActionFuture.newFuture() + ); ensureGreen(indices[0], indices[1]); assertEquals(shardCount, getNumShards(indices[0]).totalNumShards); verifyRestoredData(indicesStats.get(indices[0]), true, indices[0]); @@ -487,7 +537,7 @@ private void testPeerRecovery(int numberOfIterations, boolean invokeFlush) throw assertEquals(0, recoverySource.get().getIndex().recoveredFileCount()); } - IndexResponse response = indexSingleDoc(); + IndexResponse response = indexSingleDoc(INDEX_NAME); assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL) + 1, response.getSeqNo()); refresh(INDEX_NAME); assertBusy( diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/RestoreSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/RestoreSnapshotIT.java index dbd96a7fd109f..30a836b41e29e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/RestoreSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/RestoreSnapshotIT.java @@ -276,7 +276,10 @@ public void testRestoreOperationsShallowCopyEnabled() throws IOException, Execut assertAcked(client.admin().indices().prepareClose(restoredIndexName1)); client.admin() .cluster() - .restoreRemoteStore(new RestoreRemoteStoreRequest().indices(restoredIndexName1), PlainActionFuture.newFuture()); + .restoreRemoteStore( + new RestoreRemoteStoreRequest().indices(restoredIndexName1).restoreAllShards(true), + PlainActionFuture.newFuture() + ); ensureYellowAndNoInitializingShards(restoredIndexName1); ensureGreen(restoredIndexName1); assertDocsPresentInIndex(client(), restoredIndexName1, numDocsInIndex1); @@ -434,7 +437,9 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException { // Re-initialize client to make sure we are not using client from stopped node. client = client(clusterManagerNode); assertAcked(client.admin().indices().prepareClose(indexName1)); - client.admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(indexName1), PlainActionFuture.newFuture()); + client.admin() + .cluster() + .restoreRemoteStore(new RestoreRemoteStoreRequest().indices(indexName1).restoreAllShards(true), PlainActionFuture.newFuture()); ensureYellowAndNoInitializingShards(indexName1); ensureGreen(indexName1); assertDocsPresentInIndex(client(), indexName1, numDocsInIndex1); @@ -515,7 +520,10 @@ public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException assertAcked(client.admin().indices().prepareClose(restoredIndexName1)); client.admin() .cluster() - .restoreRemoteStore(new RestoreRemoteStoreRequest().indices(restoredIndexName1), PlainActionFuture.newFuture()); + .restoreRemoteStore( + new RestoreRemoteStoreRequest().indices(restoredIndexName1).restoreAllShards(true), + PlainActionFuture.newFuture() + ); ensureYellowAndNoInitializingShards(restoredIndexName1); ensureGreen(restoredIndexName1); // indexing some new docs and validating diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreRequest.java index 86941dffe1d3c..eb0c4392f39a8 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreRequest.java @@ -35,7 +35,8 @@ public class RestoreRemoteStoreRequest extends ClusterManagerNodeRequest implements ToXContentObject { private String[] indices = Strings.EMPTY_ARRAY; - private Boolean waitForCompletion; + private Boolean waitForCompletion = false; + private Boolean restoreAllShards = false; public RestoreRemoteStoreRequest() {} @@ -43,6 +44,7 @@ public RestoreRemoteStoreRequest(StreamInput in) throws IOException { super(in); indices = in.readStringArray(); waitForCompletion = in.readOptionalBoolean(); + restoreAllShards = in.readOptionalBoolean(); } @Override @@ -50,6 +52,7 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeStringArray(indices); out.writeOptionalBoolean(waitForCompletion); + out.writeOptionalBoolean(restoreAllShards); } @Override @@ -118,6 +121,27 @@ public boolean waitForCompletion() { return waitForCompletion; } + /** + * Set the value for restoreAllShards, denoting whether to restore all shards or only unassigned shards + * + * @param restoreAllShards If true, the operation will restore all the shards of the given indices. + * If false, the operation will restore only the unassigned shards of the given indices. + * @return this request + */ + public RestoreRemoteStoreRequest restoreAllShards(boolean restoreAllShards) { + this.restoreAllShards = restoreAllShards; + return this; + } + + /** + * Returns restoreAllShards setting + * + * @return true if the operation will restore all the shards of the given indices + */ + public boolean restoreAllShards() { + return restoreAllShards; + } + /** * Parses restore definition * @@ -167,12 +191,14 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; RestoreRemoteStoreRequest that = (RestoreRemoteStoreRequest) o; - return waitForCompletion == that.waitForCompletion && Arrays.equals(indices, that.indices); + return waitForCompletion == that.waitForCompletion + && restoreAllShards == that.restoreAllShards + && Arrays.equals(indices, that.indices); } @Override public int hashCode() { - int result = Objects.hash(waitForCompletion); + int result = Objects.hash(waitForCompletion, restoreAllShards); result = 31 * result + Arrays.hashCode(indices); return result; } @@ -181,4 +207,5 @@ public int hashCode() { public String toString() { return org.opensearch.common.Strings.toString(XContentType.JSON, this); } + } diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java index af348c1c98f2d..781ca5bb2255a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java @@ -450,7 +450,11 @@ public Builder initializeAsRestore(IndexMetadata indexMetadata, SnapshotRecovery /** * Initializes an existing index, to be restored from remote store */ - public Builder initializeAsRemoteStoreRestore(IndexMetadata indexMetadata, RemoteStoreRecoverySource recoverySource) { + public Builder initializeAsRemoteStoreRestore( + IndexMetadata indexMetadata, + RemoteStoreRecoverySource recoverySource, + Map activeInitializingShards + ) { final UnassignedInfo unassignedInfo = new UnassignedInfo( UnassignedInfo.Reason.EXISTING_INDEX_RESTORED, "restore_source[remote_store]" @@ -462,7 +466,11 @@ public Builder initializeAsRemoteStoreRestore(IndexMetadata indexMetadata, Remot for (int shardNumber = 0; shardNumber < indexMetadata.getNumberOfShards(); shardNumber++) { ShardId shardId = new ShardId(index, shardNumber); IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); - indexShardRoutingBuilder.addShard(ShardRouting.newUnassigned(shardId, true, recoverySource, unassignedInfo)); + if (activeInitializingShards.containsKey(shardId)) { + indexShardRoutingBuilder.addShard(activeInitializingShards.get(shardId)); + } else { + indexShardRoutingBuilder.addShard(ShardRouting.newUnassigned(shardId, true, recoverySource, unassignedInfo)); + } shards.put(shardNumber, indexShardRoutingBuilder.build()); } return this; diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java index 1bee5d8176a0f..7934649a6d3eb 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java @@ -562,9 +562,13 @@ public Builder addAsFromOpenToClose(IndexMetadata indexMetadata) { return add(indexRoutingBuilder); } - public Builder addAsRemoteStoreRestore(IndexMetadata indexMetadata, RemoteStoreRecoverySource recoverySource) { + public Builder addAsRemoteStoreRestore( + IndexMetadata indexMetadata, + RemoteStoreRecoverySource recoverySource, + Map activeInitializingShards + ) { IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetadata.getIndex()) - .initializeAsRemoteStoreRestore(indexMetadata, recoverySource); + .initializeAsRemoteStoreRestore(indexMetadata, recoverySource, activeInitializingShards); add(indexRoutingBuilder); return this; } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestRestoreRemoteStoreAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestRestoreRemoteStoreAction.java index fca6745167bb4..414c82b4a470f 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestRestoreRemoteStoreAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestRestoreRemoteStoreAction.java @@ -44,6 +44,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC request.paramAsTime("cluster_manager_timeout", restoreRemoteStoreRequest.masterNodeTimeout()) ); restoreRemoteStoreRequest.waitForCompletion(request.paramAsBoolean("wait_for_completion", false)); + restoreRemoteStoreRequest.restoreAllShards(request.paramAsBoolean("restore_all_shards", false)); request.applyContentParser(p -> restoreRemoteStoreRequest.source(p.mapOrdered())); return channel -> client.admin().cluster().restoreRemoteStore(restoreRemoteStoreRequest, new RestToXContentListener<>(channel)); } diff --git a/server/src/main/java/org/opensearch/snapshots/RestoreService.java b/server/src/main/java/org/opensearch/snapshots/RestoreService.java index fc8a8c1017698..6bf2459c51a1d 100644 --- a/server/src/main/java/org/opensearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/opensearch/snapshots/RestoreService.java @@ -63,6 +63,7 @@ import org.opensearch.cluster.metadata.MetadataIndexUpgradeService; import org.opensearch.cluster.metadata.RepositoriesMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.opensearch.cluster.routing.RecoverySource.RemoteStoreRecoverySource; @@ -235,21 +236,34 @@ public ClusterState execute(ClusterState currentState) { continue; } if (currentIndexMetadata.getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false)) { - if (currentIndexMetadata.getState() != IndexMetadata.State.CLOSE) { - throw new IllegalStateException( - "cannot restore index [" - + index - + "] because an open index " - + "with same name already exists in the cluster. Close the existing index" - ); + IndexMetadata updatedIndexMetadata = currentIndexMetadata; + Map activeInitializingShards = new HashMap<>(); + if (request.restoreAllShards()) { + if (currentIndexMetadata.getState() != IndexMetadata.State.CLOSE) { + throw new IllegalStateException( + "cannot restore index [" + + index + + "] because an open index " + + "with same name already exists in the cluster. Close the existing index" + ); + } + updatedIndexMetadata = IndexMetadata.builder(currentIndexMetadata) + .state(IndexMetadata.State.OPEN) + .version(1 + currentIndexMetadata.getVersion()) + .mappingVersion(1 + currentIndexMetadata.getMappingVersion()) + .settingsVersion(1 + currentIndexMetadata.getSettingsVersion()) + .aliasesVersion(1 + currentIndexMetadata.getAliasesVersion()) + .build(); + } else { + activeInitializingShards = currentState.routingTable() + .index(index) + .shards() + .values() + .stream() + .map(IndexShardRoutingTable::primaryShard) + .filter(shardRouting -> shardRouting.unassigned() == false) + .collect(Collectors.toMap(ShardRouting::shardId, Function.identity())); } - IndexMetadata updatedIndexMetadata = IndexMetadata.builder(currentIndexMetadata) - .state(IndexMetadata.State.OPEN) - .version(1 + currentIndexMetadata.getVersion()) - .mappingVersion(1 + currentIndexMetadata.getMappingVersion()) - .settingsVersion(1 + currentIndexMetadata.getSettingsVersion()) - .aliasesVersion(1 + currentIndexMetadata.getAliasesVersion()) - .build(); IndexId indexId = new IndexId(index, updatedIndexMetadata.getIndexUUID()); @@ -258,7 +272,7 @@ public ClusterState execute(ClusterState currentState) { updatedIndexMetadata.getCreationVersion(), indexId ); - rtBuilder.addAsRemoteStoreRestore(updatedIndexMetadata, recoverySource); + rtBuilder.addAsRemoteStoreRestore(updatedIndexMetadata, recoverySource, activeInitializingShards); blocks.updateBlocks(updatedIndexMetadata); mdBuilder.put(updatedIndexMetadata, true); indicesToBeRestored.add(index); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreRequestTests.java index 81d7074977253..2edfa23286658 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreRequestTests.java @@ -38,6 +38,7 @@ private RestoreRemoteStoreRequest randomState(RestoreRemoteStoreRequest instance } instance.waitForCompletion(randomBoolean()); + instance.restoreAllShards(randomBoolean()); if (randomBoolean()) { instance.masterNodeTimeout(randomTimeValue()); @@ -76,6 +77,7 @@ public void testSource() throws IOException { RestoreRemoteStoreRequest processed = new RestoreRemoteStoreRequest(); processed.masterNodeTimeout(original.masterNodeTimeout()); processed.waitForCompletion(original.waitForCompletion()); + processed.restoreAllShards(original.restoreAllShards()); processed.source(map); assertEquals(original, processed); diff --git a/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java b/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java index 6b869ffed7d23..0ff9d6f07751a 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java @@ -49,10 +49,13 @@ import org.junit.Before; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.function.Predicate; +import static org.mockito.Mockito.mock; import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -502,13 +505,40 @@ public void testAddAsRemoteStoreRestore() { Version.CURRENT, new IndexId(TEST_INDEX_1, "1") ); - final RoutingTable routingTable = new RoutingTable.Builder().addAsRemoteStoreRestore(indexMetadata, remoteStoreRecoverySource) - .build(); + final RoutingTable routingTable = new RoutingTable.Builder().addAsRemoteStoreRestore( + indexMetadata, + remoteStoreRecoverySource, + new HashMap<>() + ).build(); assertTrue(routingTable.hasIndex(TEST_INDEX_1)); assertEquals(this.numberOfShards, routingTable.allShards(TEST_INDEX_1).size()); assertEquals(this.numberOfShards, routingTable.index(TEST_INDEX_1).shardsWithState(UNASSIGNED).size()); } + public void testAddAsRemoteStoreRestoreWithActiveShards() { + final IndexMetadata indexMetadata = createIndexMetadata(TEST_INDEX_1).state(IndexMetadata.State.OPEN).build(); + final RemoteStoreRecoverySource remoteStoreRecoverySource = new RemoteStoreRecoverySource( + "restore_uuid", + Version.CURRENT, + new IndexId(TEST_INDEX_1, "1") + ); + Map activeInitializingShards = new HashMap<>(); + for (int i = 0; i < randomIntBetween(1, this.numberOfShards); i++) { + activeInitializingShards.put(new ShardId(indexMetadata.getIndex(), i), mock(ShardRouting.class)); + } + final RoutingTable routingTable = new RoutingTable.Builder().addAsRemoteStoreRestore( + indexMetadata, + remoteStoreRecoverySource, + activeInitializingShards + ).build(); + assertTrue(routingTable.hasIndex(TEST_INDEX_1)); + assertEquals(this.numberOfShards, routingTable.allShards(TEST_INDEX_1).size()); + assertEquals( + this.numberOfShards - activeInitializingShards.size(), + routingTable.index(TEST_INDEX_1).shardsWithState(UNASSIGNED).size() + ); + } + /** reverse engineer the in sync aid based on the given indexRoutingTable **/ public static IndexMetadata updateActiveAllocations(IndexRoutingTable indexRoutingTable, IndexMetadata indexMetadata) { IndexMetadata.Builder imdBuilder = IndexMetadata.builder(indexMetadata); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 0c68512f93ea6..cc4fa6f28bafc 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -190,7 +190,6 @@ public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException * reader close operation on replica shard deletes the segment files copied in current round of segment replication. * It does this by blocking the finalizeReplication on replica shard and performing close operation on acquired * searcher that triggers the reader close operation. - * @throws Exception */ public void testSegmentReplication_With_ReaderClosedConcurrently() throws Exception { String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; @@ -240,7 +239,6 @@ public void testSegmentReplication_With_ReaderClosedConcurrently() throws Except /** * Similar to test above, this test shows the issue where an engine close operation during active segment replication * can result in Lucene CorruptIndexException. - * @throws Exception */ public void testSegmentReplication_With_EngineClosedConcurrently() throws Exception { String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; @@ -289,7 +287,6 @@ public void testSegmentReplication_With_EngineClosedConcurrently() throws Except /** * Verifies that commits on replica engine resulting from engine or reader close does not cleanup the temporary * replication files from ongoing round of segment replication - * @throws Exception */ public void testTemporaryFilesNotCleanup() throws Exception { String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}";