From 54e6e4c2ea7d041e65e58d351b6f9c93fdb5583b Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 16 Dec 2019 12:47:31 +0100 Subject: [PATCH 1/3] Fix Index Deletion during Snapshot Finalization (#50202) With #45689 making it so that index metadata is written after all shards have been snapshotted we can't delete indices that are part of the upcoming snapshot finalization any longer and it is not sufficient to check if all shards of an index have been snapshotted before deciding that it is safe to delete it. This change forbids deleting any index that is in the process of being snapshot to avoid issues during snapshot finalization. Relates #50200 (doesn't fully fix yet because we're not fixing the `partial=true` snapshot case here --- .../snapshots/SnapshotsService.java | 19 +- .../snapshots/SnapshotResiliencyTests.java | 170 +++++++++++++----- 2 files changed, 127 insertions(+), 62 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 24a13dc270b7e..bd1d0e7dafb5d 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1468,21 +1468,10 @@ public static Set snapshottingIndices(final ClusterState currentState, fi final Set indices = new HashSet<>(); for (final SnapshotsInProgress.Entry entry : snapshots.entries()) { if (entry.partial() == false) { - if (entry.state() == State.INIT) { - for (IndexId index : entry.indices()) { - IndexMetaData indexMetaData = currentState.metaData().index(index.getName()); - if (indexMetaData != null && indicesToCheck.contains(indexMetaData.getIndex())) { - indices.add(indexMetaData.getIndex()); - } - } - } else { - for (ObjectObjectCursor shard : entry.shards()) { - Index index = shard.key.getIndex(); - if (indicesToCheck.contains(index) - && shard.value.state().completed() == false - && currentState.getMetaData().index(index) != null) { - indices.add(index); - } + for (IndexId index : entry.indices()) { + IndexMetaData indexMetaData = currentState.metaData().index(index.getName()); + if (indexMetaData != null && indicesToCheck.contains(indexMetaData.getIndex())) { + indices.add(indexMetaData.getIndex()); } } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 45c66caa7900f..58b78caf37648 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -21,11 +21,14 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.RequestValidators; import org.elasticsearch.action.StepListener; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.put.TransportPutRepositoryAction; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteAction; @@ -75,6 +78,7 @@ import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.DestructiveOperations; +import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -156,6 +160,7 @@ import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.repositories.fs.FsRepository; @@ -203,6 +208,7 @@ import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.mockito.Mockito.mock; @@ -233,6 +239,15 @@ public void createServices() { @After public void verifyReposThenStopServices() { try { + clearDisruptionsAndAwaitSync(); + + final StepListener cleanupResponse = new StepListener<>(); + client().admin().cluster().cleanupRepository( + new CleanupRepositoryRequest("repo"), cleanupResponse); + final AtomicBoolean cleanedUp = new AtomicBoolean(false); + continueOrDie(cleanupResponse, r -> cleanedUp.set(true)); + + runUntil(cleanedUp::get, TimeUnit.MINUTES.toMillis(1L)); if (blobStoreContext != null) { blobStoreContext.forceConsistent(); } @@ -258,8 +273,8 @@ public void testSuccessfulSnapshotAndRestore() { final StepListener createSnapshotResponseListener = new StepListener<>(); - continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> { - final Runnable afterIndexing = () -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> { + final Runnable afterIndexing = () -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName) .setWaitForCompletion(true).execute(createSnapshotResponseListener); if (documents == 0) { afterIndexing.run(); @@ -269,7 +284,7 @@ public void testSuccessfulSnapshotAndRestore() { bulkRequest.add(new IndexRequest(index).source(Collections.singletonMap("foo", "bar" + i))); } final StepListener bulkResponseStepListener = new StepListener<>(); - masterNode.client.bulk(bulkRequest, bulkResponseStepListener); + client().bulk(bulkRequest, bulkResponseStepListener); continueOrDie(bulkResponseStepListener, bulkResponse -> { assertFalse("Failures in bulk response: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures()); assertEquals(documents, bulkResponse.getItems().length); @@ -281,16 +296,16 @@ public void testSuccessfulSnapshotAndRestore() { final StepListener deleteIndexListener = new StepListener<>(); continueOrDie(createSnapshotResponseListener, - createSnapshotResponse -> masterNode.client.admin().indices().delete(new DeleteIndexRequest(index), deleteIndexListener)); + createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), deleteIndexListener)); final StepListener restoreSnapshotResponseListener = new StepListener<>(); - continueOrDie(deleteIndexListener, ignored -> masterNode.client.admin().cluster().restoreSnapshot( + continueOrDie(deleteIndexListener, ignored -> client().admin().cluster().restoreSnapshot( new RestoreSnapshotRequest(repoName, snapshotName).waitForCompletion(true), restoreSnapshotResponseListener)); final StepListener searchResponseListener = new StepListener<>(); continueOrDie(restoreSnapshotResponseListener, restoreSnapshotResponse -> { assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards()); - masterNode.client.search( + client().search( new SearchRequest(index).source(new SearchSourceBuilder().size(0).trackTotalHits(true)), searchResponseListener); }); @@ -307,7 +322,7 @@ public void testSuccessfulSnapshotAndRestore() { SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = masterNode.repositoriesService.repository(repoName); - Collection snapshotIds = repository.getRepositoryData().getSnapshotIds(); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, hasSize(1)); final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); @@ -319,33 +334,34 @@ public void testSuccessfulSnapshotAndRestore() { public void testSnapshotWithNodeDisconnects() { final int dataNodes = randomIntBetween(2, 10); - setupTestCluster(randomFrom(1, 3, 5), dataNodes); + final int masterNodes = randomFrom(1, 3, 5); + setupTestCluster(masterNodes, dataNodes); String repoName = "repo"; String snapshotName = "snapshot"; final String index = "test"; final int shards = randomIntBetween(1, 10); - TestClusterNodes.TestClusterNode masterNode = - testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); final StepListener createSnapshotResponseStepListener = new StepListener<>(); - continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> { + continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> { for (int i = 0; i < randomIntBetween(0, dataNodes); ++i) { scheduleNow(this::disconnectRandomDataNode); } if (randomBoolean()) { scheduleNow(() -> testClusterNodes.clearNetworkDisruptions()); } - masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName).execute(createSnapshotResponseStepListener); + testClusterNodes.randomMasterNodeSafe().client.admin().cluster() + .prepareCreateSnapshot(repoName, snapshotName).execute(createSnapshotResponseStepListener); }); continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> { for (int i = 0; i < randomIntBetween(0, dataNodes); ++i) { scheduleNow(this::disconnectOrRestartDataNode); } - final boolean disconnectedMaster = randomBoolean(); + // Only disconnect master if we have more than a single master and can simulate a failover + final boolean disconnectedMaster = randomBoolean() && masterNodes > 1; if (disconnectedMaster) { scheduleNow(this::disconnectOrRestartMasterNode); } @@ -368,7 +384,7 @@ public void testSnapshotWithNodeDisconnects() { SnapshotsInProgress finalSnapshotsInProgress = randomMaster.clusterService.state().custom(SnapshotsInProgress.TYPE); assertThat(finalSnapshotsInProgress.entries(), empty()); final Repository repository = randomMaster.repositoriesService.repository(repoName); - Collection snapshotIds = repository.getRepositoryData().getSnapshotIds(); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, hasSize(1)); } @@ -385,18 +401,18 @@ public void testConcurrentSnapshotCreateAndDelete() { final StepListener createSnapshotResponseStepListener = new StepListener<>(); - continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), - createIndexResponse -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + continueOrDie(createRepoAndIndex(repoName, index, shards), + createIndexResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName) .execute(createSnapshotResponseStepListener)); final StepListener deleteSnapshotStepListener = new StepListener<>(); - continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> masterNode.client.admin().cluster().deleteSnapshot( + continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> client().admin().cluster().deleteSnapshot( new DeleteSnapshotRequest(repoName, snapshotName), deleteSnapshotStepListener)); final StepListener createAnotherSnapshotResponseStepListener = new StepListener<>(); - continueOrDie(deleteSnapshotStepListener, acknowledgedResponse -> masterNode.client.admin().cluster() + continueOrDie(deleteSnapshotStepListener, acknowledgedResponse -> client().admin().cluster() .prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true).execute(createAnotherSnapshotResponseStepListener)); continueOrDie(createAnotherSnapshotResponseStepListener, createSnapshotResponse -> assertEquals(createSnapshotResponse.getSnapshotInfo().state(), SnapshotState.SUCCESS)); @@ -407,7 +423,7 @@ public void testConcurrentSnapshotCreateAndDelete() { SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = masterNode.repositoriesService.repository(repoName); - Collection snapshotIds = repository.getRepositoryData().getSnapshotIds(); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, hasSize(1)); final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); @@ -417,6 +433,50 @@ public void testConcurrentSnapshotCreateAndDelete() { assertEquals(0, snapshotInfo.failedShards()); } + public void testConcurrentSnapshotDeleteAndDeleteIndex() { + setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); + + String repoName = "repo"; + String snapshotName = "snapshot"; + final String index = "test"; + + TestClusterNodes.TestClusterNode masterNode = + testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); + + final StepListener> createIndicesListener = new StepListener<>(); + + continueOrDie(createRepoAndIndex(repoName, index, 1), createIndexResponse -> { + // create a few more indices to make it more likely that the subsequent index delete operation happens before snapshot + // finalization + final int indices = randomIntBetween(5, 20); + final GroupedActionListener listener = new GroupedActionListener<>(createIndicesListener, indices); + for (int i = 0; i < indices; ++i) { + client().admin().indices().create(new CreateIndexRequest("index-" + i), listener); + } + }); + + final StepListener createSnapshotResponseStepListener = new StepListener<>(); + + continueOrDie(createIndicesListener, createIndexResponses -> + client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(false) + .execute(createSnapshotResponseStepListener)); + + continueOrDie(createSnapshotResponseStepListener, + createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), noopListener())); + + deterministicTaskQueue.runAllRunnableTasks(); + + SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); + assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); + final Repository repository = masterNode.repositoriesService.repository(repoName); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); + assertThat(snapshotIds, hasSize(1)); + + final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); + assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + assertEquals(0, snapshotInfo.failedShards()); + } + /** * Simulates concurrent restarts of data and master nodes as well as relocating a primary shard, while starting and subsequently * deleting a snapshot. @@ -438,8 +498,8 @@ public void testSnapshotPrimaryRelocations() { final StepListener clusterStateResponseStepListener = new StepListener<>(); - continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), - createIndexResponse -> masterAdminClient.cluster().state(new ClusterStateRequest(), clusterStateResponseStepListener)); + continueOrDie(createRepoAndIndex(repoName, index, shards), + createIndexResponse -> client().admin().cluster().state(new ClusterStateRequest(), clusterStateResponseStepListener)); continueOrDie(clusterStateResponseStepListener, clusterStateResponse -> { final ShardRouting shardToRelocate = clusterStateResponse.getState().routingTable().allShards(index).get(0); @@ -490,8 +550,8 @@ public void run() { final SnapshotsInProgress finalSnapshotsInProgress = testClusterNodes.randomDataNodeSafe() .clusterService.state().custom(SnapshotsInProgress.TYPE); assertThat(finalSnapshotsInProgress.entries(), empty()); - final Repository repository = masterNode.repositoriesService.repository(repoName); - Collection snapshotIds = repository.getRepositoryData().getSnapshotIds(); + final Repository repository = testClusterNodes.randomMasterNodeSafe().repositoriesService.repository(repoName); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, either(hasSize(1)).or(hasSize(0))); } @@ -509,19 +569,18 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { final StepListener createSnapshotResponseStepListener = new StepListener<>(); - continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> { + continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> { final AtomicBoolean initiatedSnapshot = new AtomicBoolean(false); for (int i = 0; i < documents; ++i) { // Index a few documents with different field names so we trigger a dynamic mapping update for each of them - masterNode.client.bulk( - new BulkRequest().add(new IndexRequest(index).source(Collections.singletonMap("foo" + i, "bar"))) + client().bulk(new BulkRequest().add(new IndexRequest(index).source(Collections.singletonMap("foo" + i, "bar"))) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), assertNoFailureListener( bulkResponse -> { assertFalse("Failures in bulkresponse: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures()); if (initiatedSnapshot.compareAndSet(false, true)) { - masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) - .setWaitForCompletion(true).execute(createSnapshotResponseStepListener); + client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true) + .execute(createSnapshotResponseStepListener); } })); } @@ -531,7 +590,7 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { final StepListener restoreSnapshotResponseStepListener = new StepListener<>(); - continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> masterNode.client.admin().cluster().restoreSnapshot( + continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> client().admin().cluster().restoreSnapshot( new RestoreSnapshotRequest(repoName, snapshotName) .renamePattern(index).renameReplacement(restoredIndex).waitForCompletion(true), restoreSnapshotResponseStepListener)); @@ -539,8 +598,7 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { continueOrDie(restoreSnapshotResponseStepListener, restoreSnapshotResponse -> { assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards()); - masterNode.client.search( - new SearchRequest(restoredIndex).source(new SearchSourceBuilder().size(documents).trackTotalHits(true)), + client().search(new SearchRequest(restoredIndex).source(new SearchSourceBuilder().size(documents).trackTotalHits(true)), searchResponseStepListener); }); @@ -564,7 +622,7 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = masterNode.repositoriesService.repository(repoName); - Collection snapshotIds = repository.getRepositoryData().getSnapshotIds(); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, hasSize(1)); final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); @@ -574,18 +632,19 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { assertEquals(0, snapshotInfo.failedShards()); } - private StepListener createRepoAndIndex(TestClusterNodes.TestClusterNode masterNode, String repoName, - String index, int shards) { - final AdminClient adminClient = masterNode.client.admin(); + private RepositoryData getRepositoryData(Repository repository) { + return repository.getRepositoryData(); + } + private StepListener createRepoAndIndex(String repoName, String index, int shards) { final StepListener createRepositoryListener = new StepListener<>(); - adminClient.cluster().preparePutRepository(repoName).setType(FsRepository.TYPE) + client().admin().cluster().preparePutRepository(repoName).setType(FsRepository.TYPE) .setSettings(Settings.builder().put("location", randomAlphaOfLength(10))).execute(createRepositoryListener); final StepListener createIndexResponseStepListener = new StepListener<>(); - continueOrDie(createRepositoryListener, acknowledgedResponse -> adminClient.indices().create( + continueOrDie(createRepositoryListener, acknowledgedResponse -> client().admin().indices().create( new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL).settings(defaultIndexSettings(shards)), createIndexResponseStepListener)); @@ -594,11 +653,7 @@ private StepListener createRepoAndIndex(TestClusterNodes.Te private void clearDisruptionsAndAwaitSync() { testClusterNodes.clearNetworkDisruptions(); - runUntil(() -> { - final List versions = testClusterNodes.nodes.values().stream() - .map(n -> n.clusterService.state().version()).distinct().collect(Collectors.toList()); - return versions.size() == 1L; - }, TimeUnit.MINUTES.toMillis(1L)); + stabilize(); } private void disconnectOrRestartDataNode() { @@ -635,15 +690,25 @@ private void startCluster() { .filter(DiscoveryNode::isMasterNode).map(DiscoveryNode::getId).collect(Collectors.toSet())); testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode()).forEach( testClusterNode -> testClusterNode.coordinator.setInitialConfiguration(votingConfiguration)); + // Connect all nodes to each other + testClusterNodes.nodes.values().forEach(node -> testClusterNodes.nodes.values().forEach( + n -> n.transportService.connectToNode(node.node, null, + ActionTestUtils.assertNoFailureListener(c -> logger.info("--> Connected [{}] to [{}]", n.node, node.node))))); + stabilize(); + } + private void stabilize() { runUntil( () -> { - List masterNodeIds = testClusterNodes.nodes.values().stream() - .map(node -> node.clusterService.state().nodes().getMasterNodeId()) - .distinct().collect(Collectors.toList()); - return masterNodeIds.size() == 1 && masterNodeIds.contains(null) == false; + final Collection clusterStates = + testClusterNodes.nodes.values().stream().map(node -> node.clusterService.state()).collect(Collectors.toList()); + final Set masterNodeIds = clusterStates.stream() + .map(clusterState -> clusterState.nodes().getMasterNodeId()).collect(Collectors.toSet()); + final Set terms = clusterStates.stream().map(ClusterState::term).collect(Collectors.toSet()); + final List versions = clusterStates.stream().map(ClusterState::version).distinct().collect(Collectors.toList()); + return versions.size() == 1 && masterNodeIds.size() == 1 && masterNodeIds.contains(null) == false && terms.size() == 1; }, - TimeUnit.SECONDS.toMillis(30L) + TimeUnit.MINUTES.toMillis(1L) ); } @@ -689,6 +754,16 @@ private static ActionListener noopListener() { return ActionListener.wrap(() -> {}); } + public NodeClient client() { + // Select from sorted list of nodes + final List nodes = testClusterNodes.nodes.values().stream() + .filter(n -> testClusterNodes.disconnectedNodes.contains(n.node.getName()) == false) + .sorted(Comparator.comparing(n -> n.node.getName())).collect(Collectors.toList()); + if (nodes.isEmpty()) { + throw new AssertionError("No nodes available"); + } + return randomFrom(nodes).client; + } /** * Create a {@link Environment} with random path.home and path.repo **/ @@ -765,6 +840,7 @@ public TestClusterNode randomMasterNodeSafe() { public Optional randomMasterNode() { // Select from sorted list of data-nodes here to not have deterministic behaviour final List masterNodes = testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode()) + .filter(n -> disconnectedNodes.contains(n.node.getName()) == false) .sorted(Comparator.comparing(n -> n.node.getName())).collect(Collectors.toList()); return masterNodes.isEmpty() ? Optional.empty() : Optional.of(randomFrom(masterNodes)); } From 6d170798d5d2063d79ab4815b70daf99494a6c2c Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 16 Dec 2019 13:29:40 +0100 Subject: [PATCH 2/3] CS --- .../org/elasticsearch/snapshots/SnapshotResiliencyTests.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 58b78caf37648..7e6a05aafd44b 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -21,7 +21,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionType; @@ -208,7 +207,6 @@ import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.mockito.Mockito.mock; From f80c270c5320d1dee570b013bc68146f86e26592 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 16 Dec 2019 14:27:40 +0100 Subject: [PATCH 3/3] fix test --- .../org/elasticsearch/snapshots/SnapshotResiliencyTests.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 7e6a05aafd44b..5e483981b14fe 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -26,8 +26,10 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.RequestValidators; import org.elasticsearch.action.StepListener; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest; import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.TransportCleanupRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.put.TransportPutRepositoryAction; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteAction; @@ -1182,6 +1184,8 @@ searchTransportService, new SearchPhaseController(searchService::createReduceCon transportService, clusterService, repositoriesService, threadPool, actionFilters, indexNameExpressionResolver )); + actions.put(CleanupRepositoryAction.INSTANCE, new TransportCleanupRepositoryAction(transportService, clusterService, + repositoriesService, threadPool, actionFilters, indexNameExpressionResolver)); actions.put(CreateSnapshotAction.INSTANCE, new TransportCreateSnapshotAction( transportService, clusterService, threadPool,