From e61469aae667858c23b3187d5115704585e1ac8e Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 3 May 2019 11:39:16 -0400 Subject: [PATCH] Noop peer recoveries on closed index (#41400) If users close an index to change some non-dynamic index settings, then the current implementation forces replicas of that closed index to copy over segment files from the primary. With this change, we make peer recoveries of closed index skip both phases. Relates #33888 Co-authored-by: Yannick Welsch --- .../index/engine/InternalEngine.java | 4 + .../index/engine/ReadOnlyEngine.java | 3 +- .../indices/state/CloseIndexIT.java | 91 +++++++++++++++++++ 3 files changed, 97 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index f0cafe2e0f05a..38fea26facc6c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2568,6 +2568,10 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS @Override public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException { final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint(); + // avoid scanning translog if not necessary + if (startingSeqNo > currentLocalCheckpoint) { + return true; + } final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { Translog.Operation operation; diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 5cf62dbbf7c74..d22253550ca99 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -300,7 +300,8 @@ public int estimateNumberOfHistoryOperations(String source, MapperService mapper @Override public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException { - return false; + // we can do operation-based recovery if we don't have to replay any operation. + return startingSeqNo > seqNoStats.getMaxSeqNo(); } @Override diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index 8ee7d430b0a35..e3808175f763e 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -22,25 +22,31 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.indices.IndexClosedException; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; import java.util.ArrayList; import java.util.List; import java.util.Locale; import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static java.util.Collections.emptySet; @@ -50,9 +56,11 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; public class CloseIndexIT extends ESIntegTestCase { @@ -341,6 +349,81 @@ public void testCloseIndexWaitForActiveShards() throws Exception { assertIndexIsClosed(indexName); } + public void testNoopPeerRecoveriesWhenIndexClosed() throws Exception { + final String indexName = "noop-peer-recovery-test"; + int numberOfReplicas = between(1, 2); + internalCluster().ensureAtLeastNumDataNodes(numberOfReplicas + between(1, 2)); + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) + .put("index.routing.rebalance.enable", "none") + .build()); + int iterations = between(1, 3); + for (int iter = 0; iter < iterations; iter++) { + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + ensureGreen(indexName); + + // Closing an index should execute noop peer recovery + assertAcked(client().admin().indices().prepareClose(indexName).get()); + assertIndexIsClosed(indexName); + ensureGreen(indexName); + assertNoFileBasedRecovery(indexName); + internalCluster().assertSameDocIdsOnShards(); + + // Open a closed index should execute noop recovery + assertAcked(client().admin().indices().prepareOpen(indexName).get()); + assertIndexIsOpened(indexName); + ensureGreen(indexName); + assertNoFileBasedRecovery(indexName); + internalCluster().assertSameDocIdsOnShards(); + } + } + + /** + * Ensures that if a replica of a closed index does not have the same content as the primary, then a file-based recovery will occur. + */ + public void testRecoverExistingReplica() throws Exception { + final String indexName = "test-recover-existing-replica"; + internalCluster().ensureAtLeastNumDataNodes(2); + List dataNodes = randomSubsetOf(2, Sets.newHashSet( + clusterService().state().nodes().getDataNodes().valuesIt()).stream().map(DiscoveryNode::getName).collect(Collectors.toSet())); + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put("index.routing.allocation.include._name", String.join(",", dataNodes)) + .build()); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + ensureGreen(indexName); + if (randomBoolean()) { + client().admin().indices().prepareFlush(indexName).get(); + } else { + client().admin().indices().prepareSyncedFlush(indexName).get(); + } + // index more documents while one shard copy is offline + internalCluster().restartNode(dataNodes.get(1), new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + Client client = client(dataNodes.get(0)); + int moreDocs = randomIntBetween(1, 50); + for (int i = 0; i < moreDocs; i++) { + client.prepareIndex(indexName, "_doc").setSource("num", i).get(); + } + assertAcked(client.admin().indices().prepareClose(indexName)); + return super.onNodeStopped(nodeName); + } + }); + assertIndexIsClosed(indexName); + ensureGreen(indexName); + internalCluster().assertSameDocIdsOnShards(); + for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) { + if (recovery.getPrimary() == false) { + assertThat(recovery.getIndex().fileDetails(), not(empty())); + } + } + } + static void assertIndexIsClosed(final String... indices) { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); for (String index : indices) { @@ -386,4 +469,12 @@ static void assertException(final Throwable throwable, final String indexName) { fail("Unexpected exception: " + t); } } + + void assertNoFileBasedRecovery(String indexName) { + for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) { + if (recovery.getPrimary() == false) { + assertThat(recovery.getIndex().fileDetails(), empty()); + } + } + } }