From b2064d4dd841dd85ca3e5d793b9636b0ded11bda Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 24 Sep 2020 16:01:27 +0200 Subject: [PATCH] Dry up Snapshot Integ Tests some More (#62856) Just some obvious drying up of these super complex tests. --- .../BlobStoreRepositoryCleanupIT.java | 15 +- .../snapshots/ConcurrentSnapshotsIT.java | 36 --- .../CorruptedBlobStoreRepositoryIT.java | 29 +- .../DedicatedClusterSnapshotRestoreIT.java | 254 ++++++------------ .../SharedClusterSnapshotRestoreIT.java | 203 ++++++-------- .../snapshots/SnapshotShardsServiceIT.java | 9 +- .../snapshots/SnapshotStatusApisIT.java | 24 +- .../AbstractSnapshotIntegTestCase.java | 100 +++++-- .../datastreams/DataStreamsSnapshotsIT.java | 11 +- 9 files changed, 266 insertions(+), 415 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java index 2a660709f487c..ad8f502273611 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java @@ -31,7 +31,6 @@ import java.io.ByteArrayInputStream; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows; import static org.hamcrest.Matchers.is; @@ -49,11 +48,8 @@ public void testMasterFailoverDuringCleanup() throws Exception { ensureStableCluster(nodeCount - 1); logger.info("--> wait for cleanup to finish and disappear from cluster state"); - assertBusy(() -> { - RepositoryCleanupInProgress cleanupInProgress = - client().admin().cluster().prepareState().get().getState().custom(RepositoryCleanupInProgress.TYPE); - assertFalse(cleanupInProgress.hasCleanupInProgress()); - }, 30, TimeUnit.SECONDS); + awaitClusterState(state -> + state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress() == false); } public void testRepeatCleanupsDontRemove() throws Exception { @@ -71,11 +67,8 @@ public void testRepeatCleanupsDontRemove() throws Exception { unblockNode("test-repo", masterNode); logger.info("--> wait for cleanup to finish and disappear from cluster state"); - assertBusy(() -> { - RepositoryCleanupInProgress cleanupInProgress = - client().admin().cluster().prepareState().get().getState().custom(RepositoryCleanupInProgress.TYPE); - assertFalse(cleanupInProgress.hasCleanupInProgress()); - }, 30, TimeUnit.SECONDS); + awaitClusterState(state -> + state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress() == false); } private String startBlockedCleanup(String repoName) throws Exception { diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java index da3f8f7d9f72e..0966f23d60730 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java @@ -1272,40 +1272,12 @@ private ActionFuture startFullSnapshotFromMasterClient(S .setWaitForCompletion(true).execute(); } - private ActionFuture startFullSnapshot(String repoName, String snapshotName) { - return startFullSnapshot(repoName, snapshotName, false); - } - - private ActionFuture startFullSnapshot(String repoName, String snapshotName, boolean partial) { - logger.info("--> creating full snapshot [{}] to repo [{}]", snapshotName, repoName); - return client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true) - .setPartial(partial).execute(); - } - - // Large snapshot pool settings to set up nodes for tests involving multiple repositories that need to have enough - // threads so that blocking some threads on one repository doesn't block other repositories from doing work - private static final Settings LARGE_SNAPSHOT_POOL_SETTINGS = Settings.builder() - .put("thread_pool.snapshot.core", 5).put("thread_pool.snapshot.max", 5).build(); - - private static final Settings SINGLE_SHARD_NO_REPLICA = indexSettingsNoReplicas(1).build(); - - private void createIndexWithContent(String indexName) { - createIndexWithContent(indexName, SINGLE_SHARD_NO_REPLICA); - } - private void createIndexWithContent(String indexName, String nodeInclude, String nodeExclude) { createIndexWithContent(indexName, indexSettingsNoReplicas(1) .put("index.routing.allocation.include._name", nodeInclude) .put("index.routing.allocation.exclude._name", nodeExclude).build()); } - private void createIndexWithContent(String indexName, Settings indexSettings) { - logger.info("--> creating index [{}]", indexName); - createIndex(indexName, indexSettings); - ensureGreen(indexName); - index(indexName, "_doc", "some_id", "foo", "bar"); - } - private static boolean snapshotHasCompletedShard(String snapshot, SnapshotsInProgress snapshotsInProgress) { for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { if (entry.snapshot().getSnapshotId().getName().equals(snapshot)) { @@ -1359,12 +1331,4 @@ private ActionFuture startAndBlockFailingFullSnapshot(St waitForBlock(internalCluster().getMasterName(), blockedRepoName, TimeValue.timeValueSeconds(30L)); return fut; } - - private ActionFuture startFullSnapshotBlockedOnDataNode(String snapshotName, String repoName, String dataNode) - throws InterruptedException { - blockDataNode(repoName, dataNode); - final ActionFuture fut = startFullSnapshot(repoName, snapshotName); - waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L)); - return fut; - } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java index 4d87c26ca5b55..7580d8bc52bf6 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java @@ -27,10 +27,8 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -211,29 +209,10 @@ public void testFindDanglingLatestGeneration() throws Exception { Files.move(repo.resolve("index-" + beforeMoveGen), repo.resolve("index-" + (beforeMoveGen + 1))); logger.info("--> set next generation as pending in the cluster state"); - final PlainActionFuture csUpdateFuture = PlainActionFuture.newFuture(); - internalCluster().getCurrentMasterNodeInstance(ClusterService.class).submitStateUpdateTask("set pending generation", - new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - return ClusterState.builder(currentState).metadata(Metadata.builder(currentState.getMetadata()) - .putCustom(RepositoriesMetadata.TYPE, - currentState.metadata().custom(RepositoriesMetadata.TYPE).withUpdatedGeneration( - repository.getMetadata().name(), beforeMoveGen, beforeMoveGen + 1)).build()).build(); - } - - @Override - public void onFailure(String source, Exception e) { - csUpdateFuture.onFailure(e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - csUpdateFuture.onResponse(null); - } - } - ); - csUpdateFuture.get(); + updateClusterState(currentState -> ClusterState.builder(currentState).metadata(Metadata.builder(currentState.getMetadata()) + .putCustom(RepositoriesMetadata.TYPE, + currentState.metadata().custom(RepositoriesMetadata.TYPE).withUpdatedGeneration( + repository.getMetadata().name(), beforeMoveGen, beforeMoveGen + 1)).build()).build()); logger.info("--> full cluster restart"); internalCluster().fullRestart(); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 54104c8fc7e7b..c31c3231bbcd3 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -21,7 +21,6 @@ import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.hppc.IntSet; - import org.elasticsearch.Version; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; @@ -36,13 +35,9 @@ import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -51,7 +46,6 @@ import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedFunction; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; @@ -229,9 +223,7 @@ public void testExceptionWhenRestoringPersistentSettings() { createRepository("test-repo", "fs"); createFullSnapshot("test-repo", "test-snap"); - assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").execute().actionGet() - .getSnapshots().get(0).state(), - equalTo(SnapshotState.SUCCESS)); + assertThat(getSnapshot("test-repo", "test-snap").state(), equalTo(SnapshotState.SUCCESS)); logger.info("--> change the test persistent setting and break it"); setSettingValue.accept("new value 2"); @@ -255,7 +247,6 @@ public void testRestoreCustomMetadata() throws Exception { logger.info("--> start node"); internalCluster().startNode(); - Client client = client(); createIndex("test-idx"); logger.info("--> add custom persistent metadata"); updateClusterState(currentState -> { @@ -273,9 +264,7 @@ public void testRestoreCustomMetadata() throws Exception { createRepository("test-repo", "fs", tempDir); createFullSnapshot("test-repo", "test-snap"); - assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").execute().actionGet() - .getSnapshots().get(0).state(), - equalTo(SnapshotState.SUCCESS)); + assertThat(getSnapshot("test-repo", "test-snap").state(), equalTo(SnapshotState.SUCCESS)); logger.info("--> change custom persistent metadata"); updateClusterState(currentState -> { @@ -299,20 +288,20 @@ public void testRestoreCustomMetadata() throws Exception { }); logger.info("--> delete repository"); - assertAcked(client.admin().cluster().prepareDeleteRepository("test-repo")); + assertAcked(clusterAdmin().prepareDeleteRepository("test-repo")); createRepository("test-repo-2", "fs", tempDir); logger.info("--> restore snapshot"); - client.admin().cluster().prepareRestoreSnapshot("test-repo-2", "test-snap").setRestoreGlobalState(true).setIndices("-*") + clusterAdmin().prepareRestoreSnapshot("test-repo-2", "test-snap").setRestoreGlobalState(true).setIndices("-*") .setWaitForCompletion(true).execute().actionGet(); logger.info("--> make sure old repository wasn't restored"); - assertRequestBuilderThrows(client.admin().cluster().prepareGetRepositories("test-repo"), RepositoryMissingException.class); - assertThat(client.admin().cluster().prepareGetRepositories("test-repo-2").get().repositories().size(), equalTo(1)); + assertRequestBuilderThrows(clusterAdmin().prepareGetRepositories("test-repo"), RepositoryMissingException.class); + assertThat(clusterAdmin().prepareGetRepositories("test-repo-2").get().repositories().size(), equalTo(1)); logger.info("--> check that custom persistent metadata was restored"); - ClusterState clusterState = client.admin().cluster().prepareState().get().getState(); + ClusterState clusterState = clusterAdmin().prepareState().get().getState(); logger.info("Cluster state: {}", clusterState); Metadata metadata = clusterState.getMetadata(); assertThat(((SnapshottableMetadata) metadata.custom(SnapshottableMetadata.TYPE)).getData(), equalTo("before_snapshot_s")); @@ -327,7 +316,7 @@ public void testRestoreCustomMetadata() throws Exception { ensureYellow(); logger.info("--> check that gateway-persistent custom metadata survived full cluster restart"); - clusterState = client().admin().cluster().prepareState().get().getState(); + clusterState = clusterAdmin().prepareState().get().getState(); logger.info("Cluster state: {}", clusterState); metadata = clusterState.getMetadata(); assertThat(metadata.custom(SnapshottableMetadata.TYPE), nullValue()); @@ -344,38 +333,10 @@ public void testRestoreCustomMetadata() throws Exception { equalTo("before_snapshot_s_gw_noapi")); } - private void updateClusterState(final ClusterStateUpdater updater) throws InterruptedException { - final CountDownLatch countDownLatch = new CountDownLatch(1); - final ClusterService clusterService = internalCluster().getInstance(ClusterService.class); - clusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return updater.execute(currentState); - } - - @Override - public void onFailure(String source, @Nullable Exception e) { - countDownLatch.countDown(); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - countDownLatch.countDown(); - } - }); - countDownLatch.await(); - } - - private interface ClusterStateUpdater { - ClusterState execute(ClusterState currentState) throws Exception; - } - public void testSnapshotDuringNodeShutdown() throws Exception { - logger.info("--> start 2 nodes"); - Client client = client(); - assertAcked(prepareCreate("test-idx", 2, indexSettingsNoReplicas(2))); ensureGreen(); + indexRandomDocs("test-idx", 100); final Path repoPath = randomRepoPath(); createRepository("test-repo", "mock", @@ -387,7 +348,7 @@ public void testSnapshotDuringNodeShutdown() throws Exception { String blockedNode = blockNodeWithIndex("test-repo", "test-idx"); logger.info("--> snapshot"); - client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") + clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap") .setWaitForCompletion(false) .setIndices("test-idx") .get(); @@ -408,13 +369,11 @@ public void testSnapshotDuringNodeShutdown() throws Exception { public void testSnapshotWithStuckNode() throws Exception { logger.info("--> start 2 nodes"); - ArrayList nodes = new ArrayList<>(); - nodes.add(internalCluster().startNode()); - nodes.add(internalCluster().startNode()); - Client client = client(); + List nodes = internalCluster().startNodes(2); assertAcked(prepareCreate("test-idx", 2, indexSettingsNoReplicas(2))); ensureGreen(); + indexRandomDocs("test-idx", 100); Path repo = randomRepoPath(); @@ -429,7 +388,7 @@ public void testSnapshotWithStuckNode() throws Exception { assertFileCount(repo, 0); logger.info("--> snapshot"); - client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") + clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap") .setWaitForCompletion(false) .setIndices("test-idx") .get(); @@ -460,7 +419,7 @@ public void testSnapshotWithStuckNode() throws Exception { .execute().actionGet()); logger.info("--> Go through a loop of creating and deleting a snapshot to trigger repository cleanup"); - client().admin().cluster().prepareCleanupRepository("test-repo").get(); + clusterAdmin().prepareCleanupRepository("test-repo").get(); // Expect two files to remain in the repository: // (1) index-(N+1) @@ -479,11 +438,12 @@ public void testRestoreIndexWithMissingShards() throws Exception { logger.info("--> create an index that will have some unallocated shards"); assertAcked(prepareCreate("test-idx-some", 2, indexSettingsNoReplicas(6))); ensureGreen(); + indexRandomDocs("test-idx-some", 100); logger.info("--> shutdown one of the nodes"); internalCluster().stopRandomDataNode(); - assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForNodes("<2") + assertThat(clusterAdmin().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForNodes("<2") .execute().actionGet().isTimedOut(), equalTo(false)); @@ -492,47 +452,39 @@ public void testRestoreIndexWithMissingShards() throws Exception { ensureGreen("test-idx-all"); logger.info("--> create an index that will be closed"); - assertAcked(prepareCreate("test-idx-closed", 1, Settings.builder().put("number_of_shards", 4) - .put("number_of_replicas", 0))); - logger.info("--> indexing some data into test-idx-all"); - for (int i = 0; i < 100; i++) { - index("test-idx-all", "doc", Integer.toString(i), "foo", "bar" + i); - index("test-idx-closed", "doc", Integer.toString(i), "foo", "bar" + i); - } - refresh("test-idx-closed", "test-idx-all"); // don't refresh test-idx-some it will take 30 sec until it times out... - assertThat(client().prepareSearch("test-idx-all").setSize(0).get().getHits().getTotalHits().value, equalTo(100L)); - assertThat(client().prepareSearch("test-idx-closed").setSize(0).get().getHits().getTotalHits().value, equalTo(100L)); + assertAcked(prepareCreate("test-idx-closed", 1, indexSettingsNoReplicas(4))); + indexRandomDocs("test-idx-all", 100); + indexRandomDocs("test-idx-closed", 100); assertAcked(client().admin().indices().prepareClose("test-idx-closed")); logger.info("--> create an index that will have no allocated shards"); - assertAcked(prepareCreate("test-idx-none", 1, Settings.builder().put("number_of_shards", 6) - .put("index.routing.allocation.include.tag", "nowhere") - .put("number_of_replicas", 0)).setWaitForActiveShards(ActiveShardCount.NONE).get()); - assertTrue(client().admin().indices().prepareExists("test-idx-none").get().isExists()); + assertAcked(prepareCreate("test-idx-none", 1, indexSettingsNoReplicas(6) + .put("index.routing.allocation.include.tag", "nowhere")).setWaitForActiveShards(ActiveShardCount.NONE).get()); + assertTrue(indexExists("test-idx-none")); createRepository("test-repo", "fs"); logger.info("--> start snapshot with default settings without a closed index - should fail"); final SnapshotException sne = expectThrows(SnapshotException.class, - () -> client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1") + () -> clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap-1") .setIndices("test-idx-all", "test-idx-none", "test-idx-some", "test-idx-closed") .setWaitForCompletion(true).execute().actionGet()); assertThat(sne.getMessage(), containsString("Indices don't have primary shards")); if (randomBoolean()) { logger.info("checking snapshot completion using status"); - client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2") + clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap-2") .setIndices("test-idx-all", "test-idx-none", "test-idx-some", "test-idx-closed") .setWaitForCompletion(false).setPartial(true).execute().actionGet(); assertBusy(() -> { - SnapshotsStatusResponse snapshotsStatusResponse = client().admin().cluster().prepareSnapshotStatus("test-repo") + SnapshotsStatusResponse snapshotsStatusResponse = clusterAdmin().prepareSnapshotStatus("test-repo") .setSnapshots("test-snap-2").get(); List snapshotStatuses = snapshotsStatusResponse.getSnapshots(); assertEquals(snapshotStatuses.size(), 1); logger.trace("current snapshot status [{}]", snapshotStatuses.get(0)); assertTrue(snapshotStatuses.get(0).getState().completed()); }, 1, TimeUnit.MINUTES); - SnapshotsStatusResponse snapshotsStatusResponse = client().admin().cluster().prepareSnapshotStatus("test-repo") + SnapshotsStatusResponse snapshotsStatusResponse = clusterAdmin().prepareSnapshotStatus("test-repo") .setSnapshots("test-snap-2").get(); List snapshotStatuses = snapshotsStatusResponse.getSnapshots(); assertThat(snapshotStatuses.size(), equalTo(1)); @@ -545,17 +497,14 @@ public void testRestoreIndexWithMissingShards() throws Exception { // There is slight delay between snapshot being marked as completed in the cluster state and on the file system // After it was marked as completed in the cluster state - we need to check if it's completed on the file system as well assertBusy(() -> { - GetSnapshotsResponse response = client().admin().cluster().prepareGetSnapshots("test-repo") - .setSnapshots("test-snap-2").get(); - assertThat(response.getSnapshots().size(), equalTo(1)); - SnapshotInfo snapshotInfo = response.getSnapshots().get(0); + SnapshotInfo snapshotInfo = getSnapshot("test-repo", "test-snap-2"); assertTrue(snapshotInfo.state().completed()); assertEquals(SnapshotState.PARTIAL, snapshotInfo.state()); }, 1, TimeUnit.MINUTES); } else { logger.info("checking snapshot completion using wait_for_completion flag"); final CreateSnapshotResponse createSnapshotResponse = - client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2") + clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap-2") .setIndices("test-idx-all", "test-idx-none", "test-idx-some", "test-idx-closed") .setWaitForCompletion(true).setPartial(true).execute().actionGet(); logger.info("State: [{}], Reason: [{}]", @@ -563,20 +512,19 @@ public void testRestoreIndexWithMissingShards() throws Exception { assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(22)); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), lessThan(16)); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(10)); - assertThat(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-2").execute().actionGet() - .getSnapshots().get(0).state(), + assertThat(getSnapshot("test-repo", "test-snap-2").state(), equalTo(SnapshotState.PARTIAL)); } assertAcked(client().admin().indices().prepareClose("test-idx-all")); logger.info("--> restore incomplete snapshot - should fail"); - assertFutureThrows(client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2").setRestoreGlobalState(false) + assertFutureThrows(clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap-2").setRestoreGlobalState(false) .setWaitForCompletion(true).execute(), SnapshotRestoreException.class); logger.info("--> restore snapshot for the index that was snapshotted completely"); - RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2") + RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap-2") .setRestoreGlobalState(false).setIndices("test-idx-all").setWaitForCompletion(true).execute().actionGet(); assertThat(restoreSnapshotResponse.getRestoreInfo(), notNullValue()); assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), equalTo(6)); @@ -586,7 +534,7 @@ public void testRestoreIndexWithMissingShards() throws Exception { logger.info("--> restore snapshot for the partial index"); cluster().wipeIndices("test-idx-some"); - restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2") + restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap-2") .setRestoreGlobalState(false).setIndices("test-idx-some").setPartial(true).setWaitForCompletion(true).get(); assertThat(restoreSnapshotResponse.getRestoreInfo(), notNullValue()); assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), equalTo(6)); @@ -596,7 +544,7 @@ public void testRestoreIndexWithMissingShards() throws Exception { logger.info("--> restore snapshot for the index that didn't have any shards snapshotted successfully"); cluster().wipeIndices("test-idx-none"); - restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2") + restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap-2") .setRestoreGlobalState(false).setIndices("test-idx-none").setPartial(true).setWaitForCompletion(true).get(); assertThat(restoreSnapshotResponse.getRestoreInfo(), notNullValue()); assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), equalTo(6)); @@ -605,7 +553,7 @@ public void testRestoreIndexWithMissingShards() throws Exception { assertThat(getCountForIndex("test-idx-some"), allOf(greaterThan(0L), lessThan(100L))); logger.info("--> restore snapshot for the closed index that was snapshotted completely"); - restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2") + restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap-2") .setRestoreGlobalState(false).setIndices("test-idx-closed").setWaitForCompletion(true).execute().actionGet(); assertThat(restoreSnapshotResponse.getRestoreInfo(), notNullValue()); assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), equalTo(4)); @@ -629,10 +577,11 @@ public void testRestoreIndexWithShardsMissingInLocalGateway() throws Exception { logger.info("--> create an index that will have some unallocated shards"); assertAcked(prepareCreate("test-idx", 2, indexSettingsNoReplicas(numberOfShards))); ensureGreen(); + indexRandomDocs("test-idx", 100); logger.info("--> start snapshot"); - assertThat(client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1").setIndices("test-idx") + assertThat(clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap-1").setIndices("test-idx") .setWaitForCompletion(true).get().getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); @@ -647,12 +596,12 @@ public boolean clearData(String nodeName) { } }); - assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForNodes("2") + assertThat(clusterAdmin().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForNodes("2") .execute().actionGet().isTimedOut(), equalTo(false)); logger.info("--> restore index snapshot"); - assertThat(client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-1").setRestoreGlobalState(false) + assertThat(clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap-1").setRestoreGlobalState(false) .setWaitForCompletion(true).get().getRestoreInfo().successfulShards(), equalTo(6)); @@ -679,12 +628,12 @@ public void testRegistrationFailure() { internalCluster().startNode(nonMasterNode()); // Register mock repositories for (int i = 0; i < 5; i++) { - client().admin().cluster().preparePutRepository("test-repo" + i) + clusterAdmin().preparePutRepository("test-repo" + i) .setType("mock").setSettings(Settings.builder() .put("location", randomRepoPath())).setVerify(false).get(); } logger.info("--> make sure that properly setup repository can be registered on all nodes"); - client().admin().cluster().preparePutRepository("test-repo-0") + clusterAdmin().preparePutRepository("test-repo-0") .setType("fs").setSettings(Settings.builder() .put("location", randomRepoPath())).get(); @@ -692,9 +641,8 @@ public void testRegistrationFailure() { public void testThatSensitiveRepositorySettingsAreNotExposed() throws Exception { disableRepoConsistencyCheck("This test does not create any data in the repository"); - Settings nodeSettings = Settings.EMPTY; logger.info("--> start two nodes"); - internalCluster().startNodes(2, nodeSettings); + internalCluster().startNodes(2); createRepository("test-repo", "mock", Settings.builder() .put("location", randomRepoPath()) .put(MockRepository.Plugin.USERNAME_SETTING.getKey(), "notsecretusername") @@ -770,18 +718,10 @@ public void testMasterShutdownDuringSnapshot() throws Exception { logger.info("--> wait until the snapshot is done"); - assertBusy(() -> { - GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster().prepareGetSnapshots("test-repo") - .setSnapshots("test-snap").get(); - SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); - assertTrue(snapshotInfo.state().completed()); - }, 1, TimeUnit.MINUTES); + assertBusy(() -> assertTrue(getSnapshot("test-repo", "test-snap").state().completed()), 1, TimeUnit.MINUTES); logger.info("--> verify that snapshot was successful"); - - GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster().prepareGetSnapshots("test-repo") - .setSnapshots("test-snap").get(); - SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); + SnapshotInfo snapshotInfo = getSnapshot("test-repo", "test-snap"); assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); assertEquals(snapshotInfo.totalShards(), snapshotInfo.successfulShards()); assertEquals(0, snapshotInfo.failedShards()); @@ -798,6 +738,7 @@ public void testMasterAndDataShutdownDuringSnapshot() throws Exception { assertAcked(prepareCreate("test-idx", 0, indexSettingsNoReplicas(between(1, 20)))); ensureGreen(); + indexRandomDocs("test-idx", randomIntBetween(10, 100)); final int numberOfShards = getNumShards("test-idx").numPrimaries; @@ -816,18 +757,10 @@ public void testMasterAndDataShutdownDuringSnapshot() throws Exception { logger.info("--> wait until the snapshot is done"); - assertBusy(() -> { - GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster().prepareGetSnapshots("test-repo") - .setSnapshots("test-snap").get(); - SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); - assertTrue(snapshotInfo.state().completed()); - }, 1, TimeUnit.MINUTES); + assertBusy(() -> assertTrue(getSnapshot("test-repo", "test-snap").state().completed()), 1, TimeUnit.MINUTES); logger.info("--> verify that snapshot was partial"); - - GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster().prepareGetSnapshots("test-repo") - .setSnapshots("test-snap").get(); - SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); + SnapshotInfo snapshotInfo = getSnapshot("test-repo", "test-snap"); assertEquals(SnapshotState.PARTIAL, snapshotInfo.state()); assertNotEquals(snapshotInfo.totalShards(), snapshotInfo.successfulShards()); assertThat(snapshotInfo.failedShards(), greaterThan(0)); @@ -846,7 +779,6 @@ public void testRestoreShrinkIndex() throws Exception { internalCluster().startMasterOnlyNode(); internalCluster().startDataOnlyNode(); - final Client client = client(); final String repo = "test-repo"; final String snapshot = "test-snap"; final String sourceIdx = "test-idx"; @@ -859,21 +791,20 @@ public void testRestoreShrinkIndex() throws Exception { indexRandomDocs(sourceIdx, randomIntBetween(10, 100)); logger.info("--> shrink the index"); - assertAcked(client.admin().indices().prepareUpdateSettings(sourceIdx) + assertAcked(client().admin().indices().prepareUpdateSettings(sourceIdx) .setSettings(Settings.builder().put("index.blocks.write", true)).get()); - assertAcked(client.admin().indices().prepareResizeIndex(sourceIdx, shrunkIdx).get()); + assertAcked(client().admin().indices().prepareResizeIndex(sourceIdx, shrunkIdx).get()); logger.info("--> snapshot the shrunk index"); - CreateSnapshotResponse createResponse = client.admin().cluster() + assertSuccessful(clusterAdmin() .prepareCreateSnapshot(repo, snapshot) - .setWaitForCompletion(true).setIndices(shrunkIdx).get(); - assertEquals(SnapshotState.SUCCESS, createResponse.getSnapshotInfo().state()); + .setWaitForCompletion(true).setIndices(shrunkIdx).execute()); logger.info("--> delete index and stop the data node"); - assertAcked(client.admin().indices().prepareDelete(sourceIdx).get()); - assertAcked(client.admin().indices().prepareDelete(shrunkIdx).get()); + assertAcked(client().admin().indices().prepareDelete(sourceIdx).get()); + assertAcked(client().admin().indices().prepareDelete(shrunkIdx).get()); internalCluster().stopRandomDataNode(); - client().admin().cluster().prepareHealth().setTimeout("30s").setWaitForNodes("1"); + clusterAdmin().prepareHealth().setTimeout("30s").setWaitForNodes("1"); logger.info("--> start a new data node"); final Settings dataSettings = Settings.builder() @@ -881,10 +812,10 @@ public void testRestoreShrinkIndex() throws Exception { .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) // to get a new node id .build(); internalCluster().startDataOnlyNode(dataSettings); - client().admin().cluster().prepareHealth().setTimeout("30s").setWaitForNodes("2"); + clusterAdmin().prepareHealth().setTimeout("30s").setWaitForNodes("2"); logger.info("--> restore the shrunk index and ensure all shards are allocated"); - RestoreSnapshotResponse restoreResponse = client().admin().cluster() + RestoreSnapshotResponse restoreResponse = clusterAdmin() .prepareRestoreSnapshot(repo, snapshot).setWaitForCompletion(true) .setIndices(shrunkIdx).get(); assertEquals(restoreResponse.getRestoreInfo().totalShards(), @@ -894,13 +825,12 @@ public void testRestoreShrinkIndex() throws Exception { public void testSnapshotWithDateMath() { final String repo = "repo"; - final AdminClient admin = client().admin(); final IndexNameExpressionResolver nameExpressionResolver = new IndexNameExpressionResolver(); final String snapshotName = ""; logger.info("--> creating repository"); - assertAcked(admin.cluster().preparePutRepository(repo).setType("fs") + assertAcked(clusterAdmin().preparePutRepository(repo).setType("fs") .setSettings(Settings.builder().put("location", randomRepoPath()) .put("compress", randomBoolean()))); @@ -910,7 +840,7 @@ public void testSnapshotWithDateMath() { // snapshot could be taken before or after a day rollover final String expression2 = nameExpressionResolver.resolveDateMathExpression(snapshotName); - SnapshotsStatusResponse response = admin.cluster().prepareSnapshotStatus(repo) + SnapshotsStatusResponse response = clusterAdmin().prepareSnapshotStatus(repo) .setSnapshots(Sets.newHashSet(expression1, expression2).toArray(Strings.EMPTY_ARRAY)) .setIgnoreUnavailable(true) .get(); @@ -919,8 +849,7 @@ public void testSnapshotWithDateMath() { assertThat(snapshots.get(0).getState().completed(), equalTo(true)); } - public void testSnapshotTotalAndIncrementalSizes() throws IOException { - Client client = client(); + public void testSnapshotTotalAndIncrementalSizes() throws Exception { final String indexName = "test-blocks-1"; final String repositoryName = "repo-" + indexName; final String snapshot0 = "snapshot-0"; @@ -930,14 +859,14 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException { int docs = between(10, 100); for (int i = 0; i < docs; i++) { - client.prepareIndex(indexName, "type").setSource("test", "init").execute().actionGet(); + client().prepareIndex(indexName, "type").setSource("test", "init").execute().actionGet(); } final Path repoPath = randomRepoPath(); createRepository(repositoryName, "fs", repoPath); createFullSnapshot(repositoryName, snapshot0); - SnapshotsStatusResponse response = client.admin().cluster().prepareSnapshotStatus(repositoryName) + SnapshotsStatusResponse response = clusterAdmin().prepareSnapshotStatus(repositoryName) .setSnapshots(snapshot0) .get(); @@ -965,7 +894,7 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException { // add few docs - less than initially docs = between(1, 5); for (int i = 0; i < docs; i++) { - client.prepareIndex(indexName, "type").setSource("test", "test" + i).execute().actionGet(); + client().prepareIndex(indexName, "type").setSource("test", "test" + i).execute().actionGet(); } // create another snapshot @@ -973,11 +902,9 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException { createFullSnapshot(repositoryName, snapshot1); // drop 1st one to avoid miscalculation as snapshot reuses some files of prev snapshot - assertAcked(client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot0).get()); + assertAcked(startDeleteSnapshot(repositoryName, snapshot0).get()); - response = client.admin().cluster().prepareSnapshotStatus(repositoryName) - .setSnapshots(snapshot1) - .get(); + response = clusterAdmin().prepareSnapshotStatus(repositoryName).setSnapshots(snapshot1).get(); final List snapshot1Files = scanSnapshotFolder(repoPath); final List snapshot1IndexMetaFiles = findRepoMetaBlobs(repoPath); @@ -1024,12 +951,7 @@ public void testDeduplicateIndexMetadata() throws Exception { final Path repoPath = randomRepoPath(); createRepository(repositoryName, "fs", repoPath); - - logger.info("--> create a snapshot"); - client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot0) - .setIncludeGlobalState(true) - .setWaitForCompletion(true) - .get(); + createFullSnapshot(repositoryName, snapshot0); final List snapshot0IndexMetaFiles = findRepoMetaBlobs(repoPath); assertThat(snapshot0IndexMetaFiles, hasSize(1)); // snapshotting a single index @@ -1056,13 +978,12 @@ public void testDeduplicateIndexMetadata() throws Exception { for (int i = 0; i < docs; i++) { client().prepareIndex(indexName, "_doc").setSource("new_field", "test" + i).execute().actionGet(); } - assertThat(client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot2).setWaitForCompletion(true).get().status(), - equalTo(RestStatus.OK)); + createFullSnapshot(repositoryName, snapshot2); final List snapshot2IndexMetaFiles = findRepoMetaBlobs(repoPath); assertThat(snapshot2IndexMetaFiles, hasSize(2)); // should have created one new metadata blob - assertAcked(client().admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot0, snapshot1).get()); + assertAcked(clusterAdmin().prepareDeleteSnapshot(repositoryName, snapshot0, snapshot1).get()); final List snapshot3IndexMetaFiles = findRepoMetaBlobs(repoPath); assertThat(snapshot3IndexMetaFiles, hasSize(1)); // should have deleted the metadata blob referenced by the first two snapshots } @@ -1078,6 +999,7 @@ public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception { assertAcked(prepareCreate("test-idx", 0, indexSettingsNoReplicas(5))); ensureGreen(); indexRandomDocs("test-idx", randomIntBetween(50, 100)); + final String dataNode = blockNodeWithIndex("test-repo", "test-idx"); logger.info("--> snapshot"); ServiceDisruptionScheme disruption = new BusyMasterServiceDisruption(random(), Priority.HIGH); @@ -1090,14 +1012,14 @@ public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception { logger.info("--> wait for shard snapshots to show as failed"); assertBusy(() -> assertThat( - client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap").get().getSnapshots() + clusterAdmin().prepareSnapshotStatus("test-repo").setSnapshots("test-snap").get().getSnapshots() .get(0).getShardsStats().getFailedShards(), greaterThanOrEqualTo(1)), 60L, TimeUnit.SECONDS); unblockNode("test-repo", dataNode); disruption.stopDisrupting(); // check that snapshot completes assertBusy(() -> { - GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster() + GetSnapshotsResponse snapshotsStatusResponse = clusterAdmin() .prepareGetSnapshots("test-repo").setSnapshots("test-snap").setIgnoreUnavailable(true).get(); assertEquals(1, snapshotsStatusResponse.getSnapshots().size()); SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); @@ -1126,7 +1048,7 @@ public void testDataNodeRestartAfterShardSnapshotFailure() throws Exception { logger.info("--> wait for shard snapshot of first primary to show as failed"); assertBusy(() -> assertThat( - client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap").get().getSnapshots() + clusterAdmin().prepareSnapshotStatus("test-repo").setSnapshots("test-snap").get().getSnapshots() .get(0).getShardsStats().getFailedShards(), is(1)), 60L, TimeUnit.SECONDS); logger.info("--> restarting second data node, which should cause the primary shard on it to be failed"); @@ -1134,7 +1056,7 @@ public void testDataNodeRestartAfterShardSnapshotFailure() throws Exception { // check that snapshot completes with both failed shards being accounted for in the snapshot result assertBusy(() -> { - GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster() + GetSnapshotsResponse snapshotsStatusResponse = clusterAdmin() .prepareGetSnapshots("test-repo").setSnapshots("test-snap").setIgnoreUnavailable(true).get(); assertEquals(1, snapshotsStatusResponse.getSnapshots().size()); SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); @@ -1174,7 +1096,7 @@ public void testRetentionLeasesClearedOnRestore() throws Exception { final String snapshotName = "snapshot-retention-leases"; logger.debug("--> create snapshot {}:{}", repoName, snapshotName); - CreateSnapshotResponse createResponse = client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + CreateSnapshotResponse createResponse = clusterAdmin().prepareCreateSnapshot(repoName, snapshotName) .setWaitForCompletion(true).setIndices(indexName).get(); assertThat(createResponse.getSnapshotInfo().successfulShards(), equalTo(shardCount)); assertThat(createResponse.getSnapshotInfo().failedShards(), equalTo(0)); @@ -1195,7 +1117,7 @@ public void testRetentionLeasesClearedOnRestore() throws Exception { assertAcked(client().admin().indices().prepareClose(indexName)); logger.debug("--> restore index {} from snapshot", indexName); - RestoreSnapshotResponse restoreResponse = client().admin().cluster().prepareRestoreSnapshot(repoName, snapshotName) + RestoreSnapshotResponse restoreResponse = clusterAdmin().prepareRestoreSnapshot(repoName, snapshotName) .setWaitForCompletion(true).get(); assertThat(restoreResponse.getRestoreInfo().successfulShards(), equalTo(shardCount)); assertThat(restoreResponse.getRestoreInfo().failedShards(), equalTo(0)); @@ -1222,24 +1144,9 @@ public void testAbortWaitsOnDataNode() throws Exception { createRepository(repoName, "mock"); blockAllDataNodes(repoName); final String snapshotName = "test-snap"; - final ActionFuture snapshotResponse = - client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true).execute(); + final ActionFuture snapshotResponse = startFullSnapshot(repoName, snapshotName); waitForBlock(dataNodeName, repoName, TimeValue.timeValueSeconds(30L)); - final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, otherDataNode); - final PlainActionFuture abortVisibleFuture = PlainActionFuture.newFuture(); - clusterService.addListener(new ClusterStateListener() { - @Override - public void clusterChanged(ClusterChangedEvent event) { - final SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); - if (snapshotsInProgress != null && snapshotsInProgress.entries().stream() - .anyMatch(entry -> entry.state() == SnapshotsInProgress.State.ABORTED)) { - abortVisibleFuture.onResponse(null); - clusterService.removeListener(this); - } - } - }); - final AtomicBoolean blocked = new AtomicBoolean(true); final TransportService transportService = internalCluster().getInstance(TransportService.class, otherDataNode); @@ -1254,10 +1161,10 @@ public void onRequestSent(DiscoveryNode node, long requestId, String action, Tra }); logger.info("--> abort snapshot"); - final ActionFuture deleteResponse = - client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute(); + final ActionFuture deleteResponse = startDeleteSnapshot(repoName, snapshotName); - abortVisibleFuture.get(30L, TimeUnit.SECONDS); + awaitClusterState(otherDataNode, state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY) + .entries().stream().anyMatch(entry -> entry.state() == SnapshotsInProgress.State.ABORTED)); assertFalse("delete should not be able to finish until data node is unblocked", deleteResponse.isDone()); blocked.set(false); @@ -1274,8 +1181,7 @@ public void testPartialSnapshotAllShardsMissing() throws Exception { createIndex("some-index"); stopNode(dataNode); ensureStableCluster(1); - final CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap") - .setPartial(true).setWaitForCompletion(true).get(); + final CreateSnapshotResponse createSnapshotResponse = startFullSnapshot(repoName, "test-snap", true).get(); assertThat(createSnapshotResponse.getSnapshotInfo().state(), is(SnapshotState.PARTIAL)); } @@ -1342,11 +1248,11 @@ public void testSnapshotDeleteRelocatingPrimaryIndex() throws Exception { logger.info("--> wait for relocations to start"); assertBusy(() -> assertThat( - client().admin().cluster().prepareHealth(indexName).execute().actionGet().getRelocatingShards(), greaterThan(0)), + clusterAdmin().prepareHealth(indexName).execute().actionGet().getRelocatingShards(), greaterThan(0)), 1L, TimeUnit.MINUTES); logger.info("--> snapshot"); - client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap") + clusterAdmin().prepareCreateSnapshot(repoName, "test-snap") .setWaitForCompletion(false).setPartial(true).setIndices(indexName).get(); assertAcked(client().admin().indices().prepareDelete(indexName)); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 66076a62ef394..aebe62c788eb0 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -20,7 +20,6 @@ package org.elasticsearch.snapshots; import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; @@ -80,6 +79,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Predicate; @@ -109,8 +109,6 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; -// The tests in here do a lot of state updates and other writes to disk and are slowed down too much by WindowsFS -@LuceneTestCase.SuppressFileSystems(value = "WindowsFS") public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCase { @Override @@ -121,7 +119,6 @@ protected Settings nodeSettings(int nodeOrdinal) { } public void testBasicWorkFlow() throws Exception { - Client client = client(); createRepository("test-repo", "fs"); createIndexWithRandomDocs("test-idx-1", 100); createIndexWithRandomDocs("test-idx-2", 100); @@ -138,7 +135,7 @@ public void testBasicWorkFlow() throws Exception { if (!indicesToFlush.isEmpty()) { String[] indices = indicesToFlush.toArray(new String[indicesToFlush.size()]); logger.info("--> starting asynchronous flush for indices {}", Arrays.toString(indices)); - flushResponseFuture = client.admin().indices().prepareFlush(indices).execute(); + flushResponseFuture = client().admin().indices().prepareFlush(indices).execute(); } } @@ -158,36 +155,36 @@ public void testBasicWorkFlow() throws Exception { final boolean snapshotClosed = randomBoolean(); if (snapshotClosed) { - assertAcked(client.admin().indices().prepareClose(indicesToSnapshot).setWaitForActiveShards(ActiveShardCount.ALL).get()); + assertAcked(client().admin().indices().prepareClose(indicesToSnapshot).setWaitForActiveShards(ActiveShardCount.ALL).get()); } logger.info("--> snapshot"); - CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") - .setWaitForCompletion(true).setIndicesOptions(IndicesOptions.lenientExpand()).setIndices(indicesToSnapshot).get(); + CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap") + .setWaitForCompletion(true).setIndices(indicesToSnapshot).get(); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); - List snapshotInfos = client.admin().cluster().prepareGetSnapshots("test-repo") - .setSnapshots(randomFrom("test-snap", "_all", "*", "*-snap", "test*")).get().getSnapshots(); + List snapshotInfos = clusterAdmin().prepareGetSnapshots("test-repo") + .setSnapshots(randomFrom("test-snap", "_all", "*", "*-snap", "test*")).get().getSnapshots(); assertThat(snapshotInfos.size(), equalTo(1)); SnapshotInfo snapshotInfo = snapshotInfos.get(0); assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); assertThat(snapshotInfo.version(), equalTo(Version.CURRENT)); if (snapshotClosed) { - assertAcked(client.admin().indices().prepareOpen(indicesToSnapshot).setWaitForActiveShards(ActiveShardCount.ALL).get()); + assertAcked(client().admin().indices().prepareOpen(indicesToSnapshot).setWaitForActiveShards(ActiveShardCount.ALL).get()); } logger.info("--> delete some data"); for (int i = 0; i < 50; i++) { - client.prepareDelete("test-idx-1", "_doc", Integer.toString(i)).get(); + client().prepareDelete("test-idx-1", "_doc", Integer.toString(i)).get(); } for (int i = 50; i < 100; i++) { - client.prepareDelete("test-idx-2", "_doc", Integer.toString(i)).get(); + client().prepareDelete("test-idx-2", "_doc", Integer.toString(i)).get(); } for (int i = 0; i < 100; i += 2) { - client.prepareDelete("test-idx-3", "_doc", Integer.toString(i)).get(); + client().prepareDelete("test-idx-3", "_doc", Integer.toString(i)).get(); } assertAllSuccessful(refresh()); assertDocCount("test-idx-1", 50L); @@ -195,10 +192,10 @@ public void testBasicWorkFlow() throws Exception { assertDocCount("test-idx-3", 50L); logger.info("--> close indices"); - client.admin().indices().prepareClose("test-idx-1", "test-idx-2").get(); + client().admin().indices().prepareClose("test-idx-1", "test-idx-2").get(); logger.info("--> restore all indices from the snapshot"); - RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap") + RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap") .setWaitForCompletion(true).execute().actionGet(); assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); @@ -207,7 +204,7 @@ public void testBasicWorkFlow() throws Exception { assertDocCount("test-idx-2", 100L); assertDocCount("test-idx-3", 50L); - assertNull(client.admin().indices().prepareGetSettings("test-idx-1").get().getSetting("test-idx-1", + assertNull(client().admin().indices().prepareGetSettings("test-idx-1").get().getSetting("test-idx-1", MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING.getKey())); for (ShardStats shardStats: client().admin().indices().prepareStats(indicesToSnapshot).clear().get().getShards()) { @@ -221,17 +218,17 @@ public void testBasicWorkFlow() throws Exception { logger.info("--> delete indices"); cluster().wipeIndices("test-idx-1", "test-idx-2"); logger.info("--> restore one index after deletion"); - restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true) + restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true) .setIndices("test-idx-*", "-test-idx-2").execute().actionGet(); assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); ensureGreen(); assertDocCount("test-idx-1", 100); - ClusterState clusterState = client.admin().cluster().prepareState().get().getState(); + ClusterState clusterState = clusterAdmin().prepareState().get().getState(); assertThat(clusterState.getMetadata().hasIndex("test-idx-1"), equalTo(true)); assertThat(clusterState.getMetadata().hasIndex("test-idx-2"), equalTo(false)); - assertNull(client.admin().indices().prepareGetSettings("test-idx-1").get().getSetting("test-idx-1", + assertNull(client().admin().indices().prepareGetSettings("test-idx-1").get().getSetting("test-idx-1", MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING.getKey())); for (ShardStats shardStats: client().admin().indices().prepareStats(indicesToSnapshot).clear().get().getShards()) { @@ -257,7 +254,6 @@ public void testSingleGetAfterRestore() throws Exception { String typeName = "actions"; String expectedValue = "expected"; - Client client = client(); // Write a document String docId = Integer.toString(randomInt()); index(indexName, typeName, docId, "value", expectedValue); @@ -265,7 +261,7 @@ public void testSingleGetAfterRestore() throws Exception { createRepository(repoName, "fs", absolutePath); logger.info("--> snapshot"); - CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(repoName, snapshotName) .setWaitForCompletion(true) .setIndices(indexName) .get(); @@ -274,19 +270,17 @@ public void testSingleGetAfterRestore() throws Exception { equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); - RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot(repoName, snapshotName) + RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(repoName, snapshotName) .setWaitForCompletion(true) .setRenamePattern(indexName) .setRenameReplacement(restoredIndexName) .get(); assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); - assertThat(client.prepareGet(restoredIndexName, typeName, docId).get().isExists(), equalTo(true)); + assertThat(client().prepareGet(restoredIndexName, typeName, docId).get().isExists(), equalTo(true)); } public void testFreshIndexUUID() { - Client client = client(); - createRepository("test-repo", "fs"); createIndex("test"); @@ -295,7 +289,7 @@ public void testFreshIndexUUID() { assertTrue(originalIndexUUID, originalIndexUUID != null); assertFalse(originalIndexUUID, originalIndexUUID.equals(IndexMetadata.INDEX_UUID_NA_VALUE)); ensureGreen(); - CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") + CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap") .setWaitForCompletion(true).setIndices("test").get(); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), @@ -312,10 +306,10 @@ public void testFreshIndexUUID() { assertFalse(newIndexUUID, newIndexUUID.equals(IndexMetadata.INDEX_UUID_NA_VALUE)); assertFalse(newIndexUUID, newIndexUUID.equals(originalIndexUUID)); logger.info("--> close index"); - client.admin().indices().prepareClose("test").get(); + client().admin().indices().prepareClose("test").get(); logger.info("--> restore all indices from the snapshot"); - RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap") + RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap") .setWaitForCompletion(true).execute().actionGet(); assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); @@ -326,7 +320,7 @@ public void testFreshIndexUUID() { newIndexUUID.equals(newAfterRestoreIndexUUID)); logger.info("--> restore indices with different names"); - restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap") + restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap") .setRenamePattern("(.+)").setRenameReplacement("$1-copy").setWaitForCompletion(true).execute().actionGet(); assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); @@ -340,21 +334,18 @@ public void testEmptySnapshot() throws Exception { createRepository("test-repo", "fs"); logger.info("--> snapshot"); - CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") - .setWaitForCompletion(true).get(); + CreateSnapshotResponse createSnapshotResponse = startFullSnapshot("test-repo", "test-snap").get(); assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(0)); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(0)); - assertThat(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get() - .getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + assertThat(getSnapshot("test-repo", "test-snap").state(), equalTo(SnapshotState.SUCCESS)); } public void testSnapshotFileFailureDuringSnapshot() throws InterruptedException { disableRepoConsistencyCheck("This test uses a purposely broken repository so it would fail consistency checks"); - Client client = client(); logger.info("--> creating repository"); - assertAcked(client.admin().cluster().preparePutRepository("test-repo") + assertAcked(clusterAdmin().preparePutRepository("test-repo") .setType("mock").setSettings( Settings.builder() .put("location", randomRepoPath()) @@ -366,7 +357,7 @@ public void testSnapshotFileFailureDuringSnapshot() throws InterruptedException logger.info("--> snapshot"); try { - CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") + CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap") .setWaitForCompletion(true).setIndices("test-idx").get(); if (createSnapshotResponse.getSnapshotInfo().totalShards() == createSnapshotResponse.getSnapshotInfo().successfulShards()) { // If we are here, that means we didn't have any failures, let's check it @@ -379,10 +370,7 @@ public void testSnapshotFileFailureDuringSnapshot() throws InterruptedException assertThat(shardFailure.nodeId(), notNullValue()); assertThat(shardFailure.index(), equalTo("test-idx")); } - GetSnapshotsResponse getSnapshotsResponse = client.admin().cluster().prepareGetSnapshots("test-repo") - .addSnapshots("test-snap").get(); - assertThat(getSnapshotsResponse.getSnapshots().size(), equalTo(1)); - SnapshotInfo snapshotInfo = getSnapshotsResponse.getSnapshots().get(0); + SnapshotInfo snapshotInfo = getSnapshot("test-repo", "test-snap"); if (snapshotInfo.state() == SnapshotState.SUCCESS) { assertThat(snapshotInfo.shardFailures().size(), greaterThan(0)); assertThat(snapshotInfo.totalShards(), greaterThan(snapshotInfo.successfulShards())); @@ -403,7 +391,6 @@ public void testSnapshotFileFailureDuringSnapshot() throws InterruptedException public void testDataFileFailureDuringSnapshot() throws Exception { disableRepoConsistencyCheck("This test intentionally leaves a broken repository"); - Client client = client(); createRepository("test-repo", "mock", Settings.builder().put("location", randomRepoPath()) .put("random", randomAlphaOfLength(10)).put("random_data_file_io_exception_rate", 0.3)); @@ -411,7 +398,7 @@ public void testDataFileFailureDuringSnapshot() throws Exception { createIndexWithRandomDocs("test-idx", 100); logger.info("--> snapshot"); - CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") + CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap") .setWaitForCompletion(true).setIndices("test-idx").get(); if (createSnapshotResponse.getSnapshotInfo().totalShards() == createSnapshotResponse.getSnapshotInfo().successfulShards()) { logger.info("--> no failures"); @@ -425,16 +412,13 @@ public void testDataFileFailureDuringSnapshot() throws Exception { assertThat(shardFailure.nodeId(), notNullValue()); assertThat(shardFailure.index(), equalTo("test-idx")); } - GetSnapshotsResponse getSnapshotsResponse = client.admin().cluster().prepareGetSnapshots("test-repo") - .addSnapshots("test-snap").get(); - assertThat(getSnapshotsResponse.getSnapshots().size(), equalTo(1)); - SnapshotInfo snapshotInfo = getSnapshotsResponse.getSnapshots().get(0); + SnapshotInfo snapshotInfo = getSnapshot("test-repo", "test-snap"); assertThat(snapshotInfo.state(), equalTo(SnapshotState.PARTIAL)); assertThat(snapshotInfo.shardFailures().size(), greaterThan(0)); assertThat(snapshotInfo.totalShards(), greaterThan(snapshotInfo.successfulShards())); // Verify that snapshot status also contains the same failures - SnapshotsStatusResponse snapshotsStatusResponse = client.admin().cluster().prepareSnapshotStatus("test-repo") + SnapshotsStatusResponse snapshotsStatusResponse = clusterAdmin().prepareSnapshotStatus("test-repo") .addSnapshots("test-snap").get(); assertThat(snapshotsStatusResponse.getSnapshots().size(), equalTo(1)); SnapshotStatus snapshotStatus = snapshotsStatusResponse.getSnapshots().get(0); @@ -469,6 +453,7 @@ public void testDataFileFailureDuringRestore() throws Exception { ensureGreen(); final NumShards numShards = getNumShards("test-idx"); + indexRandomDocs("test-idx", 100); logger.info("--> snapshot"); @@ -591,7 +576,7 @@ public void testUnrestorableIndexDuringRestore() throws Exception { .setSettings(Settings.builder() .putNull("index.routing.allocation.include._name") .build())); - assertAcked(client().admin().cluster().prepareReroute().setRetryFailed(true)); + assertAcked(clusterAdmin().prepareReroute().setRetryFailed(true)); }; unrestorableUseCase(indexName, Settings.EMPTY, Settings.EMPTY, restoreIndexSettings, @@ -617,7 +602,7 @@ private void unrestorableUseCase(final String indexName, // create a snapshot final NumShards numShards = getNumShards(indexName); - CreateSnapshotResponse snapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") + CreateSnapshotResponse snapshotResponse = clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap") .setWaitForCompletion(true) .setIndices(indexName) .get(); @@ -630,7 +615,7 @@ private void unrestorableUseCase(final String indexName, assertAcked(client().admin().indices().prepareDelete(indexName)); // update the test repository - assertAcked(client().admin().cluster().preparePutRepository("test-repo") + assertAcked(clusterAdmin().preparePutRepository("test-repo") .setType("mock") .setSettings(Settings.builder() .put("location", repositoryLocation) @@ -638,7 +623,7 @@ private void unrestorableUseCase(final String indexName, .build())); // attempt to restore the snapshot with the given settings - RestoreSnapshotResponse restoreResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap") + RestoreSnapshotResponse restoreResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap") .setIndices(indexName) .setIndexSettings(restoreIndexSettings) .setWaitForCompletion(true) @@ -648,7 +633,7 @@ private void unrestorableUseCase(final String indexName, assertThat(restoreResponse.getRestoreInfo().totalShards(), equalTo(numShards.numPrimaries)); assertThat(restoreResponse.getRestoreInfo().successfulShards(), equalTo(0)); - ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().setCustoms(true).setRoutingTable(true).get(); + ClusterStateResponse clusterStateResponse = clusterAdmin().prepareState().setCustoms(true).setRoutingTable(true).get(); // check that there is no restore in progress RestoreInProgress restoreInProgress = clusterStateResponse.getState().custom(RestoreInProgress.TYPE); @@ -678,7 +663,7 @@ private void unrestorableUseCase(final String indexName, // delete the index and restore again assertAcked(client().admin().indices().prepareDelete(indexName)); - restoreResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).get(); + restoreResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).get(); assertThat(restoreResponse.getRestoreInfo().totalShards(), equalTo(numShards.numPrimaries)); assertThat(restoreResponse.getRestoreInfo().successfulShards(), equalTo(numShards.numPrimaries)); @@ -693,6 +678,7 @@ public void testDeletionOfFailingToRecoverIndexShouldStopRestore() throws Except Path repositoryLocation = randomRepoPath(); Client client = client(); createRepository("test-repo", "fs", repositoryLocation); + createIndexWithRandomDocs("test-idx", 100); logger.info("--> snapshot"); @@ -749,7 +735,7 @@ public void testUnallocatedShards() { logger.info("--> snapshot"); final SnapshotException sne = expectThrows(SnapshotException.class, - () -> client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") + () -> clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap") .setWaitForCompletion(true).setIndices("test-idx").get()); assertThat(sne.getMessage(), containsString("Indices don't have primary shards")); assertThat(getRepositoryData("test-repo"), is(RepositoryData.EMPTY)); @@ -812,8 +798,7 @@ public void testDeleteSnapshot() throws Exception { assertDocCount("test-idx", 10L * numberOfSnapshots); - logger.info("--> delete the last snapshot"); - client.admin().cluster().prepareDeleteSnapshot("test-repo", lastSnapshot).get(); + startDeleteSnapshot("test-repo", lastSnapshot).get(); logger.info("--> make sure that number of files is back to what it was when the first snapshot was made"); assertFileCount(repo, numberOfFiles[0]); } @@ -864,16 +849,13 @@ public void testMoveShardWhileSnapshotting() throws Exception { logger.info("--> unblocking blocked node"); unblockNode("test-repo", blockedNode); logger.info("--> waiting for completion"); - SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600)); - logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size()); + logger.info("Number of failed shards [{}]", + waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600)).shardFailures().size()); logger.info("--> done"); - List snapshotInfos = client().admin().cluster().prepareGetSnapshots("test-repo") - .setSnapshots("test-snap").get().getSnapshots(); - - assertThat(snapshotInfos.size(), equalTo(1)); - assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS)); - assertThat(snapshotInfos.get(0).shardFailures().size(), equalTo(0)); + final SnapshotInfo snapshotInfo = getSnapshot("test-repo", "test-snap"); + assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo.shardFailures(), empty()); logger.info("--> delete index"); cluster().wipeIndices("test-idx"); @@ -938,16 +920,13 @@ public void testDeleteRepositoryWhileSnapshotting() throws Exception { logger.info("--> unblocking blocked node"); unblockNode("test-repo", blockedNode); logger.info("--> waiting for completion"); - SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600)); - logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size()); + logger.info("Number of failed shards [{}]", + waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600)).shardFailures().size()); logger.info("--> done"); - List snapshotInfos = client().admin().cluster().prepareGetSnapshots("test-repo") - .setSnapshots("test-snap").get().getSnapshots(); - - assertThat(snapshotInfos.size(), equalTo(1)); - assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS)); - assertThat(snapshotInfos.get(0).shardFailures().size(), equalTo(0)); + final SnapshotInfo snapshotInfo = getSnapshot("test-repo", "test-snap"); + assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo.shardFailures().size(), equalTo(0)); logger.info("--> delete index"); cluster().wipeIndices("test-idx"); @@ -966,6 +945,7 @@ public void testReadonlyRepository() throws Exception { Client client = client(); Path repositoryLocation = randomRepoPath(); createRepository("test-repo", "fs", repositoryLocation); + createIndexWithRandomDocs("test-idx", 100); logger.info("--> snapshot"); @@ -975,8 +955,7 @@ public void testReadonlyRepository() throws Exception { assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); - assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), - equalTo(SnapshotState.SUCCESS)); + assertThat(getSnapshot("test-repo", "test-snap").state(), equalTo(SnapshotState.SUCCESS)); logger.info("--> delete index"); cluster().wipeIndices("test-idx"); @@ -1189,7 +1168,7 @@ public void testSnapshotRelocatingPrimary() throws Exception { logger.info("--> wait for relocations to start"); assertBusy(() -> assertThat( - client().admin().cluster().prepareHealth("test-idx").execute().actionGet().getRelocatingShards(), greaterThan(0)), + clusterAdmin().prepareHealth("test-idx").execute().actionGet().getRelocatingShards(), greaterThan(0)), 1L, TimeUnit.MINUTES); logger.info("--> snapshot"); @@ -1212,6 +1191,7 @@ public void testSnapshotMoreThanOnce() throws InterruptedException { .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build(); assertAcked(prepareCreate("test").setSettings(indexSettings)); ensureGreen(); + indexRandomDocs("test", randomIntBetween(10, 100)); assertNoFailures(client().admin().indices().prepareForceMerge("test").setFlush(true).setMaxNumSegments(1).get()); @@ -1220,8 +1200,7 @@ public void testSnapshotMoreThanOnce() throws InterruptedException { assertThat(createSnapshotResponseFirst.getSnapshotInfo().successfulShards(), greaterThan(0)); assertThat(createSnapshotResponseFirst.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponseFirst.getSnapshotInfo().totalShards())); - assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test").get().getSnapshots().get(0).state(), - equalTo(SnapshotState.SUCCESS)); + assertThat(getSnapshot("test-repo", "test").state(), equalTo(SnapshotState.SUCCESS)); { SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo") .setSnapshots("test").get().getSnapshots().get(0); @@ -1236,8 +1215,7 @@ public void testSnapshotMoreThanOnce() throws InterruptedException { assertThat(createSnapshotResponseSecond.getSnapshotInfo().successfulShards(), greaterThan(0)); assertThat(createSnapshotResponseSecond.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponseSecond.getSnapshotInfo().totalShards())); - assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-1").get().getSnapshots().get(0).state(), - equalTo(SnapshotState.SUCCESS)); + assertThat(getSnapshot("test-repo", "test-1").state(), equalTo(SnapshotState.SUCCESS)); { SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo") .setSnapshots("test-1").get().getSnapshots().get(0); @@ -1253,8 +1231,7 @@ public void testSnapshotMoreThanOnce() throws InterruptedException { assertThat(createSnapshotResponseThird.getSnapshotInfo().successfulShards(), greaterThan(0)); assertThat(createSnapshotResponseThird.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponseThird.getSnapshotInfo().totalShards())); - assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-2").get().getSnapshots().get(0).state(), - equalTo(SnapshotState.SUCCESS)); + assertThat(getSnapshot("test-repo", "test-2").state(), equalTo(SnapshotState.SUCCESS)); { SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo") .setSnapshots("test-2").get().getSnapshots().get(0); @@ -1279,13 +1256,12 @@ public void testCloseOrDeleteIndexDuringSnapshot() throws Exception { .put("location", randomRepoPath()).put("compress", randomBoolean()) .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES) .put("block_on_data", true)); - createIndexWithRandomDocs("test-idx-1", 100); createIndexWithRandomDocs("test-idx-2", 100); createIndexWithRandomDocs("test-idx-3", 100); logger.info("--> snapshot"); - ActionFuture future = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") + ActionFuture future = clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap") .setIndices("test-idx-*").setWaitForCompletion(true).setPartial(false).execute(); logger.info("--> wait for block to kick in"); waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1)); @@ -1324,6 +1300,7 @@ public void testCloseIndexDuringRestore() throws Exception { Client client = client(); createRepository("test-repo", "mock"); + createIndexWithRandomDocs("test-idx-1", 100); createIndexWithRandomDocs("test-idx-2", 100); @@ -1405,7 +1382,7 @@ public void testDeleteSnapshotWhileRestoringFails() throws Exception { logger.info("--> try deleting the snapshot while the restore is in progress (should throw an error)"); ConcurrentSnapshotExecutionException e = expectThrows(ConcurrentSnapshotExecutionException.class, () -> - client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).get()); + clusterAdmin().prepareDeleteSnapshot(repoName, snapshotName).get()); assertEquals(repoName, e.getRepositoryName()); assertEquals(snapshotName, e.getSnapshotName()); assertThat(e.getMessage(), containsString("cannot delete snapshot during a restore")); @@ -1421,10 +1398,7 @@ public void testDeleteSnapshotWhileRestoringFails() throws Exception { private void waitForIndex(final String index, TimeValue timeout) throws Exception { assertBusy( - () -> { - boolean exists = client().admin().indices().prepareExists(index).execute().actionGet().isExists(); - assertTrue("Expected index [" + index + "] to exist", exists); - }, + () -> assertTrue("Expected index [" + index + "] to exist", indexExists(index)), timeout.millis(), TimeUnit.MILLISECONDS); } @@ -1516,19 +1490,19 @@ public void testRestoreSnapshotWithCorruptedGlobalState() throws Exception { outChan.truncate(randomInt(10)); } - List snapshotInfos = client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots(); + List snapshotInfos = clusterAdmin().prepareGetSnapshots(repoName).get().getSnapshots(); assertThat(snapshotInfos.size(), equalTo(1)); assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS)); assertThat(snapshotInfos.get(0).snapshotId().getName(), equalTo(snapshotName)); SnapshotsStatusResponse snapshotStatusResponse = - client().admin().cluster().prepareSnapshotStatus(repoName).setSnapshots(snapshotName).get(); + clusterAdmin().prepareSnapshotStatus(repoName).setSnapshots(snapshotName).get(); assertThat(snapshotStatusResponse.getSnapshots(), hasSize(1)); assertThat(snapshotStatusResponse.getSnapshots().get(0).getSnapshot().getSnapshotId().getName(), equalTo(snapshotName)); assertAcked(client().admin().indices().prepareDelete("test-idx-1", "test-idx-2")); - SnapshotException ex = expectThrows(SnapshotException.class, () -> client().admin().cluster() + SnapshotException ex = expectThrows(SnapshotException.class, () -> clusterAdmin() .prepareRestoreSnapshot(repoName, snapshotName) .setRestoreGlobalState(true) .setWaitForCompletion(true) @@ -1537,7 +1511,7 @@ public void testRestoreSnapshotWithCorruptedGlobalState() throws Exception { assertThat(ex.getSnapshotName(), equalTo(snapshotName)); assertThat(ex.getMessage(), containsString("failed to read global metadata")); - RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot(repoName, snapshotName) + RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(repoName, snapshotName) .setWaitForCompletion(true) .get(); assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0)); @@ -1594,7 +1568,7 @@ public void testRestoreSnapshotWithCorruptedIndexMetadata() throws Exception { outChan.truncate(randomInt(10)); } - List snapshotInfos = client().admin().cluster().prepareGetSnapshots("test-repo").get().getSnapshots(); + List snapshotInfos = clusterAdmin().prepareGetSnapshots("test-repo").get().getSnapshots(); assertThat(snapshotInfos.size(), equalTo(1)); assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS)); assertThat(snapshotInfos.get(0).snapshotId().getName(), equalTo("test-snap")); @@ -1603,7 +1577,7 @@ public void testRestoreSnapshotWithCorruptedIndexMetadata() throws Exception { Predicate isRestorableIndex = index -> corruptedIndex.getName().equals(index) == false; - client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap") + clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap") .setIndices(nbDocsPerIndex.keySet().stream().filter(isRestorableIndex).toArray(String[]::new)) .setRestoreGlobalState(randomBoolean()) .setWaitForCompletion(true) @@ -1616,7 +1590,7 @@ public void testRestoreSnapshotWithCorruptedIndexMetadata() throws Exception { } } - assertAcked(client().admin().cluster().prepareDeleteSnapshot("test-repo", snapshotInfo.snapshotId().getName()).get()); + assertAcked(startDeleteSnapshot("test-repo", snapshotInfo.snapshotId().getName()).get()); } public void testCannotCreateSnapshotsWithSameName() throws Exception { @@ -1660,8 +1634,7 @@ public void testCannotCreateSnapshotsWithSameName() throws Exception { assertThat(e.getMessage(), containsString("snapshot with the same name already exists")); } - logger.info("--> delete the first snapshot"); - client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshotName).get(); + startDeleteSnapshot(repositoryName, snapshotName).get(); logger.info("--> try creating a snapshot with the same name, now it should work because the first one was deleted"); createSnapshotResponse = client.admin() @@ -1698,7 +1671,7 @@ public void testSnapshotCanceledOnRemovedShard() throws Exception { String blockedNode = blockNodeWithIndex(repo, index); logger.info("--> snapshot"); - client().admin().cluster().prepareCreateSnapshot(repo, snapshot) + clusterAdmin().prepareCreateSnapshot(repo, snapshot) .setWaitForCompletion(false) .execute(); @@ -1774,9 +1747,7 @@ public void testSnapshotSucceedsAfterSnapshotFailure() throws Exception { .setIndices("test-idx") .get(); assertEquals(0, createSnapshotResponse.getSnapshotInfo().failedShards()); - GetSnapshotsResponse getSnapshotsResponse = client.admin().cluster().prepareGetSnapshots("test-repo-2") - .addSnapshots("test-snap-2").get(); - assertEquals(SnapshotState.SUCCESS, getSnapshotsResponse.getSnapshots().get(0).state()); + assertEquals(SnapshotState.SUCCESS, getSnapshot("test-repo-2", "test-snap-2").state()); } public void testGetSnapshotsFromIndexBlobOnly() throws Exception { @@ -1828,7 +1799,7 @@ public void testGetSnapshotsFromIndexBlobOnly() throws Exception { } logger.info("--> verify _all returns snapshot info"); - GetSnapshotsResponse response = client().admin().cluster() + GetSnapshotsResponse response = clusterAdmin() .prepareGetSnapshots("test-repo") .setSnapshots("_all") .setVerbose(false) @@ -1837,7 +1808,7 @@ public void testGetSnapshotsFromIndexBlobOnly() throws Exception { verifySnapshotInfo(response, indicesPerSnapshot); logger.info("--> verify wildcard returns snapshot info"); - response = client().admin().cluster() + response = clusterAdmin() .prepareGetSnapshots("test-repo") .setSnapshots("test-snap-*") .setVerbose(false) @@ -1847,7 +1818,7 @@ public void testGetSnapshotsFromIndexBlobOnly() throws Exception { logger.info("--> verify individual requests return snapshot info"); for (int i = 0; i < numSnapshots; i++) { - response = client().admin().cluster() + response = clusterAdmin() .prepareGetSnapshots("test-repo") .setSnapshots("test-snap-" + i) .setVerbose(false) @@ -1918,7 +1889,7 @@ public void testSnapshottingWithMissingSequenceNumbers() { assertThat(shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(15L)); } - public void testSnapshotDifferentIndicesBySameName() throws InterruptedException { + public void testSnapshotDifferentIndicesBySameName() throws InterruptedException, ExecutionException { String indexName = "testindex"; String repoName = "test-repo"; Path absolutePath = randomRepoPath().toAbsolutePath(); @@ -1952,11 +1923,11 @@ public void testSnapshotDifferentIndicesBySameName() throws InterruptedException assertThat(snapshot2.successfulShards(), is(newShardCount)); logger.info("--> restoring snapshot 1"); - client().admin().cluster().prepareRestoreSnapshot(repoName, "snap-1").setIndices(indexName).setRenamePattern(indexName) + clusterAdmin().prepareRestoreSnapshot(repoName, "snap-1").setIndices(indexName).setRenamePattern(indexName) .setRenameReplacement("restored-1").setWaitForCompletion(true).get(); logger.info("--> restoring snapshot 2"); - client().admin().cluster().prepareRestoreSnapshot(repoName, "snap-2").setIndices(indexName).setRenamePattern(indexName) + clusterAdmin().prepareRestoreSnapshot(repoName, "snap-2").setIndices(indexName).setRenamePattern(indexName) .setRenameReplacement("restored-2").setWaitForCompletion(true).get(); logger.info("--> verify doc counts"); @@ -1978,7 +1949,7 @@ public void testSnapshotDifferentIndicesBySameName() throws InterruptedException logger.info("--> deleting snapshot [{}]", snapshotToDelete); assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotToDelete).get()); logger.info("--> restoring snapshot [{}]", snapshotToRestore); - client().admin().cluster().prepareRestoreSnapshot(repoName, snapshotToRestore).setIndices(indexName).setRenamePattern(indexName) + clusterAdmin().prepareRestoreSnapshot(repoName, snapshotToRestore).setIndices(indexName).setRenamePattern(indexName) .setRenameReplacement("restored-3").setWaitForCompletion(true).get(); logger.info("--> verify doc counts"); @@ -2000,7 +1971,7 @@ public void testBulkDeleteWithOverlappingPatterns() { } refresh(); logger.info("--> snapshot {}", i); - CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-" + i) + CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap-" + i) .setWaitForCompletion(true).get(); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), @@ -2008,8 +1979,8 @@ public void testBulkDeleteWithOverlappingPatterns() { } logger.info("--> deleting all snapshots"); - client().admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap-*", "*").get(); - final GetSnapshotsResponse getSnapshotsResponse = client().admin().cluster().prepareGetSnapshots("test-repo").get(); + clusterAdmin().prepareDeleteSnapshot("test-repo", "test-snap-*", "*").get(); + final GetSnapshotsResponse getSnapshotsResponse = clusterAdmin().prepareGetSnapshots("test-repo").get(); assertThat(getSnapshotsResponse.getSnapshots(), empty()); } @@ -2053,7 +2024,7 @@ public void testHiddenIndicesIncludedInSnapshot() throws InterruptedException { // Verify that hidden indices get restored with a wildcard restore { - RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster() + RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin() .prepareRestoreSnapshot(repoName, snapName) .setWaitForCompletion(true) .setIndices("*") @@ -2071,7 +2042,7 @@ public void testHiddenIndicesIncludedInSnapshot() throws InterruptedException { // Verify that exclusions work on hidden indices { - RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster() + RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin() .prepareRestoreSnapshot(repoName, snapName) .setWaitForCompletion(true) .setIndices("*", "-.*") @@ -2089,7 +2060,7 @@ public void testHiddenIndicesIncludedInSnapshot() throws InterruptedException { // Verify that hidden indices can be restored with a non-star pattern { - RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster() + RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin() .prepareRestoreSnapshot(repoName, snapName) .setWaitForCompletion(true) .setIndices("hid*") @@ -2107,7 +2078,7 @@ public void testHiddenIndicesIncludedInSnapshot() throws InterruptedException { // Verify that hidden indices can be restored by fully specified name { - RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster() + RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin() .prepareRestoreSnapshot(repoName, snapName) .setWaitForCompletion(true) .setIndices(dottedHiddenIndex) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java index e96f30f9186a7..9c1e76ece298c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotShardsServiceIT.java @@ -19,7 +19,6 @@ package org.elasticsearch.snapshots; -import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.plugins.Plugin; @@ -66,8 +65,7 @@ public void testRetryPostingSnapshotStatusMessages() throws Exception { .get(); waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60)); - final SnapshotId snapshotId = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap") - .get().getSnapshots().get(0).snapshotId(); + final SnapshotId snapshotId = getSnapshot("test-repo", "test-snap").snapshotId(); logger.info("--> start disrupting cluster"); final NetworkDisruption networkDisruption = isolateMasterDisruption(NetworkDisruption.NetworkDelay.random(random())); @@ -92,10 +90,7 @@ public void testRetryPostingSnapshotStatusMessages() throws Exception { internalCluster().clearDisruptionScheme(true); assertBusy(() -> { - GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster() - .prepareGetSnapshots("test-repo") - .setSnapshots("test-snap").get(); - SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); + SnapshotInfo snapshotInfo = getSnapshot("test-repo", "test-snap"); logger.info("Snapshot status [{}], successfulShards [{}]", snapshotInfo.state(), snapshotInfo.successfulShards()); assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); assertThat(snapshotInfo.successfulShards(), equalTo(shards)); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java index bf27e1a57faaa..012b148b62bf1 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java @@ -65,8 +65,6 @@ protected Settings nodeSettings(int nodeOrdinal) { } public void testStatusApiConsistency() { - Client client = client(); - createRepository("test-repo", "fs"); createIndex("test-idx-1", "test-idx-2", "test-idx-3"); @@ -82,13 +80,13 @@ public void testStatusApiConsistency() { createFullSnapshot("test-repo", "test-snap"); - List snapshotInfos = client.admin().cluster().prepareGetSnapshots("test-repo").get().getSnapshots(); + List snapshotInfos = clusterAdmin().prepareGetSnapshots("test-repo").get().getSnapshots(); assertThat(snapshotInfos.size(), equalTo(1)); SnapshotInfo snapshotInfo = snapshotInfos.get(0); assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); assertThat(snapshotInfo.version(), equalTo(Version.CURRENT)); - final List snapshotStatus = client.admin().cluster().snapshotsStatus( + final List snapshotStatus = clusterAdmin().snapshotsStatus( new SnapshotsStatusRequest("test-repo", new String[]{"test-snap"})).actionGet().getSnapshots(); assertThat(snapshotStatus.size(), equalTo(1)); final SnapshotStatus snStatus = snapshotStatus.get(0); @@ -97,8 +95,6 @@ public void testStatusApiConsistency() { } public void testStatusAPICallInProgressSnapshot() throws Exception { - Client client = client(); - createRepository("test-repo", "mock", Settings.builder().put("location", randomRepoPath()).put("block_on_data", true)); createIndex("test-idx-1"); @@ -111,21 +107,13 @@ public void testStatusAPICallInProgressSnapshot() throws Exception { refresh(); logger.info("--> snapshot"); - ActionFuture createSnapshotResponseActionFuture = - client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute(); + ActionFuture createSnapshotResponseActionFuture = startFullSnapshot("test-repo", "test-snap"); logger.info("--> wait for data nodes to get blocked"); waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1)); - - assertBusy(() -> { - try { - assertEquals(SnapshotsInProgress.State.STARTED, client.admin().cluster().snapshotsStatus( - new SnapshotsStatusRequest("test-repo", new String[]{"test-snap"})).actionGet().getSnapshots().get(0) - .getState()); - } catch (SnapshotMissingException sme) { - throw new AssertionError(sme); - } - }, 1L, TimeUnit.MINUTES); + awaitNumberOfSnapshotsInProgress(1); + assertEquals(SnapshotsInProgress.State.STARTED, client().admin().cluster().prepareSnapshotStatus("test-repo") + .setSnapshots("test-snap").get().getSnapshots().get(0).getState()); logger.info("--> unblock all data nodes"); unblockAllDataNodes("test-repo"); diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index 548674eb16413..22ba247acf294 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -82,7 +83,6 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -90,6 +90,11 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { private static final String OLD_VERSION_SNAPSHOT_PREFIX = "old-version-snapshot-"; + // Large snapshot pool settings to set up nodes for tests involving multiple repositories that need to have enough + // threads so that blocking some threads on one repository doesn't block other repositories from doing work + protected static final Settings LARGE_SNAPSHOT_POOL_SETTINGS = Settings.builder() + .put("thread_pool.snapshot.core", 5).put("thread_pool.snapshot.max", 5).build(); + @Override protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder().put(super.nodeSettings(nodeOrdinal)) @@ -123,11 +128,11 @@ public void verifyNoLeakedListeners() throws Exception { @After public void assertRepoConsistency() { if (skipRepoConsistencyCheckReason == null) { - client().admin().cluster().prepareGetRepositories().get().repositories().forEach(repositoryMetadata -> { + clusterAdmin().prepareGetRepositories().get().repositories().forEach(repositoryMetadata -> { final String name = repositoryMetadata.name(); if (repositoryMetadata.settings().getAsBoolean("readonly", false) == false) { - client().admin().cluster().prepareDeleteSnapshot(name, OLD_VERSION_SNAPSHOT_PREFIX + "*").get(); - client().admin().cluster().prepareCleanupRepository(name).get(); + clusterAdmin().prepareDeleteSnapshot(name, OLD_VERSION_SNAPSHOT_PREFIX + "*").get(); + clusterAdmin().prepareCleanupRepository(name).get(); } BlobStoreTestUtil.assertRepoConsistency(internalCluster(), name); }); @@ -203,12 +208,10 @@ public static void waitForBlock(String node, String repository, TimeValue timeou public SnapshotInfo waitForCompletion(String repository, String snapshotName, TimeValue timeout) throws InterruptedException { long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < timeout.millis()) { - List snapshotInfos = client().admin().cluster().prepareGetSnapshots(repository).setSnapshots(snapshotName) - .get().getSnapshots(); - assertThat(snapshotInfos.size(), equalTo(1)); - if (snapshotInfos.get(0).state().completed()) { + final SnapshotInfo snapshotInfo = getSnapshot(repository, snapshotName); + if (snapshotInfo.state().completed()) { // Make sure that snapshot clean up operations are finished - ClusterStateResponse stateResponse = client().admin().cluster().prepareState().get(); + ClusterStateResponse stateResponse = clusterAdmin().prepareState().get(); boolean found = false; for (SnapshotsInProgress.Entry entry : stateResponse.getState().custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) { @@ -219,7 +222,7 @@ public SnapshotInfo waitForCompletion(String repository, String snapshotName, Ti } } if (found == false) { - return snapshotInfos.get(0); + return snapshotInfo; } } Thread.sleep(100); @@ -308,7 +311,7 @@ public void unblockNode(final String repository, final String node) { protected void createRepository(String repoName, String type, Settings.Builder settings) { logger.info("--> creating repository [{}] [{}]", repoName, type); - assertAcked(client().admin().cluster().preparePutRepository(repoName) + assertAcked(clusterAdmin().preparePutRepository(repoName) .setType(type).setSettings(settings)); } @@ -350,7 +353,7 @@ protected void maybeInitWithOldSnapshotVersion(String repoName, Path repoPath) t protected String initWithSnapshotVersion(String repoName, Path repoPath, Version version) throws IOException { assertThat("This hack only works on an empty repository", getRepositoryData(repoName).getSnapshotIds(), empty()); final String oldVersionSnapshot = OLD_VERSION_SNAPSHOT_PREFIX + version.id; - final CreateSnapshotResponse createSnapshotResponse = client().admin().cluster() + final CreateSnapshotResponse createSnapshotResponse = clusterAdmin() .prepareCreateSnapshot(repoName, oldVersionSnapshot).setIndices("does-not-exist-for-sure-*") .setWaitForCompletion(true).get(); assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), is(0)); @@ -373,7 +376,7 @@ protected String initWithSnapshotVersion(String repoName, Path repoPath, Version protected SnapshotInfo createFullSnapshot(String repoName, String snapshotName) { logger.info("--> creating full snapshot [{}] in [{}]", snapshotName, repoName); - CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(repoName, snapshotName) .setIncludeGlobalState(true) .setWaitForCompletion(true) .get(); @@ -417,7 +420,7 @@ protected void assertDocCount(String index, long count) { * @param metadata snapshot metadata to write (as returned by {@link SnapshotInfo#userMetadata()}) */ protected void addBwCFailedSnapshot(String repoName, String snapshotName, Map metadata) throws Exception { - final ClusterState state = client().admin().cluster().prepareState().get().getState(); + final ClusterState state = clusterAdmin().prepareState().get().getState(); final RepositoriesMetadata repositoriesMetadata = state.metadata().custom(RepositoriesMetadata.TYPE); assertNotNull(repositoriesMetadata); final RepositoryMetadata initialRepoMetadata = repositoriesMetadata.repository(repoName); @@ -447,11 +450,6 @@ protected void awaitClusterState(Predicate statePredicate) throws awaitClusterState(internalCluster().getMasterName(), statePredicate); } - protected ActionFuture startDeleteSnapshot(String repoName, String snapshotName) { - logger.info("--> deleting snapshot [{}] from repo [{}]", snapshotName, repoName); - return clusterAdmin().prepareDeleteSnapshot(repoName, snapshotName).execute(); - } - protected void awaitClusterState(String viaNode, Predicate statePredicate) throws Exception { final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, viaNode); final ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, viaNode); @@ -478,12 +476,76 @@ public void onTimeout(TimeValue timeout) { } } + protected ActionFuture startFullSnapshotBlockedOnDataNode(String snapshotName, String repoName, + String dataNode) throws InterruptedException { + blockDataNode(repoName, dataNode); + final ActionFuture fut = startFullSnapshot(repoName, snapshotName); + waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L)); + return fut; + } + + protected ActionFuture startFullSnapshot(String repoName, String snapshotName) { + return startFullSnapshot(repoName, snapshotName, false); + } + + protected ActionFuture startFullSnapshot(String repoName, String snapshotName, boolean partial) { + logger.info("--> creating full snapshot [{}] to repo [{}]", snapshotName, repoName); + return clusterAdmin().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true) + .setPartial(partial).execute(); + } + + protected void awaitNumberOfSnapshotsInProgress(int count) throws Exception { + logger.info("--> wait for [{}] snapshots to show up in the cluster state", count); + awaitClusterState(state -> + state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().size() == count); + } + protected static SnapshotInfo assertSuccessful(ActionFuture future) throws Exception { final SnapshotInfo snapshotInfo = future.get().getSnapshotInfo(); assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS)); return snapshotInfo; } + private static final Settings SINGLE_SHARD_NO_REPLICA = indexSettingsNoReplicas(1).build(); + + protected void createIndexWithContent(String indexName) { + createIndexWithContent(indexName, SINGLE_SHARD_NO_REPLICA); + } + + protected void createIndexWithContent(String indexName, Settings indexSettings) { + logger.info("--> creating index [{}]", indexName); + createIndex(indexName, indexSettings); + ensureGreen(indexName); + index(indexName, "_doc", "some_id", "foo", "bar"); + } + + protected ActionFuture startDeleteSnapshot(String repoName, String snapshotName) { + logger.info("--> deleting snapshot [{}] from repo [{}]", snapshotName, repoName); + return clusterAdmin().prepareDeleteSnapshot(repoName, snapshotName).execute(); + } + + protected void updateClusterState(final Function updater) throws Exception { + final PlainActionFuture future = PlainActionFuture.newFuture(); + final ClusterService clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + clusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return updater.apply(currentState); + } + + @Override + public void onFailure(String source, Exception e) { + future.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + future.onResponse(null); + } + }); + future.get(); + } + protected SnapshotInfo getSnapshot(String repository, String snapshot) { final List snapshotInfos = clusterAdmin().prepareGetSnapshots(repository).setSnapshots(snapshot) .get().getSnapshots(); diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java index 4f457ae7302dc..9dbb388b75a8a 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java @@ -10,7 +10,6 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; -import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.IndicesOptions; @@ -115,10 +114,7 @@ public void testSnapshotAndRestore() throws Exception { RestStatus status = createSnapshotResponse.getSnapshotInfo().status(); assertEquals(RestStatus.OK, status); - GetSnapshotsResponse snapshot = client.admin().cluster().prepareGetSnapshots(REPO).setSnapshots(SNAPSHOT).get(); - java.util.List snap = snapshot.getSnapshots(); - assertEquals(1, snap.size()); - assertEquals(Collections.singletonList(DS_BACKING_INDEX_NAME), snap.get(0).indices()); + assertEquals(Collections.singletonList(DS_BACKING_INDEX_NAME), getSnapshot(REPO, SNAPSHOT).indices()); assertTrue( client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "ds" })) @@ -161,10 +157,7 @@ public void testSnapshotAndRestoreAll() throws Exception { RestStatus status = createSnapshotResponse.getSnapshotInfo().status(); assertEquals(RestStatus.OK, status); - GetSnapshotsResponse snapshot = client.admin().cluster().prepareGetSnapshots(REPO).setSnapshots(SNAPSHOT).get(); - java.util.List snap = snapshot.getSnapshots(); - assertEquals(1, snap.size()); - assertEquals(Collections.singletonList(DS_BACKING_INDEX_NAME), snap.get(0).indices()); + assertEquals(Collections.singletonList(DS_BACKING_INDEX_NAME), getSnapshot(REPO, SNAPSHOT).indices()); assertAcked(client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "*" })).get()); assertAcked(client.admin().indices().prepareDelete("*").setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN));