Skip to content

Commit

Permalink
[Backport 2.x] Backport #15409 and #15624 to 2.x (#15595)
Browse files Browse the repository at this point in the history
* [SnapshotV2] Snapshot Status API changes (#15409)

---------

Signed-off-by: Lakshya Taragi <lakshya.taragi@gmail.com>
(cherry picked from commit 925f41b)

* Remove breaking changes from #15409 (#15624)

Signed-off-by: Lakshya Taragi <lakshya.taragi@gmail.com>
(cherry picked from commit a60b668)
  • Loading branch information
ltaragi authored Sep 4, 2024
1 parent 66d18a4 commit 01b38be
Show file tree
Hide file tree
Showing 15 changed files with 670 additions and 50 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))
- Add canRemain method to TargetPoolAllocationDecider to move shards from local to remote pool for hot to warm tiering ([#15010](https://github.com/opensearch-project/OpenSearch/pull/15010))
- Add support for pluggable deciders for concurrent search ([#15363](https://github.com/opensearch-project/OpenSearch/pull/15363))
- Add support for comma-separated list of index names to be used with Snapshot Status API ([#15409](https://github.com/opensearch-project/OpenSearch/pull/15409))[SnapshotV2] Snapshot Status API changes (#15409))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,15 +230,17 @@ public void testSnapshotsStatus() {
Map<String, String> expectedParams = new HashMap<>();
String repository = RequestConvertersTests.randomIndicesNames(1, 1)[0];
String[] snapshots = RequestConvertersTests.randomIndicesNames(1, 5);
String[] indices = RequestConvertersTests.randomIndicesNames(1, 5);
StringBuilder snapshotNames = new StringBuilder(snapshots[0]);
for (int idx = 1; idx < snapshots.length; idx++) {
snapshotNames.append(",").append(snapshots[idx]);
}
boolean ignoreUnavailable = randomBoolean();
String endpoint = "/_snapshot/" + repository + "/" + snapshotNames.toString() + "/_status";

SnapshotsStatusRequest snapshotsStatusRequest = new SnapshotsStatusRequest(repository, snapshots);
SnapshotsStatusRequest snapshotsStatusRequest = (new SnapshotsStatusRequest(repository, snapshots)).indices(indices);
RequestConvertersTests.setRandomMasterTimeout(snapshotsStatusRequest, expectedParams);

snapshotsStatusRequest.ignoreUnavailable(ignoreUnavailable);
expectedParams.put("ignore_unavailable", Boolean.toString(ignoreUnavailable));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.upgrades;

import org.opensearch.OpenSearchStatusException;
import com.sun.jna.StringArray;
import org.opensearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStatus;
Expand All @@ -46,6 +47,7 @@
import org.opensearch.client.RestClient;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.common.xcontent.json.JsonXContent;
Expand Down Expand Up @@ -145,14 +147,14 @@ public void testCreateAndRestoreSnapshot() throws IOException {
case STEP2_NEW_CLUSTER:
case STEP4_NEW_CLUSTER:
assertSnapshotStatusSuccessful(client, repoName,
snapshots.stream().map(sn -> (String) sn.get("snapshot")).toArray(String[]::new));
snapshots.stream().map(sn -> (String) sn.get("snapshot")).toArray(String[]::new), Strings.EMPTY_ARRAY);
break;
case STEP1_OLD_CLUSTER:
assertSnapshotStatusSuccessful(client, repoName, "snapshot-" + TEST_STEP);
assertSnapshotStatusSuccessful(client, repoName, new String[] {"snapshot-" + TEST_STEP}, Strings.EMPTY_ARRAY);
break;
case STEP3_OLD_CLUSTER:
assertSnapshotStatusSuccessful(
client, repoName, "snapshot-" + TEST_STEP, "snapshot-" + TestStep.STEP3_OLD_CLUSTER);
client, repoName, new String[] {"snapshot-" + TEST_STEP, "snapshot-" + TestStep.STEP3_OLD_CLUSTER}, Strings.EMPTY_ARRAY);
break;
}
if (TEST_STEP == TestStep.STEP3_OLD_CLUSTER) {
Expand Down Expand Up @@ -190,10 +192,10 @@ public void testReadOnlyRepo() throws IOException {
break;
}
if (TEST_STEP == TestStep.STEP1_OLD_CLUSTER || TEST_STEP == TestStep.STEP3_OLD_CLUSTER) {
assertSnapshotStatusSuccessful(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER);
assertSnapshotStatusSuccessful(client, repoName, new String[] {"snapshot-" + TestStep.STEP1_OLD_CLUSTER}, Strings.EMPTY_ARRAY);
} else {
assertSnapshotStatusSuccessful(client, repoName,
"snapshot-" + TestStep.STEP1_OLD_CLUSTER, "snapshot-" + TestStep.STEP2_NEW_CLUSTER);
new String[] {"snapshot-" + TestStep.STEP1_OLD_CLUSTER, "snapshot-" + TestStep.STEP2_NEW_CLUSTER}, Strings.EMPTY_ARRAY);
}
if (TEST_STEP == TestStep.STEP3_OLD_CLUSTER) {
ensureSnapshotRestoreWorks(repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards);
Expand All @@ -218,7 +220,7 @@ public void testUpgradeMovesRepoToNewMetaVersion() throws IOException {
// Every step creates one snapshot
assertThat(snapshots, hasSize(TEST_STEP.ordinal() + 1));
assertSnapshotStatusSuccessful(client, repoName,
snapshots.stream().map(sn -> (String) sn.get("snapshot")).toArray(String[]::new));
snapshots.stream().map(sn -> (String) sn.get("snapshot")).toArray(String[]::new), Strings.EMPTY_ARRAY);
if (TEST_STEP == TestStep.STEP1_OLD_CLUSTER) {
ensureSnapshotRestoreWorks(repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards);
} else {
Expand Down Expand Up @@ -253,9 +255,9 @@ public void testUpgradeMovesRepoToNewMetaVersion() throws IOException {
}

private static void assertSnapshotStatusSuccessful(RestHighLevelClient client, String repoName,
String... snapshots) throws IOException {
String[] snapshots, String[] indices) throws IOException {
final SnapshotsStatusResponse statusResponse = client.snapshot()
.status(new SnapshotsStatusRequest(repoName, snapshots), RequestOptions.DEFAULT);
.status((new SnapshotsStatusRequest(repoName, snapshots)).indices(indices), RequestOptions.DEFAULT);
for (SnapshotStatus status : statusResponse.getSnapshots()) {
assertThat(status.getShardsStats().getFailedShards(), is(0));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@
"description":"A comma-separated list of snapshot names"
}
}
},
{
"path":"/_snapshot/{repository}/{snapshot}/{index}/_status",
"methods":[
"GET"
],
"parts":{
"repository":{
"type":"string",
"description":"A repository name"
},
"snapshot":{
"type":"string",
"description":"A snapshot name"
},
"index":{
"type": "list",
"description":"A comma-separated list of index names"
}
}
}
]
},
Expand All @@ -58,7 +78,7 @@
},
"ignore_unavailable":{
"type":"boolean",
"description":"Whether to ignore unavailable snapshots, defaults to false which means a SnapshotMissingException is thrown"
"description":"Whether to ignore unavailable snapshots and indices, defaults to false which means a SnapshotMissingException or IndexNotFoundException is thrown"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,28 @@

package org.opensearch.snapshots;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStage;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotIndexStatus;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.opensearch.cluster.SnapshotsInProgress;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.settings.Settings;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.junit.Before;

import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.opensearch.snapshots.SnapshotsService.MAX_SHARDS_ALLOWED_IN_STATUS_API;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -192,6 +200,110 @@ public void testStatusAPICallInProgressShallowSnapshot() throws Exception {
createSnapshotResponseActionFuture.actionGet();
}

public void testStatusAPICallForShallowV2Snapshot() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used for the test");
Settings pinnedTimestampSettings = Settings.builder()
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
.build();
internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings);
internalCluster().startDataOnlyNodes(2, pinnedTimestampSettings);

final String index1 = "remote-index-1";
final String index2 = "remote-index-2";
final String index3 = "remote-index-3";
final String snapshotRepoName = "snapshot-repo-name";
final String snapshot = "snapshot";

logger.info("Create repository for shallow V2 snapshots");
Settings.Builder snapshotV2RepoSettings = snapshotRepoSettingsForShallowCopy().put(
BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(),
Boolean.TRUE
);
createRepository(snapshotRepoName, "fs", snapshotV2RepoSettings);

final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings();
createIndex(index1, remoteStoreEnabledIndexSettings);
createIndex(index2, remoteStoreEnabledIndexSettings);
createIndex(index3, remoteStoreEnabledIndexSettings);
ensureGreen();

logger.info("Indexing some data");
for (int i = 0; i < 50; i++) {
index(index1, "_doc", Integer.toString(i), "foo", "bar" + i);
index(index2, "_doc", Integer.toString(i), "foo", "bar" + i);
index(index3, "_doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();

SnapshotInfo snapshotInfo = createFullSnapshot(snapshotRepoName, snapshot);
assertTrue(snapshotInfo.getPinnedTimestamp() > 0); // to assert creation of a shallow v2 snapshot

logger.info("Set MAX_SHARDS_ALLOWED_IN_STATUS_API to a low value");
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(MAX_SHARDS_ALLOWED_IN_STATUS_API.getKey(), 2));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

assertBusy(() -> {
// without index filter
// although no. of shards in snapshot (3) is greater than the max value allowed in a status api call, the request does not fail
SnapshotStatus snapshotStatusWithoutIndexFilter = client().admin()
.cluster()
.prepareSnapshotStatus(snapshotRepoName)
.setSnapshots(snapshot)
.execute()
.actionGet()
.getSnapshots()
.get(0);

assertShallowV2SnapshotStatus(snapshotStatusWithoutIndexFilter, false);

// with index filter
SnapshotStatus snapshotStatusWithIndexFilter = client().admin()
.cluster()
.prepareSnapshotStatus(snapshotRepoName)
.setSnapshots(snapshot)
.setIndices(index1, index2)
.execute()
.actionGet()
.getSnapshots()
.get(0);

assertShallowV2SnapshotStatus(snapshotStatusWithIndexFilter, true);

}, 1, TimeUnit.MINUTES);

}

private void assertShallowV2SnapshotStatus(SnapshotStatus snapshotStatus, boolean hasIndexFilter) {
if (hasIndexFilter) {
assertEquals(0, snapshotStatus.getStats().getTotalSize());
} else {
// TODO: after adding primary store size at the snapshot level, total size here should be > 0
}
// assert that total and incremental values of file count and size_in_bytes are 0 at index and shard levels
assertEquals(0, snapshotStatus.getStats().getTotalFileCount());
assertEquals(0, snapshotStatus.getStats().getIncrementalSize());
assertEquals(0, snapshotStatus.getStats().getIncrementalFileCount());

for (Map.Entry<String, SnapshotIndexStatus> entry : snapshotStatus.getIndices().entrySet()) {
// index level
SnapshotIndexStatus snapshotIndexStatus = entry.getValue();
assertEquals(0, snapshotIndexStatus.getStats().getTotalSize());
assertEquals(0, snapshotIndexStatus.getStats().getTotalFileCount());
assertEquals(0, snapshotIndexStatus.getStats().getIncrementalSize());
assertEquals(0, snapshotIndexStatus.getStats().getIncrementalFileCount());

for (SnapshotIndexShardStatus snapshotIndexShardStatus : snapshotStatus.getShards()) {
// shard level
assertEquals(0, snapshotIndexShardStatus.getStats().getTotalSize());
assertEquals(0, snapshotIndexShardStatus.getStats().getTotalFileCount());
assertEquals(0, snapshotIndexShardStatus.getStats().getIncrementalSize());
assertEquals(0, snapshotIndexShardStatus.getStats().getIncrementalFileCount());
assertEquals(SnapshotIndexShardStage.DONE, snapshotIndexShardStatus.getStage());
}
}
}

private static SnapshotIndexShardStatus stateFirstShard(SnapshotStatus snapshotStatus, String indexName) {
return snapshotStatus.getIndices().get(indexName).getShards().get(0);
}
Expand Down
Loading

0 comments on commit 01b38be

Please sign in to comment.