Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recover peers from translog, ignoring soft deletes #38904

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ public abstract int estimateNumberOfHistoryOperations(String source,
MapperService mapperService, long startingSeqNo) throws IOException;

/**
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog)
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its translog
*/
public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,16 +502,11 @@ public void syncTranslog() throws IOException {
}

/**
* Creates a new history snapshot for reading operations since the provided seqno.
* The returned snapshot can be retrieved from either Lucene index or translog files.
* Creates a new history snapshot for reading operations since the provided seqno from the translog.
*/
@Override
public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
return newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
} else {
return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
}
return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
}

/**
Expand Down Expand Up @@ -2546,21 +2541,17 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS

@Override
public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
return getMinRetainedSeqNo() <= startingSeqNo;
} else {
final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint();
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
tracker.markSeqNoAsCompleted(operation.seqNo());
}
final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint();
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
tracker.markSeqNoAsCompleted(operation.seqNo());
}
}
return tracker.getCheckpoint() >= currentLocalCheckpoint;
}
return tracker.getCheckpoint() >= currentLocalCheckpoint;
}

/**
Expand All @@ -2575,7 +2566,19 @@ public final long getMinRetainedSeqNo() {
@Override
public Closeable acquireRetentionLock() {
if (softDeleteEnabled) {
return softDeletesPolicy.acquireRetentionLock();
final Closeable softDeletesRetentionLock = softDeletesPolicy.acquireRetentionLock();
final Closeable translogRetentionLock;
try {
translogRetentionLock = translog.acquireRetentionLock();
} catch (Exception e) {
try {
softDeletesRetentionLock.close();
} catch (IOException ioexception) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: make softDeletesRetentionLock Releasable so we won't have to catch here.

e.addSuppressed(ioexception);
}
throw e;
}
return () -> IOUtils.close(translogRetentionLock,softDeletesRetentionLock);
} else {
return translog.acquireRetentionLock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,9 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
// We must have everything above the local checkpoint in the commit
requiredSeqNoRangeStart =
Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
// If soft-deletes enabled, we need to transfer only operations after the local_checkpoint of the commit to have
// the same history on the target. However, with translog, we need to set this to 0 to create a translog roughly
// according to the retention policy on the target. Note that it will still filter out legacy operations without seqNo.
startingSeqNo = shard.indexSettings().isSoftDeleteEnabled() ? requiredSeqNoRangeStart : 0;
// We need to set this to 0 to create a translog roughly according to the retention policy on the target. Note that it will
// still filter out legacy operations without seqNo.
startingSeqNo = 0;
try {
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
sendFileResult = phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,10 @@ public synchronized void reset() {
stopTime = 0;
}

// for tests
public long getStartNanoTime() {
return startNanoTime;
}
}

public static class VerifyIndex extends Timer implements ToXContentFragment, Writeable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,10 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception {
assertThat(resyncRequest.getMaxSeenAutoIdTimestampOnPrimary(), equalTo(shard.getMaxSeenAutoIdTimestamp()));
}
if (syncNeeded && globalCheckPoint < numDocs - 1) {
if (shard.indexSettings.isSoftDeleteEnabled()) {
assertThat(resyncTask.getSkippedOperations(), equalTo(0));
assertThat(resyncTask.getResyncedOperations(), equalTo(resyncTask.getTotalOperations()));
assertThat(resyncTask.getTotalOperations(), equalTo(Math.toIntExact(numDocs - 1 - globalCheckPoint)));
} else {
int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included
assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps));
assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps));
assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs));
}
int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included
assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps));
assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps));
assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs));
} else {
assertThat(resyncTask.getSkippedOperations(), equalTo(0));
assertThat(resyncTask.getResyncedOperations(), equalTo(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
Expand Down Expand Up @@ -786,4 +788,55 @@ public void sendRequest(Transport.Connection connection, long requestId, String
assertHitCount(client().prepareSearch(indexName).get(), numDocs);
}
}

@TestLogging("org.elasticsearch.indices.recovery:TRACE")
public void testHistoryRetention() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this test a left-over?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this test motivates this change, in the sense that it fails on master but passes on this branch.

internalCluster().startNodes(3);

final String indexName = "test";
client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)).get();
ensureGreen(indexName);

// Perform some replicated operations so the replica isn't simply empty, because ops-based recovery isn't better in that case
final List<IndexRequestBuilder> requests = new ArrayList<>();
final int replicatedDocCount = scaledRandomIntBetween(25, 250);
while (requests.size() < replicatedDocCount) {
requests.add(client().prepareIndex(indexName, "_doc").setSource("{}", XContentType.JSON));
}
indexRandom(true, requests);
if (randomBoolean()) {
flush(indexName);
}

internalCluster().stopRandomNode(s -> true);
internalCluster().stopRandomNode(s -> true);

final long desyncNanoTime = System.nanoTime();
while (System.nanoTime() <= desyncNanoTime) {
// time passes
}

final int numNewDocs = scaledRandomIntBetween(25, 250);
for (int i = 0; i < numNewDocs; i++) {
client().prepareIndex(indexName, "_doc").setSource("{}", XContentType.JSON).setRefreshPolicy(RefreshPolicy.IMMEDIATE).get();
}
// Flush twice to update the safe commit's local checkpoint
assertThat(client().admin().indices().prepareFlush(indexName).setForce(true).execute().get().getFailedShards(), equalTo(0));
assertThat(client().admin().indices().prepareFlush(indexName).setForce(true).execute().get().getFailedShards(), equalTo(0));

assertAcked(client().admin().indices().prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)));
internalCluster().startNode();
ensureGreen(indexName);

final RecoveryResponse recoveryResponse = client().admin().indices().recoveries(new RecoveryRequest(indexName)).get();
final List<RecoveryState> recoveryStates = recoveryResponse.shardRecoveryStates().get(indexName);
recoveryStates.removeIf(r -> r.getTimer().getStartNanoTime() <= desyncNanoTime);

assertThat(recoveryStates, hasSize(1));
assertThat(recoveryStates.get(0).getIndex().totalFileCount(), is(0));
assertThat(recoveryStates.get(0).getTranslog().recoveredOperations(), greaterThan(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ public void testTranslogHistoryTransferred() throws Exception {
shards.addReplica();
shards.startAll();
final IndexShard replica = shards.getReplicas().get(0);
boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled();
assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? moreDocs : docs + moreDocs));
assertThat(getTranslog(replica).totalOperations(), equalTo(docs + moreDocs));
shards.assertAllEqual(docs + moreDocs);
}
}
Expand Down Expand Up @@ -282,8 +281,7 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception {
shards.recoverReplica(newReplica);
// file based recovery should be made
assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty()));
boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled();
assertThat(getTranslog(newReplica).totalOperations(), equalTo(softDeletesEnabled ? nonFlushedDocs : numDocs));
assertThat(getTranslog(newReplica).totalOperations(), equalTo(numDocs));

// history uuid was restored
assertThat(newReplica.getHistoryUUID(), equalTo(historyUUID));
Expand Down Expand Up @@ -387,8 +385,7 @@ public void testShouldFlushAfterPeerRecovery() throws Exception {
shards.recoverReplica(replica);
// Make sure the flushing will eventually be completed (eg. `shouldPeriodicallyFlush` is false)
assertBusy(() -> assertThat(getEngine(replica).shouldPeriodicallyFlush(), equalTo(false)));
boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled();
assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? 0 : numDocs));
assertThat(getTranslog(replica).totalOperations(), equalTo(numDocs));
shards.assertAllEqual(numDocs);
}
}
Expand Down