Skip to content

Commit

Permalink
Add more concurrent Snapshot V2 Integ Tests (#16207) (#16230)
Browse files Browse the repository at this point in the history
(cherry picked from commit e8e041b)

Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 9eaa7ae commit 1fafc04
Showing 1 changed file with 146 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
package org.opensearch.snapshots;

import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.remotestore.RemoteSnapshotIT;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -479,7 +481,6 @@ public void testCloneSnapshotV2MasterSwitch() throws Exception {
assertThat(snapInfo, containsInAnyOrder(csr.getSnapshotInfo(), csr2.getSnapshotInfo()));
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/pull/16191")
public void testDeleteWhileV2CreateOngoing() throws Exception {
final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
Expand Down Expand Up @@ -538,7 +539,7 @@ public void testDeleteWhileV2CreateOngoing() throws Exception {
assertThat(snapInfo, contains(csr.getSnapshotInfo()));
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/pull/16191")
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/16205")
public void testDeleteAndCloneV1WhileV2CreateOngoing() throws Exception {
final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
Expand Down Expand Up @@ -569,6 +570,7 @@ public void testDeleteAndCloneV1WhileV2CreateOngoing() throws Exception {
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);
ensureGreen();

startFullSnapshot(repoName, "snapshot-v1").actionGet();
startFullSnapshot(repoName, "snapshot-v1-2").actionGet();
Expand Down Expand Up @@ -597,6 +599,148 @@ public void testDeleteAndCloneV1WhileV2CreateOngoing() throws Exception {
assertTrue(startCloneSnapshot.actionGet().isAcknowledged());
List<SnapshotInfo> snapInfo = client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots();
assertEquals(3, snapInfo.size());

RestoreSnapshotResponse restoreSnapshotResponse = client.admin()
.cluster()
.prepareRestoreSnapshot(repoName, "snapshot-v1-2-clone")
.setWaitForCompletion(true)
.setIndices(indexName1)
.setRenamePattern("(.+)")
.setRenameReplacement("$1-copy")
.get();
assertEquals(restoreSnapshotResponse.status(), RestStatus.OK);
ensureGreen();
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/16205")
public void testDeleteAndCloneV1WhileCreateOngoing() throws Exception {
final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String repoName = "test-create-snapshot-repo";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

Settings.Builder settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), false);
createRepository(repoName, "mock", settings);

Client client = client();
Settings indexSettings = getIndexSettings(10, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(15, 0).build();
createIndex(indexName2, indexSettings2);

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

startFullSnapshot(repoName, "snapshot-v1").actionGet();
startFullSnapshot(repoName, "snapshot-v1-2").actionGet();

blockClusterManagerOnWriteIndexFile(repoName);

final ActionFuture<CreateSnapshotResponse> snapshotFuture = startFullSnapshot(repoName, "snapshot-v2");
awaitNumberOfSnapshotsInProgress(1);

ActionFuture<AcknowledgedResponse> startDeleteSnapshot = startDeleteSnapshot(repoName, "snapshot-v1");
ActionFuture<AcknowledgedResponse> startCloneSnapshot = startCloneSnapshot(repoName, "snapshot-v1-2", "snapshot-v1-2-clone");

unblockNode(repoName, clusterManagerName);
snapshotFuture.actionGet();
assertTrue(startDeleteSnapshot.actionGet().isAcknowledged());
assertTrue(startCloneSnapshot.actionGet().isAcknowledged());
List<SnapshotInfo> snapInfo = client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots();
assertEquals(3, snapInfo.size());

RestoreSnapshotResponse restoreSnapshotResponse = client.admin()
.cluster()
.prepareRestoreSnapshot(repoName, "snapshot-v1-2-clone")
.setWaitForCompletion(true)
.setIndices(indexName1)
.setRenamePattern("(.+)")
.setRenameReplacement("$1-copy")
.get();
assertEquals(restoreSnapshotResponse.status(), RestStatus.OK);
ensureGreen();
}

public void testCloneV1WhileV2CreateOngoing() throws Exception {
final String clusterManagerName = internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
internalCluster().startDataOnlyNode(pinnedTimestampSettings());
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String repoName = "test-create-snapshot-repo";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

Settings.Builder settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), false);
createRepository(repoName, "mock", settings);

Client client = client();
Settings indexSettings = getIndexSettings(20, 0).build();
createIndex(indexName1, indexSettings);

Settings indexSettings2 = getIndexSettings(15, 0).build();
createIndex(indexName2, indexSettings2);

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexDocuments(client, indexName1, numDocsInIndex1);
indexDocuments(client, indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

startFullSnapshot(repoName, "snapshot-v1").actionGet();

// Creating a v2 repo
settings = Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true);
createRepository(repoName, "mock", settings);

blockClusterManagerOnWriteIndexFile(repoName);

final ActionFuture<CreateSnapshotResponse> snapshotFuture = startFullSnapshot(repoName, "snapshot-v2");
awaitNumberOfSnapshotsInProgress(1);

ActionFuture<AcknowledgedResponse> startCloneSnapshot = startCloneSnapshot(repoName, "snapshot-v1", "snapshot-v1-2-clone");

unblockNode(repoName, clusterManagerName);
CreateSnapshotResponse csr = snapshotFuture.actionGet();
assertTrue(csr.getSnapshotInfo().getPinnedTimestamp() != 0);
assertTrue(startCloneSnapshot.actionGet().isAcknowledged());
List<SnapshotInfo> snapInfo = client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots();
assertEquals(3, snapInfo.size());

RestoreSnapshotResponse restoreSnapshotResponse = client.admin()
.cluster()
.prepareRestoreSnapshot(repoName, "snapshot-v1-2-clone")
.setWaitForCompletion(true)
.setIndices(indexName1)
.setRenamePattern(indexName1)
.setRenamePattern("(.+)")
.setRenameReplacement("$1-copy")
.get();
assertEquals(restoreSnapshotResponse.status(), RestStatus.OK);
ensureGreen();
}

protected ActionFuture<AcknowledgedResponse> startCloneSnapshot(String repoName, String sourceSnapshotName, String snapshotName) {
Expand Down

0 comments on commit 1fafc04

Please sign in to comment.