From 5c3a4c13ddd0ce0eab4302a87fbdb172a6b57628 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 6 Oct 2020 01:52:25 +0200 Subject: [PATCH] Clone Snapshot API (#61839) (#63291) Snapshot clone API. Complete except for some TODOs around documentation (and adding HLRC support). backport of #61839, #63217, #63037 --- .../apis/clone-snapshot-api.asciidoc | 52 ++ .../reference/snapshot-restore/index.asciidoc | 1 + .../snapshot-restore/take-snapshot.asciidoc | 2 + .../snapshots/CloneSnapshotIT.java | 410 +++++++++- .../elasticsearch/action/ActionModule.java | 5 + .../snapshots/clone/CloneSnapshotAction.java | 33 + .../snapshots/clone/CloneSnapshotRequest.java | 142 ++++ .../clone/CloneSnapshotRequestBuilder.java | 65 ++ .../clone/TransportCloneSnapshotAction.java | 75 ++ .../client/ClusterAdminClient.java | 19 +- .../client/support/AbstractClient.java | 18 + .../cluster/SnapshotsInProgress.java | 147 +++- .../cluster/RestCloneSnapshotAction.java | 62 ++ .../snapshots/RestoreService.java | 2 +- .../snapshots/SnapshotShardsService.java | 4 + .../snapshots/SnapshotsService.java | 741 +++++++++++++++--- ...UpdateIndexShardSnapshotStatusRequest.java | 2 +- .../elasticsearch/snapshots/package-info.java | 35 + .../snapshots/SnapshotsServiceTests.java | 427 ++++++++++ .../snapshots/mockstore/MockRepository.java | 28 +- 20 files changed, 2142 insertions(+), 128 deletions(-) create mode 100644 docs/reference/snapshot-restore/apis/clone-snapshot-api.asciidoc create mode 100644 server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/CloneSnapshotAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/CloneSnapshotRequest.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/CloneSnapshotRequestBuilder.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/TransportCloneSnapshotAction.java create mode 100644 server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestCloneSnapshotAction.java create mode 100644 server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java diff --git a/docs/reference/snapshot-restore/apis/clone-snapshot-api.asciidoc b/docs/reference/snapshot-restore/apis/clone-snapshot-api.asciidoc new file mode 100644 index 0000000000000..416466910aa60 --- /dev/null +++ b/docs/reference/snapshot-restore/apis/clone-snapshot-api.asciidoc @@ -0,0 +1,52 @@ +[[clone-snapshot-api]] +=== Clone snapshot API +++++ +Clone snapshot +++++ + +Clones part or all of a snapshot into a new snapshot. + +[source,console] +---- +PUT /_snapshot/my_repository/source_snapshot/_clone/target_snapshot +{ + "indices": "index_a,index_b" +} +---- +// TEST[skip:TODO] + +[[clone-snapshot-api-request]] +==== {api-request-title} + +`PUT /_snapshot///_clone/` + +[[clone-snapshot-api-desc]] +==== {api-description-title} + +The clone snapshot API allows creating a copy of all or part of an existing snapshot +within the same repository. + +[[clone-snapshot-api-params]] +==== {api-path-parms-title} + +``:: +(Required, string) +Name of the snapshot repository that both source and target snapshot belong to. + +[[clone-snapshot-api-query-params]] +==== {api-query-parms-title} + +`master_timeout`:: +(Optional, <>) Specifies the period of time to wait for +a connection to the master node. If no response is received before the timeout +expires, the request fails and returns an error. Defaults to `30s`. + +`timeout`:: +(Optional, <>) Specifies the period of time to wait for +a response. If no response is received before the timeout expires, the request +fails and returns an error. Defaults to `30s`. + +`indices`:: +(Required, string) +A comma-separated list of indices to include in the snapshot. +<> is supported. \ No newline at end of file diff --git a/docs/reference/snapshot-restore/index.asciidoc b/docs/reference/snapshot-restore/index.asciidoc index 8286c73276864..805b923c6d56d 100644 --- a/docs/reference/snapshot-restore/index.asciidoc +++ b/docs/reference/snapshot-restore/index.asciidoc @@ -107,6 +107,7 @@ understand the time requirements before proceeding. -- include::register-repository.asciidoc[] +include::apis/clone-snapshot-api.asciidoc[] include::take-snapshot.asciidoc[] include::restore-snapshot.asciidoc[] include::monitor-snapshot-restore.asciidoc[] diff --git a/docs/reference/snapshot-restore/take-snapshot.asciidoc b/docs/reference/snapshot-restore/take-snapshot.asciidoc index 4adbfce304d67..ddc2812dbe280 100644 --- a/docs/reference/snapshot-restore/take-snapshot.asciidoc +++ b/docs/reference/snapshot-restore/take-snapshot.asciidoc @@ -124,3 +124,5 @@ PUT /_snapshot/my_backup/ PUT /_snapshot/my_backup/%3Csnapshot-%7Bnow%2Fd%7D%3E ----------------------------------- // TEST[continued] + +NOTE: You can also create snapshots that are copies of part of an existing snapshot using the <>. \ No newline at end of file diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java index 82bec5a1f0dc3..c8377c8e76b0b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java @@ -18,6 +18,26 @@ */ package org.elasticsearch.snapshots; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexStatus; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.snapshots.mockstore.MockRepository; +import org.elasticsearch.test.ESIntegTestCase; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.containsString; + import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.UUIDs; @@ -26,16 +46,11 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots; import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles; import org.elasticsearch.repositories.IndexId; -import org.elasticsearch.repositories.RepositoriesService; -import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryShardId; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; -import org.elasticsearch.test.ESIntegTestCase; import java.nio.file.Path; -import java.util.List; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -53,7 +68,7 @@ public void testShardClone() throws Exception { if (useBwCFormat) { initWithSnapshotVersion(repoName, repoPath, SnapshotsService.OLD_SNAPSHOT_FORMAT); // Re-create repo to clear repository data cache - assertAcked(client().admin().cluster().prepareDeleteRepository(repoName).get()); + assertAcked(clusterAdmin().prepareDeleteRepository(repoName).get()); createRepository(repoName, "fs", repoPath); } @@ -107,6 +122,389 @@ public void testShardClone() throws Exception { assertEquals(newShardGeneration, newShardGeneration2); } + public void testCloneSnapshotIndex() throws Exception { + internalCluster().startMasterOnlyNode(); + internalCluster().startDataOnlyNode(); + final String repoName = "repo-name"; + createRepository(repoName, "fs"); + + final String indexName = "index-1"; + createIndexWithRandomDocs(indexName, randomIntBetween(5, 10)); + final String sourceSnapshot = "source-snapshot"; + createFullSnapshot(repoName, sourceSnapshot); + + indexRandomDocs(indexName, randomIntBetween(20, 100)); + if (randomBoolean()) { + assertAcked(admin().indices().prepareDelete(indexName)); + } + final String targetSnapshot = "target-snapshot"; + assertAcked(startClone(repoName, sourceSnapshot, targetSnapshot, indexName).get()); + + final List status = clusterAdmin().prepareSnapshotStatus(repoName) + .setSnapshots(sourceSnapshot, targetSnapshot).get().getSnapshots(); + assertThat(status, hasSize(2)); + final SnapshotIndexStatus status1 = status.get(0).getIndices().get(indexName); + final SnapshotIndexStatus status2 = status.get(1).getIndices().get(indexName); + assertEquals(status1.getStats().getTotalFileCount(), status2.getStats().getTotalFileCount()); + assertEquals(status1.getStats().getTotalSize(), status2.getStats().getTotalSize()); + } + + public void testClonePreventsSnapshotDelete() throws Exception { + final String masterName = internalCluster().startMasterOnlyNode(); + internalCluster().startDataOnlyNode(); + final String repoName = "repo-name"; + createRepository(repoName, "mock"); + + final String indexName = "index-1"; + createIndexWithRandomDocs(indexName, randomIntBetween(5, 10)); + final String sourceSnapshot = "source-snapshot"; + createFullSnapshot(repoName, sourceSnapshot); + + indexRandomDocs(indexName, randomIntBetween(20, 100)); + + final String targetSnapshot = "target-snapshot"; + blockNodeOnAnyFiles(repoName, masterName); + final ActionFuture cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexName); + waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L)); + assertFalse(cloneFuture.isDone()); + + ConcurrentSnapshotExecutionException ex = expectThrows(ConcurrentSnapshotExecutionException.class, + () -> startDeleteSnapshot(repoName, sourceSnapshot).actionGet()); + assertThat(ex.getMessage(), containsString("cannot delete snapshot while it is being cloned")); + + unblockNode(repoName, masterName); + assertAcked(cloneFuture.get()); + final List status = clusterAdmin().prepareSnapshotStatus(repoName) + .setSnapshots(sourceSnapshot, targetSnapshot).get().getSnapshots(); + assertThat(status, hasSize(2)); + final SnapshotIndexStatus status1 = status.get(0).getIndices().get(indexName); + final SnapshotIndexStatus status2 = status.get(1).getIndices().get(indexName); + assertEquals(status1.getStats().getTotalFileCount(), status2.getStats().getTotalFileCount()); + assertEquals(status1.getStats().getTotalSize(), status2.getStats().getTotalSize()); + } + + public void testConcurrentCloneAndSnapshot() throws Exception { + internalCluster().startMasterOnlyNode(); + final String dataNode = internalCluster().startDataOnlyNode(); + final String repoName = "repo-name"; + createRepository(repoName, "mock"); + + final String indexName = "index-1"; + createIndexWithRandomDocs(indexName, randomIntBetween(5, 10)); + final String sourceSnapshot = "source-snapshot"; + createFullSnapshot(repoName, sourceSnapshot); + + indexRandomDocs(indexName, randomIntBetween(20, 100)); + + final String targetSnapshot = "target-snapshot"; + final ActionFuture snapshot2Future = + startFullSnapshotBlockedOnDataNode("snapshot-2", repoName, dataNode); + waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L)); + final ActionFuture cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexName); + awaitNumberOfSnapshotsInProgress(2); + unblockNode(repoName, dataNode); + assertAcked(cloneFuture.get()); + assertSuccessful(snapshot2Future); + } + + public void testLongRunningCloneAllowsConcurrentSnapshot() throws Exception { + // large snapshot pool so blocked snapshot threads from cloning don't prevent concurrent snapshot finalizations + final String masterNode = internalCluster().startMasterOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS); + internalCluster().startDataOnlyNode(); + final String repoName = "test-repo"; + createRepository(repoName, "mock"); + final String indexSlow = "index-slow"; + createIndexWithContent(indexSlow); + + final String sourceSnapshot = "source-snapshot"; + createFullSnapshot(repoName, sourceSnapshot); + + final String targetSnapshot = "target-snapshot"; + blockMasterOnShardClone(repoName); + final ActionFuture cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, indexSlow); + waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L)); + + final String indexFast = "index-fast"; + createIndexWithRandomDocs(indexFast, randomIntBetween(20, 100)); + + assertSuccessful(clusterAdmin().prepareCreateSnapshot(repoName, "fast-snapshot") + .setIndices(indexFast).setWaitForCompletion(true).execute()); + + assertThat(cloneFuture.isDone(), is(false)); + unblockNode(repoName, masterNode); + + assertAcked(cloneFuture.get()); + } + + public void testLongRunningSnapshotAllowsConcurrentClone() throws Exception { + internalCluster().startMasterOnlyNode(); + final String dataNode = internalCluster().startDataOnlyNode(); + final String repoName = "test-repo"; + createRepository(repoName, "mock"); + final String indexSlow = "index-slow"; + createIndexWithContent(indexSlow); + + final String sourceSnapshot = "source-snapshot"; + createFullSnapshot(repoName, sourceSnapshot); + + final String indexFast = "index-fast"; + createIndexWithRandomDocs(indexFast, randomIntBetween(20, 100)); + + blockDataNode(repoName, dataNode); + final ActionFuture snapshotFuture = clusterAdmin() + .prepareCreateSnapshot(repoName, "fast-snapshot").setIndices(indexFast).setWaitForCompletion(true).execute(); + waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L)); + + final String targetSnapshot = "target-snapshot"; + assertAcked(startClone(repoName, sourceSnapshot, targetSnapshot, indexSlow).get()); + + assertThat(snapshotFuture.isDone(), is(false)); + unblockNode(repoName, dataNode); + + assertSuccessful(snapshotFuture); + } + + public void testDeletePreventsClone() throws Exception { + final String masterName = internalCluster().startMasterOnlyNode(); + internalCluster().startDataOnlyNode(); + final String repoName = "repo-name"; + createRepository(repoName, "mock"); + + final String indexName = "index-1"; + createIndexWithRandomDocs(indexName, randomIntBetween(5, 10)); + final String sourceSnapshot = "source-snapshot"; + createFullSnapshot(repoName, sourceSnapshot); + + indexRandomDocs(indexName, randomIntBetween(20, 100)); + + final String targetSnapshot = "target-snapshot"; + blockNodeOnAnyFiles(repoName, masterName); + final ActionFuture deleteFuture = startDeleteSnapshot(repoName, sourceSnapshot); + waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L)); + assertFalse(deleteFuture.isDone()); + + ConcurrentSnapshotExecutionException ex = expectThrows(ConcurrentSnapshotExecutionException.class, () -> + startClone(repoName, sourceSnapshot, targetSnapshot, indexName).actionGet()); + assertThat(ex.getMessage(), containsString("cannot clone from snapshot that is being deleted")); + + unblockNode(repoName, masterName); + assertAcked(deleteFuture.get()); + } + + public void testBackToBackClonesForIndexNotInCluster() throws Exception { + // large snapshot pool so blocked snapshot threads from cloning don't prevent concurrent snapshot finalizations + final String masterNode = internalCluster().startMasterOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS); + internalCluster().startDataOnlyNode(); + final String repoName = "test-repo"; + createRepository(repoName, "mock"); + final String indexBlocked = "index-blocked"; + createIndexWithContent(indexBlocked); + + final String sourceSnapshot = "source-snapshot"; + createFullSnapshot(repoName, sourceSnapshot); + + assertAcked(admin().indices().prepareDelete(indexBlocked).get()); + + final String targetSnapshot1 = "target-snapshot"; + blockMasterOnShardClone(repoName); + final ActionFuture cloneFuture1 = startClone(repoName, sourceSnapshot, targetSnapshot1, indexBlocked); + waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L)); + assertThat(cloneFuture1.isDone(), is(false)); + + final int extraClones = randomIntBetween(1, 5); + final List> extraCloneFutures = new ArrayList<>(extraClones); + for (int i = 0; i < extraClones; i++) { + extraCloneFutures.add(startClone(repoName, sourceSnapshot, "target-snapshot-" + i, indexBlocked)); + } + awaitNumberOfSnapshotsInProgress(1 + extraClones); + for (ActionFuture extraCloneFuture : extraCloneFutures) { + assertFalse(extraCloneFuture.isDone()); + } + + final int extraSnapshots = randomIntBetween(0, 5); + if (extraSnapshots > 0) { + createIndexWithContent(indexBlocked); + } + + final List> extraSnapshotFutures = new ArrayList<>(extraSnapshots); + for (int i = 0; i < extraSnapshots; i++) { + extraSnapshotFutures.add(startFullSnapshot(repoName, "extra-snap-" + i)); + } + + awaitNumberOfSnapshotsInProgress(1 + extraClones + extraSnapshots); + for (ActionFuture extraSnapshotFuture : extraSnapshotFutures) { + assertFalse(extraSnapshotFuture.isDone()); + } + + unblockNode(repoName, masterNode); + assertAcked(cloneFuture1.get()); + + for (ActionFuture extraCloneFuture : extraCloneFutures) { + assertAcked(extraCloneFuture.get()); + } + for (ActionFuture extraSnapshotFuture : extraSnapshotFutures) { + assertSuccessful(extraSnapshotFuture); + } + } + + public void testMasterFailoverDuringCloneStep1() throws Exception { + internalCluster().startMasterOnlyNodes(3); + internalCluster().startDataOnlyNode(); + final String repoName = "test-repo"; + createRepository(repoName, "mock"); + final String testIndex = "index-test"; + createIndexWithContent(testIndex); + + final String sourceSnapshot = "source-snapshot"; + createFullSnapshot(repoName, sourceSnapshot); + + blockMasterOnReadIndexMeta(repoName); + final ActionFuture cloneFuture = + startCloneFromDataNode(repoName, sourceSnapshot, "target-snapshot", testIndex); + awaitNumberOfSnapshotsInProgress(1); + final String masterNode = internalCluster().getMasterName(); + waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L)); + internalCluster().restartNode(masterNode); + boolean cloneSucceeded = false; + try { + cloneFuture.actionGet(TimeValue.timeValueSeconds(30L)); + cloneSucceeded = true; + } catch (SnapshotException sne) { + // ignored, most of the time we will throw here but we could randomly run into a situation where the data node retries the + // snapshot on disconnect slowly enough for it to work out + } + + awaitNoMoreRunningOperations(internalCluster().getMasterName()); + + assertAllSnapshotsSuccessful(getRepositoryData(repoName), cloneSucceeded ? 2 : 1); + } + + public void testFailsOnCloneMissingIndices() { + internalCluster().startMasterOnlyNode(); + internalCluster().startDataOnlyNode(); + final String repoName = "repo-name"; + final Path repoPath = randomRepoPath(); + if (randomBoolean()) { + createIndexWithContent("test-idx"); + } + createRepository(repoName, "fs", repoPath); + + final String snapshotName = "snapshot"; + createFullSnapshot(repoName, snapshotName); + expectThrows(IndexNotFoundException.class, + () -> startClone(repoName, snapshotName, "target-snapshot", "does-not-exist").actionGet()); + } + + public void testMasterFailoverDuringCloneStep2() throws Exception { + // large snapshot pool so blocked snapshot threads from cloning don't prevent concurrent snapshot finalizations + internalCluster().startMasterOnlyNodes(3, LARGE_SNAPSHOT_POOL_SETTINGS); + internalCluster().startDataOnlyNode(); + final String repoName = "test-repo"; + createRepository(repoName, "mock"); + final String testIndex = "index-test"; + createIndexWithContent(testIndex); + + final String sourceSnapshot = "source-snapshot"; + createFullSnapshot(repoName, sourceSnapshot); + + final String targetSnapshot = "target-snapshot"; + blockMasterOnShardClone(repoName); + final ActionFuture cloneFuture = startCloneFromDataNode(repoName, sourceSnapshot, targetSnapshot, testIndex); + awaitNumberOfSnapshotsInProgress(1); + final String masterNode = internalCluster().getMasterName(); + waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L)); + internalCluster().restartNode(masterNode); + expectThrows(SnapshotException.class, cloneFuture::actionGet); + awaitNoMoreRunningOperations(internalCluster().getMasterName()); + + assertAllSnapshotsSuccessful(getRepositoryData(repoName), 2); + } + + public void testExceptionDuringShardClone() throws Exception { + // large snapshot pool so blocked snapshot threads from cloning don't prevent concurrent snapshot finalizations + internalCluster().startMasterOnlyNodes(3, LARGE_SNAPSHOT_POOL_SETTINGS); + internalCluster().startDataOnlyNode(); + final String repoName = "test-repo"; + createRepository(repoName, "mock"); + final String testIndex = "index-test"; + createIndexWithContent(testIndex); + + final String sourceSnapshot = "source-snapshot"; + createFullSnapshot(repoName, sourceSnapshot); + + final String targetSnapshot = "target-snapshot"; + blockMasterFromFinalizingSnapshotOnSnapFile(repoName); + final ActionFuture cloneFuture = startCloneFromDataNode(repoName, sourceSnapshot, targetSnapshot, testIndex); + awaitNumberOfSnapshotsInProgress(1); + final String masterNode = internalCluster().getMasterName(); + waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L)); + unblockNode(repoName, masterNode); + expectThrows(SnapshotException.class, cloneFuture::actionGet); + awaitNoMoreRunningOperations(internalCluster().getMasterName()); + assertAllSnapshotsSuccessful(getRepositoryData(repoName), 1); + assertAcked(startDeleteSnapshot(repoName, sourceSnapshot).get()); + } + + public void testDoesNotStartOnBrokenSourceSnapshot() throws Exception { + internalCluster().startMasterOnlyNode(); + final String dataNode = internalCluster().startDataOnlyNode(); + final String repoName = "test-repo"; + createRepository(repoName, "mock"); + final String testIndex = "index-test"; + createIndexWithContent(testIndex); + + final String sourceSnapshot = "source-snapshot"; + blockDataNode(repoName, dataNode); + final Client masterClient = internalCluster().masterClient(); + final ActionFuture sourceSnapshotFuture = masterClient.admin().cluster() + .prepareCreateSnapshot(repoName, sourceSnapshot).setWaitForCompletion(true).execute(); + awaitNumberOfSnapshotsInProgress(1); + waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L)); + internalCluster().restartNode(dataNode); + assertThat(sourceSnapshotFuture.get().getSnapshotInfo().state(), is(SnapshotState.PARTIAL)); + + final SnapshotException sne = expectThrows(SnapshotException.class, () -> startClone(masterClient, repoName, sourceSnapshot, + "target-snapshot", testIndex).actionGet(TimeValue.timeValueSeconds(30L))); + assertThat(sne.getMessage(), containsString("Can't clone index [" + getRepositoryData(repoName).resolveIndexId(testIndex) + + "] because its snapshot was not successful.")); + } + + private ActionFuture startCloneFromDataNode(String repoName, String sourceSnapshot, String targetSnapshot, + String... indices) { + return startClone(dataNodeClient(), repoName, sourceSnapshot, targetSnapshot, indices); + } + + private ActionFuture startClone(String repoName, String sourceSnapshot, String targetSnapshot, + String... indices) { + return startClone(client(), repoName, sourceSnapshot, targetSnapshot, indices); + } + + private static ActionFuture startClone(Client client, String repoName, String sourceSnapshot, + String targetSnapshot, String... indices) { + return client.admin().cluster().prepareCloneSnapshot(repoName, sourceSnapshot, targetSnapshot).setIndices(indices).execute(); + } + + private void blockMasterOnReadIndexMeta(String repoName) { + ((MockRepository)internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName)) + .setBlockOnReadIndexMeta(); + } + + private void blockMasterOnShardClone(String repoName) { + ((MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName)) + .setBlockOnWriteShardLevelMeta(); + } + + /** + * Assert that given {@link RepositoryData} contains exactly the given number of snapshots and all of them are successful. + */ + private static void assertAllSnapshotsSuccessful(RepositoryData repositoryData, int successfulSnapshotCount) { + final Collection snapshotIds = repositoryData.getSnapshotIds(); + assertThat(snapshotIds, hasSize(successfulSnapshotCount)); + for (SnapshotId snapshotId : snapshotIds) { + assertThat(repositoryData.getSnapshotState(snapshotId), is(SnapshotState.SUCCESS)); + } + } + private static BlobStoreIndexShardSnapshots readShardGeneration(BlobStoreRepository repository, RepositoryShardId repositoryShardId, String generation) { return PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.supply(f, diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index e2f7a65d6143f..96ff879c68682 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -64,6 +64,8 @@ import org.elasticsearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction; import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction; +import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotAction; +import org.elasticsearch.action.admin.cluster.snapshots.clone.TransportCloneSnapshotAction; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotAction; import org.elasticsearch.action.admin.cluster.snapshots.create.TransportCreateSnapshotAction; import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotAction; @@ -267,6 +269,7 @@ import org.elasticsearch.rest.action.admin.cluster.RestCancelTasksAction; import org.elasticsearch.rest.action.admin.cluster.RestCleanupRepositoryAction; import org.elasticsearch.rest.action.admin.cluster.RestClearVotingConfigExclusionsAction; +import org.elasticsearch.rest.action.admin.cluster.RestCloneSnapshotAction; import org.elasticsearch.rest.action.admin.cluster.RestClusterAllocationExplainAction; import org.elasticsearch.rest.action.admin.cluster.RestClusterGetSettingsAction; import org.elasticsearch.rest.action.admin.cluster.RestClusterHealthAction; @@ -522,6 +525,7 @@ public void reg actions.register(GetSnapshotsAction.INSTANCE, TransportGetSnapshotsAction.class); actions.register(DeleteSnapshotAction.INSTANCE, TransportDeleteSnapshotAction.class); actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class); + actions.register(CloneSnapshotAction.INSTANCE, TransportCloneSnapshotAction.class); actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class); actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class); @@ -665,6 +669,7 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestCleanupRepositoryAction()); registerHandler.accept(new RestGetSnapshotsAction()); registerHandler.accept(new RestCreateSnapshotAction()); + registerHandler.accept(new RestCloneSnapshotAction()); registerHandler.accept(new RestRestoreSnapshotAction()); registerHandler.accept(new RestDeleteSnapshotAction()); registerHandler.accept(new RestSnapshotsStatusAction()); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/CloneSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/CloneSnapshotAction.java new file mode 100644 index 0000000000000..e995469759bf0 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/CloneSnapshotAction.java @@ -0,0 +1,33 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.clone; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.AcknowledgedResponse; + +public final class CloneSnapshotAction extends ActionType { + + public static final CloneSnapshotAction INSTANCE = new CloneSnapshotAction(); + public static final String NAME = "cluster:admin/snapshot/clone"; + + private CloneSnapshotAction() { + super(NAME, AcknowledgedResponse::new); + } +} \ No newline at end of file diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/CloneSnapshotRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/CloneSnapshotRequest.java new file mode 100644 index 0000000000000..4d1eb0952e384 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/CloneSnapshotRequest.java @@ -0,0 +1,142 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.clone; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +public class CloneSnapshotRequest extends MasterNodeRequest implements IndicesRequest.Replaceable{ + + private final String repository; + + private final String source; + + private final String target; + + private String[] indices; + + private IndicesOptions indicesOptions = IndicesOptions.strictExpandHidden(); + + public CloneSnapshotRequest(StreamInput in) throws IOException { + super(in); + repository = in.readString(); + source = in.readString(); + target = in.readString(); + indices = in.readStringArray(); + indicesOptions = IndicesOptions.readIndicesOptions(in); + } + + /** + * Creates a clone snapshot request for cloning the given source snapshot's indices into the given target snapshot on the given + * repository. + * + * @param repository repository that source snapshot belongs to and that the target snapshot will be created in + * @param source source snapshot name + * @param target target snapshot name + * @param indices indices to clone from source to target + */ + public CloneSnapshotRequest(String repository, String source, String target, String[] indices) { + this.repository = repository; + this.source = source; + this.target = target; + this.indices = indices; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(repository); + out.writeString(source); + out.writeString(target); + out.writeStringArray(indices); + indicesOptions.writeIndicesOptions(out); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (source == null) { + validationException = addValidationError("source snapshot name is missing", null); + } + if (target == null) { + validationException = addValidationError("target snapshot name is missing", null); + } + if (repository == null) { + validationException = addValidationError("repository is missing", validationException); + } + if (indices == null) { + validationException = addValidationError("indices is null", validationException); + } else if (indices.length == 0) { + validationException = addValidationError("indices patterns are empty", validationException); + } else { + for (String index : indices) { + if (index == null) { + validationException = addValidationError("index is null", validationException); + break; + } + } + } + return validationException; + } + + @Override + public String[] indices() { + return this.indices; + } + + @Override + public IndicesOptions indicesOptions() { + return indicesOptions; + } + + @Override + public CloneSnapshotRequest indices(String... indices) { + this.indices = indices; + return this; + } + + /** + * @see CloneSnapshotRequestBuilder#setIndicesOptions + */ + public CloneSnapshotRequest indicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + + public String repository() { + return this.repository; + } + + public String target() { + return this.target; + } + + public String source() { + return this.source; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/CloneSnapshotRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/CloneSnapshotRequestBuilder.java new file mode 100644 index 0000000000000..5139633fc2d26 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/CloneSnapshotRequestBuilder.java @@ -0,0 +1,65 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.clone; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.Strings; + +public class CloneSnapshotRequestBuilder extends MasterNodeOperationRequestBuilder { + + protected CloneSnapshotRequestBuilder(ElasticsearchClient client, ActionType action, + CloneSnapshotRequest request) { + super(client, action, request); + } + + public CloneSnapshotRequestBuilder(ElasticsearchClient client, ActionType action, + String repository, String source, String target) { + this(client, action, new CloneSnapshotRequest(repository, source, target, Strings.EMPTY_ARRAY)); + } + + /** + * Sets a list of indices that should be cloned from the source to the target snapshot + *

+ * The list of indices supports multi-index syntax. For example: "+test*" ,"-test42" will clone all indices with + * prefix "test" except index "test42". + * + * @return this builder + */ + public CloneSnapshotRequestBuilder setIndices(String... indices) { + request.indices(indices); + return this; + } + + /** + * Specifies the indices options. Like what type of requested indices to ignore. For example indices that don't exist. + * + * @param indicesOptions the desired behaviour regarding indices options + * @return this request + */ + public CloneSnapshotRequestBuilder setIndicesOptions(IndicesOptions indicesOptions) { + request.indicesOptions(indicesOptions); + return this; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/TransportCloneSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/TransportCloneSnapshotAction.java new file mode 100644 index 0000000000000..a696dcba607a0 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/TransportCloneSnapshotAction.java @@ -0,0 +1,75 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.cluster.snapshots.clone; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; + +/** + * Transport action for the clone snapshot operation. + */ +public final class TransportCloneSnapshotAction extends TransportMasterNodeAction { + + private final SnapshotsService snapshotsService; + + @Inject + public TransportCloneSnapshotAction(TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, SnapshotsService snapshotsService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(CloneSnapshotAction.NAME, transportService, clusterService, threadPool, actionFilters, CloneSnapshotRequest::new, + indexNameExpressionResolver); + this.snapshotsService = snapshotsService; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse read(StreamInput in) throws IOException { + return new AcknowledgedResponse(in); + } + + @Override + protected void masterOperation(CloneSnapshotRequest request, ClusterState state, ActionListener listener) { + snapshotsService.cloneSnapshot(request, ActionListener.map(listener, v -> new AcknowledgedResponse(true))); + } + + @Override + protected ClusterBlockException checkBlock(CloneSnapshotRequest request, ClusterState state) { + // Cluster is not affected but we look up repositories in metadata + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } +} diff --git a/server/src/main/java/org/elasticsearch/client/ClusterAdminClient.java b/server/src/main/java/org/elasticsearch/client/ClusterAdminClient.java index cd9adabaf2759..4701c67987f9b 100644 --- a/server/src/main/java/org/elasticsearch/client/ClusterAdminClient.java +++ b/server/src/main/java/org/elasticsearch/client/ClusterAdminClient.java @@ -71,6 +71,8 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequestBuilder; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest; +import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequestBuilder; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequestBuilder; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; @@ -506,7 +508,22 @@ public interface ClusterAdminClient extends ElasticsearchClient { CreateSnapshotRequestBuilder prepareCreateSnapshot(String repository, String name); /** - * Get snapshot. + * Clones a snapshot. + */ + CloneSnapshotRequestBuilder prepareCloneSnapshot(String repository, String source, String target); + + /** + * Clones a snapshot. + */ + ActionFuture cloneSnapshot(CloneSnapshotRequest request); + + /** + * Clones a snapshot. + */ + void cloneSnapshot(CloneSnapshotRequest request, ActionListener listener); + + /** + * Get snapshots. */ ActionFuture getSnapshots(GetSnapshotsRequest request); diff --git a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java index 879c60d065166..12fae90ab950a 100644 --- a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -94,6 +94,9 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequestBuilder; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotAction; +import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest; +import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequestBuilder; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotAction; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequestBuilder; @@ -959,6 +962,21 @@ public CreateSnapshotRequestBuilder prepareCreateSnapshot(String repository, Str return new CreateSnapshotRequestBuilder(this, CreateSnapshotAction.INSTANCE, repository, name); } + @Override + public CloneSnapshotRequestBuilder prepareCloneSnapshot(String repository, String source, String target) { + return new CloneSnapshotRequestBuilder(this, CloneSnapshotAction.INSTANCE, repository, source, target); + } + + @Override + public ActionFuture cloneSnapshot(CloneSnapshotRequest request) { + return execute(CloneSnapshotAction.INSTANCE, request); + } + + @Override + public void cloneSnapshot(CloneSnapshotRequest request, ActionListener listener) { + execute(CloneSnapshotAction.INSTANCE, request, listener); + } + @Override public ActionFuture getSnapshots(GetSnapshotsRequest request) { return execute(GetSnapshotsAction.INSTANCE, request); diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 535630196934e..b0e6604b2e4df 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -35,8 +35,10 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.RepositoryShardId; import org.elasticsearch.repositories.RepositoryOperation; import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotsService; import java.io.IOException; @@ -101,11 +103,32 @@ public static Entry startedEntry(Snapshot snapshot, boolean includeGlobalState, indices, dataStreams, startTime, repositoryStateId, shards, null, userMetadata, version); } + /** + * Creates the initial snapshot clone entry + * + * @param snapshot snapshot to clone into + * @param source snapshot to clone from + * @param indices indices to clone + * @param startTime start time + * @param repositoryStateId repository state id that this clone is based on + * @param version repository metadata version to write + * @return snapshot clone entry + */ + public static Entry startClone(Snapshot snapshot, SnapshotId source, List indices, long startTime, + long repositoryStateId, Version version) { + return new SnapshotsInProgress.Entry(snapshot, true, false, State.STARTED, indices, Collections.emptyList(), + startTime, repositoryStateId, ImmutableOpenMap.of(), null, Collections.emptyMap(), version, source, + ImmutableOpenMap.of()); + } + public static class Entry implements Writeable, ToXContent, RepositoryOperation { private final State state; private final Snapshot snapshot; private final boolean includeGlobalState; private final boolean partial; + /** + * Map of {@link ShardId} to {@link ShardSnapshotStatus} tracking the state of each shard snapshot operation. + */ private final ImmutableOpenMap shards; private final List indices; private final List dataStreams; @@ -113,6 +136,19 @@ public static class Entry implements Writeable, ToXContent, RepositoryOperation private final long repositoryStateId; // see #useShardGenerations private final Version version; + + /** + * Source snapshot if this is a clone operation or {@code null} if this is a snapshot. + */ + @Nullable + private final SnapshotId source; + + /** + * Map of {@link RepositoryShardId} to {@link ShardSnapshotStatus} tracking the state of each shard clone operation in this entry + * the same way {@link #shards} tracks the status of each shard snapshot operation in non-clone entries. + */ + private final ImmutableOpenMap clones; + @Nullable private final Map userMetadata; @Nullable private final String failure; @@ -121,6 +157,15 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta List dataStreams, long startTime, long repositoryStateId, ImmutableOpenMap shards, String failure, Map userMetadata, Version version) { + this(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, repositoryStateId, shards, failure, + userMetadata, version, null, ImmutableOpenMap.of()); + } + + private Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, + List dataStreams, long startTime, long repositoryStateId, + ImmutableOpenMap shards, String failure, Map userMetadata, + Version version, @Nullable SnapshotId source, + @Nullable ImmutableOpenMap clones) { this.state = state; this.snapshot = snapshot; this.includeGlobalState = includeGlobalState; @@ -129,11 +174,18 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta this.dataStreams = dataStreams; this.startTime = startTime; this.shards = shards; - assert assertShardsConsistent(state, indices, shards); this.repositoryStateId = repositoryStateId; this.failure = failure; this.userMetadata = userMetadata; this.version = version; + this.source = source; + if (source == null) { + assert clones == null || clones.isEmpty() : "Provided [" + clones + "] but no source"; + this.clones = ImmutableOpenMap.of(); + } else { + this.clones = clones; + } + assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones); } private Entry(StreamInput in) throws IOException { @@ -143,7 +195,7 @@ private Entry(StreamInput in) throws IOException { state = State.fromValue(in.readByte()); indices = in.readList(IndexId::new); startTime = in.readLong(); - shards = in.readImmutableMap(ShardId::new, ShardSnapshotStatus::new); + shards = in.readImmutableMap(ShardId::new, ShardSnapshotStatus::readFrom); repositoryStateId = in.readLong(); failure = in.readOptionalString(); if (in.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) { @@ -166,21 +218,41 @@ private Entry(StreamInput in) throws IOException { } else { dataStreams = Collections.emptyList(); } + if (in.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) { + source = in.readOptionalWriteable(SnapshotId::new); + clones = in.readImmutableMap(RepositoryShardId::new, ShardSnapshotStatus::readFrom); + } else { + source = null; + clones = ImmutableOpenMap.of(); + } } - private static boolean assertShardsConsistent(State state, List indices, - ImmutableOpenMap shards) { + private static boolean assertShardsConsistent(SnapshotId source, State state, List indices, + ImmutableOpenMap shards, + ImmutableOpenMap clones) { if ((state == State.INIT || state == State.ABORTED) && shards.isEmpty()) { return true; } final Set indexNames = indices.stream().map(IndexId::getName).collect(Collectors.toSet()); final Set indexNamesInShards = new HashSet<>(); - shards.keysIt().forEachRemaining(s -> indexNamesInShards.add(s.getIndexName())); - assert indexNames.equals(indexNamesInShards) + shards.iterator().forEachRemaining(s -> { + indexNamesInShards.add(s.key.getIndexName()); + assert source == null || s.value.nodeId == null : + "Shard snapshot must not be assigned to data node when copying from snapshot [" + source + "]"; + }); + assert source == null || indexNames.isEmpty() == false : "No empty snapshot clones allowed"; + assert source != null || indexNames.equals(indexNamesInShards) : "Indices in shards " + indexNamesInShards + " differ from expected indices " + indexNames + " for state [" + state + "]"; - final boolean shardsCompleted = completed(shards.values()); - assert (state.completed() && shardsCompleted) || (state.completed() == false && shardsCompleted == false) - : "Completed state must imply all shards completed but saw state [" + state + "] and shards " + shards; + final boolean shardsCompleted = completed(shards.values()) && completed(clones.values()); + // Check state consistency for normal snapshots and started clone operations + if (source == null || clones.isEmpty() == false) { + assert (state.completed() && shardsCompleted) || (state.completed() == false && shardsCompleted == false) + : "Completed state must imply all shards completed but saw state [" + state + "] and shards " + shards; + } + if (source != null && state.completed()) { + assert hasFailures(clones) == false || state == State.FAILED + : "Failed shard clones in [" + clones + "] but state was [" + state + "]"; + } return true; } @@ -201,7 +273,17 @@ public Entry withRepoGen(long newRepoGen) { assert newRepoGen > repositoryStateId : "Updated repository generation [" + newRepoGen + "] must be higher than current generation [" + repositoryStateId + "]"; return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, newRepoGen, shards, failure, - userMetadata, version); + userMetadata, version, source, clones); + } + + public Entry withClones(ImmutableOpenMap updatedClones) { + if (updatedClones.equals(clones)) { + return this; + } + return new Entry(snapshot, includeGlobalState, partial, + completed(updatedClones.values()) ? (hasFailures(updatedClones) ? State.FAILED : State.SUCCESS) : + state, indices, dataStreams, startTime, repositoryStateId, shards, failure, userMetadata, version, source, + updatedClones); } /** @@ -230,7 +312,7 @@ public Entry abort() { public Entry fail(ImmutableOpenMap shards, State state, String failure) { return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, repositoryStateId, shards, - failure, userMetadata, version); + failure, userMetadata, version, source, clones); } /** @@ -318,6 +400,19 @@ public Version version() { return version; } + @Nullable + public SnapshotId source() { + return source; + } + + public boolean isClone() { + return source != null; + } + + public ImmutableOpenMap clones() { + return clones; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -334,6 +429,8 @@ public boolean equals(Object o) { if (state != entry.state) return false; if (repositoryStateId != entry.repositoryStateId) return false; if (version.equals(entry.version) == false) return false; + if (Objects.equals(source, ((Entry) o).source) == false) return false; + if (clones.equals(((Entry) o).clones) == false) return false; return true; } @@ -349,6 +446,8 @@ public int hashCode() { result = 31 * result + Long.hashCode(startTime); result = 31 * result + Long.hashCode(repositoryStateId); result = 31 * result + version.hashCode(); + result = 31 * result + (source == null ? 0 : source.hashCode()); + result = 31 * result + clones.hashCode(); return result; } @@ -418,6 +517,10 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) { out.writeStringCollection(dataStreams); } + if (out.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) { + out.writeOptionalWriteable(source); + out.writeMap(clones); + } } @Override @@ -441,6 +544,15 @@ public static boolean completed(ObjectContainer shards) { return true; } + private static boolean hasFailures(ImmutableOpenMap clones) { + for (ObjectCursor value : clones.values()) { + if (value.value.state().failed()) { + return true; + } + } + return false; + } + public static class ShardSnapshotStatus implements Writeable { /** @@ -490,15 +602,20 @@ private boolean assertConsistent() { return true; } - public ShardSnapshotStatus(StreamInput in) throws IOException { - nodeId = in.readOptionalString(); - state = ShardState.fromValue(in.readByte()); + public static ShardSnapshotStatus readFrom(StreamInput in) throws IOException { + String nodeId = in.readOptionalString(); + final ShardState state = ShardState.fromValue(in.readByte()); + final String generation; if (SnapshotsService.useShardGenerations(in.getVersion())) { generation = in.readOptionalString(); } else { generation = null; } - reason = in.readOptionalString(); + final String reason = in.readOptionalString(); + if (state == ShardState.QUEUED) { + return UNASSIGNED_QUEUED; + } + return new ShardSnapshotStatus(nodeId, state, reason, generation); } public ShardState state() { diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestCloneSnapshotAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestCloneSnapshotAction.java new file mode 100644 index 0000000000000..f22f1d95aefbd --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestCloneSnapshotAction.java @@ -0,0 +1,62 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.admin.cluster; + +import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.rest.RestRequest.Method.PUT; + +/** + * Clones indices from one snapshot into another snapshot in the same repository + */ +public class RestCloneSnapshotAction extends BaseRestHandler { + + @Override + public List routes() { + return Collections.singletonList(new Route(PUT, "/_snapshot/{repository}/{snapshot}/_clone/{target_snapshot}")); + } + + @Override + public String getName() { + return "clone_snapshot_action"; + } + + @Override + public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + final Map source = request.contentParser().map(); + final CloneSnapshotRequest cloneSnapshotRequest = new CloneSnapshotRequest( + request.param("repository"), request.param("snapshot"), request.param("target_snapshot"), + XContentMapValues.nodeStringArrayValue(source.getOrDefault("indices", Collections.emptyList()))); + cloneSnapshotRequest.masterNodeTimeout(request.paramAsTime("master_timeout", cloneSnapshotRequest.masterNodeTimeout())); + cloneSnapshotRequest.indicesOptions(IndicesOptions.fromMap(source, cloneSnapshotRequest.indicesOptions())); + return channel -> client.admin().cluster().cloneSnapshot(cloneSnapshotRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index baf8e1fc457a8..5d8d41b1ad1fc 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -924,7 +924,7 @@ private static void validateSnapshotRestorable(final String repository, final Sn } } - private static boolean failed(SnapshotInfo snapshot, String index) { + public static boolean failed(SnapshotInfo snapshot, String index) { for (SnapshotShardFailure failure : snapshot.shardFailures()) { if (index.equals(failure.index())) { return true; diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 2ed1de343885d..810527de15230 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -197,6 +197,10 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) { final String localNodeId = clusterService.localNode().getId(); for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { final State entryState = entry.state(); + if (entry.isClone()) { + // This is a snapshot clone, it will be executed on the current master + continue; + } if (entryState == State.STARTED) { Map startedShards = null; final Snapshot snapshot = entry.snapshot(); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index eb02d069fbde5..6e667f0dab39c 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -28,9 +28,12 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.StepListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -44,6 +47,7 @@ import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.repositories.RepositoryShardId; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; import org.elasticsearch.cluster.SnapshotsInProgress.ShardState; import org.elasticsearch.cluster.SnapshotsInProgress.State; @@ -105,6 +109,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -128,6 +134,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus public static final Version FULL_CONCURRENCY_VERSION = Version.V_7_9_0; + public static final Version CLONE_SNAPSHOT_VERSION = Version.V_7_10_0; + public static final Version SHARD_GEN_IN_REPO_DATA_VERSION = Version.V_7_6_0; public static final Version INDEX_GEN_IN_REPO_DATA_VERSION = Version.V_7_9_0; @@ -166,6 +174,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus // Set of snapshots that are currently being ended by this node private final Set endingSnapshots = Collections.synchronizedSet(new HashSet<>()); + // Set of currently initializing clone operations + private final Set initializingClones = Collections.synchronizedSet(new HashSet<>()); + private final UpdateSnapshotStatusAction updateSnapshotStatusHandler; private final TransportService transportService; @@ -363,20 +374,10 @@ public void createSnapshot(final CreateSnapshotRequest request, final ActionList @Override public ClusterState execute(ClusterState currentState) { - // check if the snapshot name already exists in the repository - if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) { - throw new InvalidSnapshotNameException( - repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists"); - } + ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository); final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); final List runningSnapshots = snapshots.entries(); - if (runningSnapshots.stream().anyMatch(s -> { - final Snapshot running = s.snapshot(); - return running.getRepository().equals(repositoryName) && running.getSnapshotId().getName().equals(snapshotName); - })) { - throw new InvalidSnapshotNameException( - repository.getMetadata().name(), snapshotName, "snapshot with the same name is already in-progress"); - } + ensureSnapshotNameNotRunning(runningSnapshots, repositoryName, snapshotName); validate(repositoryName, snapshotName, currentState); final boolean concurrentOperationsAllowed = currentState.nodes().getMinNodeVersion().onOrAfter(FULL_CONCURRENCY_VERSION); final SnapshotDeletionsInProgress deletionsInProgress = @@ -397,6 +398,7 @@ public ClusterState execute(ClusterState currentState) { if (concurrentOperationsAllowed == false && runningSnapshots.stream().anyMatch(entry -> entry.state() != State.INIT)) { throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running"); } + ensureNoCleanupInProgress(currentState, repositoryName, snapshotName); ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshots, deletionsInProgress); // Store newSnapshot here to be processed in clusterStateProcessed List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request)); @@ -407,9 +409,7 @@ public ClusterState execute(ClusterState currentState) { logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); final List indexIds = repositoryData.resolveNewIndices( - indices, runningSnapshots.stream().filter(entry -> entry.repository().equals(repositoryName)) - .flatMap(entry -> entry.indices().stream()).distinct() - .collect(Collectors.toMap(IndexId::getName, Function.identity()))); + indices, getInFlightIndexIds(runningSnapshots, repositoryName)); final Version version = minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null); ImmutableOpenMap shards = shards(snapshots, deletionsInProgress, currentState.metadata(), currentState.routingTable(), indexIds, useShardGenerations(version), repositoryData, repositoryName); @@ -459,6 +459,278 @@ public TimeValue timeout() { }, "create_snapshot [" + snapshotName + ']', listener::onFailure); } + private static void ensureSnapshotNameNotRunning(List runningSnapshots, String repositoryName, + String snapshotName) { + if (runningSnapshots.stream().anyMatch(s -> { + final Snapshot running = s.snapshot(); + return running.getRepository().equals(repositoryName) && running.getSnapshotId().getName().equals(snapshotName); + })) { + throw new InvalidSnapshotNameException(repositoryName, snapshotName, "snapshot with the same name is already in-progress"); + } + } + + private static Map getInFlightIndexIds(List runningSnapshots, String repositoryName) { + return runningSnapshots.stream().filter(entry -> entry.repository().equals(repositoryName)) + .flatMap(entry -> entry.indices().stream()).distinct() + .collect(Collectors.toMap(IndexId::getName, Function.identity())); + } + + // TODO: It is worth revisiting the design choice of creating a placeholder entry in snapshots-in-progress here once we have a cache + // for repository metadata and loading it has predictable performance + public void cloneSnapshot(CloneSnapshotRequest request, ActionListener listener) { + final String repositoryName = request.repository(); + Repository repository = repositoriesService.repository(repositoryName); + if (repository.isReadOnly()) { + listener.onFailure(new RepositoryException(repositoryName, "cannot create snapshot in a readonly repository")); + return; + } + final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.target()); + validate(repositoryName, snapshotName); + final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); + final Snapshot snapshot = new Snapshot(repositoryName, snapshotId); + initializingClones.add(snapshot); + repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask() { + + private SnapshotsInProgress.Entry newEntry; + + @Override + public ClusterState execute(ClusterState currentState) { + ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository); + ensureNoCleanupInProgress(currentState, repositoryName, snapshotName); + final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); + final List runningSnapshots = snapshots.entries(); + ensureSnapshotNameNotRunning(runningSnapshots, repositoryName, snapshotName); + validate(repositoryName, snapshotName, currentState); + + final SnapshotId sourceSnapshotId = repositoryData.getSnapshotIds() + .stream() + .filter(src -> src.getName().equals(request.source())) + .findAny() + .orElseThrow(() -> new SnapshotMissingException(repositoryName, request.source())); + final SnapshotDeletionsInProgress deletionsInProgress = + currentState.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY); + if (deletionsInProgress.getEntries().stream().anyMatch(entry -> entry.getSnapshots().contains(sourceSnapshotId))) { + throw new ConcurrentSnapshotExecutionException(repositoryName, sourceSnapshotId.getName(), + "cannot clone from snapshot that is being deleted"); + } + ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshots, deletionsInProgress); + final List indicesForSnapshot = new ArrayList<>(); + for (IndexId indexId : repositoryData.getIndices().values()) { + if (repositoryData.getSnapshots(indexId).contains(sourceSnapshotId)) { + indicesForSnapshot.add(indexId.getName()); + } + } + final List matchingIndices = + SnapshotUtils.filterIndices(indicesForSnapshot, request.indices(), request.indicesOptions()); + if (matchingIndices.isEmpty()) { + throw new SnapshotException(new Snapshot(repositoryName, sourceSnapshotId), + "No indices in the source snapshot [" + sourceSnapshotId + "] matched requested pattern [" + + Strings.arrayToCommaDelimitedString(request.indices()) + "]"); + } + newEntry = SnapshotsInProgress.startClone( + snapshot, sourceSnapshotId, + repositoryData.resolveIndices(matchingIndices), + threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), + minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null)); + final List newEntries = new ArrayList<>(runningSnapshots); + newEntries.add(newEntry); + return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, + SnapshotsInProgress.of(newEntries)).build(); + } + + @Override + public void onFailure(String source, Exception e) { + initializingClones.remove(snapshot); + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to clone snapshot", repositoryName, snapshotName), e); + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) { + logger.info("snapshot clone [{}] started", snapshot); + addListener(snapshot, ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure)); + startCloning(repository, newEntry); + } + + @Override + public TimeValue timeout() { + initializingClones.remove(snapshot); + return request.masterNodeTimeout(); + } + }, "clone_snapshot [" + request.source() + "][" + snapshotName + ']', listener::onFailure); + } + + private static void ensureNoCleanupInProgress(ClusterState currentState, String repositoryName, String snapshotName) { + final RepositoryCleanupInProgress repositoryCleanupInProgress = + currentState.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY); + if (repositoryCleanupInProgress.hasCleanupInProgress()) { + throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, + "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]"); + } + } + + private static void ensureSnapshotNameAvailableInRepo(RepositoryData repositoryData, String snapshotName, Repository repository) { + // check if the snapshot name already exists in the repository + if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) { + throw new InvalidSnapshotNameException( + repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists"); + } + } + + /** + * Determine the number of shards in each index of a clone operation and update the cluster state accordingly. + * + * @param repository repository to run operation on + * @param cloneEntry clone operation in the cluster state + */ + private void startCloning(Repository repository, SnapshotsInProgress.Entry cloneEntry) { + final List indices = cloneEntry.indices(); + final SnapshotId sourceSnapshot = cloneEntry.source(); + final Snapshot targetSnapshot = cloneEntry.snapshot(); + + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + // Exception handler for IO exceptions with loading index and repo metadata + final Consumer onFailure = e -> { + initializingClones.remove(targetSnapshot); + logger.info(() -> new ParameterizedMessage("Failed to start snapshot clone [{}]", cloneEntry), e); + removeFailedSnapshotFromClusterState(targetSnapshot, e, null, null); + }; + + // 1. step, load SnapshotInfo to make sure that source snapshot was successful for the indices we want to clone + // TODO: we could skip this step for snapshots with state SUCCESS + final StepListener snapshotInfoListener = new StepListener<>(); + executor.execute(ActionRunnable.supply(snapshotInfoListener, () -> repository.getSnapshotInfo(sourceSnapshot))); + + final StepListener>> allShardCountsListener = new StepListener<>(); + final GroupedActionListener> shardCountListener = + new GroupedActionListener<>(allShardCountsListener, indices.size()); + snapshotInfoListener.whenComplete(snapshotInfo -> { + for (IndexId indexId : indices) { + if (RestoreService.failed(snapshotInfo, indexId.getName())) { + throw new SnapshotException(targetSnapshot, "Can't clone index [" + indexId + + "] because its snapshot was not successful."); + } + } + // 2. step, load the number of shards we have in each index to be cloned from the index metadata. + repository.getRepositoryData(ActionListener.wrap(repositoryData -> { + for (IndexId index : indices) { + executor.execute(ActionRunnable.supply(shardCountListener, () -> { + final IndexMetadata metadata = repository.getSnapshotIndexMetaData(repositoryData, sourceSnapshot, index); + return Tuple.tuple(index, metadata.getNumberOfShards()); + })); + } + }, onFailure)); + }, onFailure); + + // 3. step, we have all the shard counts, now update the cluster state to have clone jobs in the snap entry + allShardCountsListener.whenComplete(counts -> repository.executeConsistentStateUpdate(repoData -> new ClusterStateUpdateTask() { + + private SnapshotsInProgress.Entry updatedEntry; + + @Override + public ClusterState execute(ClusterState currentState) { + final SnapshotsInProgress snapshotsInProgress = + currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); + final List updatedEntries = new ArrayList<>(snapshotsInProgress.entries()); + boolean changed = false; + final String localNodeId = currentState.nodes().getLocalNodeId(); + final String repoName = cloneEntry.repository(); + final Map indexIds = getInFlightIndexIds(updatedEntries, repoName); + final ShardGenerations shardGenerations = repoData.shardGenerations(); + for (int i = 0; i < updatedEntries.size(); i++) { + if (cloneEntry.equals(updatedEntries.get(i))) { + final ImmutableOpenMap.Builder clonesBuilder = + ImmutableOpenMap.builder(); + // TODO: could be optimized by just dealing with repo shard id directly + final Set busyShardsInRepo = + busyShardsForRepo(repoName, snapshotsInProgress, currentState.metadata()) + .stream() + .map(shardId -> new RepositoryShardId(indexIds.get(shardId.getIndexName()), shardId.getId())) + .collect(Collectors.toSet()); + for (Tuple count : counts) { + for (int shardId = 0; shardId < count.v2(); shardId++) { + final RepositoryShardId repoShardId = new RepositoryShardId(count.v1(), shardId); + if (busyShardsInRepo.contains(repoShardId)) { + clonesBuilder.put(repoShardId, ShardSnapshotStatus.UNASSIGNED_QUEUED); + } else { + clonesBuilder.put(repoShardId, + new ShardSnapshotStatus(localNodeId, shardGenerations.getShardGen(count.v1(), shardId))); + } + } + } + updatedEntry = cloneEntry.withClones(clonesBuilder.build()); + updatedEntries.set(i, updatedEntry); + changed = true; + break; + } + } + return updateWithSnapshots(currentState, changed ? SnapshotsInProgress.of(updatedEntries) : null, null); + } + + @Override + public void onFailure(String source, Exception e) { + initializingClones.remove(targetSnapshot); + logger.info(() -> new ParameterizedMessage("Failed to start snapshot clone [{}]", cloneEntry), e); + failAllListenersOnMasterFailOver(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + initializingClones.remove(targetSnapshot); + if (updatedEntry != null) { + final Snapshot target = updatedEntry.snapshot(); + final SnapshotId sourceSnapshot = updatedEntry.source(); + for (ObjectObjectCursor indexClone : updatedEntry.clones()) { + final ShardSnapshotStatus shardStatusBefore = indexClone.value; + if (shardStatusBefore.state() != ShardState.INIT) { + continue; + } + final RepositoryShardId repoShardId = indexClone.key; + runReadyClone(target, sourceSnapshot, shardStatusBefore, repoShardId, repository); + } + } else { + // Extremely unlikely corner case of master failing over between between starting the clone and + // starting shard clones. + logger.warn("Did not find expected entry [{}] in the cluster state", cloneEntry); + } + } + }, "start snapshot clone", onFailure), onFailure); + } + + private final Set currentlyCloning = Collections.synchronizedSet(new HashSet<>()); + + private void runReadyClone(Snapshot target, SnapshotId sourceSnapshot, ShardSnapshotStatus shardStatusBefore, + RepositoryShardId repoShardId, Repository repository) { + final SnapshotId targetSnapshot = target.getSnapshotId(); + final String localNodeId = clusterService.localNode().getId(); + if (currentlyCloning.add(repoShardId)) { + repository.cloneShardSnapshot(sourceSnapshot, targetSnapshot, repoShardId, shardStatusBefore.generation(), ActionListener.wrap( + generation -> innerUpdateSnapshotState( + new ShardSnapshotUpdate(target, repoShardId, + new ShardSnapshotStatus(localNodeId, ShardState.SUCCESS, generation)), + ActionListener.runBefore( + ActionListener.wrap( + v -> logger.trace("Marked [{}] as successfully cloned from [{}] to [{}]", repoShardId, + sourceSnapshot, targetSnapshot), + e -> { + logger.warn("Cluster state update after successful shard clone [{}] failed", repoShardId); + failAllListenersOnMasterFailOver(e); + } + ), () -> currentlyCloning.remove(repoShardId)) + ), e -> innerUpdateSnapshotState( + new ShardSnapshotUpdate(target, repoShardId, + new ShardSnapshotStatus(localNodeId, ShardState.FAILED, "failed to clone shard snapshot", null)), + ActionListener.runBefore(ActionListener.wrap( + v -> logger.trace("Marked [{}] as failed clone from [{}] to [{}]", repoShardId, + sourceSnapshot, targetSnapshot), + ex -> { + logger.warn("Cluster state update after failed shard clone [{}] failed", repoShardId); + failAllListenersOnMasterFailOver(ex); + } + ), () -> currentlyCloning.remove(repoShardId))))); + } + } + private void ensureBelowConcurrencyLimit(String repository, String name, SnapshotsInProgress snapshotsInProgress, SnapshotDeletionsInProgress deletionsInProgress) { final int inProgressOperations = snapshotsInProgress.entries().size() + deletionsInProgress.getEntries().size(); @@ -695,17 +967,24 @@ private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snaps ShardGenerations.Builder builder = ShardGenerations.builder(); final Map indexLookup = new HashMap<>(); snapshot.indices().forEach(idx -> indexLookup.put(idx.getName(), idx)); - snapshot.shards().forEach(c -> { - if (metadata.index(c.key.getIndex()) == null) { - assert snapshot.partial() : - "Index [" + c.key.getIndex() + "] was deleted during a snapshot but snapshot was not partial."; - return; - } - final IndexId indexId = indexLookup.get(c.key.getIndexName()); - if (indexId != null) { - builder.put(indexId, c.key.id(), c.value.generation()); - } - }); + if (snapshot.isClone()) { + snapshot.clones().forEach(c -> { + final IndexId indexId = indexLookup.get(c.key.indexName()); + builder.put(indexId, c.key.shardId(), c.value.generation()); + }); + } else { + snapshot.shards().forEach(c -> { + if (metadata.index(c.key.getIndex()) == null) { + assert snapshot.partial() : + "Index [" + c.key.getIndex() + "] was deleted during a snapshot but snapshot was not partial."; + return; + } + final IndexId indexId = indexLookup.get(c.key.getIndexName()); + if (indexId != null) { + builder.put(indexId, c.key.id(), c.value.generation()); + } + }); + } return builder.build(); } @@ -934,17 +1213,27 @@ public ClusterState execute(ClusterState currentState) { for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) { if (statesToUpdate.contains(snapshot.state())) { - ImmutableOpenMap shards = processWaitingShardsAndRemovedNodes(snapshot.shards(), - routingTable, nodes, knownFailures.computeIfAbsent(snapshot.repository(), k -> new HashMap<>())); - if (shards != null) { - final SnapshotsInProgress.Entry updatedSnapshot = snapshot.withShardStates(shards); - changed = true; - if (updatedSnapshot.state().completed()) { - finishedSnapshots.add(updatedSnapshot); + // Currently initializing clone + if (snapshot.isClone() && snapshot.clones().isEmpty()) { + if (initializingClones.contains(snapshot.snapshot())) { + updatedSnapshotEntries.add(snapshot); + } else { + logger.debug("removing not yet start clone operation [{}]", snapshot); + changed = true; } - updatedSnapshotEntries.add(updatedSnapshot); } else { - updatedSnapshotEntries.add(snapshot); + ImmutableOpenMap shards = processWaitingShardsAndRemovedNodes(snapshot.shards(), + routingTable, nodes, knownFailures.computeIfAbsent(snapshot.repository(), k -> new HashMap<>())); + if (shards != null) { + final SnapshotsInProgress.Entry updatedSnapshot = snapshot.withShardStates(shards); + changed = true; + if (updatedSnapshot.state().completed()) { + finishedSnapshots.add(updatedSnapshot); + } + updatedSnapshotEntries.add(updatedSnapshot); + } else { + updatedSnapshotEntries.add(snapshot); + } } } else if (snapshot.repositoryStateId() == RepositoryData.UNKNOWN_REPO_GEN) { // BwC path, older versions could create entries with unknown repo GEN in INIT or ABORTED state that did not yet @@ -996,6 +1285,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } } } + startExecutableClones(newState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY), null); // run newly ready deletes for (SnapshotDeletionsInProgress.Entry entry : deletionsToExecute) { if (tryEnterRepoLoop(entry.repository())) { @@ -1165,6 +1455,11 @@ private void endSnapshot(SnapshotsInProgress.Entry entry, Metadata metadata, @Nu entry.snapshot(), new SnapshotException(snapshot, "Aborted on initialization"), repositoryData, null); return; } + if (entry.isClone() && entry.state() == State.FAILED) { + logger.debug("Removing failed snapshot clone [{}] from cluster state", entry); + removeFailedSnapshotFromClusterState(entry.snapshot(), new SnapshotException(entry.snapshot(), entry.failure()), null, null); + return; + } final String repoName = entry.repository(); if (tryEnterRepoLoop(repoName)) { if (repositoryData == null) { @@ -1238,10 +1533,24 @@ private void finalizeSnapshotEntry(SnapshotsInProgress.Entry entry, Metadata met entry.startTime(), failure, threadPool.absoluteTimeInMillis(), entry.partial() ? shardGenerations.totalShards() : entry.shards().size(), shardFailures, entry.includeGlobalState(), entry.userMetadata()); - repositoriesService.repository(snapshot.getRepository()).finalizeSnapshot( + final StepListener metadataListener = new StepListener<>(); + final Repository repo = repositoriesService.repository(snapshot.getRepository()); + if (entry.isClone()) { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute( + ActionRunnable.supply(metadataListener, () -> { + final Metadata.Builder metaBuilder = Metadata.builder(repo.getSnapshotGlobalMetadata(entry.source())); + for (IndexId index : entry.indices()) { + metaBuilder.put(repo.getSnapshotIndexMetaData(repositoryData, entry.source(), index), false); + } + return metaBuilder.build(); + })); + } else { + metadataListener.onResponse(metadata); + } + metadataListener.whenComplete(meta -> repo.finalizeSnapshot( shardGenerations, - repositoryData.getGenId(), - metadataForSnapshot(entry, metadata), + repositoryData.getGenId(), + metadataForSnapshot(entry, meta), snapshotInfo, entry.version(), state -> stateWithoutSnapshot(state, snapshot), @@ -1251,7 +1560,8 @@ private void finalizeSnapshotEntry(SnapshotsInProgress.Entry entry, Metadata met snapshotCompletionListeners.remove(snapshot), Tuple.tuple(newRepoData, snapshotInfo)); logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state()); runNextQueuedOperation(newRepoData, repository, true); - }, e -> handleFinalizationFailure(e, entry, repositoryData))); + }, e -> handleFinalizationFailure(e, entry, repositoryData))), + e -> handleFinalizationFailure(e, entry, repositoryData)); } catch (Exception e) { assert false : new AssertionError(e); handleFinalizationFailure(e, entry, repositoryData); @@ -1428,8 +1738,6 @@ private static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot sn private void removeFailedSnapshotFromClusterState(Snapshot snapshot, Exception failure, @Nullable RepositoryData repositoryData, @Nullable CleanupAfterErrorListener listener) { assert failure != null : "Failure must be supplied"; - assert (listener == null || repositoryData == null) && (listener == null && repositoryData == null) == false : - "Either repository data or a listener but not both must be null but saw [" + listener + "] and [" + repositoryData + "]"; clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() { @Override @@ -1466,7 +1774,9 @@ public void onNoLongerMaster(String source) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { failSnapshotCompletionListeners(snapshot, failure); if (listener == null) { - runNextQueuedOperation(repositoryData, snapshot.getRepository(), true); + if (repositoryData != null) { + runNextQueuedOperation(repositoryData, snapshot.getRepository(), true); + } } else { listener.onFailure(null); } @@ -1519,11 +1829,11 @@ private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e) { public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionListener listener) { final String[] snapshotNames = request.snapshots(); - final String repositoryName = request.repository(); + final String repoName = request.repository(); logger.info(() -> new ParameterizedMessage("deleting snapshots [{}] from repository [{}]", - Strings.arrayToCommaDelimitedString(snapshotNames), repositoryName)); + Strings.arrayToCommaDelimitedString(snapshotNames), repoName)); - final Repository repository = repositoriesService.repository(repositoryName); + final Repository repository = repositoriesService.repository(repoName); repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(Priority.NORMAL) { private Snapshot runningSnapshot; @@ -1543,12 +1853,12 @@ public ClusterState execute(ClusterState currentState) throws Exception { + "]"); } final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); - final List snapshotEntries = findInProgressSnapshots(snapshots, snapshotNames, repositoryName); + final List snapshotEntries = findInProgressSnapshots(snapshots, snapshotNames, repoName); final List snapshotIds = matchingSnapshotIds( snapshotEntries.stream().map(e -> e.snapshot().getSnapshotId()).collect(Collectors.toList()), repositoryData, - snapshotNames, repositoryName); + snapshotNames, repoName); if (snapshotEntries.isEmpty() || minNodeVersion.onOrAfter(SnapshotsService.FULL_CONCURRENCY_VERSION)) { - deleteFromRepoTask = createDeleteStateUpdate(snapshotIds, repositoryName, repositoryData, Priority.NORMAL, listener); + deleteFromRepoTask = createDeleteStateUpdate(snapshotIds, repoName, repositoryData, Priority.NORMAL, listener); return deleteFromRepoTask.execute(currentState); } assert snapshotEntries.size() == 1 : "Expected just a single running snapshot but saw " + snapshotEntries; @@ -1629,7 +1939,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS listener.onResponse(null); } else { clusterService.submitStateUpdateTask("delete snapshot", - createDeleteStateUpdate(outstandingDeletes, repositoryName, repositoryData, Priority.IMMEDIATE, listener)); + createDeleteStateUpdate(outstandingDeletes, repoName, repositoryData, Priority.IMMEDIATE, listener)); } return; } @@ -1638,7 +1948,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS result -> { logger.debug("deleted snapshot completed - deleting files"); clusterService.submitStateUpdateTask("delete snapshot", - createDeleteStateUpdate(outstandingDeletes, repositoryName, result.v1(), Priority.IMMEDIATE, listener)); + createDeleteStateUpdate(outstandingDeletes, repoName, result.v1(), Priority.IMMEDIATE, listener)); }, e -> { if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) != null) { @@ -1758,6 +2068,17 @@ public ClusterState execute(ClusterState currentState) { } } final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); + final Set activeCloneSources = snapshots.entries() + .stream() + .filter(SnapshotsInProgress.Entry::isClone) + .map(SnapshotsInProgress.Entry::source) + .collect(Collectors.toSet()); + for (SnapshotId snapshotId : snapshotIds) { + if (activeCloneSources.contains(snapshotId)) { + throw new ConcurrentSnapshotExecutionException(new Snapshot(repoName, snapshotId), + "cannot delete snapshot while it is being cloned"); + } + } final SnapshotsInProgress updatedSnapshots; if (minNodeVersion.onOrAfter(FULL_CONCURRENCY_VERSION)) { updatedSnapshots = SnapshotsInProgress.of(snapshots.entries().stream() @@ -2266,7 +2587,7 @@ private static ImmutableOpenMap builder = ImmutableOpenMap.builder(); final ShardGenerations shardGenerations = repositoryData.shardGenerations(); - final Set inProgressShards = busyShardsForRepo(repoName, snapshotsInProgress); + final Set inProgressShards = busyShardsForRepo(repoName, snapshotsInProgress, metadata); final boolean readyToExecute = deletionsInProgress == null || deletionsInProgress.getEntries().stream() .noneMatch(entry -> entry.repository().equals(repoName) && entry.state() == SnapshotDeletionsInProgress.State.STARTED); for (IndexId index : indices) { @@ -2330,16 +2651,32 @@ private static ImmutableOpenMap busyShardsForRepo(String repoName, @Nullable SnapshotsInProgress snapshots) { + private static Set busyShardsForRepo(String repoName, @Nullable SnapshotsInProgress snapshots, Metadata metadata) { final List runningSnapshots = snapshots == null ? Collections.emptyList() : snapshots.entries(); final Set inProgressShards = new HashSet<>(); for (SnapshotsInProgress.Entry runningSnapshot : runningSnapshots) { if (runningSnapshot.repository().equals(repoName) == false) { continue; } - for (ObjectObjectCursor shard : runningSnapshot.shards()) { - if (shard.value.isActive()) { - inProgressShards.add(shard.key); + if (runningSnapshot.isClone()) { + for (ObjectObjectCursor clone : runningSnapshot.clones()) { + final ShardSnapshotStatus shardState = clone.value; + if (shardState.isActive()) { + IndexMetadata indexMeta = metadata.index(clone.key.indexName()); + final Index index; + if (indexMeta == null) { + index = new Index(clone.key.indexName(), IndexMetadata.INDEX_UUID_NA_VALUE); + } else { + index = indexMeta.getIndex(); + } + inProgressShards.add(new ShardId(index, clone.key.shardId())); + } + } + } else { + for (ObjectObjectCursor shard : runningSnapshot.shards()) { + if (shard.value.isActive()) { + inProgressShards.add(shard.key); + } } } } @@ -2431,97 +2768,282 @@ public boolean assertAllListenersResolved() { return true; } - private static final ClusterStateTaskExecutor SHARD_STATE_EXECUTOR = (currentState, tasks) -> { + /** + * Executor that applies {@link ShardSnapshotUpdate}s to the current cluster state. The algorithm implemented below works as described + * below: + * Every shard snapshot or clone state update can result in multiple snapshots being updated. In order to determine whether or not a + * shard update has an effect we use an outer loop over all current executing snapshot operations that iterates over them in the order + * they were started in and an inner loop over the list of shard update tasks. + * + * If the inner loop finds that a shard update task applies to a given snapshot and either a shard-snapshot or shard-clone operation in + * it then it will update the state of the snapshot entry accordingly. If that update was a noop, then the task is removed from the + * iteration as it was already applied before and likely just arrived on the master node again due to retries upstream. + * If the update was not a noop, then it means that the shard it applied to is now available for another snapshot or clone operation + * to be re-assigned if there is another snapshot operation that is waiting for the shard to become available. We therefore record the + * fact that a task was executed by adding it to a collection of executed tasks. If a subsequent execution of the outer loop finds that + * a task in the executed tasks collection applied to a shard it was waiting for to become available, then the shard snapshot operation + * will be started for that snapshot entry and the task removed from the collection of tasks that need to be applied to snapshot + * entries since it can not have any further effects. + * + * Package private to allow for tests. + */ + static final ClusterStateTaskExecutor SHARD_STATE_EXECUTOR = (currentState, tasks) -> { int changedCount = 0; int startedCount = 0; final List entries = new ArrayList<>(); + final String localNodeId = currentState.nodes().getLocalNodeId(); // Tasks to check for updates for running snapshots. final List unconsumedTasks = new ArrayList<>(tasks); // Tasks that were used to complete an existing in-progress shard snapshot final Set executedTasks = new HashSet<>(); + // Outer loop over all snapshot entries in the order they were created in for (SnapshotsInProgress.Entry entry : currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) { if (entry.state().completed()) { + // completed snapshots do not require any updates so we just add them to the new list and keep going entries.add(entry); continue; } ImmutableOpenMap.Builder shards = null; + ImmutableOpenMap.Builder clones = null; + Map indicesLookup = null; + // inner loop over all the shard updates that are potentially applicable to the current snapshot entry for (Iterator iterator = unconsumedTasks.iterator(); iterator.hasNext(); ) { final ShardSnapshotUpdate updateSnapshotState = iterator.next(); final Snapshot updatedSnapshot = updateSnapshotState.snapshot; final String updatedRepository = updatedSnapshot.getRepository(); if (entry.repository().equals(updatedRepository) == false) { + // the update applies to a different repository so it is irrelevant here continue; } - final ShardId finishedShardId = updateSnapshotState.shardId; - if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) { - final ShardSnapshotStatus existing = entry.shards().get(finishedShardId); - if (existing == null) { - logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]", - updateSnapshotState, entry); - assert false : "This should never happen, data nodes should only send updates for expected shards"; - continue; - } - if (existing.state().completed()) { - // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect. - iterator.remove(); - continue; - } - logger.trace("[{}] Updating shard [{}] with status [{}]", updatedSnapshot, - finishedShardId, updateSnapshotState.updatedState.state()); - if (shards == null) { - shards = ImmutableOpenMap.builder(entry.shards()); - } - shards.put(finishedShardId, updateSnapshotState.updatedState); - executedTasks.add(updateSnapshotState); - changedCount++; - } else if (executedTasks.contains(updateSnapshotState)) { - // tasks that completed a shard might allow starting a new shard snapshot for the current snapshot - final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId); - if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) { - continue; + if (updateSnapshotState.isClone()) { + // The update applied to a shard clone operation + final RepositoryShardId finishedShardId = updateSnapshotState.repoShardId; + if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) { + assert entry.isClone() : "Non-clone snapshot [" + entry + "] received update for clone [" + + updateSnapshotState + "]"; + final ShardSnapshotStatus existing = entry.clones().get(finishedShardId); + if (existing == null) { + logger.warn("Received clone shard snapshot status update [{}] but this shard is not tracked in [{}]", + updateSnapshotState, entry); + assert false : "This should never happen, master will not submit a state update for a non-existing clone"; + continue; + } + if (existing.state().completed()) { + // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect. + iterator.remove(); + continue; + } + logger.trace("[{}] Updating shard clone [{}] with status [{}]", updatedSnapshot, + finishedShardId, updateSnapshotState.updatedState.state()); + if (clones == null) { + clones = ImmutableOpenMap.builder(entry.clones()); + } + changedCount++; + clones.put(finishedShardId, updateSnapshotState.updatedState); + executedTasks.add(updateSnapshotState); + } else if (executedTasks.contains(updateSnapshotState)) { + // the update was already executed on the clone operation it applied to, now we check if it may be possible to + // start a shard snapshot or clone operation on the current entry + if (entry.isClone()) { + // current entry is a clone operation + final ShardSnapshotStatus existingStatus = entry.clones().get(finishedShardId); + if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) { + continue; + } + if (clones == null) { + clones = ImmutableOpenMap.builder(entry.clones()); + } + final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState; + logger.trace("Starting clone [{}] on [{}] with generation [{}]", finishedShardId, + finishedStatus.nodeId(), finishedStatus.generation()); + assert finishedStatus.nodeId().equals(localNodeId) : "Clone updated with node id [" + finishedStatus.nodeId() + + "] but local node id is [" + localNodeId + "]"; + clones.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation())); + iterator.remove(); + } else { + // current entry is a snapshot operation so we must translate the repository shard id to a routing shard id + final IndexMetadata indexMeta = currentState.metadata().index(finishedShardId.indexName()); + if (indexMeta == null) { + // The index name that finished cloning does not exist in the cluster state so it isn't relevant to a + // normal snapshot + continue; + } + final ShardId finishedRoutingShardId = new ShardId(indexMeta.getIndex(), finishedShardId.shardId()); + final ShardSnapshotStatus existingStatus = entry.shards().get(finishedRoutingShardId); + if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) { + continue; + } + if (shards == null) { + shards = ImmutableOpenMap.builder(entry.shards()); + } + final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState; + logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId, + finishedStatus.nodeId(), finishedStatus.generation()); + // A clone was updated, so we must use the correct data node id for the reassignment as actual shard + // snapshot + final ShardSnapshotStatus shardSnapshotStatus = startShardSnapshotAfterClone(currentState, + updateSnapshotState.updatedState.generation(), finishedRoutingShardId); + shards.put(finishedRoutingShardId, shardSnapshotStatus); + if (shardSnapshotStatus.isActive()) { + // only remove the update from the list of tasks that might hold a reusable shard if we actually + // started a snapshot and didn't just fail + iterator.remove(); + } + } } - if (shards == null) { - shards = ImmutableOpenMap.builder(entry.shards()); + } else { + // a (non-clone) shard snapshot operation was updated + final ShardId finishedShardId = updateSnapshotState.shardId; + if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) { + final ShardSnapshotStatus existing = entry.shards().get(finishedShardId); + if (existing == null) { + logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]", + updateSnapshotState, entry); + assert false : "This should never happen, data nodes should only send updates for expected shards"; + continue; + } + if (existing.state().completed()) { + // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect. + iterator.remove(); + continue; + } + logger.trace("[{}] Updating shard [{}] with status [{}]", updatedSnapshot, + finishedShardId, updateSnapshotState.updatedState.state()); + if (shards == null) { + shards = ImmutableOpenMap.builder(entry.shards()); + } + shards.put(finishedShardId, updateSnapshotState.updatedState); + executedTasks.add(updateSnapshotState); + changedCount++; + } else if (executedTasks.contains(updateSnapshotState)) { + // We applied the update for a shard snapshot state to its snapshot entry, now check if we can update + // either a clone or a snapshot + if (entry.isClone()) { + // Since we updated a normal snapshot we need to translate its shard ids to repository shard ids which requires + // a lookup for the index ids + if (indicesLookup == null) { + indicesLookup = entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity())); + } + // shard snapshot was completed, we check if we can start a clone operation for the same repo shard + final IndexId indexId = indicesLookup.get(finishedShardId.getIndexName()); + // If the lookup finds the index id then at least the entry is concerned with the index id just updated + // so we check on a shard level + if (indexId != null) { + final RepositoryShardId repoShardId = new RepositoryShardId(indexId, finishedShardId.getId()); + final ShardSnapshotStatus existingStatus = entry.clones().get(repoShardId); + if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) { + continue; + } + if (clones == null) { + clones = ImmutableOpenMap.builder(entry.clones()); + } + final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState; + logger.trace("Starting clone [{}] on [{}] with generation [{}]", finishedShardId, + finishedStatus.nodeId(), finishedStatus.generation()); + clones.put(repoShardId, new ShardSnapshotStatus(localNodeId, finishedStatus.generation())); + iterator.remove(); + startedCount++; + } + } else { + // shard snapshot was completed, we check if we can start another snapshot + final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId); + if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) { + continue; + } + if (shards == null) { + shards = ImmutableOpenMap.builder(entry.shards()); + } + final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState; + logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId, + finishedStatus.nodeId(), finishedStatus.generation()); + shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation())); + iterator.remove(); + } } - final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState; - logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId, - finishedStatus.nodeId(), finishedStatus.generation()); - shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation())); - iterator.remove(); - startedCount++; } } - if (shards == null) { - entries.add(entry); + final SnapshotsInProgress.Entry updatedEntry; + if (shards != null) { + assert clones == null : "Should not have updated clones when updating shard snapshots but saw " + clones + + " as well as " + shards; + updatedEntry = entry.withShardStates(shards.build()); + } else if (clones != null) { + updatedEntry = entry.withClones(clones.build()); } else { - entries.add(entry.withShardStates(shards.build())); + updatedEntry = entry; } + entries.add(updatedEntry); } if (changedCount > 0) { logger.trace("changed cluster state triggered by [{}] snapshot state updates and resulted in starting " + "[{}] shard snapshots", changedCount, startedCount); - return ClusterStateTaskExecutor.ClusterTasksResult.builder().successes(tasks).build( - ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build()); + return ClusterStateTaskExecutor.ClusterTasksResult.builder().successes(tasks) + .build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, + SnapshotsInProgress.of(entries)).build()); } return ClusterStateTaskExecutor.ClusterTasksResult.builder().successes(tasks).build(currentState); }; + /** + * Creates a {@link ShardSnapshotStatus} entry for a snapshot after the shard has become available for snapshotting as a result + * of a snapshot clone completing. + * + * @param currentState current cluster state + * @param shardGeneration shard generation of the shard in the repository + * @param shardId shard id of the shard that just finished cloning + * @return shard snapshot status + */ + private static ShardSnapshotStatus startShardSnapshotAfterClone(ClusterState currentState, String shardGeneration, ShardId shardId) { + final ShardRouting primary = currentState.routingTable().index(shardId.getIndex()).shard(shardId.id()).primaryShard(); + final ShardSnapshotStatus shardSnapshotStatus; + if (primary == null || !primary.assignedToNode()) { + shardSnapshotStatus = new ShardSnapshotStatus( + null, ShardState.MISSING, "primary shard is not allocated", shardGeneration); + } else if (primary.relocating() || primary.initializing()) { + shardSnapshotStatus = + new ShardSnapshotStatus(primary.currentNodeId(), ShardState.WAITING, shardGeneration); + } else if (primary.started() == false) { + shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), ShardState.MISSING, + "primary shard hasn't been started yet", shardGeneration); + } else { + shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), shardGeneration); + } + return shardSnapshotStatus; + } + /** * An update to the snapshot state of a shard. + * + * Package private for testing */ - private static final class ShardSnapshotUpdate { + static final class ShardSnapshotUpdate { private final Snapshot snapshot; private final ShardId shardId; + private final RepositoryShardId repoShardId; + private final ShardSnapshotStatus updatedState; - private ShardSnapshotUpdate(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus updatedState) { + ShardSnapshotUpdate(Snapshot snapshot, RepositoryShardId repositoryShardId, ShardSnapshotStatus updatedState) { + this.snapshot = snapshot; + this.shardId = null; + this.updatedState = updatedState; + this.repoShardId = repositoryShardId; + } + + ShardSnapshotUpdate(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus updatedState) { this.snapshot = snapshot; this.shardId = shardId; this.updatedState = updatedState; + repoShardId = null; + } + + + public boolean isClone() { + return repoShardId != null; } @Override @@ -2533,13 +3055,14 @@ public boolean equals(Object other) { return false; } final ShardSnapshotUpdate that = (ShardSnapshotUpdate) other; - return this.snapshot.equals(that.snapshot) && this.shardId.equals(that.shardId) && this.updatedState == that.updatedState; + return this.snapshot.equals(that.snapshot) && Objects.equals(this.shardId, that.shardId) + && Objects.equals(this.repoShardId, that.repoShardId) && this.updatedState == that.updatedState; } @Override public int hashCode() { - return Objects.hash(snapshot, shardId, updatedState); + return Objects.hash(snapshot, shardId, updatedState, repoShardId); } } @@ -2568,19 +3091,35 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } finally { // Maybe this state update completed the snapshot. If we are not already ending it because of a concurrent // state update we check if its state is completed and end it if it is. + final SnapshotsInProgress snapshotsInProgress = + newState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); if (endingSnapshots.contains(update.snapshot) == false) { - final SnapshotsInProgress snapshotsInProgress = newState.custom(SnapshotsInProgress.TYPE); final SnapshotsInProgress.Entry updatedEntry = snapshotsInProgress.snapshot(update.snapshot); // If the entry is still in the cluster state and is completed, try finalizing the snapshot in the repo if (updatedEntry != null && updatedEntry.state().completed()) { endSnapshot(updatedEntry, newState.metadata(), null); } } + startExecutableClones(snapshotsInProgress, update.snapshot.getRepository()); } } }); } + private void startExecutableClones(SnapshotsInProgress snapshotsInProgress, @Nullable String repoName) { + for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { + if (entry.isClone() && entry.state() == State.STARTED && (repoName == null || entry.repository().equals(repoName))) { + // this is a clone, see if new work is ready + for (ObjectObjectCursor clone : entry.clones()) { + if (clone.value.state() == ShardState.INIT) { + runReadyClone(entry.snapshot(), entry.source(), clone.value, clone.key, + repositoriesService.repository(entry.repository())); + } + } + } + } + } + private class UpdateSnapshotStatusAction extends TransportMasterNodeAction { UpdateSnapshotStatusAction(TransportService transportService, ClusterService clusterService, diff --git a/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusRequest.java b/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusRequest.java index 1d20d2d109b7d..cd90a2866bbc1 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusRequest.java +++ b/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusRequest.java @@ -41,7 +41,7 @@ public UpdateIndexShardSnapshotStatusRequest(StreamInput in) throws IOException super(in); snapshot = new Snapshot(in); shardId = new ShardId(in); - status = new SnapshotsInProgress.ShardSnapshotStatus(in); + status = SnapshotsInProgress.ShardSnapshotStatus.readFrom(in); } public UpdateIndexShardSnapshotStatusRequest(Snapshot snapshot, ShardId shardId, SnapshotsInProgress.ShardSnapshotStatus status) { diff --git a/server/src/main/java/org/elasticsearch/snapshots/package-info.java b/server/src/main/java/org/elasticsearch/snapshots/package-info.java index 5c08aadece0b7..eb50b9821da93 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/package-info.java +++ b/server/src/main/java/org/elasticsearch/snapshots/package-info.java @@ -99,6 +99,41 @@ * update to remove the deletion's entry in {@code SnapshotDeletionsInProgress} which concludes the process of deleting a snapshot. * * + *

Cloning a Snapshot

+ * + *

Cloning part of a snapshot is a process executed entirely on the master node. On a high level, the process of cloning a snapshot is + * analogous to that of creating a snapshot from data in the cluster except that the source of data files is the snapshot repository + * instead of the data nodes. It begins with cloning all shards and then finalizes the cloned snapshot the same way a normal snapshot would + * be finalized. Concretely, it is executed as follows:

+ * + *
    + *
  1. First, {@link org.elasticsearch.snapshots.SnapshotsService#cloneSnapshot} is invoked which will place a placeholder entry into + * {@code SnapshotsInProgress} that does not yet contain any shard clone assignments. Note that unlike in the case of snapshot + * creation, the shard level clone tasks in {@link org.elasticsearch.cluster.SnapshotsInProgress.Entry#clones} are not created in the + * initial cluster state update as is done for shard snapshot assignments in + * {@link org.elasticsearch.cluster.SnapshotsInProgress.Entry#shards}. This is due to the fact that shard snapshot assignments are + * computed purely from information in the current cluster state while shard clone assignments require information to be read from the + * repository, which is too slow of a process to be done inside a cluster state update. Loading this information ahead of creating a + * task in the cluster state, runs the risk of race conditions where the source snapshot is being deleted before the clone task is + * enqueued in the cluster state.
  2. + *
  3. Once a placeholder task for the clone operation is put into the cluster state, we must determine the number of shards in each + * index that is to be cloned as well as ensure the health of the index snapshots in the source snapshot. In order to determine the + * shard count for each index that is to be cloned, we load the index metadata for each such index using the repository's + * {@link org.elasticsearch.repositories.Repository#getSnapshotIndexMetaData} method. In order to ensure the health of the source index + * snapshots, we load the {@link org.elasticsearch.snapshots.SnapshotInfo} for the source snapshot and check for shard snapshot + * failures of the relevant indices.
  4. + *
  5. Once all shard counts are known and the health of all source indices data has been verified, we populate the + * {@code SnapshotsInProgress.Entry#clones} map for the clone operation with the the relevant shard clone tasks.
  6. + *
  7. After the clone tasks have been added to the {@code SnapshotsInProgress.Entry}, master executes them on its snapshot thread-pool + * by invoking {@link org.elasticsearch.repositories.Repository#cloneShardSnapshot} for each shard that is to be cloned. Each completed + * shard snapshot triggers a call to the {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_STATE_EXECUTOR} which updates the + * clone's {@code SnapshotsInProgress.Entry} to mark the shard clone operation completed.
  8. + *
  9. Once all the entries in {@code SnapshotsInProgress.Entry#clones} have completed, the clone is finalized just like any other + * snapshot through {@link org.elasticsearch.snapshots.SnapshotsService#endSnapshot}. The only difference being that the metadata that + * is written out for indices and the global metadata are read from the source snapshot in the repository instead of the cluster state. + *
  10. + *
+ * *

Concurrent Snapshot Operations

* * Snapshot create and delete operations may be started concurrently. Operations targeting different repositories run independently of diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java new file mode 100644 index 0000000000000..8e15d6d5a1e48 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java @@ -0,0 +1,427 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.snapshots; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.RepositoryShardId; +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED; +import static org.hamcrest.Matchers.is; + +public class SnapshotsServiceTests extends ESTestCase { + + public void testNoopShardStateUpdates() throws Exception { + final String repoName = "test-repo"; + final Snapshot snapshot = snapshot(repoName, "snapshot-1"); + final SnapshotsInProgress.Entry snapshotNoShards = snapshotEntry(snapshot, Collections.emptyList(), ImmutableOpenMap.of()); + + final String indexName1 = "index-1"; + final ShardId shardId1 = new ShardId(index(indexName1), 0); + { + final ClusterState state = stateWithSnapshots(snapshotNoShards); + final SnapshotsService.ShardSnapshotUpdate shardCompletion = + new SnapshotsService.ShardSnapshotUpdate(snapshot, shardId1, successfulShardStatus(uuid())); + assertIsNoop(state, shardCompletion); + } + { + final ClusterState state = stateWithSnapshots( + snapshotEntry( + snapshot, Collections.singletonList(indexId(indexName1)), shardsMap(shardId1, initShardStatus(uuid())))); + final SnapshotsService.ShardSnapshotUpdate shardCompletion = new SnapshotsService.ShardSnapshotUpdate( + snapshot("other-repo", snapshot.getSnapshotId().getName()), shardId1, successfulShardStatus(uuid())); + assertIsNoop(state, shardCompletion); + } + } + + public void testUpdateSnapshotToSuccess() throws Exception { + final String repoName = "test-repo"; + final Snapshot sn1 = snapshot(repoName, "snapshot-1"); + final String indexName1 = "index-1"; + final String dataNodeId = uuid(); + final IndexId indexId1 = indexId(indexName1); + final ShardId shardId1 = new ShardId(index(indexName1), 0); + final SnapshotsInProgress.Entry snapshotSingleShard = + snapshotEntry(sn1, Collections.singletonList(indexId1), shardsMap(shardId1, initShardStatus(dataNodeId))); + + assertThat(snapshotSingleShard.state(), is(SnapshotsInProgress.State.STARTED)); + + final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(sn1, shardId1, dataNodeId); + final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(snapshotSingleShard), completeShard); + final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE); + final SnapshotsInProgress.Entry updatedSnapshot1 = snapshotsInProgress.entries().get(0); + assertThat(updatedSnapshot1.state(), is(SnapshotsInProgress.State.SUCCESS)); + assertIsNoop(updatedClusterState, completeShard); + } + + public void testUpdateSnapshotMultipleShards() throws Exception { + final String repoName = "test-repo"; + final Snapshot sn1 = snapshot(repoName, "snapshot-1"); + final String indexName1 = "index-1"; + final String dataNodeId = uuid(); + final IndexId indexId1 = indexId(indexName1); + final Index routingIndex1 = index(indexName1); + final ShardId shardId1 = new ShardId(routingIndex1, 0); + final ShardId shardId2 = new ShardId(routingIndex1, 1); + final SnapshotsInProgress.ShardSnapshotStatus shardInitStatus = initShardStatus(dataNodeId); + final SnapshotsInProgress.Entry snapshotSingleShard = snapshotEntry(sn1, Collections.singletonList(indexId1), + ImmutableOpenMap.builder(shardsMap(shardId1, shardInitStatus)).fPut(shardId2, shardInitStatus).build()); + + assertThat(snapshotSingleShard.state(), is(SnapshotsInProgress.State.STARTED)); + + final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(sn1, shardId1, dataNodeId); + final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(snapshotSingleShard), completeShard); + final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE); + final SnapshotsInProgress.Entry updatedSnapshot1 = snapshotsInProgress.entries().get(0); + assertThat(updatedSnapshot1.state(), is(SnapshotsInProgress.State.STARTED)); + assertIsNoop(updatedClusterState, completeShard); + } + + public void testUpdateCloneToSuccess() throws Exception { + final String repoName = "test-repo"; + final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot"); + final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot"); + final String indexName1 = "index-1"; + final String dataNodeId = uuid(); + final IndexId indexId1 = indexId(indexName1); + final RepositoryShardId shardId1 = new RepositoryShardId(indexId1, 0); + final SnapshotsInProgress.Entry cloneSingleShard = + cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(), clonesMap(shardId1, initShardStatus(dataNodeId))); + + assertThat(cloneSingleShard.state(), is(SnapshotsInProgress.State.STARTED)); + + final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(targetSnapshot, shardId1, dataNodeId); + final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(cloneSingleShard), completeShard); + final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE); + final SnapshotsInProgress.Entry updatedSnapshot1 = snapshotsInProgress.entries().get(0); + assertThat(updatedSnapshot1.state(), is(SnapshotsInProgress.State.SUCCESS)); + assertIsNoop(updatedClusterState, completeShard); + } + + public void testUpdateCloneMultipleShards() throws Exception { + final String repoName = "test-repo"; + final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot"); + final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot"); + final String indexName1 = "index-1"; + final String dataNodeId = uuid(); + final IndexId indexId1 = indexId(indexName1); + final RepositoryShardId shardId1 = new RepositoryShardId(indexId1, 0); + final RepositoryShardId shardId2 = new RepositoryShardId(indexId1, 1); + final SnapshotsInProgress.ShardSnapshotStatus shardInitStatus = initShardStatus(dataNodeId); + final SnapshotsInProgress.Entry cloneMultipleShards = cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(), + ImmutableOpenMap.builder(clonesMap(shardId1, shardInitStatus)).fPut(shardId2, shardInitStatus).build()); + + assertThat(cloneMultipleShards.state(), is(SnapshotsInProgress.State.STARTED)); + + final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(targetSnapshot, shardId1, dataNodeId); + final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(cloneMultipleShards), completeShard); + final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE); + final SnapshotsInProgress.Entry updatedSnapshot1 = snapshotsInProgress.entries().get(0); + assertThat(updatedSnapshot1.state(), is(SnapshotsInProgress.State.STARTED)); + assertIsNoop(updatedClusterState, completeShard); + } + + public void testCompletedCloneStartsSnapshot() throws Exception { + final String repoName = "test-repo"; + final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot"); + final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot"); + final String indexName1 = "index-1"; + final String dataNodeId = uuid(); + final IndexId indexId1 = indexId(indexName1); + final RepositoryShardId shardId1 = new RepositoryShardId(indexId1, 0); + final SnapshotsInProgress.ShardSnapshotStatus shardInitStatus = initShardStatus(dataNodeId); + final SnapshotsInProgress.Entry cloneSingleShard = cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(), + clonesMap(shardId1, shardInitStatus)); + + final ClusterState stateWithIndex = stateWithUnassignedIndices(indexName1); + final Snapshot plainSnapshot = snapshot(repoName, "test-snapshot"); + final ShardId routingShardId1 = new ShardId(stateWithIndex.metadata().index(indexName1).getIndex(), 0); + final SnapshotsInProgress.Entry snapshotSingleShard = snapshotEntry(plainSnapshot, Collections.singletonList(indexId1), + shardsMap(routingShardId1, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED)); + + assertThat(cloneSingleShard.state(), is(SnapshotsInProgress.State.STARTED)); + + // 1. case: shard that just finished cloning is unassigned -> shard snapshot should go to MISSING state + final ClusterState stateWithUnassignedRoutingShard = stateWithSnapshots(stateWithIndex, cloneSingleShard, snapshotSingleShard); + final SnapshotsService.ShardSnapshotUpdate completeShardClone = successUpdate(targetSnapshot, shardId1, uuid()); + { + final ClusterState updatedClusterState = applyUpdates(stateWithUnassignedRoutingShard, completeShardClone); + final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE); + final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0); + assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS)); + final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1); + assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.SUCCESS)); + assertThat(startedSnapshot.shards().get(routingShardId1).state(), is(SnapshotsInProgress.ShardState.MISSING)); + assertIsNoop(updatedClusterState, completeShardClone); + } + + // 2. case: shard that just finished cloning is assigned correctly -> shard snapshot should go to INIT state + final ClusterState stateWithAssignedRoutingShard = + ClusterState.builder(stateWithUnassignedRoutingShard).routingTable( + RoutingTable.builder(stateWithUnassignedRoutingShard.routingTable()).add( + IndexRoutingTable.builder(routingShardId1.getIndex()).addIndexShard( + new IndexShardRoutingTable.Builder(routingShardId1).addShard( + TestShardRouting.newShardRouting( + routingShardId1, dataNodeId, true, ShardRoutingState.STARTED) + ).build())).build()).build(); + { + final ClusterState updatedClusterState = applyUpdates(stateWithAssignedRoutingShard, completeShardClone); + final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE); + final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0); + assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS)); + final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1); + assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED)); + final SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus = startedSnapshot.shards().get(routingShardId1); + assertThat(shardSnapshotStatus.state(), is(SnapshotsInProgress.ShardState.INIT)); + assertThat(shardSnapshotStatus.nodeId(), is(dataNodeId)); + assertIsNoop(updatedClusterState, completeShardClone); + } + + // 3. case: shard that just finished cloning is currently initializing -> shard snapshot should go to WAITING state + final ClusterState stateWithInitializingRoutingShard = + ClusterState.builder(stateWithUnassignedRoutingShard).routingTable( + RoutingTable.builder(stateWithUnassignedRoutingShard.routingTable()).add( + IndexRoutingTable.builder(routingShardId1.getIndex()).addIndexShard( + new IndexShardRoutingTable.Builder(routingShardId1).addShard( + TestShardRouting.newShardRouting( + routingShardId1, dataNodeId, true, ShardRoutingState.INITIALIZING) + ).build())).build()).build(); + { + final ClusterState updatedClusterState = applyUpdates(stateWithInitializingRoutingShard, completeShardClone); + final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE); + final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0); + assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS)); + final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1); + assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED)); + assertThat(startedSnapshot.shards().get(routingShardId1).state(), is(SnapshotsInProgress.ShardState.WAITING)); + assertIsNoop(updatedClusterState, completeShardClone); + } + } + + public void testCompletedSnapshotStartsClone() throws Exception { + final String repoName = "test-repo"; + final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot"); + final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot"); + final String indexName = "index-1"; + final String dataNodeId = uuid(); + final IndexId indexId1 = indexId(indexName); + final RepositoryShardId repositoryShardId = new RepositoryShardId(indexId1, 0); + final SnapshotsInProgress.Entry cloneSingleShard = cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(), + clonesMap(repositoryShardId, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED)); + + final ClusterState stateWithIndex = stateWithUnassignedIndices(indexName); + final Snapshot plainSnapshot = snapshot(repoName, "test-snapshot"); + final ShardId routingShardId = new ShardId(stateWithIndex.metadata().index(indexName).getIndex(), 0); + final SnapshotsInProgress.Entry snapshotSingleShard = snapshotEntry(plainSnapshot, Collections.singletonList(indexId1), + shardsMap(routingShardId, initShardStatus(dataNodeId))); + + assertThat(cloneSingleShard.state(), is(SnapshotsInProgress.State.STARTED)); + + final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(plainSnapshot, routingShardId, dataNodeId); + + final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(snapshotSingleShard, cloneSingleShard), completeShard); + final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE); + final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0); + assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS)); + final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1); + assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED)); + final SnapshotsInProgress.ShardSnapshotStatus shardCloneStatus = startedSnapshot.clones().get(repositoryShardId); + assertThat(shardCloneStatus.state(), is(SnapshotsInProgress.ShardState.INIT)); + assertThat(shardCloneStatus.nodeId(), is(updatedClusterState.nodes().getLocalNodeId())); + assertIsNoop(updatedClusterState, completeShard); + } + + public void testCompletedSnapshotStartsNextSnapshot() throws Exception { + final String repoName = "test-repo"; + final String indexName = "index-1"; + final String dataNodeId = uuid(); + final IndexId indexId1 = indexId(indexName); + + final ClusterState stateWithIndex = stateWithUnassignedIndices(indexName); + final Snapshot plainSnapshot = snapshot(repoName, "test-snapshot-1"); + final ShardId routingShardId = new ShardId(stateWithIndex.metadata().index(indexName).getIndex(), 0); + final SnapshotsInProgress.Entry snapshotSingleShard = snapshotEntry(plainSnapshot, Collections.singletonList(indexId1), + shardsMap(routingShardId, initShardStatus(dataNodeId))); + + final Snapshot queuedSnapshot = snapshot(repoName, "test-snapshot-2"); + final SnapshotsInProgress.Entry queuedSnapshotSingleShard = snapshotEntry(queuedSnapshot, Collections.singletonList(indexId1), + shardsMap(routingShardId, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED)); + + final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(plainSnapshot, routingShardId, dataNodeId); + + final ClusterState updatedClusterState = + applyUpdates(stateWithSnapshots(snapshotSingleShard, queuedSnapshotSingleShard), completeShard); + final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE); + final SnapshotsInProgress.Entry completedSnapshot = snapshotsInProgress.entries().get(0); + assertThat(completedSnapshot.state(), is(SnapshotsInProgress.State.SUCCESS)); + final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1); + assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED)); + final SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus = startedSnapshot.shards().get(routingShardId); + assertThat(shardSnapshotStatus.state(), is(SnapshotsInProgress.ShardState.INIT)); + assertThat(shardSnapshotStatus.nodeId(), is(dataNodeId)); + assertIsNoop(updatedClusterState, completeShard); + } + + public void testCompletedCloneStartsNextClone() throws Exception { + final String repoName = "test-repo"; + final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot"); + final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot"); + final String indexName1 = "index-1"; + final IndexId indexId1 = indexId(indexName1); + final RepositoryShardId shardId1 = new RepositoryShardId(indexId1, 0); + final String masterNodeId = uuid(); + final SnapshotsInProgress.Entry cloneSingleShard = cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(), + clonesMap(shardId1, initShardStatus(masterNodeId))); + + final Snapshot queuedTargetSnapshot = snapshot(repoName, "test-snapshot"); + final SnapshotsInProgress.Entry queuedClone = cloneEntry(queuedTargetSnapshot, sourceSnapshot.getSnapshotId(), + clonesMap(shardId1, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED)); + + assertThat(cloneSingleShard.state(), is(SnapshotsInProgress.State.STARTED)); + + final ClusterState stateWithUnassignedRoutingShard = stateWithSnapshots( + ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes(masterNodeId)).build(), cloneSingleShard, queuedClone); + final SnapshotsService.ShardSnapshotUpdate completeShardClone = successUpdate(targetSnapshot, shardId1, masterNodeId); + + final ClusterState updatedClusterState = applyUpdates(stateWithUnassignedRoutingShard, completeShardClone); + final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE); + final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0); + assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS)); + final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1); + assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED)); + assertThat(startedSnapshot.clones().get(shardId1).state(), is(SnapshotsInProgress.ShardState.INIT)); + assertIsNoop(updatedClusterState, completeShardClone); + } + + private static DiscoveryNodes discoveryNodes(String localNodeId) { + return DiscoveryNodes.builder().localNodeId(localNodeId).build(); + } + + private static ImmutableOpenMap shardsMap( + ShardId shardId, SnapshotsInProgress.ShardSnapshotStatus shardStatus) { + return ImmutableOpenMap.builder().fPut(shardId, shardStatus).build(); + } + + private static ImmutableOpenMap clonesMap( + RepositoryShardId shardId, SnapshotsInProgress.ShardSnapshotStatus shardStatus) { + return ImmutableOpenMap.builder().fPut(shardId, shardStatus).build(); + } + + private static SnapshotsService.ShardSnapshotUpdate successUpdate(Snapshot snapshot, ShardId shardId, String nodeId) { + return new SnapshotsService.ShardSnapshotUpdate(snapshot, shardId, successfulShardStatus(nodeId)); + } + + private static SnapshotsService.ShardSnapshotUpdate successUpdate(Snapshot snapshot, RepositoryShardId shardId, String nodeId) { + return new SnapshotsService.ShardSnapshotUpdate(snapshot, shardId, successfulShardStatus(nodeId)); + } + + private static ClusterState stateWithUnassignedIndices(String... indexNames) { + final Metadata.Builder metaBuilder = Metadata.builder(Metadata.EMPTY_METADATA); + for (String index : indexNames) { + metaBuilder.put(IndexMetadata.builder(index) + .settings(Settings.builder().put(SETTING_VERSION_CREATED, Version.CURRENT.id)) + .numberOfShards(1).numberOfReplicas(0) + .build(), false); + } + final RoutingTable.Builder routingTable = RoutingTable.builder(); + for (String index : indexNames) { + final Index idx = metaBuilder.get(index).getIndex(); + routingTable.add(IndexRoutingTable.builder(idx).addIndexShard( + new IndexShardRoutingTable.Builder(new ShardId(idx, 0)).build())); + } + return ClusterState.builder(ClusterState.EMPTY_STATE).metadata(metaBuilder).routingTable(routingTable.build()).build(); + } + + private static ClusterState stateWithSnapshots(ClusterState state, SnapshotsInProgress.Entry... entries) { + return ClusterState.builder(state).version(state.version() + 1L) + .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(Arrays.asList(entries))).build(); + } + + private static ClusterState stateWithSnapshots(SnapshotsInProgress.Entry... entries) { + return stateWithSnapshots(ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes(uuid())).build(), entries); + } + + private static void assertIsNoop(ClusterState state, SnapshotsService.ShardSnapshotUpdate shardCompletion) throws Exception { + assertSame(applyUpdates(state, shardCompletion), state); + } + + private static ClusterState applyUpdates(ClusterState state, SnapshotsService.ShardSnapshotUpdate... updates) throws Exception { + return SnapshotsService.SHARD_STATE_EXECUTOR.execute(state, Arrays.asList(updates)).resultingState; + } + + private static SnapshotsInProgress.Entry snapshotEntry(Snapshot snapshot, List indexIds, + ImmutableOpenMap shards) { + return SnapshotsInProgress.startedEntry(snapshot, randomBoolean(), randomBoolean(), indexIds, Collections.emptyList(), + 1L, randomNonNegativeLong(), shards, Collections.emptyMap(), Version.CURRENT); + } + + private static SnapshotsInProgress.Entry cloneEntry( + Snapshot snapshot, SnapshotId source, ImmutableOpenMap clones) { + final List indexIds = StreamSupport.stream(clones.keys().spliterator(), false) + .map(k -> k.value.index()).distinct().collect(Collectors.toList()); + return SnapshotsInProgress.startClone(snapshot, source, indexIds, 1L, randomNonNegativeLong(), Version.CURRENT).withClones(clones); + } + + private static SnapshotsInProgress.ShardSnapshotStatus initShardStatus(String nodeId) { + return new SnapshotsInProgress.ShardSnapshotStatus(nodeId, uuid()); + } + + private static SnapshotsInProgress.ShardSnapshotStatus successfulShardStatus(String nodeId) { + return new SnapshotsInProgress.ShardSnapshotStatus(nodeId, SnapshotsInProgress.ShardState.SUCCESS, uuid()); + } + + private static Snapshot snapshot(String repoName, String name) { + return new Snapshot(repoName, new SnapshotId(name, uuid())); + } + + private static Index index(String name) { + return new Index(name, uuid()); + } + + private static IndexId indexId(String name) { + return new IndexId(name, uuid()); + } + + private static String uuid() { + return UUIDs.randomBase64UUID(random()); + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index a91e95c836cf3..ac17d9b944238 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -118,6 +118,10 @@ public long getFailureCount() { /** Allows blocking on writing the snapshot file at the end of snapshot creation to simulate a died master node */ private volatile boolean blockAndFailOnWriteSnapFile; + private volatile boolean blockOnWriteShardLevelMeta; + + private volatile boolean blockOnReadIndexMeta; + /** * Writes to the blob {@code index.latest} at the repository root will fail with an {@link IOException} if {@code true}. */ @@ -189,6 +193,8 @@ public synchronized void unblock() { blockOnWriteIndexFile = false; blockAndFailOnWriteSnapFile = false; blockOnDeleteIndexN = false; + blockOnWriteShardLevelMeta = false; + blockOnReadIndexMeta = false; this.notifyAll(); } @@ -212,6 +218,14 @@ public void setBlockOnDeleteIndexFile() { blockOnDeleteIndexN = true; } + public void setBlockOnWriteShardLevelMeta() { + blockOnWriteShardLevelMeta = true; + } + + public void setBlockOnReadIndexMeta() { + blockOnReadIndexMeta = true; + } + public void setFailReadsAfterUnblock(boolean failReadsAfterUnblock) { this.failReadsAfterUnblock = failReadsAfterUnblock; } @@ -229,7 +243,7 @@ private synchronized boolean blockExecution() { boolean wasBlocked = false; try { while (blockOnDataFiles || blockOnAnyFiles || blockOnWriteIndexFile || - blockAndFailOnWriteSnapFile || blockOnDeleteIndexN) { + blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockOnReadIndexMeta) { blocked = true; this.wait(); wasBlocked = true; @@ -365,8 +379,12 @@ protected BlobContainer wrapChild(BlobContainer child) { @Override public InputStream readBlob(String name) throws IOException { - maybeReadErrorAfterBlock(name); - maybeIOExceptionOrBlock(name); + if (blockOnReadIndexMeta && name.startsWith(BlobStoreRepository.METADATA_PREFIX) && path().equals(basePath()) == false) { + blockExecutionAndMaybeWait(name); + } else { + maybeReadErrorAfterBlock(name); + maybeIOExceptionOrBlock(name); + } return super.readBlob(name); } @@ -430,6 +448,10 @@ public Map listBlobsByPrefix(String blobNamePrefix) throws public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { maybeIOExceptionOrBlock(blobName); + if (blockOnWriteShardLevelMeta && blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX) + && path().equals(basePath()) == false) { + blockExecutionAndMaybeWait(blobName); + } super.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); if (RandomizedContext.current().getRandom().nextBoolean()) { // for network based repositories, the blob may have been written but we may still