Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Noop peer recoveries on closed index #41400

Merged
merged 20 commits into from
May 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2559,6 +2559,10 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS
@Override
public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException {
final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint();
// avoid scanning translog if not necessary
if (startingSeqNo > currentLocalCheckpoint) {
return true;
}
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
Translog.Operation operation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ public int estimateNumberOfHistoryOperations(String source, MapperService mapper

@Override
public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException {
return false;
// we can do operation-based recovery if we don't have to replay any operation.
return startingSeqNo > seqNoStats.getMaxSeqNo();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,31 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;

import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static java.util.Collections.emptySet;
Expand All @@ -50,9 +56,11 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;

public class CloseIndexIT extends ESIntegTestCase {
Expand Down Expand Up @@ -338,6 +346,81 @@ public void testCloseIndexWaitForActiveShards() throws Exception {
assertIndexIsClosed(indexName);
}

public void testNoopPeerRecoveriesWhenIndexClosed() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to also test the scenario you described here:

#41400 (comment)

where we expect file based recovery and verify same docs on all shards.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@henningandersen Good suggestion. However we can't test that scenario for now since closing a follower index with gaps in sequence number will make all its shard unassigned; hence no peer recovery will be performed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dnhatn Can you implement the test scenario that you've described for regular indices (instead of follower index)? It will then show that a closed replica index that is missing some docs IS doing a file-based recovery.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test in b50d3f2.

final String indexName = "noop-peer-recovery-test";
int numberOfReplicas = between(1, 2);
internalCluster().ensureAtLeastNumDataNodes(numberOfReplicas + between(1, 2));
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.put("index.routing.rebalance.enable", "none")
.build());
int iterations = between(1, 3);
for (int iter = 0; iter < iterations; iter++) {
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50))
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList()));
ensureGreen(indexName);

// Closing an index should execute noop peer recovery
assertAcked(client().admin().indices().prepareClose(indexName).get());
assertIndexIsClosed(indexName);
ensureGreen(indexName);
assertNoFileBasedRecovery(indexName);
internalCluster().assertSameDocIdsOnShards();

// Open a closed index should execute noop recovery
assertAcked(client().admin().indices().prepareOpen(indexName).get());
assertIndexIsOpened(indexName);
ensureGreen(indexName);
assertNoFileBasedRecovery(indexName);
internalCluster().assertSameDocIdsOnShards();
}
}

/**
* Ensures that if a replica of a closed index does not have the same content as the primary, then a file-based recovery will occur.
*/
public void testRecoverExistingReplica() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps add a comment that says that this tests recovery of a replica of a closed index that has some docs missing that were on the primary, leading to a file-based recovery

final String indexName = "test-recover-existing-replica";
internalCluster().ensureAtLeastNumDataNodes(2);
List<String> dataNodes = randomSubsetOf(2, Sets.newHashSet(
clusterService().state().nodes().getDataNodes().valuesIt()).stream().map(DiscoveryNode::getName).collect(Collectors.toSet()));
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.routing.allocation.include._name", String.join(",", dataNodes))
.build());
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50))
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList()));
ensureGreen(indexName);
if (randomBoolean()) {
client().admin().indices().prepareFlush(indexName).get();
} else {
client().admin().indices().prepareSyncedFlush(indexName).get();
}
// index more documents while one shard copy is offline
internalCluster().restartNode(dataNodes.get(1), new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
Client client = client(dataNodes.get(0));
int moreDocs = randomIntBetween(1, 50);
for (int i = 0; i < moreDocs; i++) {
client.prepareIndex(indexName, "_doc").setSource("num", i).get();
}
assertAcked(client.admin().indices().prepareClose(indexName));
return super.onNodeStopped(nodeName);
}
});
assertIndexIsClosed(indexName);
ensureGreen(indexName);
internalCluster().assertSameDocIdsOnShards();
for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) {
if (recovery.getPrimary() == false) {
assertThat(recovery.getIndex().fileDetails(), not(empty()));
}
}
}

static void assertIndexIsClosed(final String... indices) {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
for (String index : indices) {
Expand Down Expand Up @@ -383,4 +466,12 @@ static void assertException(final Throwable throwable, final String indexName) {
fail("Unexpected exception: " + t);
}
}

void assertNoFileBasedRecovery(String indexName) {
for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) {
if (recovery.getPrimary() == false) {
assertThat(recovery.getIndex().fileDetails(), empty());
}
}
}
}