Skip to content

Commit

Permalink
Do not wait for advancement of checkpoint in recovery (#39006)
Browse files Browse the repository at this point in the history
With this change, we won't wait for the local checkpoint to advance to
the max_seq_no before starting phase2 of peer-recovery. We also remove
the sequence number range check in peer-recovery. We can safely do these
thanks to Yannick's finding.

The replication group to be used is currently sampled after indexing
into the primary (see `ReplicationOperation` class). This means that
when initiating tracking of a new replica, we have to consider the
following two cases:

- There are operations for which the replication group has not been
sampled yet. As we initiated the new replica as tracking, we know that
those operations will be replicated to the new replica and follow the
typical replication group semantics (e.g. marked as stale when
unavailable).

- There are operations for which the replication group has already been
sampled. These operations will not be sent to the new replica.  However,
we know that those operations are already indexed into Lucene and the
translog on the primary, as the sampling is happening after that. This
means that by taking a snapshot of Lucene or the translog, we will be
getting those ops as well. What we cannot guarantee anymore is that all
ops up to `endingSeqNo` are available in the snapshot (i.e.  also see
comment in `RecoverySourceHandler` saying `We need to wait for all
operations up to the current max to complete, otherwise we can not
guarantee that all operations in the required range will be available
for replaying from the translog of the source.`). This is not needed,
though, as we can no longer guarantee that max seq no == local
checkpoint.

Relates #39000
Closes #38949

Co-authored-by: Yannick Welsch <yannick@welsch.lu>
  • Loading branch information
dnhatn and ywelsch committed Feb 25, 2019
1 parent e8aa85a commit e851fa8
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -808,14 +808,6 @@ public final CommitStats commitStats() {
*/
public abstract long getLocalCheckpoint();

/**
* Waits for all operations up to the provided sequence number to complete.
*
* @param seqNo the sequence number that the checkpoint must advance to before this method returns
* @throws InterruptedException if the thread was interrupted while blocking on the condition
*/
public abstract void waitForOpsToComplete(long seqNo) throws InterruptedException;

/**
* @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2424,11 +2424,6 @@ public long getLocalCheckpoint() {
return localCheckpointTracker.getCheckpoint();
}

@Override
public void waitForOpsToComplete(long seqNo) throws InterruptedException {
localCheckpointTracker.waitForOpsToComplete(seqNo);
}

/**
* Marks the given seq_no as seen and advances the max_seq_no of this engine to at least that value.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,6 @@ public long getLocalCheckpoint() {
return seqNoStats.getLocalCheckpoint();
}

@Override
public void waitForOpsToComplete(long seqNo) {
}

@Override
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
return new SeqNoStats(seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), globalCheckpoint);
Expand Down
10 changes: 0 additions & 10 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2042,16 +2042,6 @@ public void syncRetentionLeases() {
}
}

/**
* Waits for all operations up to the provided sequence number to complete.
*
* @param seqNo the sequence number that the checkpoint must advance to before this method returns
* @throws InterruptedException if the thread was interrupted while blocking on the condition
*/
public void waitForOpsToComplete(final long seqNo) throws InterruptedException {
getEngine().waitForOpsToComplete(seqNo);
}

/**
* Called when the recovery process for a shard has opened the engine on the target shard. Ensures that the right data structures
* have been set up locally to track local checkpoint information for the shard and that the shard is added to the replication group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,12 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
final Closeable retentionLock = shard.acquireRetentionLock();
resources.add(retentionLock);
final long startingSeqNo;
final long requiredSeqNoRangeStart;
final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO &&
isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo());
final SendFileResult sendFileResult;
if (isSequenceNumberBasedRecovery) {
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
startingSeqNo = request.startingSeqNo();
requiredSeqNoRangeStart = startingSeqNo;
sendFileResult = SendFileResult.EMPTY;
} else {
final Engine.IndexCommitRef phase1Snapshot;
Expand All @@ -174,9 +172,6 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
}
// We must have everything above the local checkpoint in the commit
requiredSeqNoRangeStart =
Long.parseLong(phase1Snapshot.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
// We need to set this to 0 to create a translog roughly according to the retention policy on the target. Note that it will
// still filter out legacy operations without seqNo.
startingSeqNo = 0;
Expand All @@ -194,8 +189,6 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
}
}
assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo;
assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than ["
+ startingSeqNo + "]";

final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
// For a sequence based recovery, the target can keep its local translog
Expand All @@ -213,13 +206,7 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger);

final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
/*
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
* operations in the required range will be available for replaying from the translog of the source.
*/
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));
if (logger.isTraceEnabled()) {
logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo);
logger.trace("snapshot translog for recovery; current size is [{}]",
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
}
Expand All @@ -232,15 +219,8 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
final RetentionLeases retentionLeases = shard.getRetentionLeases();
phase2(
startingSeqNo,
requiredSeqNoRangeStart,
endingSeqNo,
phase2Snapshot,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
sendSnapshotStep);
phase2(startingSeqNo, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
retentionLeases, sendSnapshotStep);
sendSnapshotStep.whenComplete(
r -> IOUtils.close(phase2Snapshot),
e -> {
Expand Down Expand Up @@ -518,7 +498,6 @@ void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, A
*
* @param startingSeqNo the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all
* ops should be sent
* @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo)
* @param endingSeqNo the highest sequence number that should be sent
* @param snapshot a snapshot of the translog
* @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary
Expand All @@ -527,26 +506,19 @@ void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, A
*/
void phase2(
final long startingSeqNo,
final long requiredSeqNoRangeStart,
final long endingSeqNo,
final Translog.Snapshot snapshot,
final long maxSeenAutoIdTimestamp,
final long maxSeqNoOfUpdatesOrDeletes,
final RetentionLeases retentionLeases,
final ActionListener<SendSnapshotResult> listener) throws IOException {
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
assert startingSeqNo <= requiredSeqNoRangeStart :
"startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart;
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " +
"required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]");
logger.trace("recovery [phase2]: sending transaction log operations (from [" + startingSeqNo + "] to [" + endingSeqNo + "]");

final AtomicInteger skippedOps = new AtomicInteger();
final AtomicInteger totalSentOps = new AtomicInteger();
final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1);
final AtomicInteger lastBatchCount = new AtomicInteger(); // used to estimate the count of the subsequent batch.
final CheckedSupplier<List<Translog.Operation>, IOException> readNextBatch = () -> {
// We need to synchronized Snapshot#next() because it's called by different threads through sendBatch.
Expand All @@ -568,7 +540,6 @@ void phase2(
ops.add(operation);
batchSizeInBytes += operation.estimateSize();
totalSentOps.incrementAndGet();
requiredOpsTracker.markSeqNoAsCompleted(seqNo);

// check if this request is past bytes threshold, and if so, send it off
if (batchSizeInBytes >= chunkSizeInBytes) {
Expand All @@ -586,11 +557,6 @@ void phase2(
assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps.get() + totalSentOps.get()
: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps.get(), totalSentOps.get());
if (requiredOpsTracker.getCheckpoint() < endingSeqNo) {
throw new IllegalStateException("translog replay failed to cover required sequence numbers" +
" (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is ["
+ (requiredOpsTracker.getCheckpoint() + 1) + "]");
}
stopWatch.stop();
final TimeValue tookTime = stopWatch.totalTime();
logger.trace("recovery [phase2]: took [{}]", tookTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.IntSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.zip.CRC32;

import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -231,8 +230,7 @@ public void testSendSnapshotSendsOps() throws IOException {
operations.add(new Translog.Index(index, new Engine.IndexResult(1, 1, i - initialNumberOfDocs, true)));
}
final long startingSeqNo = randomIntBetween(0, numberOfDocsWithValidSequenceNumbers - 1);
final long requiredStartingSeqNo = randomIntBetween((int) startingSeqNo, numberOfDocsWithValidSequenceNumbers - 1);
final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo - 1, numberOfDocsWithValidSequenceNumbers - 1);
final long endingSeqNo = randomLongBetween(startingSeqNo, numberOfDocsWithValidSequenceNumbers - 1);

final List<Translog.Operation> shippedOps = new ArrayList<>();
final AtomicLong checkpointOnTarget = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Expand All @@ -247,7 +245,7 @@ public void indexTranslogOperations(List<Translog.Operation> operations, int tot
};
RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10));
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> future = new PlainActionFuture<>();
handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()),
handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()),
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future);
final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1);
RecoverySourceHandler.SendSnapshotResult result = future.actionGet();
Expand All @@ -258,18 +256,6 @@ public void indexTranslogOperations(List<Translog.Operation> operations, int tot
assertThat(shippedOps.get(i), equalTo(operations.get(i + (int) startingSeqNo + initialNumberOfDocs)));
}
assertThat(result.targetLocalCheckpoint, equalTo(checkpointOnTarget.get()));
if (endingSeqNo >= requiredStartingSeqNo + 1) {
// check that missing ops blows up
List<Translog.Operation> requiredOps = operations.subList(0, operations.size() - 1).stream() // remove last null marker
.filter(o -> o.seqNo() >= requiredStartingSeqNo && o.seqNo() <= endingSeqNo).collect(Collectors.toList());
List<Translog.Operation> opsToSkip = randomSubsetOf(randomIntBetween(1, requiredOps.size()), requiredOps);
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> failedFuture = new PlainActionFuture<>();
expectThrows(IllegalStateException.class, () -> {
handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, opsToSkip),
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, failedFuture);
failedFuture.actionGet();
});
}
}

public void testSendSnapshotStopOnError() throws Exception {
Expand Down Expand Up @@ -299,7 +285,7 @@ public void indexTranslogOperations(List<Translog.Operation> operations, int tot
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> future = new PlainActionFuture<>();
final long startingSeqNo = randomLongBetween(0, ops.size() - 1L);
final long endingSeqNo = randomLongBetween(startingSeqNo, ops.size() - 1L);
handler.phase2(startingSeqNo, startingSeqNo, endingSeqNo, newTranslogSnapshot(ops, Collections.emptyList()),
handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(ops, Collections.emptyList()),
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future);
if (wasFailed.get()) {
assertThat(expectThrows(RuntimeException.class, () -> future.actionGet()).getMessage(), equalTo("test - failed to index"));
Expand Down Expand Up @@ -498,11 +484,11 @@ void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, A
}

@Override
void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot,
void phase2(long startingSeqNo, long endingSeqNo, Translog.Snapshot snapshot,
long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes, RetentionLeases retentionLeases,
ActionListener<SendSnapshotResult> listener) throws IOException {
phase2Called.set(true);
super.phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot,
super.phase2(startingSeqNo, endingSeqNo, snapshot,
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,16 @@ public static Translog getTranslog(Engine engine) {
return internalEngine.getTranslog();
}

/**
* Waits for all operations up to the provided sequence number to complete in the given internal engine.
*
* @param seqNo the sequence number that the checkpoint must advance to before this method returns
* @throws InterruptedException if the thread was interrupted while blocking on the condition
*/
public static void waitForOpsToComplete(InternalEngine engine, long seqNo) throws InterruptedException {
engine.getLocalCheckpointTracker().waitForOpsToComplete(seqNo);
}

public static boolean hasSnapshottedCommits(Engine engine) {
assert engine instanceof InternalEngine : "only InternalEngines have snapshotted commits, got: " + engine.getClass();
InternalEngine internalEngine = (InternalEngine) engine;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@

public class IndexFollowingIT extends CcrIntegTestCase {

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38949")
public void testFollowIndex() throws Exception {
final int numberOfPrimaryShards = randomIntBetween(1, 3);
int numberOfReplicas = between(0, 1);
Expand Down Expand Up @@ -221,7 +220,6 @@ public void testFollowIndex() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38949")
public void testFollowIndexWithConcurrentMappingChanges() throws Exception {
final int numberOfPrimaryShards = randomIntBetween(1, 3);
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
Expand Down
Loading

0 comments on commit e851fa8

Please sign in to comment.