From d9376eec6bc69ab4bf9043175135abc7b0579004 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Fri, 3 May 2019 13:07:38 +0200 Subject: [PATCH] Closed index replica allocation Added integration test validating that fast recovery is made for closed indices when multiple shard copies can be chosen from. Fixed InternalTestCluster to allow doing operations inside onStopped() when using restartXXXNode(). Relates #41400 and #33888 --- .../indices/state/CloseIndexIT.java | 49 +++++++++++++++++++ .../test/InternalTestCluster.java | 25 +++++++--- 2 files changed, 66 insertions(+), 8 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index af98ba990b253..8625397c1fd90 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -36,6 +37,7 @@ import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; import java.util.ArrayList; import java.util.List; @@ -338,6 +340,53 @@ public void testCloseIndexWaitForActiveShards() throws Exception { assertIndexIsClosed(indexName); } + /** + * Verify that if we have two shard copies around, we prefer one with identical sequence numbers and do + * a noop recovery. + */ + public void testClosedIndexRecoversFast() throws Exception { + final String indexName = "closed-index-fast-recovery"; + internalCluster().ensureAtLeastNumDataNodes(3); + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build()); + + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(1, 50)) + .mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList())); + ensureGreen(indexName); + + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(1, 50)) + .mapToObj(i -> client().prepareIndex(indexName, "_doc", "Extra" + i).setSource("num", i)).collect(toList())); + ensureGreen(); + + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + ensureYellow(); + + assertAcked(client().admin().indices().prepareClose(indexName).get()); + + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none")).get()); + return super.onNodeStopped(nodeName); + } + }); + return super.onNodeStopped(nodeName); + } + }); + + assertThat(client().admin().cluster().prepareHealth(indexName).get().getStatus(), is(ClusterHealthStatus.YELLOW)); + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), (String) null)).get()); + ensureGreen(); + // needs merge of #41400 before we can check this. +// assertNoFileBasedRecovery(indexName); + } + static void assertIndexIsClosed(final String... indices) { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); for (String index : indices) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 7ff928c4413d2..0089cd0346681 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -148,8 +148,8 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_TYPE_SETTING; import static org.elasticsearch.discovery.DiscoveryModule.ZEN2_DISCOVERY_TYPE; -import static org.elasticsearch.node.Node.INITIAL_STATE_TIMEOUT_SETTING; import static org.elasticsearch.discovery.FileBasedSeedHostsProvider.UNICAST_HOSTS_FILE; +import static org.elasticsearch.node.Node.INITIAL_STATE_TIMEOUT_SETTING; import static org.elasticsearch.test.ESTestCase.assertBusy; import static org.elasticsearch.test.ESTestCase.awaitBusy; import static org.elasticsearch.test.ESTestCase.getTestTransportType; @@ -564,7 +564,8 @@ private NodeAndClient getRandomNodeAndClient() { private synchronized NodeAndClient getRandomNodeAndClient(Predicate predicate) { ensureOpen(); - List values = nodes.values().stream().filter(predicate).collect(Collectors.toList()); + List values = nodes.values().stream().filter(nc -> nc.isClosed() == false).filter(predicate) + .collect(Collectors.toList()); if (values.isEmpty() == false) { return randomFrom(random, values); } @@ -1003,6 +1004,10 @@ public void close() throws IOException { } } + public boolean isClosed() { + return closed.get(); + } + private void markNodeDataDirsAsPendingForWipe(Node node) { assert Thread.holdsLock(InternalTestCluster.this); NodeEnvironment nodeEnv = node.getNodeEnvironment(); @@ -1178,10 +1183,11 @@ public synchronized void validateClusterFormed() { /** ensure a cluster is formed with all published nodes, but do so by using the client of the specified node */ private synchronized void validateClusterFormed(String viaNode) { - Set expectedNodes = new HashSet<>(); - for (NodeAndClient nodeAndClient : nodes.values()) { - expectedNodes.add(getInstanceFromNode(ClusterService.class, nodeAndClient.node()).localNode()); - } + Set expectedNodes = + nodes.values().stream() + .filter(nc -> nc.isClosed() == false) + .map(nc -> getInstanceFromNode(ClusterService.class, nc.node()).localNode()) + .collect(Collectors.toSet()); logger.trace("validating cluster formed via [{}], expecting {}", viaNode, expectedNodes); final Client client = client(viaNode); try { @@ -1533,7 +1539,7 @@ private static T getInstanceFromNode(Class clazz, Node node) { @Override public int size() { - return nodes.size(); + return Math.toIntExact(nodes.values().stream().filter(nc -> nc.isClosed() == false).count()); } @Override @@ -2085,7 +2091,10 @@ private static int getMinMasterNodes(int eligibleMasterNodes) { } private int getMasterNodesCount() { - return (int) nodes.values().stream().filter(n -> Node.NODE_MASTER_SETTING.get(n.node().settings())).count(); + return (int) nodes.values().stream() + .filter(n -> n.isClosed() == false) + .filter(n -> Node.NODE_MASTER_SETTING.get(n.node().settings())) + .count(); } public String startMasterOnlyNode() {