From c8d2bf90d27c828317b4d73b8b8e0a1e1cb4e69f Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 14 Feb 2019 12:14:04 +0000 Subject: [PATCH 1/4] Recover peers from translog, ignoring soft deletes Today if soft deletes are enabled then we read the operations needed for peer recovery from Lucene. However we do not currently make any attempt to retain history in Lucene specifically for peer recoveries so we may discard it and fall back to a more expensive file-based recovery. Yet we still retain sufficient history in the translog to perform an operations-based peer recovery. In the long run we would like to fix this by retaining more history in Lucene, possibly using shard history retention leases (#37165). For now, however, this commit reverts to performing peer recoveries using the history retained in the translog regardless of whether soft deletes are enabled or not. --- .../elasticsearch/index/engine/Engine.java | 2 +- .../index/engine/InternalEngine.java | 36 ++++++------- .../recovery/RecoverySourceHandler.java | 7 ++- .../indices/recovery/RecoveryState.java | 4 ++ .../shard/PrimaryReplicaSyncerTests.java | 14 ++--- .../indices/recovery/IndexRecoveryIT.java | 53 +++++++++++++++++++ .../indices/recovery/RecoveryTests.java | 9 ++-- 7 files changed, 84 insertions(+), 41 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index bac85413a7a92..56d8c6bab6184 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -767,7 +767,7 @@ public abstract int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException; /** - * Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog) + * Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its translog */ public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException; diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 66e0d30f164f1..7df1f03ae2b43 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -502,16 +502,11 @@ public void syncTranslog() throws IOException { } /** - * Creates a new history snapshot for reading operations since the provided seqno. - * The returned snapshot can be retrieved from either Lucene index or translog files. + * Creates a new history snapshot for reading operations since the provided seqno from the translog. */ @Override public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException { - if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { - return newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false); - } else { - return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo); - } + return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo); } /** @@ -2546,21 +2541,17 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS @Override public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException { - if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { - return getMinRetainedSeqNo() <= startingSeqNo; - } else { - final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint(); - final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); - try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { - Translog.Operation operation; - while ((operation = snapshot.next()) != null) { - if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { - tracker.markSeqNoAsCompleted(operation.seqNo()); - } + final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint(); + final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); + try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { + tracker.markSeqNoAsCompleted(operation.seqNo()); } } - return tracker.getCheckpoint() >= currentLocalCheckpoint; } + return tracker.getCheckpoint() >= currentLocalCheckpoint; } /** @@ -2575,7 +2566,12 @@ public final long getMinRetainedSeqNo() { @Override public Closeable acquireRetentionLock() { if (softDeleteEnabled) { - return softDeletesPolicy.acquireRetentionLock(); + final Closeable softDeletesRetentionLock = softDeletesPolicy.acquireRetentionLock(); + final Closeable translogRetentionLock = translog.acquireRetentionLock(); + return () -> { + softDeletesRetentionLock.close(); + translogRetentionLock.close(); + }; } else { return translog.acquireRetentionLock(); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 40e1a88a349c9..e7a8fbfb523a1 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -177,10 +177,9 @@ public void recoverToTarget(ActionListener listener) { // We must have everything above the local checkpoint in the commit requiredSeqNoRangeStart = Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; - // If soft-deletes enabled, we need to transfer only operations after the local_checkpoint of the commit to have - // the same history on the target. However, with translog, we need to set this to 0 to create a translog roughly - // according to the retention policy on the target. Note that it will still filter out legacy operations without seqNo. - startingSeqNo = shard.indexSettings().isSoftDeleteEnabled() ? requiredSeqNoRangeStart : 0; + // We need to set this to 0 to create a translog roughly according to the retention policy on the target. Note that it will + // still filter out legacy operations without seqNo. + startingSeqNo = 0; try { final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo); sendFileResult = phase1(phase1Snapshot.getIndexCommit(), () -> estimateNumOps); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java index 1fed238f8ddf6..4d27362af22b5 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java @@ -417,6 +417,10 @@ public synchronized void reset() { stopTime = 0; } + // for tests + public long getStartNanoTime() { + return startNanoTime; + } } public static class VerifyIndex extends Timer implements ToXContentFragment, Writeable { diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index d074ef3375833..85e381b176ccb 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -115,16 +115,10 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { assertThat(resyncRequest.getMaxSeenAutoIdTimestampOnPrimary(), equalTo(shard.getMaxSeenAutoIdTimestamp())); } if (syncNeeded && globalCheckPoint < numDocs - 1) { - if (shard.indexSettings.isSoftDeleteEnabled()) { - assertThat(resyncTask.getSkippedOperations(), equalTo(0)); - assertThat(resyncTask.getResyncedOperations(), equalTo(resyncTask.getTotalOperations())); - assertThat(resyncTask.getTotalOperations(), equalTo(Math.toIntExact(numDocs - 1 - globalCheckPoint))); - } else { - int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included - assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps)); - assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps)); - assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs)); - } + int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included + assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps)); + assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps)); + assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs)); } else { assertThat(resyncTask.getSkippedOperations(), equalTo(0)); assertThat(resyncTask.getResyncedOperations(), equalTo(0)); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index ea15eceb8be84..ea3e933a88314 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -26,11 +26,13 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.RecoverySource; @@ -786,4 +788,55 @@ public void sendRequest(Transport.Connection connection, long requestId, String assertHitCount(client().prepareSearch(indexName).get(), numDocs); } } + + @TestLogging("org.elasticsearch.indices.recovery:TRACE") + public void testHistoryRetention() throws Exception { + internalCluster().startNodes(3); + + final String indexName = "test"; + client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)).get(); + ensureGreen(indexName); + + // Perform some replicated operations so the replica isn't simply empty, because ops-based recovery isn't better in that case + final List requests = new ArrayList<>(); + final int replicatedDocCount = scaledRandomIntBetween(25, 250); + while (requests.size() < replicatedDocCount) { + requests.add(client().prepareIndex(indexName, "_doc").setSource("{}", XContentType.JSON)); + } + indexRandom(true, requests); + if (randomBoolean()) { + flush(indexName); + } + + internalCluster().stopRandomNode(s -> true); + internalCluster().stopRandomNode(s -> true); + + final long desyncNanoTime = System.nanoTime(); + while (System.nanoTime() <= desyncNanoTime) { + // time passes + } + + final int numNewDocs = scaledRandomIntBetween(25, 250); + for (int i = 0; i < numNewDocs; i++) { + client().prepareIndex(indexName, "_doc").setSource("{}", XContentType.JSON).setRefreshPolicy(RefreshPolicy.IMMEDIATE).get(); + } + // Flush twice to update the safe commit's local checkpoint + assertThat(client().admin().indices().prepareFlush(indexName).setForce(true).execute().get().getFailedShards(), equalTo(0)); + assertThat(client().admin().indices().prepareFlush(indexName).setForce(true).execute().get().getFailedShards(), equalTo(0)); + + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1))); + internalCluster().startNode(); + ensureGreen(indexName); + + final RecoveryResponse recoveryResponse = client().admin().indices().recoveries(new RecoveryRequest(indexName)).get(); + final List recoveryStates = recoveryResponse.shardRecoveryStates().get(indexName); + recoveryStates.removeIf(r -> r.getTimer().getStartNanoTime() <= desyncNanoTime); + + assertThat(recoveryStates, hasSize(1)); + assertThat(recoveryStates.get(0).getIndex().totalFileCount(), is(0)); + assertThat(recoveryStates.get(0).getTranslog().recoveredOperations(), greaterThan(0)); + } } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 48061b11d58c7..2761333ef5628 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -68,8 +68,7 @@ public void testTranslogHistoryTransferred() throws Exception { shards.addReplica(); shards.startAll(); final IndexShard replica = shards.getReplicas().get(0); - boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled(); - assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? moreDocs : docs + moreDocs)); + assertThat(getTranslog(replica).totalOperations(), equalTo(docs + moreDocs)); shards.assertAllEqual(docs + moreDocs); } } @@ -282,8 +281,7 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception { shards.recoverReplica(newReplica); // file based recovery should be made assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty())); - boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled(); - assertThat(getTranslog(newReplica).totalOperations(), equalTo(softDeletesEnabled ? nonFlushedDocs : numDocs)); + assertThat(getTranslog(newReplica).totalOperations(), equalTo(numDocs)); // history uuid was restored assertThat(newReplica.getHistoryUUID(), equalTo(historyUUID)); @@ -387,8 +385,7 @@ public void testShouldFlushAfterPeerRecovery() throws Exception { shards.recoverReplica(replica); // Make sure the flushing will eventually be completed (eg. `shouldPeriodicallyFlush` is false) assertBusy(() -> assertThat(getEngine(replica).shouldPeriodicallyFlush(), equalTo(false))); - boolean softDeletesEnabled = replica.indexSettings().isSoftDeleteEnabled(); - assertThat(getTranslog(replica).totalOperations(), equalTo(softDeletesEnabled ? 0 : numDocs)); + assertThat(getTranslog(replica).totalOperations(), equalTo(numDocs)); shards.assertAllEqual(numDocs); } } From 7b6c2a0deb280e354d4a93b72ef2eff317995050 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 14 Feb 2019 16:14:18 +0000 Subject: [PATCH 2/4] Deal with exceptions when obtaining both retention locks --- .../index/engine/InternalEngine.java | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 7df1f03ae2b43..5a75ea5fd2590 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2567,10 +2567,23 @@ public final long getMinRetainedSeqNo() { public Closeable acquireRetentionLock() { if (softDeleteEnabled) { final Closeable softDeletesRetentionLock = softDeletesPolicy.acquireRetentionLock(); - final Closeable translogRetentionLock = translog.acquireRetentionLock(); + final Closeable translogRetentionLock; + try { + translogRetentionLock = translog.acquireRetentionLock(); + } catch (Exception e) { + try { + softDeletesRetentionLock.close(); + } catch (IOException ioexception) { + e.addSuppressed(ioexception); + } + throw e; + } return () -> { - softDeletesRetentionLock.close(); - translogRetentionLock.close(); + try { + translogRetentionLock.close(); + } finally { + softDeletesRetentionLock.close(); + } }; } else { return translog.acquireRetentionLock(); From 53d63f6a92b219ed659d301a43dc4175aaaa734f Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 14 Feb 2019 16:24:18 +0000 Subject: [PATCH 3/4] IOUtils.close --- .../org/elasticsearch/index/engine/InternalEngine.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 5a75ea5fd2590..8224294262fd4 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2578,13 +2578,7 @@ public Closeable acquireRetentionLock() { } throw e; } - return () -> { - try { - translogRetentionLock.close(); - } finally { - softDeletesRetentionLock.close(); - } - }; + return () -> IOUtils.close(translogRetentionLock,softDeletesRetentionLock); } else { return translog.acquireRetentionLock(); } From 755ce5ac285c465bc75c31697348fde2c3c38e40 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 14 Feb 2019 17:11:59 +0000 Subject: [PATCH 4/4] Neater to be Releasable --- .../org/elasticsearch/index/engine/InternalEngine.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 8224294262fd4..832df83fe0f5c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2566,19 +2566,15 @@ public final long getMinRetainedSeqNo() { @Override public Closeable acquireRetentionLock() { if (softDeleteEnabled) { - final Closeable softDeletesRetentionLock = softDeletesPolicy.acquireRetentionLock(); + final Releasable softDeletesRetentionLock = softDeletesPolicy.acquireRetentionLock(); final Closeable translogRetentionLock; try { translogRetentionLock = translog.acquireRetentionLock(); } catch (Exception e) { - try { - softDeletesRetentionLock.close(); - } catch (IOException ioexception) { - e.addSuppressed(ioexception); - } + softDeletesRetentionLock.close(); throw e; } - return () -> IOUtils.close(translogRetentionLock,softDeletesRetentionLock); + return () -> IOUtils.close(translogRetentionLock, softDeletesRetentionLock); } else { return translog.acquireRetentionLock(); }