diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index bac85413a7a92..56d8c6bab6184 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -767,7 +767,7 @@ public abstract int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException; /** - * Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog) + * Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its translog */ public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException; diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 66e0d30f164f1..832df83fe0f5c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -502,16 +502,11 @@ public void syncTranslog() throws IOException { } /** - * Creates a new history snapshot for reading operations since the provided seqno. - * The returned snapshot can be retrieved from either Lucene index or translog files. + * Creates a new history snapshot for reading operations since the provided seqno from the translog. */ @Override public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException { - if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { - return newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false); - } else { - return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo); - } + return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo); } /** @@ -2546,21 +2541,17 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS @Override public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException { - if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { - return getMinRetainedSeqNo() <= startingSeqNo; - } else { - final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint(); - final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); - try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { - Translog.Operation operation; - while ((operation = snapshot.next()) != null) { - if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { - tracker.markSeqNoAsCompleted(operation.seqNo()); - } + final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint(); + final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); + try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { + tracker.markSeqNoAsCompleted(operation.seqNo()); } } - return tracker.getCheckpoint() >= currentLocalCheckpoint; } + return tracker.getCheckpoint() >= currentLocalCheckpoint; } /** @@ -2575,7 +2566,15 @@ public final long getMinRetainedSeqNo() { @Override public Closeable acquireRetentionLock() { if (softDeleteEnabled) { - return softDeletesPolicy.acquireRetentionLock(); + final Releasable softDeletesRetentionLock = softDeletesPolicy.acquireRetentionLock(); + final Closeable translogRetentionLock; + try { + translogRetentionLock = translog.acquireRetentionLock(); + } catch (Exception e) { + softDeletesRetentionLock.close(); + throw e; + } + return () -> IOUtils.close(translogRetentionLock, softDeletesRetentionLock); } else { return translog.acquireRetentionLock(); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 40e1a88a349c9..e7a8fbfb523a1 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -177,10 +177,9 @@ public void recoverToTarget(ActionListener listener) { // We must have everything above the local checkpoint in the commit requiredSeqNoRangeStart = Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; - // If soft-deletes enabled, we need to transfer only operations after the local_checkpoint of the commit to have - // the same history on the target. However, with translog, we need to set this to 0 to create a translog roughly - // according to the retention policy on the target. Note that it will still filter out legacy operations without seqNo. - startingSeqNo = shard.indexSettings().isSoftDeleteEnabled() ? requiredSeqNoRangeStart : 0; + // We need to set this to 0 to create a translog roughly according to the retention policy on the target. Note that it will + // still filter out legacy operations without seqNo. + startingSeqNo = 0; try { final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo); sendFileResult = phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java index 1fed238f8ddf6..4d27362af22b5 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java @@ -417,6 +417,10 @@ public synchronized void reset() { stopTime = 0; } + // for tests + public long getStartNanoTime() { + return startNanoTime; + } } public static class VerifyIndex extends Timer implements ToXContentFragment, Writeable { diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index d074ef3375833..85e381b176ccb 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -115,16 +115,10 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { assertThat(resyncRequest.getMaxSeenAutoIdTimestampOnPrimary(), equalTo(shard.getMaxSeenAutoIdTimestamp())); } if (syncNeeded && globalCheckPoint < numDocs - 1) { - if (shard.indexSettings.isSoftDeleteEnabled()) { - assertThat(resyncTask.getSkippedOperations(), equalTo(0)); - assertThat(resyncTask.getResyncedOperations(), equalTo(resyncTask.getTotalOperations())); - assertThat(resyncTask.getTotalOperations(), equalTo(Math.toIntExact(numDocs - 1 - globalCheckPoint))); - } else { - int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included - assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps)); - assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps)); - assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs)); - } + int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included + assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps)); + assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps)); + assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs)); } else { assertThat(resyncTask.getSkippedOperations(), equalTo(0)); assertThat(resyncTask.getResyncedOperations(), equalTo(0)); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index ea15eceb8be84..ea3e933a88314 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -26,11 +26,13 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.RecoverySource; @@ -786,4 +788,55 @@ public void sendRequest(Transport.Connection connection, long requestId, String assertHitCount(client().prepareSearch(indexName).get(), numDocs); } } + + @TestLogging("org.elasticsearch.indices.recovery:TRACE") + public void testHistoryRetention() throws Exception { + internalCluster().startNodes(3); + + final String indexName = "test"; + client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)).get(); + ensureGreen(indexName); + + // Perform some replicated operations so the replica isn't simply empty, because ops-based recovery isn't better in that case + final List requests = new ArrayList<>(); + final int replicatedDocCount = scaledRandomIntBetween(25, 250); + while (requests.size() < replicatedDocCount) { + requests.add(client().prepareIndex(indexName, "_doc").setSource("{}", XContentType.JSON)); + } + indexRandom(true, requests); + if (randomBoolean()) { + flush(indexName); + } + + internalCluster().stopRandomNode(s -> true); + internalCluster().stopRandomNode(s -> true); + + final long desyncNanoTime = System.nanoTime(); + while (System.nanoTime() <= desyncNanoTime) { + // time passes + } + + final int numNewDocs = scaledRandomIntBetween(25, 250); + for (int i = 0; i < numNewDocs; i++) { + client().prepareIndex(indexName, "_doc").setSource("{}", XContentType.JSON).setRefreshPolicy(RefreshPolicy.IMMEDIATE).get(); + } + // Flush twice to update the safe commit's local checkpoint + assertThat(client().admin().indices().prepareFlush(indexName).setForce(true).execute().get().getFailedShards(), equalTo(0)); + assertThat(client().admin().indices().prepareFlush(indexName).setForce(true).execute().get().getFailedShards(), equalTo(0)); + + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1))); + internalCluster().startNode(); + ensureGreen(indexName); + + final RecoveryResponse recoveryResponse = client().admin().indices().recoveries(new RecoveryRequest(indexName)).get(); + final List recoveryStates = recoveryResponse.shardRecoveryStates().get(indexName); + recoveryStates.removeIf(r -> r.getTimer().getStartNanoTime() <= desyncNanoTime); + + assertThat(recoveryStates, hasSize(1)); + assertThat(recoveryStates.get(0).getIndex().totalFileCount(), is(0)); + assertThat(recoveryStates.get(0).getTranslog().recoveredOperations(), greaterThan(0)); + } } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 48061b11d58c7..2761333ef5628 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -68,8 +68,7 @@ public void testTranslogHistoryTransferred() throws Exception { shards.addReplica(); shards.startAll(); final IndexShard replica = shards.getReplicas().get(0); - boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled(); - assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? moreDocs : docs + moreDocs)); + assertThat(getTranslog(replica).totalOperations(), equalTo(docs + moreDocs)); shards.assertAllEqual(docs + moreDocs); } } @@ -282,8 +281,7 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception { shards.recoverReplica(newReplica); // file based recovery should be made assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty())); - boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled(); - assertThat(getTranslog(newReplica).totalOperations(), equalTo(softDeletesEnabled ? nonFlushedDocs : numDocs)); + assertThat(getTranslog(newReplica).totalOperations(), equalTo(numDocs)); // history uuid was restored assertThat(newReplica.getHistoryUUID(), equalTo(historyUUID)); @@ -387,8 +385,7 @@ public void testShouldFlushAfterPeerRecovery() throws Exception { shards.recoverReplica(replica); // Make sure the flushing will eventually be completed (eg. `shouldPeriodicallyFlush` is false) assertBusy(() -> assertThat(getEngine(replica).shouldPeriodicallyFlush(), equalTo(false))); - boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled(); - assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? 0 : numDocs)); + assertThat(getTranslog(replica).totalOperations(), equalTo(numDocs)); shards.assertAllEqual(numDocs); } }