diff --git a/docs/reference/snapshot-restore/apis/clone-snapshot-api.asciidoc b/docs/reference/snapshot-restore/apis/clone-snapshot-api.asciidoc
new file mode 100644
index 0000000000000..416466910aa60
--- /dev/null
+++ b/docs/reference/snapshot-restore/apis/clone-snapshot-api.asciidoc
@@ -0,0 +1,52 @@
+[[clone-snapshot-api]]
+=== Clone snapshot API
+++++
+Clone snapshot
+++++
+
+Clones part or all of a snapshot into a new snapshot.
+
+[source,console]
+----
+PUT /_snapshot/my_repository/source_snapshot/_clone/target_snapshot
+{
+ "indices": "index_a,index_b"
+}
+----
+// TEST[skip:TODO]
+
+[[clone-snapshot-api-request]]
+==== {api-request-title}
+
+`PUT /_snapshot///_clone/`
+
+[[clone-snapshot-api-desc]]
+==== {api-description-title}
+
+The clone snapshot API allows creating a copy of all or part of an existing snapshot
+within the same repository.
+
+[[clone-snapshot-api-params]]
+==== {api-path-parms-title}
+
+``::
+(Required, string)
+Name of the snapshot repository that both source and target snapshot belong to.
+
+[[clone-snapshot-api-query-params]]
+==== {api-query-parms-title}
+
+`master_timeout`::
+(Optional, <>) Specifies the period of time to wait for
+a connection to the master node. If no response is received before the timeout
+expires, the request fails and returns an error. Defaults to `30s`.
+
+`timeout`::
+(Optional, <>) Specifies the period of time to wait for
+a response. If no response is received before the timeout expires, the request
+fails and returns an error. Defaults to `30s`.
+
+`indices`::
+(Required, string)
+A comma-separated list of indices to include in the snapshot.
+<> is supported.
\ No newline at end of file
diff --git a/docs/reference/snapshot-restore/index.asciidoc b/docs/reference/snapshot-restore/index.asciidoc
index 8286c73276864..805b923c6d56d 100644
--- a/docs/reference/snapshot-restore/index.asciidoc
+++ b/docs/reference/snapshot-restore/index.asciidoc
@@ -107,6 +107,7 @@ understand the time requirements before proceeding.
--
include::register-repository.asciidoc[]
+include::apis/clone-snapshot-api.asciidoc[]
include::take-snapshot.asciidoc[]
include::restore-snapshot.asciidoc[]
include::monitor-snapshot-restore.asciidoc[]
diff --git a/docs/reference/snapshot-restore/take-snapshot.asciidoc b/docs/reference/snapshot-restore/take-snapshot.asciidoc
index 4adbfce304d67..ddc2812dbe280 100644
--- a/docs/reference/snapshot-restore/take-snapshot.asciidoc
+++ b/docs/reference/snapshot-restore/take-snapshot.asciidoc
@@ -124,3 +124,5 @@ PUT /_snapshot/my_backup/
PUT /_snapshot/my_backup/%3Csnapshot-%7Bnow%2Fd%7D%3E
-----------------------------------
// TEST[continued]
+
+NOTE: You can also create snapshots that are copies of part of an existing snapshot using the <>.
\ No newline at end of file
diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java
index 82bec5a1f0dc3..c8377c8e76b0b 100644
--- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java
+++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java
@@ -18,6 +18,26 @@
*/
package org.elasticsearch.snapshots;
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexStatus;
+import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.repositories.RepositoriesService;
+import org.elasticsearch.repositories.RepositoryData;
+import org.elasticsearch.snapshots.mockstore.MockRepository;
+import org.elasticsearch.test.ESIntegTestCase;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.containsString;
+
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.UUIDs;
@@ -26,16 +46,11 @@
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots;
import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles;
import org.elasticsearch.repositories.IndexId;
-import org.elasticsearch.repositories.RepositoriesService;
-import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryShardId;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
-import org.elasticsearch.test.ESIntegTestCase;
import java.nio.file.Path;
-import java.util.List;
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
@@ -53,7 +68,7 @@ public void testShardClone() throws Exception {
if (useBwCFormat) {
initWithSnapshotVersion(repoName, repoPath, SnapshotsService.OLD_SNAPSHOT_FORMAT);
// Re-create repo to clear repository data cache
- assertAcked(client().admin().cluster().prepareDeleteRepository(repoName).get());
+ assertAcked(clusterAdmin().prepareDeleteRepository(repoName).get());
createRepository(repoName, "fs", repoPath);
}
@@ -107,6 +122,389 @@ public void testShardClone() throws Exception {
assertEquals(newShardGeneration, newShardGeneration2);
}
+ public void testCloneSnapshotIndex() throws Exception {
+ internalCluster().startMasterOnlyNode();
+ internalCluster().startDataOnlyNode();
+ final String repoName = "repo-name";
+ createRepository(repoName, "fs");
+
+ final String indexName = "index-1";
+ createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));
+ final String sourceSnapshot = "source-snapshot";
+ createFullSnapshot(repoName, sourceSnapshot);
+
+ indexRandomDocs(indexName, randomIntBetween(20, 100));
+ if (randomBoolean()) {
+ assertAcked(admin().indices().prepareDelete(indexName));
+ }
+ final String targetSnapshot = "target-snapshot";
+ assertAcked(startClone(repoName, sourceSnapshot, targetSnapshot, indexName).get());
+
+ final List status = clusterAdmin().prepareSnapshotStatus(repoName)
+ .setSnapshots(sourceSnapshot, targetSnapshot).get().getSnapshots();
+ assertThat(status, hasSize(2));
+ final SnapshotIndexStatus status1 = status.get(0).getIndices().get(indexName);
+ final SnapshotIndexStatus status2 = status.get(1).getIndices().get(indexName);
+ assertEquals(status1.getStats().getTotalFileCount(), status2.getStats().getTotalFileCount());
+ assertEquals(status1.getStats().getTotalSize(), status2.getStats().getTotalSize());
+ }
+
+ public void testClonePreventsSnapshotDelete() throws Exception {
+ final String masterName = internalCluster().startMasterOnlyNode();
+ internalCluster().startDataOnlyNode();
+ final String repoName = "repo-name";
+ createRepository(repoName, "mock");
+
+ final String indexName = "index-1";
+ createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));
+ final String sourceSnapshot = "source-snapshot";
+ createFullSnapshot(repoName, sourceSnapshot);
+
+ indexRandomDocs(indexName, randomIntBetween(20, 100));
+
+ final String targetSnapshot = "target-snapshot";
+ blockNodeOnAnyFiles(repoName, masterName);
+ final ActionFuture cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexName);
+ waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
+ assertFalse(cloneFuture.isDone());
+
+ ConcurrentSnapshotExecutionException ex = expectThrows(ConcurrentSnapshotExecutionException.class,
+ () -> startDeleteSnapshot(repoName, sourceSnapshot).actionGet());
+ assertThat(ex.getMessage(), containsString("cannot delete snapshot while it is being cloned"));
+
+ unblockNode(repoName, masterName);
+ assertAcked(cloneFuture.get());
+ final List status = clusterAdmin().prepareSnapshotStatus(repoName)
+ .setSnapshots(sourceSnapshot, targetSnapshot).get().getSnapshots();
+ assertThat(status, hasSize(2));
+ final SnapshotIndexStatus status1 = status.get(0).getIndices().get(indexName);
+ final SnapshotIndexStatus status2 = status.get(1).getIndices().get(indexName);
+ assertEquals(status1.getStats().getTotalFileCount(), status2.getStats().getTotalFileCount());
+ assertEquals(status1.getStats().getTotalSize(), status2.getStats().getTotalSize());
+ }
+
+ public void testConcurrentCloneAndSnapshot() throws Exception {
+ internalCluster().startMasterOnlyNode();
+ final String dataNode = internalCluster().startDataOnlyNode();
+ final String repoName = "repo-name";
+ createRepository(repoName, "mock");
+
+ final String indexName = "index-1";
+ createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));
+ final String sourceSnapshot = "source-snapshot";
+ createFullSnapshot(repoName, sourceSnapshot);
+
+ indexRandomDocs(indexName, randomIntBetween(20, 100));
+
+ final String targetSnapshot = "target-snapshot";
+ final ActionFuture snapshot2Future =
+ startFullSnapshotBlockedOnDataNode("snapshot-2", repoName, dataNode);
+ waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
+ final ActionFuture cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexName);
+ awaitNumberOfSnapshotsInProgress(2);
+ unblockNode(repoName, dataNode);
+ assertAcked(cloneFuture.get());
+ assertSuccessful(snapshot2Future);
+ }
+
+ public void testLongRunningCloneAllowsConcurrentSnapshot() throws Exception {
+ // large snapshot pool so blocked snapshot threads from cloning don't prevent concurrent snapshot finalizations
+ final String masterNode = internalCluster().startMasterOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS);
+ internalCluster().startDataOnlyNode();
+ final String repoName = "test-repo";
+ createRepository(repoName, "mock");
+ final String indexSlow = "index-slow";
+ createIndexWithContent(indexSlow);
+
+ final String sourceSnapshot = "source-snapshot";
+ createFullSnapshot(repoName, sourceSnapshot);
+
+ final String targetSnapshot = "target-snapshot";
+ blockMasterOnShardClone(repoName);
+ final ActionFuture cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexSlow);
+ waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
+
+ final String indexFast = "index-fast";
+ createIndexWithRandomDocs(indexFast, randomIntBetween(20, 100));
+
+ assertSuccessful(clusterAdmin().prepareCreateSnapshot(repoName, "fast-snapshot")
+ .setIndices(indexFast).setWaitForCompletion(true).execute());
+
+ assertThat(cloneFuture.isDone(), is(false));
+ unblockNode(repoName, masterNode);
+
+ assertAcked(cloneFuture.get());
+ }
+
+ public void testLongRunningSnapshotAllowsConcurrentClone() throws Exception {
+ internalCluster().startMasterOnlyNode();
+ final String dataNode = internalCluster().startDataOnlyNode();
+ final String repoName = "test-repo";
+ createRepository(repoName, "mock");
+ final String indexSlow = "index-slow";
+ createIndexWithContent(indexSlow);
+
+ final String sourceSnapshot = "source-snapshot";
+ createFullSnapshot(repoName, sourceSnapshot);
+
+ final String indexFast = "index-fast";
+ createIndexWithRandomDocs(indexFast, randomIntBetween(20, 100));
+
+ blockDataNode(repoName, dataNode);
+ final ActionFuture snapshotFuture = clusterAdmin()
+ .prepareCreateSnapshot(repoName, "fast-snapshot").setIndices(indexFast).setWaitForCompletion(true).execute();
+ waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
+
+ final String targetSnapshot = "target-snapshot";
+ assertAcked(startClone(repoName, sourceSnapshot, targetSnapshot, indexSlow).get());
+
+ assertThat(snapshotFuture.isDone(), is(false));
+ unblockNode(repoName, dataNode);
+
+ assertSuccessful(snapshotFuture);
+ }
+
+ public void testDeletePreventsClone() throws Exception {
+ final String masterName = internalCluster().startMasterOnlyNode();
+ internalCluster().startDataOnlyNode();
+ final String repoName = "repo-name";
+ createRepository(repoName, "mock");
+
+ final String indexName = "index-1";
+ createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));
+ final String sourceSnapshot = "source-snapshot";
+ createFullSnapshot(repoName, sourceSnapshot);
+
+ indexRandomDocs(indexName, randomIntBetween(20, 100));
+
+ final String targetSnapshot = "target-snapshot";
+ blockNodeOnAnyFiles(repoName, masterName);
+ final ActionFuture deleteFuture = startDeleteSnapshot(repoName, sourceSnapshot);
+ waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
+ assertFalse(deleteFuture.isDone());
+
+ ConcurrentSnapshotExecutionException ex = expectThrows(ConcurrentSnapshotExecutionException.class, () ->
+ startClone(repoName, sourceSnapshot, targetSnapshot, indexName).actionGet());
+ assertThat(ex.getMessage(), containsString("cannot clone from snapshot that is being deleted"));
+
+ unblockNode(repoName, masterName);
+ assertAcked(deleteFuture.get());
+ }
+
+ public void testBackToBackClonesForIndexNotInCluster() throws Exception {
+ // large snapshot pool so blocked snapshot threads from cloning don't prevent concurrent snapshot finalizations
+ final String masterNode = internalCluster().startMasterOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS);
+ internalCluster().startDataOnlyNode();
+ final String repoName = "test-repo";
+ createRepository(repoName, "mock");
+ final String indexBlocked = "index-blocked";
+ createIndexWithContent(indexBlocked);
+
+ final String sourceSnapshot = "source-snapshot";
+ createFullSnapshot(repoName, sourceSnapshot);
+
+ assertAcked(admin().indices().prepareDelete(indexBlocked).get());
+
+ final String targetSnapshot1 = "target-snapshot";
+ blockMasterOnShardClone(repoName);
+ final ActionFuture cloneFuture1 = startClone(repoName, sourceSnapshot, targetSnapshot1, indexBlocked);
+ waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
+ assertThat(cloneFuture1.isDone(), is(false));
+
+ final int extraClones = randomIntBetween(1, 5);
+ final List> extraCloneFutures = new ArrayList<>(extraClones);
+ for (int i = 0; i < extraClones; i++) {
+ extraCloneFutures.add(startClone(repoName, sourceSnapshot, "target-snapshot-" + i, indexBlocked));
+ }
+ awaitNumberOfSnapshotsInProgress(1 + extraClones);
+ for (ActionFuture extraCloneFuture : extraCloneFutures) {
+ assertFalse(extraCloneFuture.isDone());
+ }
+
+ final int extraSnapshots = randomIntBetween(0, 5);
+ if (extraSnapshots > 0) {
+ createIndexWithContent(indexBlocked);
+ }
+
+ final List> extraSnapshotFutures = new ArrayList<>(extraSnapshots);
+ for (int i = 0; i < extraSnapshots; i++) {
+ extraSnapshotFutures.add(startFullSnapshot(repoName, "extra-snap-" + i));
+ }
+
+ awaitNumberOfSnapshotsInProgress(1 + extraClones + extraSnapshots);
+ for (ActionFuture extraSnapshotFuture : extraSnapshotFutures) {
+ assertFalse(extraSnapshotFuture.isDone());
+ }
+
+ unblockNode(repoName, masterNode);
+ assertAcked(cloneFuture1.get());
+
+ for (ActionFuture extraCloneFuture : extraCloneFutures) {
+ assertAcked(extraCloneFuture.get());
+ }
+ for (ActionFuture extraSnapshotFuture : extraSnapshotFutures) {
+ assertSuccessful(extraSnapshotFuture);
+ }
+ }
+
+ public void testMasterFailoverDuringCloneStep1() throws Exception {
+ internalCluster().startMasterOnlyNodes(3);
+ internalCluster().startDataOnlyNode();
+ final String repoName = "test-repo";
+ createRepository(repoName, "mock");
+ final String testIndex = "index-test";
+ createIndexWithContent(testIndex);
+
+ final String sourceSnapshot = "source-snapshot";
+ createFullSnapshot(repoName, sourceSnapshot);
+
+ blockMasterOnReadIndexMeta(repoName);
+ final ActionFuture cloneFuture =
+ startCloneFromDataNode(repoName, sourceSnapshot, "target-snapshot", testIndex);
+ awaitNumberOfSnapshotsInProgress(1);
+ final String masterNode = internalCluster().getMasterName();
+ waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
+ internalCluster().restartNode(masterNode);
+ boolean cloneSucceeded = false;
+ try {
+ cloneFuture.actionGet(TimeValue.timeValueSeconds(30L));
+ cloneSucceeded = true;
+ } catch (SnapshotException sne) {
+ // ignored, most of the time we will throw here but we could randomly run into a situation where the data node retries the
+ // snapshot on disconnect slowly enough for it to work out
+ }
+
+ awaitNoMoreRunningOperations(internalCluster().getMasterName());
+
+ assertAllSnapshotsSuccessful(getRepositoryData(repoName), cloneSucceeded ? 2 : 1);
+ }
+
+ public void testFailsOnCloneMissingIndices() {
+ internalCluster().startMasterOnlyNode();
+ internalCluster().startDataOnlyNode();
+ final String repoName = "repo-name";
+ final Path repoPath = randomRepoPath();
+ if (randomBoolean()) {
+ createIndexWithContent("test-idx");
+ }
+ createRepository(repoName, "fs", repoPath);
+
+ final String snapshotName = "snapshot";
+ createFullSnapshot(repoName, snapshotName);
+ expectThrows(IndexNotFoundException.class,
+ () -> startClone(repoName, snapshotName, "target-snapshot", "does-not-exist").actionGet());
+ }
+
+ public void testMasterFailoverDuringCloneStep2() throws Exception {
+ // large snapshot pool so blocked snapshot threads from cloning don't prevent concurrent snapshot finalizations
+ internalCluster().startMasterOnlyNodes(3, LARGE_SNAPSHOT_POOL_SETTINGS);
+ internalCluster().startDataOnlyNode();
+ final String repoName = "test-repo";
+ createRepository(repoName, "mock");
+ final String testIndex = "index-test";
+ createIndexWithContent(testIndex);
+
+ final String sourceSnapshot = "source-snapshot";
+ createFullSnapshot(repoName, sourceSnapshot);
+
+ final String targetSnapshot = "target-snapshot";
+ blockMasterOnShardClone(repoName);
+ final ActionFuture cloneFuture = startCloneFromDataNode(repoName, sourceSnapshot, targetSnapshot, testIndex);
+ awaitNumberOfSnapshotsInProgress(1);
+ final String masterNode = internalCluster().getMasterName();
+ waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
+ internalCluster().restartNode(masterNode);
+ expectThrows(SnapshotException.class, cloneFuture::actionGet);
+ awaitNoMoreRunningOperations(internalCluster().getMasterName());
+
+ assertAllSnapshotsSuccessful(getRepositoryData(repoName), 2);
+ }
+
+ public void testExceptionDuringShardClone() throws Exception {
+ // large snapshot pool so blocked snapshot threads from cloning don't prevent concurrent snapshot finalizations
+ internalCluster().startMasterOnlyNodes(3, LARGE_SNAPSHOT_POOL_SETTINGS);
+ internalCluster().startDataOnlyNode();
+ final String repoName = "test-repo";
+ createRepository(repoName, "mock");
+ final String testIndex = "index-test";
+ createIndexWithContent(testIndex);
+
+ final String sourceSnapshot = "source-snapshot";
+ createFullSnapshot(repoName, sourceSnapshot);
+
+ final String targetSnapshot = "target-snapshot";
+ blockMasterFromFinalizingSnapshotOnSnapFile(repoName);
+ final ActionFuture cloneFuture = startCloneFromDataNode(repoName, sourceSnapshot, targetSnapshot, testIndex);
+ awaitNumberOfSnapshotsInProgress(1);
+ final String masterNode = internalCluster().getMasterName();
+ waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
+ unblockNode(repoName, masterNode);
+ expectThrows(SnapshotException.class, cloneFuture::actionGet);
+ awaitNoMoreRunningOperations(internalCluster().getMasterName());
+ assertAllSnapshotsSuccessful(getRepositoryData(repoName), 1);
+ assertAcked(startDeleteSnapshot(repoName, sourceSnapshot).get());
+ }
+
+ public void testDoesNotStartOnBrokenSourceSnapshot() throws Exception {
+ internalCluster().startMasterOnlyNode();
+ final String dataNode = internalCluster().startDataOnlyNode();
+ final String repoName = "test-repo";
+ createRepository(repoName, "mock");
+ final String testIndex = "index-test";
+ createIndexWithContent(testIndex);
+
+ final String sourceSnapshot = "source-snapshot";
+ blockDataNode(repoName, dataNode);
+ final Client masterClient = internalCluster().masterClient();
+ final ActionFuture sourceSnapshotFuture = masterClient.admin().cluster()
+ .prepareCreateSnapshot(repoName, sourceSnapshot).setWaitForCompletion(true).execute();
+ awaitNumberOfSnapshotsInProgress(1);
+ waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
+ internalCluster().restartNode(dataNode);
+ assertThat(sourceSnapshotFuture.get().getSnapshotInfo().state(), is(SnapshotState.PARTIAL));
+
+ final SnapshotException sne = expectThrows(SnapshotException.class, () -> startClone(masterClient, repoName, sourceSnapshot,
+ "target-snapshot", testIndex).actionGet(TimeValue.timeValueSeconds(30L)));
+ assertThat(sne.getMessage(), containsString("Can't clone index [" + getRepositoryData(repoName).resolveIndexId(testIndex) +
+ "] because its snapshot was not successful."));
+ }
+
+ private ActionFuture startCloneFromDataNode(String repoName, String sourceSnapshot, String targetSnapshot,
+ String... indices) {
+ return startClone(dataNodeClient(), repoName, sourceSnapshot, targetSnapshot, indices);
+ }
+
+ private ActionFuture startClone(String repoName, String sourceSnapshot, String targetSnapshot,
+ String... indices) {
+ return startClone(client(), repoName, sourceSnapshot, targetSnapshot, indices);
+ }
+
+ private static ActionFuture startClone(Client client, String repoName, String sourceSnapshot,
+ String targetSnapshot, String... indices) {
+ return client.admin().cluster().prepareCloneSnapshot(repoName, sourceSnapshot, targetSnapshot).setIndices(indices).execute();
+ }
+
+ private void blockMasterOnReadIndexMeta(String repoName) {
+ ((MockRepository)internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName))
+ .setBlockOnReadIndexMeta();
+ }
+
+ private void blockMasterOnShardClone(String repoName) {
+ ((MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName))
+ .setBlockOnWriteShardLevelMeta();
+ }
+
+ /**
+ * Assert that given {@link RepositoryData} contains exactly the given number of snapshots and all of them are successful.
+ */
+ private static void assertAllSnapshotsSuccessful(RepositoryData repositoryData, int successfulSnapshotCount) {
+ final Collection snapshotIds = repositoryData.getSnapshotIds();
+ assertThat(snapshotIds, hasSize(successfulSnapshotCount));
+ for (SnapshotId snapshotId : snapshotIds) {
+ assertThat(repositoryData.getSnapshotState(snapshotId), is(SnapshotState.SUCCESS));
+ }
+ }
+
private static BlobStoreIndexShardSnapshots readShardGeneration(BlobStoreRepository repository, RepositoryShardId repositoryShardId,
String generation) {
return PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.supply(f,
diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java
index 8d5deea11a254..7f1951ddb7494 100644
--- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java
+++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java
@@ -1293,11 +1293,6 @@ private ActionFuture startFullSnapshotFromMasterClient(S
.setWaitForCompletion(true).execute();
}
- // Large snapshot pool settings to set up nodes for tests involving multiple repositories that need to have enough
- // threads so that blocking some threads on one repository doesn't block other repositories from doing work
- private static final Settings LARGE_SNAPSHOT_POOL_SETTINGS = Settings.builder()
- .put("thread_pool.snapshot.core", 5).put("thread_pool.snapshot.max", 5).build();
-
private void createIndexWithContent(String indexName, String nodeInclude, String nodeExclude) {
createIndexWithContent(indexName, indexSettingsNoReplicas(1)
.put("index.routing.allocation.include._name", nodeInclude)
diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/TransportCloneSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/TransportCloneSnapshotAction.java
index a8a7f18d0ecb2..55c14860a2bef 100644
--- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/TransportCloneSnapshotAction.java
+++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/TransportCloneSnapshotAction.java
@@ -72,6 +72,6 @@ protected ClusterBlockException checkBlock(CloneSnapshotRequest request, Cluster
@Override
protected void masterOperation(Task task, final CloneSnapshotRequest request, ClusterState state,
final ActionListener listener) {
- throw new UnsupportedOperationException("not implemented yet");
+ snapshotsService.cloneSnapshot(request, ActionListener.map(listener, v -> new AcknowledgedResponse(true)));
}
}
diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java
index 1efba60fe6570..8547c5f5e9a34 100644
--- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java
+++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java
@@ -35,8 +35,11 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
+import org.elasticsearch.repositories.RepositoryShardId;
import org.elasticsearch.repositories.RepositoryOperation;
import org.elasticsearch.snapshots.Snapshot;
+import org.elasticsearch.snapshots.SnapshotId;
+import org.elasticsearch.snapshots.SnapshotsService;
import java.io.IOException;
import java.util.Collections;
@@ -96,11 +99,32 @@ public static Entry startedEntry(Snapshot snapshot, boolean includeGlobalState,
indices, dataStreams, startTime, repositoryStateId, shards, null, userMetadata, version);
}
+ /**
+ * Creates the initial snapshot clone entry
+ *
+ * @param snapshot snapshot to clone into
+ * @param source snapshot to clone from
+ * @param indices indices to clone
+ * @param startTime start time
+ * @param repositoryStateId repository state id that this clone is based on
+ * @param version repository metadata version to write
+ * @return snapshot clone entry
+ */
+ public static Entry startClone(Snapshot snapshot, SnapshotId source, List indices, long startTime,
+ long repositoryStateId, Version version) {
+ return new SnapshotsInProgress.Entry(snapshot, true, false, State.STARTED, indices, Collections.emptyList(),
+ startTime, repositoryStateId, ImmutableOpenMap.of(), null, Collections.emptyMap(), version, source,
+ ImmutableOpenMap.of());
+ }
+
public static class Entry implements Writeable, ToXContent, RepositoryOperation {
private final State state;
private final Snapshot snapshot;
private final boolean includeGlobalState;
private final boolean partial;
+ /**
+ * Map of {@link ShardId} to {@link ShardSnapshotStatus} tracking the state of each shard snapshot operation.
+ */
private final ImmutableOpenMap shards;
private final List indices;
private final List dataStreams;
@@ -108,6 +132,19 @@ public static class Entry implements Writeable, ToXContent, RepositoryOperation
private final long repositoryStateId;
// see #useShardGenerations
private final Version version;
+
+ /**
+ * Source snapshot if this is a clone operation or {@code null} if this is a snapshot.
+ */
+ @Nullable
+ private final SnapshotId source;
+
+ /**
+ * Map of {@link RepositoryShardId} to {@link ShardSnapshotStatus} tracking the state of each shard clone operation in this entry
+ * the same way {@link #shards} tracks the status of each shard snapshot operation in non-clone entries.
+ */
+ private final ImmutableOpenMap clones;
+
@Nullable private final Map userMetadata;
@Nullable private final String failure;
@@ -116,6 +153,15 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta
List dataStreams, long startTime, long repositoryStateId,
ImmutableOpenMap shards, String failure, Map userMetadata,
Version version) {
+ this(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, repositoryStateId, shards, failure,
+ userMetadata, version, null, ImmutableOpenMap.of());
+ }
+
+ private Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices,
+ List dataStreams, long startTime, long repositoryStateId,
+ ImmutableOpenMap shards, String failure, Map userMetadata,
+ Version version, @Nullable SnapshotId source,
+ @Nullable ImmutableOpenMap clones) {
this.state = state;
this.snapshot = snapshot;
this.includeGlobalState = includeGlobalState;
@@ -124,11 +170,18 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta
this.dataStreams = dataStreams;
this.startTime = startTime;
this.shards = shards;
- assert assertShardsConsistent(state, indices, shards);
this.repositoryStateId = repositoryStateId;
this.failure = failure;
this.userMetadata = userMetadata;
this.version = version;
+ this.source = source;
+ if (source == null) {
+ assert clones == null || clones.isEmpty() : "Provided [" + clones + "] but no source";
+ this.clones = ImmutableOpenMap.of();
+ } else {
+ this.clones = clones;
+ }
+ assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones);
}
private Entry(StreamInput in) throws IOException {
@@ -144,21 +197,41 @@ private Entry(StreamInput in) throws IOException {
userMetadata = in.readMap();
version = Version.readVersion(in);
dataStreams = in.readStringList();
+ if (in.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) {
+ source = in.readOptionalWriteable(SnapshotId::new);
+ clones = in.readImmutableMap(RepositoryShardId::new, ShardSnapshotStatus::readFrom);
+ } else {
+ source = null;
+ clones = ImmutableOpenMap.of();
+ }
}
- private static boolean assertShardsConsistent(State state, List indices,
- ImmutableOpenMap shards) {
+ private static boolean assertShardsConsistent(SnapshotId source, State state, List indices,
+ ImmutableOpenMap shards,
+ ImmutableOpenMap clones) {
if ((state == State.INIT || state == State.ABORTED) && shards.isEmpty()) {
return true;
}
final Set indexNames = indices.stream().map(IndexId::getName).collect(Collectors.toSet());
final Set indexNamesInShards = new HashSet<>();
- shards.keysIt().forEachRemaining(s -> indexNamesInShards.add(s.getIndexName()));
- assert indexNames.equals(indexNamesInShards)
+ shards.iterator().forEachRemaining(s -> {
+ indexNamesInShards.add(s.key.getIndexName());
+ assert source == null || s.value.nodeId == null :
+ "Shard snapshot must not be assigned to data node when copying from snapshot [" + source + "]";
+ });
+ assert source == null || indexNames.isEmpty() == false : "No empty snapshot clones allowed";
+ assert source != null || indexNames.equals(indexNamesInShards)
: "Indices in shards " + indexNamesInShards + " differ from expected indices " + indexNames + " for state [" + state + "]";
- final boolean shardsCompleted = completed(shards.values());
- assert (state.completed() && shardsCompleted) || (state.completed() == false && shardsCompleted == false)
- : "Completed state must imply all shards completed but saw state [" + state + "] and shards " + shards;
+ final boolean shardsCompleted = completed(shards.values()) && completed(clones.values());
+ // Check state consistency for normal snapshots and started clone operations
+ if (source == null || clones.isEmpty() == false) {
+ assert (state.completed() && shardsCompleted) || (state.completed() == false && shardsCompleted == false)
+ : "Completed state must imply all shards completed but saw state [" + state + "] and shards " + shards;
+ }
+ if (source != null && state.completed()) {
+ assert hasFailures(clones) == false || state == State.FAILED
+ : "Failed shard clones in [" + clones + "] but state was [" + state + "]";
+ }
return true;
}
@@ -166,7 +239,17 @@ public Entry withRepoGen(long newRepoGen) {
assert newRepoGen > repositoryStateId : "Updated repository generation [" + newRepoGen
+ "] must be higher than current generation [" + repositoryStateId + "]";
return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, newRepoGen, shards, failure,
- userMetadata, version);
+ userMetadata, version, source, clones);
+ }
+
+ public Entry withClones(ImmutableOpenMap updatedClones) {
+ if (updatedClones.equals(clones)) {
+ return this;
+ }
+ return new Entry(snapshot, includeGlobalState, partial,
+ completed(updatedClones.values()) ? (hasFailures(updatedClones) ? State.FAILED : State.SUCCESS) :
+ state, indices, dataStreams, startTime, repositoryStateId, shards, failure, userMetadata, version, source,
+ updatedClones);
}
/**
@@ -203,7 +286,7 @@ public Entry abort() {
public Entry fail(ImmutableOpenMap shards, State state, String failure) {
return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, repositoryStateId, shards,
- failure, userMetadata, version);
+ failure, userMetadata, version, source, clones);
}
/**
@@ -291,6 +374,19 @@ public Version version() {
return version;
}
+ @Nullable
+ public SnapshotId source() {
+ return source;
+ }
+
+ public boolean isClone() {
+ return source != null;
+ }
+
+ public ImmutableOpenMap clones() {
+ return clones;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -307,6 +403,8 @@ public boolean equals(Object o) {
if (state != entry.state) return false;
if (repositoryStateId != entry.repositoryStateId) return false;
if (version.equals(entry.version) == false) return false;
+ if (Objects.equals(source, ((Entry) o).source) == false) return false;
+ if (clones.equals(((Entry) o).clones) == false) return false;
return true;
}
@@ -322,6 +420,8 @@ public int hashCode() {
result = 31 * result + Long.hashCode(startTime);
result = 31 * result + Long.hashCode(repositoryStateId);
result = 31 * result + version.hashCode();
+ result = 31 * result + (source == null ? 0 : source.hashCode());
+ result = 31 * result + clones.hashCode();
return result;
}
@@ -383,6 +483,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeMap(userMetadata);
Version.writeVersion(version, out);
out.writeStringCollection(dataStreams);
+ if (out.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) {
+ out.writeOptionalWriteable(source);
+ out.writeMap(clones);
+ }
}
@Override
@@ -406,6 +510,15 @@ public static boolean completed(ObjectContainer shards) {
return true;
}
+ private static boolean hasFailures(ImmutableOpenMap clones) {
+ for (ObjectCursor value : clones.values()) {
+ if (value.value.state().failed()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
public static class ShardSnapshotStatus implements Writeable {
/**
diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java
index 393e65794c762..e68873131baeb 100644
--- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java
+++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java
@@ -915,7 +915,7 @@ private static void validateSnapshotRestorable(final String repository, final Sn
}
}
- private static boolean failed(SnapshotInfo snapshot, String index) {
+ public static boolean failed(SnapshotInfo snapshot, String index) {
for (SnapshotShardFailure failure : snapshot.shardFailures()) {
if (index.equals(failure.index())) {
return true;
diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
index d0401d8299eed..4a958ecda070c 100644
--- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
+++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
@@ -197,6 +197,10 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) {
final String localNodeId = clusterService.localNode().getId();
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
final State entryState = entry.state();
+ if (entry.isClone()) {
+ // This is a snapshot clone, it will be executed on the current master
+ continue;
+ }
if (entryState == State.STARTED) {
Map startedShards = null;
final Snapshot snapshot = entry.snapshot();
diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
index b7b1058dbf69e..c13ae47cdb45e 100644
--- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
+++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
@@ -27,9 +27,13 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRunnable;
+import org.elasticsearch.action.StepListener;
+import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
@@ -43,6 +47,7 @@
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
+import org.elasticsearch.repositories.RepositoryShardId;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
@@ -104,6 +109,8 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -120,6 +127,8 @@
*/
public class SnapshotsService extends AbstractLifecycleComponent implements ClusterStateApplier {
+ public static final Version CLONE_SNAPSHOT_VERSION = Version.V_8_0_0;
+
public static final Version SHARD_GEN_IN_REPO_DATA_VERSION = Version.V_7_6_0;
public static final Version INDEX_GEN_IN_REPO_DATA_VERSION = Version.V_7_9_0;
@@ -152,6 +161,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
// Set of snapshots that are currently being ended by this node
private final Set endingSnapshots = Collections.synchronizedSet(new HashSet<>());
+ // Set of currently initializing clone operations
+ private final Set initializingClones = Collections.synchronizedSet(new HashSet<>());
+
private final UpdateSnapshotStatusAction updateSnapshotStatusHandler;
private final TransportService transportService;
@@ -228,29 +240,14 @@ public void createSnapshot(final CreateSnapshotRequest request, final ActionList
@Override
public ClusterState execute(ClusterState currentState) {
- // check if the snapshot name already exists in the repository
- if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
- throw new InvalidSnapshotNameException(
- repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists");
- }
+ ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository);
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
final List runningSnapshots = snapshots.entries();
- if (runningSnapshots.stream().anyMatch(s -> {
- final Snapshot running = s.snapshot();
- return running.getRepository().equals(repositoryName) && running.getSnapshotId().getName().equals(snapshotName);
- })) {
- throw new InvalidSnapshotNameException(
- repository.getMetadata().name(), snapshotName, "snapshot with the same name is already in-progress");
- }
+ ensureSnapshotNameNotRunning(runningSnapshots, repositoryName, snapshotName);
validate(repositoryName, snapshotName, currentState);
final SnapshotDeletionsInProgress deletionsInProgress =
currentState.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY);
- final RepositoryCleanupInProgress repositoryCleanupInProgress =
- currentState.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY);
- if (repositoryCleanupInProgress.hasCleanupInProgress()) {
- throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
- "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
- }
+ ensureNoCleanupInProgress(currentState, repositoryName, snapshotName);
ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshots, deletionsInProgress);
// Store newSnapshot here to be processed in clusterStateProcessed
List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request));
@@ -261,9 +258,7 @@ public ClusterState execute(ClusterState currentState) {
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
final List indexIds = repositoryData.resolveNewIndices(
- indices, runningSnapshots.stream().filter(entry -> entry.repository().equals(repositoryName))
- .flatMap(entry -> entry.indices().stream()).distinct()
- .collect(Collectors.toMap(IndexId::getName, Function.identity())));
+ indices, getInFlightIndexIds(runningSnapshots, repositoryName));
final Version version = minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null);
ImmutableOpenMap shards = shards(snapshots, deletionsInProgress, currentState.metadata(),
currentState.routingTable(), indexIds, useShardGenerations(version), repositoryData, repositoryName);
@@ -313,6 +308,276 @@ public TimeValue timeout() {
}, "create_snapshot [" + snapshotName + ']', listener::onFailure);
}
+ private static void ensureSnapshotNameNotRunning(List runningSnapshots, String repositoryName,
+ String snapshotName) {
+ if (runningSnapshots.stream().anyMatch(s -> {
+ final Snapshot running = s.snapshot();
+ return running.getRepository().equals(repositoryName) && running.getSnapshotId().getName().equals(snapshotName);
+ })) {
+ throw new InvalidSnapshotNameException(repositoryName, snapshotName, "snapshot with the same name is already in-progress");
+ }
+ }
+
+ private static Map getInFlightIndexIds(List runningSnapshots, String repositoryName) {
+ return runningSnapshots.stream().filter(entry -> entry.repository().equals(repositoryName))
+ .flatMap(entry -> entry.indices().stream()).distinct()
+ .collect(Collectors.toMap(IndexId::getName, Function.identity()));
+ }
+
+ public void cloneSnapshot(CloneSnapshotRequest request, ActionListener listener) {
+ final String repositoryName = request.repository();
+ Repository repository = repositoriesService.repository(repositoryName);
+ if (repository.isReadOnly()) {
+ listener.onFailure(new RepositoryException(repositoryName, "cannot create snapshot in a readonly repository"));
+ return;
+ }
+ final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.target());
+ validate(repositoryName, snapshotName);
+ final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID());
+ final Snapshot snapshot = new Snapshot(repositoryName, snapshotId);
+ initializingClones.add(snapshot);
+ repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask() {
+
+ private SnapshotsInProgress.Entry newEntry;
+
+ @Override
+ public ClusterState execute(ClusterState currentState) {
+ ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository);
+ ensureNoCleanupInProgress(currentState, repositoryName, snapshotName);
+ final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
+ final List runningSnapshots = snapshots.entries();
+ ensureSnapshotNameNotRunning(runningSnapshots, repositoryName, snapshotName);
+ validate(repositoryName, snapshotName, currentState);
+
+ final SnapshotId sourceSnapshotId = repositoryData.getSnapshotIds()
+ .stream()
+ .filter(src -> src.getName().equals(request.source()))
+ .findAny()
+ .orElseThrow(() -> new SnapshotMissingException(repositoryName, request.source()));
+ final SnapshotDeletionsInProgress deletionsInProgress =
+ currentState.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY);
+ if (deletionsInProgress.getEntries().stream().anyMatch(entry -> entry.getSnapshots().contains(sourceSnapshotId))) {
+ throw new ConcurrentSnapshotExecutionException(repositoryName, sourceSnapshotId.getName(),
+ "cannot clone from snapshot that is being deleted");
+ }
+ ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshots, deletionsInProgress);
+ final List indicesForSnapshot = new ArrayList<>();
+ for (IndexId indexId : repositoryData.getIndices().values()) {
+ if (repositoryData.getSnapshots(indexId).contains(sourceSnapshotId)) {
+ indicesForSnapshot.add(indexId.getName());
+ }
+ }
+ final List matchingIndices =
+ SnapshotUtils.filterIndices(indicesForSnapshot, request.indices(), request.indicesOptions());
+ if (matchingIndices.isEmpty()) {
+ throw new SnapshotException(new Snapshot(repositoryName, sourceSnapshotId),
+ "No indices in the source snapshot [" + sourceSnapshotId + "] matched requested pattern ["
+ + Strings.arrayToCommaDelimitedString(request.indices()) + "]");
+ }
+ newEntry = SnapshotsInProgress.startClone(
+ snapshot, sourceSnapshotId,
+ repositoryData.resolveIndices(matchingIndices),
+ threadPool.absoluteTimeInMillis(), repositoryData.getGenId(),
+ minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null));
+ final List newEntries = new ArrayList<>(runningSnapshots);
+ newEntries.add(newEntry);
+ return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
+ SnapshotsInProgress.of(List.copyOf(newEntries))).build();
+ }
+
+ @Override
+ public void onFailure(String source, Exception e) {
+ initializingClones.remove(snapshot);
+ logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to clone snapshot", repositoryName, snapshotName), e);
+ listener.onFailure(e);
+ }
+
+ @Override
+ public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
+ logger.info("snapshot clone [{}] started", snapshot);
+ addListener(snapshot, ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure));
+ startCloning(repository, newEntry);
+ }
+
+ @Override
+ public TimeValue timeout() {
+ initializingClones.remove(snapshot);
+ return request.masterNodeTimeout();
+ }
+ }, "clone_snapshot [" + request.source() + "][" + snapshotName + ']', listener::onFailure);
+ }
+
+ private static void ensureNoCleanupInProgress(ClusterState currentState, String repositoryName, String snapshotName) {
+ final RepositoryCleanupInProgress repositoryCleanupInProgress =
+ currentState.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY);
+ if (repositoryCleanupInProgress.hasCleanupInProgress()) {
+ throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
+ "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
+ }
+ }
+
+ private static void ensureSnapshotNameAvailableInRepo(RepositoryData repositoryData, String snapshotName, Repository repository) {
+ // check if the snapshot name already exists in the repository
+ if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
+ throw new InvalidSnapshotNameException(
+ repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists");
+ }
+ }
+
+ /**
+ * Determine the number of shards in each index of a clone operation and update the cluster state accordingly.
+ *
+ * @param repository repository to run operation on
+ * @param cloneEntry clone operation in the cluster state
+ */
+ private void startCloning(Repository repository, SnapshotsInProgress.Entry cloneEntry) {
+ final List indices = cloneEntry.indices();
+ final SnapshotId sourceSnapshot = cloneEntry.source();
+ final Snapshot targetSnapshot = cloneEntry.snapshot();
+
+ final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
+ // Exception handler for IO exceptions with loading index and repo metadata
+ final Consumer onFailure = e -> {
+ initializingClones.remove(targetSnapshot);
+ logger.info(() -> new ParameterizedMessage("Failed to start snapshot clone [{}]", cloneEntry), e);
+ removeFailedSnapshotFromClusterState(targetSnapshot, e, null);
+ };
+
+ // 1. step, load SnapshotInfo to make sure that source snapshot was successful for the indices we want to clone
+ // TODO: we could skip this step for snapshots with state SUCCESS
+ final StepListener snapshotInfoListener = new StepListener<>();
+ executor.execute(ActionRunnable.supply(snapshotInfoListener, () -> repository.getSnapshotInfo(sourceSnapshot)));
+
+ final StepListener>> allShardCountsListener = new StepListener<>();
+ final GroupedActionListener> shardCountListener =
+ new GroupedActionListener<>(allShardCountsListener, indices.size());
+ snapshotInfoListener.whenComplete(snapshotInfo -> {
+ for (IndexId indexId : indices) {
+ if (RestoreService.failed(snapshotInfo, indexId.getName())) {
+ throw new SnapshotException(targetSnapshot, "Can't clone index [" + indexId +
+ "] because its snapshot was not successful.");
+ }
+ }
+ // 2. step, load the number of shards we have in each index to be cloned from the index metadata.
+ repository.getRepositoryData(ActionListener.wrap(repositoryData -> {
+ for (IndexId index : indices) {
+ executor.execute(ActionRunnable.supply(shardCountListener, () -> {
+ final IndexMetadata metadata = repository.getSnapshotIndexMetaData(repositoryData, sourceSnapshot, index);
+ return Tuple.tuple(index, metadata.getNumberOfShards());
+ }));
+ }
+ }, onFailure));
+ }, onFailure);
+
+ // 3. step, we have all the shard counts, now update the cluster state to have clone jobs in the snap entry
+ allShardCountsListener.whenComplete(counts -> repository.executeConsistentStateUpdate(repoData -> new ClusterStateUpdateTask() {
+
+ private SnapshotsInProgress.Entry updatedEntry;
+
+ @Override
+ public ClusterState execute(ClusterState currentState) {
+ final SnapshotsInProgress snapshotsInProgress =
+ currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
+ final List updatedEntries = new ArrayList<>(snapshotsInProgress.entries());
+ boolean changed = false;
+ final String localNodeId = currentState.nodes().getLocalNodeId();
+ final String repoName = cloneEntry.repository();
+ final Map indexIds = getInFlightIndexIds(updatedEntries, repoName);
+ final ShardGenerations shardGenerations = repoData.shardGenerations();
+ for (int i = 0; i < updatedEntries.size(); i++) {
+ if (cloneEntry.equals(updatedEntries.get(i))) {
+ final ImmutableOpenMap.Builder clonesBuilder =
+ ImmutableOpenMap.builder();
+ // TODO: could be optimized by just dealing with repo shard id directly
+ final Set busyShardsInRepo =
+ busyShardsForRepo(repoName, snapshotsInProgress, currentState.metadata())
+ .stream()
+ .map(shardId -> new RepositoryShardId(indexIds.get(shardId.getIndexName()), shardId.getId()))
+ .collect(Collectors.toSet());
+ for (Tuple count : counts) {
+ for (int shardId = 0; shardId < count.v2(); shardId++) {
+ final RepositoryShardId repoShardId = new RepositoryShardId(count.v1(), shardId);
+ if (busyShardsInRepo.contains(repoShardId)) {
+ clonesBuilder.put(repoShardId, ShardSnapshotStatus.UNASSIGNED_QUEUED);
+ } else {
+ clonesBuilder.put(repoShardId,
+ new ShardSnapshotStatus(localNodeId, shardGenerations.getShardGen(count.v1(), shardId)));
+ }
+ }
+ }
+ updatedEntry = cloneEntry.withClones(clonesBuilder.build());
+ updatedEntries.set(i, updatedEntry);
+ changed = true;
+ break;
+ }
+ }
+ return updateWithSnapshots(currentState, changed ? SnapshotsInProgress.of(updatedEntries) : null, null);
+ }
+
+ @Override
+ public void onFailure(String source, Exception e) {
+ initializingClones.remove(targetSnapshot);
+ logger.info(() -> new ParameterizedMessage("Failed to start snapshot clone [{}]", cloneEntry), e);
+ failAllListenersOnMasterFailOver(e);
+ }
+
+ @Override
+ public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+ initializingClones.remove(targetSnapshot);
+ if (updatedEntry != null) {
+ final Snapshot target = updatedEntry.snapshot();
+ final SnapshotId sourceSnapshot = updatedEntry.source();
+ for (ObjectObjectCursor indexClone : updatedEntry.clones()) {
+ final ShardSnapshotStatus shardStatusBefore = indexClone.value;
+ if (shardStatusBefore.state() != ShardState.INIT) {
+ continue;
+ }
+ final RepositoryShardId repoShardId = indexClone.key;
+ runReadyClone(target, sourceSnapshot, shardStatusBefore, repoShardId, repository);
+ }
+ } else {
+ // Extremely unlikely corner case of master failing over between between starting the clone and
+ // starting shard clones.
+ logger.warn("Did not find expected entry [{}] in the cluster state", cloneEntry);
+ }
+ }
+ }, "start snapshot clone", onFailure), onFailure);
+ }
+
+ private final Set currentlyCloning = Collections.synchronizedSet(new HashSet<>());
+
+ private void runReadyClone(Snapshot target, SnapshotId sourceSnapshot, ShardSnapshotStatus shardStatusBefore,
+ RepositoryShardId repoShardId, Repository repository) {
+ final SnapshotId targetSnapshot = target.getSnapshotId();
+ final String localNodeId = clusterService.localNode().getId();
+ if (currentlyCloning.add(repoShardId)) {
+ repository.cloneShardSnapshot(sourceSnapshot, targetSnapshot, repoShardId, shardStatusBefore.generation(), ActionListener.wrap(
+ generation -> innerUpdateSnapshotState(
+ new ShardSnapshotUpdate(target, repoShardId,
+ new ShardSnapshotStatus(localNodeId, ShardState.SUCCESS, generation)),
+ ActionListener.runBefore(
+ ActionListener.wrap(
+ v -> logger.trace("Marked [{}] as successfully cloned from [{}] to [{}]", repoShardId,
+ sourceSnapshot, targetSnapshot),
+ e -> {
+ logger.warn("Cluster state update after successful shard clone [{}] failed", repoShardId);
+ failAllListenersOnMasterFailOver(e);
+ }
+ ), () -> currentlyCloning.remove(repoShardId))
+ ), e -> innerUpdateSnapshotState(
+ new ShardSnapshotUpdate(target, repoShardId,
+ new ShardSnapshotStatus(localNodeId, ShardState.FAILED, "failed to clone shard snapshot", null)),
+ ActionListener.runBefore(ActionListener.wrap(
+ v -> logger.trace("Marked [{}] as failed clone from [{}] to [{}]", repoShardId,
+ sourceSnapshot, targetSnapshot),
+ ex -> {
+ logger.warn("Cluster state update after failed shard clone [{}] failed", repoShardId);
+ failAllListenersOnMasterFailOver(ex);
+ }
+ ), () -> currentlyCloning.remove(repoShardId)))));
+ }
+ }
+
private void ensureBelowConcurrencyLimit(String repository, String name, SnapshotsInProgress snapshotsInProgress,
SnapshotDeletionsInProgress deletionsInProgress) {
final int inProgressOperations = snapshotsInProgress.entries().size() + deletionsInProgress.getEntries().size();
@@ -369,17 +634,24 @@ private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snaps
ShardGenerations.Builder builder = ShardGenerations.builder();
final Map indexLookup = new HashMap<>();
snapshot.indices().forEach(idx -> indexLookup.put(idx.getName(), idx));
- snapshot.shards().forEach(c -> {
- if (metadata.index(c.key.getIndex()) == null) {
- assert snapshot.partial() :
- "Index [" + c.key.getIndex() + "] was deleted during a snapshot but snapshot was not partial.";
- return;
- }
- final IndexId indexId = indexLookup.get(c.key.getIndexName());
- if (indexId != null) {
- builder.put(indexId, c.key.id(), c.value.generation());
- }
- });
+ if (snapshot.isClone()) {
+ snapshot.clones().forEach(c -> {
+ final IndexId indexId = indexLookup.get(c.key.indexName());
+ builder.put(indexId, c.key.shardId(), c.value.generation());
+ });
+ } else {
+ snapshot.shards().forEach(c -> {
+ if (metadata.index(c.key.getIndex()) == null) {
+ assert snapshot.partial() :
+ "Index [" + c.key.getIndex() + "] was deleted during a snapshot but snapshot was not partial.";
+ return;
+ }
+ final IndexId indexId = indexLookup.get(c.key.getIndexName());
+ if (indexId != null) {
+ builder.put(indexId, c.key.id(), c.value.generation());
+ }
+ });
+ }
return builder.build();
}
@@ -594,17 +866,27 @@ public ClusterState execute(ClusterState currentState) {
for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) {
if (statesToUpdate.contains(snapshot.state())) {
- ImmutableOpenMap shards = processWaitingShardsAndRemovedNodes(snapshot.shards(),
- routingTable, nodes, knownFailures.computeIfAbsent(snapshot.repository(), k -> new HashMap<>()));
- if (shards != null) {
- final SnapshotsInProgress.Entry updatedSnapshot = snapshot.withShardStates(shards);
- changed = true;
- if (updatedSnapshot.state().completed()) {
- finishedSnapshots.add(updatedSnapshot);
+ // Currently initializing clone
+ if (snapshot.isClone() && snapshot.clones().isEmpty()) {
+ if (initializingClones.contains(snapshot.snapshot())) {
+ updatedSnapshotEntries.add(snapshot);
+ } else {
+ logger.debug("removing not yet start clone operation [{}]", snapshot);
+ changed = true;
}
- updatedSnapshotEntries.add(updatedSnapshot);
} else {
- updatedSnapshotEntries.add(snapshot);
+ ImmutableOpenMap shards = processWaitingShardsAndRemovedNodes(snapshot.shards(),
+ routingTable, nodes, knownFailures.computeIfAbsent(snapshot.repository(), k -> new HashMap<>()));
+ if (shards != null) {
+ final SnapshotsInProgress.Entry updatedSnapshot = snapshot.withShardStates(shards);
+ changed = true;
+ if (updatedSnapshot.state().completed()) {
+ finishedSnapshots.add(updatedSnapshot);
+ }
+ updatedSnapshotEntries.add(updatedSnapshot);
+ } else {
+ updatedSnapshotEntries.add(snapshot);
+ }
}
} else if (snapshot.repositoryStateId() == RepositoryData.UNKNOWN_REPO_GEN) {
// BwC path, older versions could create entries with unknown repo GEN in INIT or ABORTED state that did not yet
@@ -656,6 +938,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
}
}
+ startExecutableClones(newState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY), null);
// run newly ready deletes
for (SnapshotDeletionsInProgress.Entry entry : deletionsToExecute) {
if (tryEnterRepoLoop(entry.repository())) {
@@ -791,6 +1074,11 @@ private static boolean removedNodesCleanupNeeded(SnapshotsInProgress snapshotsIn
* @param entry snapshot
*/
private void endSnapshot(SnapshotsInProgress.Entry entry, Metadata metadata, @Nullable RepositoryData repositoryData) {
+ if (entry.isClone() && entry.state() == State.FAILED) {
+ logger.debug("Removing failed snapshot clone [{}] from cluster state", entry);
+ removeFailedSnapshotFromClusterState(entry.snapshot(), new SnapshotException(entry.snapshot(), entry.failure()), null);
+ return;
+ }
final boolean newFinalization = endingSnapshots.add(entry.snapshot());
final String repoName = entry.repository();
if (tryEnterRepoLoop(repoName)) {
@@ -865,10 +1153,24 @@ private void finalizeSnapshotEntry(SnapshotsInProgress.Entry entry, Metadata met
entry.startTime(), failure, threadPool.absoluteTimeInMillis(),
entry.partial() ? shardGenerations.totalShards() : entry.shards().size(), shardFailures,
entry.includeGlobalState(), entry.userMetadata());
- repositoriesService.repository(snapshot.getRepository()).finalizeSnapshot(
+ final StepListener metadataListener = new StepListener<>();
+ final Repository repo = repositoriesService.repository(snapshot.getRepository());
+ if (entry.isClone()) {
+ threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(
+ ActionRunnable.supply(metadataListener, () -> {
+ final Metadata.Builder metaBuilder = Metadata.builder(repo.getSnapshotGlobalMetadata(entry.source()));
+ for (IndexId index : entry.indices()) {
+ metaBuilder.put(repo.getSnapshotIndexMetaData(repositoryData, entry.source(), index), false);
+ }
+ return metaBuilder.build();
+ }));
+ } else {
+ metadataListener.onResponse(metadata);
+ }
+ metadataListener.whenComplete(meta -> repo.finalizeSnapshot(
shardGenerations,
- repositoryData.getGenId(),
- metadataForSnapshot(entry, metadata),
+ repositoryData.getGenId(),
+ metadataForSnapshot(entry, meta),
snapshotInfo,
entry.version(),
state -> stateWithoutSnapshot(state, snapshot),
@@ -878,7 +1180,8 @@ private void finalizeSnapshotEntry(SnapshotsInProgress.Entry entry, Metadata met
snapshotCompletionListeners.remove(snapshot), Tuple.tuple(newRepoData, snapshotInfo));
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
runNextQueuedOperation(newRepoData, repository, true);
- }, e -> handleFinalizationFailure(e, entry, repositoryData)));
+ }, e -> handleFinalizationFailure(e, entry, repositoryData))),
+ e -> handleFinalizationFailure(e, entry, repositoryData));
} catch (Exception e) {
assert false : new AssertionError(e);
handleFinalizationFailure(e, entry, repositoryData);
@@ -1046,10 +1349,12 @@ private static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot sn
* used when the snapshot fails for some reason. During normal operation the snapshot repository will remove the
* {@link SnapshotsInProgress.Entry} from the cluster state once it's done finalizing the snapshot.
*
- * @param snapshot snapshot that failed
- * @param failure exception that failed the snapshot
+ * @param snapshot snapshot that failed
+ * @param failure exception that failed the snapshot
+ * @param repositoryData repository data if the next finalization operation on the repository should be attempted or {@code null} if
+ * no further actions should be executed
*/
- private void removeFailedSnapshotFromClusterState(Snapshot snapshot, Exception failure, RepositoryData repositoryData) {
+ private void removeFailedSnapshotFromClusterState(Snapshot snapshot, Exception failure, @Nullable RepositoryData repositoryData) {
assert failure != null : "Failure must be supplied";
clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() {
@@ -1080,7 +1385,9 @@ public void onNoLongerMaster(String source) {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
failSnapshotCompletionListeners(snapshot, failure);
- runNextQueuedOperation(repositoryData, snapshot.getRepository(), true);
+ if (repositoryData != null) {
+ runNextQueuedOperation(repositoryData, snapshot.getRepository(), true);
+ }
}
});
}
@@ -1159,6 +1466,17 @@ public ClusterState execute(ClusterState currentState) {
if (snapshotIds.isEmpty()) {
return currentState;
}
+ final Set activeCloneSources = snapshots.entries()
+ .stream()
+ .filter(SnapshotsInProgress.Entry::isClone)
+ .map(SnapshotsInProgress.Entry::source)
+ .collect(Collectors.toSet());
+ for (SnapshotId snapshotId : snapshotIds) {
+ if (activeCloneSources.contains(snapshotId)) {
+ throw new ConcurrentSnapshotExecutionException(new Snapshot(repoName, snapshotId),
+ "cannot delete snapshot while it is being cloned");
+ }
+ }
final SnapshotDeletionsInProgress deletionsInProgress =
currentState.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY);
final RepositoryCleanupInProgress repositoryCleanupInProgress =
@@ -1749,7 +2067,7 @@ private static ImmutableOpenMap builder = ImmutableOpenMap.builder();
final ShardGenerations shardGenerations = repositoryData.shardGenerations();
- final Set inProgressShards = busyShardsForRepo(repoName, snapshotsInProgress);
+ final Set inProgressShards = busyShardsForRepo(repoName, snapshotsInProgress, metadata);
final boolean readyToExecute = deletionsInProgress == null || deletionsInProgress.getEntries().stream()
.noneMatch(entry -> entry.repository().equals(repoName) && entry.state() == SnapshotDeletionsInProgress.State.STARTED);
for (IndexId index : indices) {
@@ -1809,16 +2127,32 @@ private static ImmutableOpenMap busyShardsForRepo(String repoName, @Nullable SnapshotsInProgress snapshots) {
+ private static Set busyShardsForRepo(String repoName, @Nullable SnapshotsInProgress snapshots, Metadata metadata) {
final List runningSnapshots = snapshots == null ? List.of() : snapshots.entries();
final Set inProgressShards = new HashSet<>();
for (SnapshotsInProgress.Entry runningSnapshot : runningSnapshots) {
if (runningSnapshot.repository().equals(repoName) == false) {
continue;
}
- for (ObjectObjectCursor shard : runningSnapshot.shards()) {
- if (shard.value.isActive()) {
- inProgressShards.add(shard.key);
+ if (runningSnapshot.isClone()) {
+ for (ObjectObjectCursor clone : runningSnapshot.clones()) {
+ final ShardSnapshotStatus shardState = clone.value;
+ if (shardState.isActive()) {
+ IndexMetadata indexMeta = metadata.index(clone.key.indexName());
+ final Index index;
+ if (indexMeta == null) {
+ index = new Index(clone.key.indexName(), IndexMetadata.INDEX_UUID_NA_VALUE);
+ } else {
+ index = indexMeta.getIndex();
+ }
+ inProgressShards.add(new ShardId(index, clone.key.shardId()));
+ }
+ }
+ } else {
+ for (ObjectObjectCursor shard : runningSnapshot.shards()) {
+ if (shard.value.isActive()) {
+ inProgressShards.add(shard.key);
+ }
}
}
}
@@ -1911,97 +2245,282 @@ public boolean assertAllListenersResolved() {
return true;
}
- private static final ClusterStateTaskExecutor SHARD_STATE_EXECUTOR = (currentState, tasks) -> {
+ /**
+ * Executor that applies {@link ShardSnapshotUpdate}s to the current cluster state. The algorithm implemented below works as described
+ * below:
+ * Every shard snapshot or clone state update can result in multiple snapshots being updated. In order to determine whether or not a
+ * shard update has an effect we use an outer loop over all current executing snapshot operations that iterates over them in the order
+ * they were started in and an inner loop over the list of shard update tasks.
+ *
+ * If the inner loop finds that a shard update task applies to a given snapshot and either a shard-snapshot or shard-clone operation in
+ * it then it will update the state of the snapshot entry accordingly. If that update was a noop, then the task is removed from the
+ * iteration as it was already applied before and likely just arrived on the master node again due to retries upstream.
+ * If the update was not a noop, then it means that the shard it applied to is now available for another snapshot or clone operation
+ * to be re-assigned if there is another snapshot operation that is waiting for the shard to become available. We therefore record the
+ * fact that a task was executed by adding it to a collection of executed tasks. If a subsequent execution of the outer loop finds that
+ * a task in the executed tasks collection applied to a shard it was waiting for to become available, then the shard snapshot operation
+ * will be started for that snapshot entry and the task removed from the collection of tasks that need to be applied to snapshot
+ * entries since it can not have any further effects.
+ *
+ * Package private to allow for tests.
+ */
+ static final ClusterStateTaskExecutor SHARD_STATE_EXECUTOR = (currentState, tasks) -> {
int changedCount = 0;
int startedCount = 0;
final List entries = new ArrayList<>();
+ final String localNodeId = currentState.nodes().getLocalNodeId();
// Tasks to check for updates for running snapshots.
final List unconsumedTasks = new ArrayList<>(tasks);
// Tasks that were used to complete an existing in-progress shard snapshot
final Set executedTasks = new HashSet<>();
+ // Outer loop over all snapshot entries in the order they were created in
for (SnapshotsInProgress.Entry entry : currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) {
if (entry.state().completed()) {
+ // completed snapshots do not require any updates so we just add them to the new list and keep going
entries.add(entry);
continue;
}
ImmutableOpenMap.Builder shards = null;
+ ImmutableOpenMap.Builder clones = null;
+ Map indicesLookup = null;
+ // inner loop over all the shard updates that are potentially applicable to the current snapshot entry
for (Iterator iterator = unconsumedTasks.iterator(); iterator.hasNext(); ) {
final ShardSnapshotUpdate updateSnapshotState = iterator.next();
final Snapshot updatedSnapshot = updateSnapshotState.snapshot;
final String updatedRepository = updatedSnapshot.getRepository();
if (entry.repository().equals(updatedRepository) == false) {
+ // the update applies to a different repository so it is irrelevant here
continue;
}
- final ShardId finishedShardId = updateSnapshotState.shardId;
- if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) {
- final ShardSnapshotStatus existing = entry.shards().get(finishedShardId);
- if (existing == null) {
- logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]",
- updateSnapshotState, entry);
- assert false : "This should never happen, data nodes should only send updates for expected shards";
- continue;
- }
- if (existing.state().completed()) {
- // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
- iterator.remove();
- continue;
- }
- logger.trace("[{}] Updating shard [{}] with status [{}]", updatedSnapshot,
- finishedShardId, updateSnapshotState.updatedState.state());
- if (shards == null) {
- shards = ImmutableOpenMap.builder(entry.shards());
- }
- shards.put(finishedShardId, updateSnapshotState.updatedState);
- executedTasks.add(updateSnapshotState);
- changedCount++;
- } else if (executedTasks.contains(updateSnapshotState)) {
- // tasks that completed a shard might allow starting a new shard snapshot for the current snapshot
- final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId);
- if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
- continue;
+ if (updateSnapshotState.isClone()) {
+ // The update applied to a shard clone operation
+ final RepositoryShardId finishedShardId = updateSnapshotState.repoShardId;
+ if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) {
+ assert entry.isClone() : "Non-clone snapshot [" + entry + "] received update for clone ["
+ + updateSnapshotState + "]";
+ final ShardSnapshotStatus existing = entry.clones().get(finishedShardId);
+ if (existing == null) {
+ logger.warn("Received clone shard snapshot status update [{}] but this shard is not tracked in [{}]",
+ updateSnapshotState, entry);
+ assert false : "This should never happen, master will not submit a state update for a non-existing clone";
+ continue;
+ }
+ if (existing.state().completed()) {
+ // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
+ iterator.remove();
+ continue;
+ }
+ logger.trace("[{}] Updating shard clone [{}] with status [{}]", updatedSnapshot,
+ finishedShardId, updateSnapshotState.updatedState.state());
+ if (clones == null) {
+ clones = ImmutableOpenMap.builder(entry.clones());
+ }
+ changedCount++;
+ clones.put(finishedShardId, updateSnapshotState.updatedState);
+ executedTasks.add(updateSnapshotState);
+ } else if (executedTasks.contains(updateSnapshotState)) {
+ // the update was already executed on the clone operation it applied to, now we check if it may be possible to
+ // start a shard snapshot or clone operation on the current entry
+ if (entry.isClone()) {
+ // current entry is a clone operation
+ final ShardSnapshotStatus existingStatus = entry.clones().get(finishedShardId);
+ if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
+ continue;
+ }
+ if (clones == null) {
+ clones = ImmutableOpenMap.builder(entry.clones());
+ }
+ final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
+ logger.trace("Starting clone [{}] on [{}] with generation [{}]", finishedShardId,
+ finishedStatus.nodeId(), finishedStatus.generation());
+ assert finishedStatus.nodeId().equals(localNodeId) : "Clone updated with node id [" + finishedStatus.nodeId() +
+ "] but local node id is [" + localNodeId + "]";
+ clones.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation()));
+ iterator.remove();
+ } else {
+ // current entry is a snapshot operation so we must translate the repository shard id to a routing shard id
+ final IndexMetadata indexMeta = currentState.metadata().index(finishedShardId.indexName());
+ if (indexMeta == null) {
+ // The index name that finished cloning does not exist in the cluster state so it isn't relevant to a
+ // normal snapshot
+ continue;
+ }
+ final ShardId finishedRoutingShardId = new ShardId(indexMeta.getIndex(), finishedShardId.shardId());
+ final ShardSnapshotStatus existingStatus = entry.shards().get(finishedRoutingShardId);
+ if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
+ continue;
+ }
+ if (shards == null) {
+ shards = ImmutableOpenMap.builder(entry.shards());
+ }
+ final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
+ logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId,
+ finishedStatus.nodeId(), finishedStatus.generation());
+ // A clone was updated, so we must use the correct data node id for the reassignment as actual shard
+ // snapshot
+ final ShardSnapshotStatus shardSnapshotStatus = startShardSnapshotAfterClone(currentState,
+ updateSnapshotState.updatedState.generation(), finishedRoutingShardId);
+ shards.put(finishedRoutingShardId, shardSnapshotStatus);
+ if (shardSnapshotStatus.isActive()) {
+ // only remove the update from the list of tasks that might hold a reusable shard if we actually
+ // started a snapshot and didn't just fail
+ iterator.remove();
+ }
+ }
}
- if (shards == null) {
- shards = ImmutableOpenMap.builder(entry.shards());
+ } else {
+ // a (non-clone) shard snapshot operation was updated
+ final ShardId finishedShardId = updateSnapshotState.shardId;
+ if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) {
+ final ShardSnapshotStatus existing = entry.shards().get(finishedShardId);
+ if (existing == null) {
+ logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]",
+ updateSnapshotState, entry);
+ assert false : "This should never happen, data nodes should only send updates for expected shards";
+ continue;
+ }
+ if (existing.state().completed()) {
+ // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
+ iterator.remove();
+ continue;
+ }
+ logger.trace("[{}] Updating shard [{}] with status [{}]", updatedSnapshot,
+ finishedShardId, updateSnapshotState.updatedState.state());
+ if (shards == null) {
+ shards = ImmutableOpenMap.builder(entry.shards());
+ }
+ shards.put(finishedShardId, updateSnapshotState.updatedState);
+ executedTasks.add(updateSnapshotState);
+ changedCount++;
+ } else if (executedTasks.contains(updateSnapshotState)) {
+ // We applied the update for a shard snapshot state to its snapshot entry, now check if we can update
+ // either a clone or a snapshot
+ if (entry.isClone()) {
+ // Since we updated a normal snapshot we need to translate its shard ids to repository shard ids which requires
+ // a lookup for the index ids
+ if (indicesLookup == null) {
+ indicesLookup = entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity()));
+ }
+ // shard snapshot was completed, we check if we can start a clone operation for the same repo shard
+ final IndexId indexId = indicesLookup.get(finishedShardId.getIndexName());
+ // If the lookup finds the index id then at least the entry is concerned with the index id just updated
+ // so we check on a shard level
+ if (indexId != null) {
+ final RepositoryShardId repoShardId = new RepositoryShardId(indexId, finishedShardId.getId());
+ final ShardSnapshotStatus existingStatus = entry.clones().get(repoShardId);
+ if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
+ continue;
+ }
+ if (clones == null) {
+ clones = ImmutableOpenMap.builder(entry.clones());
+ }
+ final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
+ logger.trace("Starting clone [{}] on [{}] with generation [{}]", finishedShardId,
+ finishedStatus.nodeId(), finishedStatus.generation());
+ clones.put(repoShardId, new ShardSnapshotStatus(localNodeId, finishedStatus.generation()));
+ iterator.remove();
+ startedCount++;
+ }
+ } else {
+ // shard snapshot was completed, we check if we can start another snapshot
+ final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId);
+ if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
+ continue;
+ }
+ if (shards == null) {
+ shards = ImmutableOpenMap.builder(entry.shards());
+ }
+ final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
+ logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId,
+ finishedStatus.nodeId(), finishedStatus.generation());
+ shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation()));
+ iterator.remove();
+ }
}
- final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
- logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId,
- finishedStatus.nodeId(), finishedStatus.generation());
- shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation()));
- iterator.remove();
- startedCount++;
}
}
- if (shards == null) {
- entries.add(entry);
+ final SnapshotsInProgress.Entry updatedEntry;
+ if (shards != null) {
+ assert clones == null : "Should not have updated clones when updating shard snapshots but saw " + clones +
+ " as well as " + shards;
+ updatedEntry = entry.withShardStates(shards.build());
+ } else if (clones != null) {
+ updatedEntry = entry.withClones(clones.build());
} else {
- entries.add(entry.withShardStates(shards.build()));
+ updatedEntry = entry;
}
+ entries.add(updatedEntry);
}
if (changedCount > 0) {
logger.trace("changed cluster state triggered by [{}] snapshot state updates and resulted in starting " +
"[{}] shard snapshots", changedCount, startedCount);
- return ClusterStateTaskExecutor.ClusterTasksResult.builder().successes(tasks).build(
- ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build());
+ return ClusterStateTaskExecutor.ClusterTasksResult.builder().successes(tasks)
+ .build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
+ SnapshotsInProgress.of(entries)).build());
}
return ClusterStateTaskExecutor.ClusterTasksResult.builder().successes(tasks).build(currentState);
};
+ /**
+ * Creates a {@link ShardSnapshotStatus} entry for a snapshot after the shard has become available for snapshotting as a result
+ * of a snapshot clone completing.
+ *
+ * @param currentState current cluster state
+ * @param shardGeneration shard generation of the shard in the repository
+ * @param shardId shard id of the shard that just finished cloning
+ * @return shard snapshot status
+ */
+ private static ShardSnapshotStatus startShardSnapshotAfterClone(ClusterState currentState, String shardGeneration, ShardId shardId) {
+ final ShardRouting primary = currentState.routingTable().index(shardId.getIndex()).shard(shardId.id()).primaryShard();
+ final ShardSnapshotStatus shardSnapshotStatus;
+ if (primary == null || !primary.assignedToNode()) {
+ shardSnapshotStatus = new ShardSnapshotStatus(
+ null, ShardState.MISSING, "primary shard is not allocated", shardGeneration);
+ } else if (primary.relocating() || primary.initializing()) {
+ shardSnapshotStatus =
+ new ShardSnapshotStatus(primary.currentNodeId(), ShardState.WAITING, shardGeneration);
+ } else if (primary.started() == false) {
+ shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), ShardState.MISSING,
+ "primary shard hasn't been started yet", shardGeneration);
+ } else {
+ shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), shardGeneration);
+ }
+ return shardSnapshotStatus;
+ }
+
/**
* An update to the snapshot state of a shard.
+ *
+ * Package private for testing
*/
- private static final class ShardSnapshotUpdate {
+ static final class ShardSnapshotUpdate {
private final Snapshot snapshot;
private final ShardId shardId;
+ private final RepositoryShardId repoShardId;
+
private final ShardSnapshotStatus updatedState;
- private ShardSnapshotUpdate(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus updatedState) {
+ ShardSnapshotUpdate(Snapshot snapshot, RepositoryShardId repositoryShardId, ShardSnapshotStatus updatedState) {
+ this.snapshot = snapshot;
+ this.shardId = null;
+ this.updatedState = updatedState;
+ this.repoShardId = repositoryShardId;
+ }
+
+ ShardSnapshotUpdate(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus updatedState) {
this.snapshot = snapshot;
this.shardId = shardId;
this.updatedState = updatedState;
+ repoShardId = null;
+ }
+
+
+ public boolean isClone() {
+ return repoShardId != null;
}
@Override
@@ -2013,13 +2532,14 @@ public boolean equals(Object other) {
return false;
}
final ShardSnapshotUpdate that = (ShardSnapshotUpdate) other;
- return this.snapshot.equals(that.snapshot) && this.shardId.equals(that.shardId) && this.updatedState == that.updatedState;
+ return this.snapshot.equals(that.snapshot) && Objects.equals(this.shardId, that.shardId)
+ && Objects.equals(this.repoShardId, that.repoShardId) && this.updatedState == that.updatedState;
}
@Override
public int hashCode() {
- return Objects.hash(snapshot, shardId, updatedState);
+ return Objects.hash(snapshot, shardId, updatedState, repoShardId);
}
}
@@ -2048,19 +2568,35 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
} finally {
// Maybe this state update completed the snapshot. If we are not already ending it because of a concurrent
// state update we check if its state is completed and end it if it is.
+ final SnapshotsInProgress snapshotsInProgress =
+ newState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
if (endingSnapshots.contains(update.snapshot) == false) {
- final SnapshotsInProgress snapshotsInProgress = newState.custom(SnapshotsInProgress.TYPE);
final SnapshotsInProgress.Entry updatedEntry = snapshotsInProgress.snapshot(update.snapshot);
// If the entry is still in the cluster state and is completed, try finalizing the snapshot in the repo
if (updatedEntry != null && updatedEntry.state().completed()) {
endSnapshot(updatedEntry, newState.metadata(), null);
}
}
+ startExecutableClones(snapshotsInProgress, update.snapshot.getRepository());
}
}
});
}
+ private void startExecutableClones(SnapshotsInProgress snapshotsInProgress, @Nullable String repoName) {
+ for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
+ if (entry.isClone() && entry.state() == State.STARTED && (repoName == null || entry.repository().equals(repoName))) {
+ // this is a clone, see if new work is ready
+ for (ObjectObjectCursor clone : entry.clones()) {
+ if (clone.value.state() == ShardState.INIT) {
+ runReadyClone(entry.snapshot(), entry.source(), clone.value, clone.key,
+ repositoriesService.repository(entry.repository()));
+ }
+ }
+ }
+ }
+ }
+
private class UpdateSnapshotStatusAction
extends TransportMasterNodeAction {
UpdateSnapshotStatusAction(TransportService transportService, ClusterService clusterService,
diff --git a/server/src/main/java/org/elasticsearch/snapshots/package-info.java b/server/src/main/java/org/elasticsearch/snapshots/package-info.java
index 5c08aadece0b7..e9fea864ff7c7 100644
--- a/server/src/main/java/org/elasticsearch/snapshots/package-info.java
+++ b/server/src/main/java/org/elasticsearch/snapshots/package-info.java
@@ -99,6 +99,9 @@
* update to remove the deletion's entry in {@code SnapshotDeletionsInProgress} which concludes the process of deleting a snapshot.
*
*
+ * Cloning a Snapshot
+ * TODO: write up the steps in a snapshot clone properly
+ *
* Concurrent Snapshot Operations
*
* Snapshot create and delete operations may be started concurrently. Operations targeting different repositories run independently of
diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java
new file mode 100644
index 0000000000000..8e15d6d5a1e48
--- /dev/null
+++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.snapshots;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.SnapshotsInProgress;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.IndexRoutingTable;
+import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
+import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.cluster.routing.ShardRoutingState;
+import org.elasticsearch.cluster.routing.TestShardRouting;
+import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.repositories.IndexId;
+import org.elasticsearch.repositories.RepositoryShardId;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
+import static org.hamcrest.Matchers.is;
+
+public class SnapshotsServiceTests extends ESTestCase {
+
+ public void testNoopShardStateUpdates() throws Exception {
+ final String repoName = "test-repo";
+ final Snapshot snapshot = snapshot(repoName, "snapshot-1");
+ final SnapshotsInProgress.Entry snapshotNoShards = snapshotEntry(snapshot, Collections.emptyList(), ImmutableOpenMap.of());
+
+ final String indexName1 = "index-1";
+ final ShardId shardId1 = new ShardId(index(indexName1), 0);
+ {
+ final ClusterState state = stateWithSnapshots(snapshotNoShards);
+ final SnapshotsService.ShardSnapshotUpdate shardCompletion =
+ new SnapshotsService.ShardSnapshotUpdate(snapshot, shardId1, successfulShardStatus(uuid()));
+ assertIsNoop(state, shardCompletion);
+ }
+ {
+ final ClusterState state = stateWithSnapshots(
+ snapshotEntry(
+ snapshot, Collections.singletonList(indexId(indexName1)), shardsMap(shardId1, initShardStatus(uuid()))));
+ final SnapshotsService.ShardSnapshotUpdate shardCompletion = new SnapshotsService.ShardSnapshotUpdate(
+ snapshot("other-repo", snapshot.getSnapshotId().getName()), shardId1, successfulShardStatus(uuid()));
+ assertIsNoop(state, shardCompletion);
+ }
+ }
+
+ public void testUpdateSnapshotToSuccess() throws Exception {
+ final String repoName = "test-repo";
+ final Snapshot sn1 = snapshot(repoName, "snapshot-1");
+ final String indexName1 = "index-1";
+ final String dataNodeId = uuid();
+ final IndexId indexId1 = indexId(indexName1);
+ final ShardId shardId1 = new ShardId(index(indexName1), 0);
+ final SnapshotsInProgress.Entry snapshotSingleShard =
+ snapshotEntry(sn1, Collections.singletonList(indexId1), shardsMap(shardId1, initShardStatus(dataNodeId)));
+
+ assertThat(snapshotSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
+
+ final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(sn1, shardId1, dataNodeId);
+ final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(snapshotSingleShard), completeShard);
+ final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+ final SnapshotsInProgress.Entry updatedSnapshot1 = snapshotsInProgress.entries().get(0);
+ assertThat(updatedSnapshot1.state(), is(SnapshotsInProgress.State.SUCCESS));
+ assertIsNoop(updatedClusterState, completeShard);
+ }
+
+ public void testUpdateSnapshotMultipleShards() throws Exception {
+ final String repoName = "test-repo";
+ final Snapshot sn1 = snapshot(repoName, "snapshot-1");
+ final String indexName1 = "index-1";
+ final String dataNodeId = uuid();
+ final IndexId indexId1 = indexId(indexName1);
+ final Index routingIndex1 = index(indexName1);
+ final ShardId shardId1 = new ShardId(routingIndex1, 0);
+ final ShardId shardId2 = new ShardId(routingIndex1, 1);
+ final SnapshotsInProgress.ShardSnapshotStatus shardInitStatus = initShardStatus(dataNodeId);
+ final SnapshotsInProgress.Entry snapshotSingleShard = snapshotEntry(sn1, Collections.singletonList(indexId1),
+ ImmutableOpenMap.builder(shardsMap(shardId1, shardInitStatus)).fPut(shardId2, shardInitStatus).build());
+
+ assertThat(snapshotSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
+
+ final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(sn1, shardId1, dataNodeId);
+ final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(snapshotSingleShard), completeShard);
+ final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+ final SnapshotsInProgress.Entry updatedSnapshot1 = snapshotsInProgress.entries().get(0);
+ assertThat(updatedSnapshot1.state(), is(SnapshotsInProgress.State.STARTED));
+ assertIsNoop(updatedClusterState, completeShard);
+ }
+
+ public void testUpdateCloneToSuccess() throws Exception {
+ final String repoName = "test-repo";
+ final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot");
+ final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot");
+ final String indexName1 = "index-1";
+ final String dataNodeId = uuid();
+ final IndexId indexId1 = indexId(indexName1);
+ final RepositoryShardId shardId1 = new RepositoryShardId(indexId1, 0);
+ final SnapshotsInProgress.Entry cloneSingleShard =
+ cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(), clonesMap(shardId1, initShardStatus(dataNodeId)));
+
+ assertThat(cloneSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
+
+ final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(targetSnapshot, shardId1, dataNodeId);
+ final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(cloneSingleShard), completeShard);
+ final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+ final SnapshotsInProgress.Entry updatedSnapshot1 = snapshotsInProgress.entries().get(0);
+ assertThat(updatedSnapshot1.state(), is(SnapshotsInProgress.State.SUCCESS));
+ assertIsNoop(updatedClusterState, completeShard);
+ }
+
+ public void testUpdateCloneMultipleShards() throws Exception {
+ final String repoName = "test-repo";
+ final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot");
+ final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot");
+ final String indexName1 = "index-1";
+ final String dataNodeId = uuid();
+ final IndexId indexId1 = indexId(indexName1);
+ final RepositoryShardId shardId1 = new RepositoryShardId(indexId1, 0);
+ final RepositoryShardId shardId2 = new RepositoryShardId(indexId1, 1);
+ final SnapshotsInProgress.ShardSnapshotStatus shardInitStatus = initShardStatus(dataNodeId);
+ final SnapshotsInProgress.Entry cloneMultipleShards = cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(),
+ ImmutableOpenMap.builder(clonesMap(shardId1, shardInitStatus)).fPut(shardId2, shardInitStatus).build());
+
+ assertThat(cloneMultipleShards.state(), is(SnapshotsInProgress.State.STARTED));
+
+ final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(targetSnapshot, shardId1, dataNodeId);
+ final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(cloneMultipleShards), completeShard);
+ final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+ final SnapshotsInProgress.Entry updatedSnapshot1 = snapshotsInProgress.entries().get(0);
+ assertThat(updatedSnapshot1.state(), is(SnapshotsInProgress.State.STARTED));
+ assertIsNoop(updatedClusterState, completeShard);
+ }
+
+ public void testCompletedCloneStartsSnapshot() throws Exception {
+ final String repoName = "test-repo";
+ final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot");
+ final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot");
+ final String indexName1 = "index-1";
+ final String dataNodeId = uuid();
+ final IndexId indexId1 = indexId(indexName1);
+ final RepositoryShardId shardId1 = new RepositoryShardId(indexId1, 0);
+ final SnapshotsInProgress.ShardSnapshotStatus shardInitStatus = initShardStatus(dataNodeId);
+ final SnapshotsInProgress.Entry cloneSingleShard = cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(),
+ clonesMap(shardId1, shardInitStatus));
+
+ final ClusterState stateWithIndex = stateWithUnassignedIndices(indexName1);
+ final Snapshot plainSnapshot = snapshot(repoName, "test-snapshot");
+ final ShardId routingShardId1 = new ShardId(stateWithIndex.metadata().index(indexName1).getIndex(), 0);
+ final SnapshotsInProgress.Entry snapshotSingleShard = snapshotEntry(plainSnapshot, Collections.singletonList(indexId1),
+ shardsMap(routingShardId1, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED));
+
+ assertThat(cloneSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
+
+ // 1. case: shard that just finished cloning is unassigned -> shard snapshot should go to MISSING state
+ final ClusterState stateWithUnassignedRoutingShard = stateWithSnapshots(stateWithIndex, cloneSingleShard, snapshotSingleShard);
+ final SnapshotsService.ShardSnapshotUpdate completeShardClone = successUpdate(targetSnapshot, shardId1, uuid());
+ {
+ final ClusterState updatedClusterState = applyUpdates(stateWithUnassignedRoutingShard, completeShardClone);
+ final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+ final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0);
+ assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS));
+ final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
+ assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.SUCCESS));
+ assertThat(startedSnapshot.shards().get(routingShardId1).state(), is(SnapshotsInProgress.ShardState.MISSING));
+ assertIsNoop(updatedClusterState, completeShardClone);
+ }
+
+ // 2. case: shard that just finished cloning is assigned correctly -> shard snapshot should go to INIT state
+ final ClusterState stateWithAssignedRoutingShard =
+ ClusterState.builder(stateWithUnassignedRoutingShard).routingTable(
+ RoutingTable.builder(stateWithUnassignedRoutingShard.routingTable()).add(
+ IndexRoutingTable.builder(routingShardId1.getIndex()).addIndexShard(
+ new IndexShardRoutingTable.Builder(routingShardId1).addShard(
+ TestShardRouting.newShardRouting(
+ routingShardId1, dataNodeId, true, ShardRoutingState.STARTED)
+ ).build())).build()).build();
+ {
+ final ClusterState updatedClusterState = applyUpdates(stateWithAssignedRoutingShard, completeShardClone);
+ final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+ final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0);
+ assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS));
+ final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
+ assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED));
+ final SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus = startedSnapshot.shards().get(routingShardId1);
+ assertThat(shardSnapshotStatus.state(), is(SnapshotsInProgress.ShardState.INIT));
+ assertThat(shardSnapshotStatus.nodeId(), is(dataNodeId));
+ assertIsNoop(updatedClusterState, completeShardClone);
+ }
+
+ // 3. case: shard that just finished cloning is currently initializing -> shard snapshot should go to WAITING state
+ final ClusterState stateWithInitializingRoutingShard =
+ ClusterState.builder(stateWithUnassignedRoutingShard).routingTable(
+ RoutingTable.builder(stateWithUnassignedRoutingShard.routingTable()).add(
+ IndexRoutingTable.builder(routingShardId1.getIndex()).addIndexShard(
+ new IndexShardRoutingTable.Builder(routingShardId1).addShard(
+ TestShardRouting.newShardRouting(
+ routingShardId1, dataNodeId, true, ShardRoutingState.INITIALIZING)
+ ).build())).build()).build();
+ {
+ final ClusterState updatedClusterState = applyUpdates(stateWithInitializingRoutingShard, completeShardClone);
+ final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+ final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0);
+ assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS));
+ final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
+ assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED));
+ assertThat(startedSnapshot.shards().get(routingShardId1).state(), is(SnapshotsInProgress.ShardState.WAITING));
+ assertIsNoop(updatedClusterState, completeShardClone);
+ }
+ }
+
+ public void testCompletedSnapshotStartsClone() throws Exception {
+ final String repoName = "test-repo";
+ final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot");
+ final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot");
+ final String indexName = "index-1";
+ final String dataNodeId = uuid();
+ final IndexId indexId1 = indexId(indexName);
+ final RepositoryShardId repositoryShardId = new RepositoryShardId(indexId1, 0);
+ final SnapshotsInProgress.Entry cloneSingleShard = cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(),
+ clonesMap(repositoryShardId, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED));
+
+ final ClusterState stateWithIndex = stateWithUnassignedIndices(indexName);
+ final Snapshot plainSnapshot = snapshot(repoName, "test-snapshot");
+ final ShardId routingShardId = new ShardId(stateWithIndex.metadata().index(indexName).getIndex(), 0);
+ final SnapshotsInProgress.Entry snapshotSingleShard = snapshotEntry(plainSnapshot, Collections.singletonList(indexId1),
+ shardsMap(routingShardId, initShardStatus(dataNodeId)));
+
+ assertThat(cloneSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
+
+ final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(plainSnapshot, routingShardId, dataNodeId);
+
+ final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(snapshotSingleShard, cloneSingleShard), completeShard);
+ final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+ final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0);
+ assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS));
+ final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
+ assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED));
+ final SnapshotsInProgress.ShardSnapshotStatus shardCloneStatus = startedSnapshot.clones().get(repositoryShardId);
+ assertThat(shardCloneStatus.state(), is(SnapshotsInProgress.ShardState.INIT));
+ assertThat(shardCloneStatus.nodeId(), is(updatedClusterState.nodes().getLocalNodeId()));
+ assertIsNoop(updatedClusterState, completeShard);
+ }
+
+ public void testCompletedSnapshotStartsNextSnapshot() throws Exception {
+ final String repoName = "test-repo";
+ final String indexName = "index-1";
+ final String dataNodeId = uuid();
+ final IndexId indexId1 = indexId(indexName);
+
+ final ClusterState stateWithIndex = stateWithUnassignedIndices(indexName);
+ final Snapshot plainSnapshot = snapshot(repoName, "test-snapshot-1");
+ final ShardId routingShardId = new ShardId(stateWithIndex.metadata().index(indexName).getIndex(), 0);
+ final SnapshotsInProgress.Entry snapshotSingleShard = snapshotEntry(plainSnapshot, Collections.singletonList(indexId1),
+ shardsMap(routingShardId, initShardStatus(dataNodeId)));
+
+ final Snapshot queuedSnapshot = snapshot(repoName, "test-snapshot-2");
+ final SnapshotsInProgress.Entry queuedSnapshotSingleShard = snapshotEntry(queuedSnapshot, Collections.singletonList(indexId1),
+ shardsMap(routingShardId, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED));
+
+ final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(plainSnapshot, routingShardId, dataNodeId);
+
+ final ClusterState updatedClusterState =
+ applyUpdates(stateWithSnapshots(snapshotSingleShard, queuedSnapshotSingleShard), completeShard);
+ final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+ final SnapshotsInProgress.Entry completedSnapshot = snapshotsInProgress.entries().get(0);
+ assertThat(completedSnapshot.state(), is(SnapshotsInProgress.State.SUCCESS));
+ final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
+ assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED));
+ final SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus = startedSnapshot.shards().get(routingShardId);
+ assertThat(shardSnapshotStatus.state(), is(SnapshotsInProgress.ShardState.INIT));
+ assertThat(shardSnapshotStatus.nodeId(), is(dataNodeId));
+ assertIsNoop(updatedClusterState, completeShard);
+ }
+
+ public void testCompletedCloneStartsNextClone() throws Exception {
+ final String repoName = "test-repo";
+ final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot");
+ final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot");
+ final String indexName1 = "index-1";
+ final IndexId indexId1 = indexId(indexName1);
+ final RepositoryShardId shardId1 = new RepositoryShardId(indexId1, 0);
+ final String masterNodeId = uuid();
+ final SnapshotsInProgress.Entry cloneSingleShard = cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(),
+ clonesMap(shardId1, initShardStatus(masterNodeId)));
+
+ final Snapshot queuedTargetSnapshot = snapshot(repoName, "test-snapshot");
+ final SnapshotsInProgress.Entry queuedClone = cloneEntry(queuedTargetSnapshot, sourceSnapshot.getSnapshotId(),
+ clonesMap(shardId1, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED));
+
+ assertThat(cloneSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
+
+ final ClusterState stateWithUnassignedRoutingShard = stateWithSnapshots(
+ ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes(masterNodeId)).build(), cloneSingleShard, queuedClone);
+ final SnapshotsService.ShardSnapshotUpdate completeShardClone = successUpdate(targetSnapshot, shardId1, masterNodeId);
+
+ final ClusterState updatedClusterState = applyUpdates(stateWithUnassignedRoutingShard, completeShardClone);
+ final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+ final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0);
+ assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS));
+ final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
+ assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED));
+ assertThat(startedSnapshot.clones().get(shardId1).state(), is(SnapshotsInProgress.ShardState.INIT));
+ assertIsNoop(updatedClusterState, completeShardClone);
+ }
+
+ private static DiscoveryNodes discoveryNodes(String localNodeId) {
+ return DiscoveryNodes.builder().localNodeId(localNodeId).build();
+ }
+
+ private static ImmutableOpenMap shardsMap(
+ ShardId shardId, SnapshotsInProgress.ShardSnapshotStatus shardStatus) {
+ return ImmutableOpenMap.builder().fPut(shardId, shardStatus).build();
+ }
+
+ private static ImmutableOpenMap clonesMap(
+ RepositoryShardId shardId, SnapshotsInProgress.ShardSnapshotStatus shardStatus) {
+ return ImmutableOpenMap.builder().fPut(shardId, shardStatus).build();
+ }
+
+ private static SnapshotsService.ShardSnapshotUpdate successUpdate(Snapshot snapshot, ShardId shardId, String nodeId) {
+ return new SnapshotsService.ShardSnapshotUpdate(snapshot, shardId, successfulShardStatus(nodeId));
+ }
+
+ private static SnapshotsService.ShardSnapshotUpdate successUpdate(Snapshot snapshot, RepositoryShardId shardId, String nodeId) {
+ return new SnapshotsService.ShardSnapshotUpdate(snapshot, shardId, successfulShardStatus(nodeId));
+ }
+
+ private static ClusterState stateWithUnassignedIndices(String... indexNames) {
+ final Metadata.Builder metaBuilder = Metadata.builder(Metadata.EMPTY_METADATA);
+ for (String index : indexNames) {
+ metaBuilder.put(IndexMetadata.builder(index)
+ .settings(Settings.builder().put(SETTING_VERSION_CREATED, Version.CURRENT.id))
+ .numberOfShards(1).numberOfReplicas(0)
+ .build(), false);
+ }
+ final RoutingTable.Builder routingTable = RoutingTable.builder();
+ for (String index : indexNames) {
+ final Index idx = metaBuilder.get(index).getIndex();
+ routingTable.add(IndexRoutingTable.builder(idx).addIndexShard(
+ new IndexShardRoutingTable.Builder(new ShardId(idx, 0)).build()));
+ }
+ return ClusterState.builder(ClusterState.EMPTY_STATE).metadata(metaBuilder).routingTable(routingTable.build()).build();
+ }
+
+ private static ClusterState stateWithSnapshots(ClusterState state, SnapshotsInProgress.Entry... entries) {
+ return ClusterState.builder(state).version(state.version() + 1L)
+ .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(Arrays.asList(entries))).build();
+ }
+
+ private static ClusterState stateWithSnapshots(SnapshotsInProgress.Entry... entries) {
+ return stateWithSnapshots(ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes(uuid())).build(), entries);
+ }
+
+ private static void assertIsNoop(ClusterState state, SnapshotsService.ShardSnapshotUpdate shardCompletion) throws Exception {
+ assertSame(applyUpdates(state, shardCompletion), state);
+ }
+
+ private static ClusterState applyUpdates(ClusterState state, SnapshotsService.ShardSnapshotUpdate... updates) throws Exception {
+ return SnapshotsService.SHARD_STATE_EXECUTOR.execute(state, Arrays.asList(updates)).resultingState;
+ }
+
+ private static SnapshotsInProgress.Entry snapshotEntry(Snapshot snapshot, List indexIds,
+ ImmutableOpenMap shards) {
+ return SnapshotsInProgress.startedEntry(snapshot, randomBoolean(), randomBoolean(), indexIds, Collections.emptyList(),
+ 1L, randomNonNegativeLong(), shards, Collections.emptyMap(), Version.CURRENT);
+ }
+
+ private static SnapshotsInProgress.Entry cloneEntry(
+ Snapshot snapshot, SnapshotId source, ImmutableOpenMap clones) {
+ final List indexIds = StreamSupport.stream(clones.keys().spliterator(), false)
+ .map(k -> k.value.index()).distinct().collect(Collectors.toList());
+ return SnapshotsInProgress.startClone(snapshot, source, indexIds, 1L, randomNonNegativeLong(), Version.CURRENT).withClones(clones);
+ }
+
+ private static SnapshotsInProgress.ShardSnapshotStatus initShardStatus(String nodeId) {
+ return new SnapshotsInProgress.ShardSnapshotStatus(nodeId, uuid());
+ }
+
+ private static SnapshotsInProgress.ShardSnapshotStatus successfulShardStatus(String nodeId) {
+ return new SnapshotsInProgress.ShardSnapshotStatus(nodeId, SnapshotsInProgress.ShardState.SUCCESS, uuid());
+ }
+
+ private static Snapshot snapshot(String repoName, String name) {
+ return new Snapshot(repoName, new SnapshotId(name, uuid()));
+ }
+
+ private static Index index(String name) {
+ return new Index(name, uuid());
+ }
+
+ private static IndexId indexId(String name) {
+ return new IndexId(name, uuid());
+ }
+
+ private static String uuid() {
+ return UUIDs.randomBase64UUID(random());
+ }
+}
diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java
index cf09ca5cebfd6..cd24a0f279fe1 100644
--- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java
@@ -90,6 +90,11 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
private static final String OLD_VERSION_SNAPSHOT_PREFIX = "old-version-snapshot-";
+ // Large snapshot pool settings to set up nodes for tests involving multiple repositories that need to have enough
+ // threads so that blocking some threads on one repository doesn't block other repositories from doing work
+ protected static final Settings LARGE_SNAPSHOT_POOL_SETTINGS = Settings.builder()
+ .put("thread_pool.snapshot.core", 5).put("thread_pool.snapshot.max", 5).build();
+
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
index b7b380b653625..5cb470fffe8fa 100644
--- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
+++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
@@ -118,6 +118,10 @@ public long getFailureCount() {
/** Allows blocking on writing the snapshot file at the end of snapshot creation to simulate a died master node */
private volatile boolean blockAndFailOnWriteSnapFile;
+ private volatile boolean blockOnWriteShardLevelMeta;
+
+ private volatile boolean blockOnReadIndexMeta;
+
/**
* Writes to the blob {@code index.latest} at the repository root will fail with an {@link IOException} if {@code true}.
*/
@@ -189,6 +193,8 @@ public synchronized void unblock() {
blockOnWriteIndexFile = false;
blockAndFailOnWriteSnapFile = false;
blockOnDeleteIndexN = false;
+ blockOnWriteShardLevelMeta = false;
+ blockOnReadIndexMeta = false;
this.notifyAll();
}
@@ -212,6 +218,14 @@ public void setBlockOnDeleteIndexFile() {
blockOnDeleteIndexN = true;
}
+ public void setBlockOnWriteShardLevelMeta() {
+ blockOnWriteShardLevelMeta = true;
+ }
+
+ public void setBlockOnReadIndexMeta() {
+ blockOnReadIndexMeta = true;
+ }
+
public void setFailReadsAfterUnblock(boolean failReadsAfterUnblock) {
this.failReadsAfterUnblock = failReadsAfterUnblock;
}
@@ -229,7 +243,7 @@ private synchronized boolean blockExecution() {
boolean wasBlocked = false;
try {
while (blockOnDataFiles || blockOnAnyFiles || blockOnWriteIndexFile ||
- blockAndFailOnWriteSnapFile || blockOnDeleteIndexN) {
+ blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockOnReadIndexMeta) {
blocked = true;
this.wait();
wasBlocked = true;
@@ -365,8 +379,12 @@ protected BlobContainer wrapChild(BlobContainer child) {
@Override
public InputStream readBlob(String name) throws IOException {
- maybeReadErrorAfterBlock(name);
- maybeIOExceptionOrBlock(name);
+ if (blockOnReadIndexMeta && name.startsWith(BlobStoreRepository.METADATA_PREFIX) && path().equals(basePath()) == false) {
+ blockExecutionAndMaybeWait(name);
+ } else {
+ maybeReadErrorAfterBlock(name);
+ maybeIOExceptionOrBlock(name);
+ }
return super.readBlob(name);
}
@@ -430,6 +448,10 @@ public Map listBlobsByPrefix(String blobNamePrefix) throws
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
throws IOException {
maybeIOExceptionOrBlock(blobName);
+ if (blockOnWriteShardLevelMeta && blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX)
+ && path().equals(basePath()) == false) {
+ blockExecutionAndMaybeWait(blobName);
+ }
super.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
if (RandomizedContext.current().getRandom().nextBoolean()) {
// for network based repositories, the blob may have been written but we may still