Skip to content

Commit

Permalink
Use global checkpoint as starting seq in ops-based recovery (#43463)
Browse files Browse the repository at this point in the history
Today we use the local checkpoint of the safe commit on replicas as the
starting sequence number of operation-based peer recovery. While this is
a good choice due to its simplicity, we need to share this information
between copies if we use retention leases in peer recovery. We can avoid
this extra work if we use the global checkpoint as the starting sequence
number.

With this change, we will try to recover replica locally up to the
global checkpoint before performing peer recovery. This commit should
also increase the chance of operation-based recovery.
  • Loading branch information
dnhatn authored Jul 23, 2019
1 parent 69c94f4 commit d15684d
Show file tree
Hide file tree
Showing 9 changed files with 343 additions and 205 deletions.
123 changes: 106 additions & 17 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -1359,6 +1360,81 @@ public void prepareForIndexRecovery() {
assert currentEngineReference.get() == null;
}

/**
* A best effort to bring up this shard to the global checkpoint using the local translog before performing a peer recovery.
*
* @return a sequence number that an operation-based peer recovery can start with.
* This is the first operation after the local checkpoint of the safe commit if exists.
*/
public long recoverLocallyUpToGlobalCheckpoint() {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
assert recoveryState.getStage() == RecoveryState.Stage.INDEX : "unexpected recovery stage [" + recoveryState.getStage() + "]";
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
final Optional<SequenceNumbers.CommitInfo> safeCommit;
final long globalCheckpoint;
try {
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
safeCommit = store.findSafeIndexCommit(globalCheckpoint);
} catch (org.apache.lucene.index.IndexNotFoundException e) {
logger.trace("skip local recovery as no index commit found");
return UNASSIGNED_SEQ_NO;
} catch (Exception e) {
logger.debug("skip local recovery as failed to find the safe commit", e);
return UNASSIGNED_SEQ_NO;
}
if (safeCommit.isPresent() == false) {
logger.trace("skip local recovery as no safe commit found");
return UNASSIGNED_SEQ_NO;
}
assert safeCommit.get().localCheckpoint <= globalCheckpoint : safeCommit.get().localCheckpoint + " > " + globalCheckpoint;
try {
maybeCheckIndex(); // check index here and won't do it again if ops-based recovery occurs
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
if (safeCommit.get().localCheckpoint == globalCheckpoint) {
logger.trace("skip local recovery as the safe commit is up to date; safe commit {} global checkpoint {}",
safeCommit.get(), globalCheckpoint);
recoveryState.getTranslog().totalLocal(0);
return globalCheckpoint + 1;
}
try {
final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> {
recoveryState.getTranslog().totalLocal(snapshot.totalOperations());
final int recoveredOps = runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY,
recoveryState.getTranslog()::incrementRecoveredOperations);
recoveryState.getTranslog().totalLocal(recoveredOps); // adjust the total local to reflect the actual count
return recoveredOps;
};
innerOpenEngineAndTranslog();
getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint);
logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint));
} finally {
synchronized (mutex) {
IOUtils.close(currentEngineReference.getAndSet(null));
}
}
} catch (Exception e) {
logger.debug(new ParameterizedMessage("failed to recover shard locally up to global checkpoint {}", globalCheckpoint), e);
return UNASSIGNED_SEQ_NO;
}
try {
// we need to find the safe commit again as we should have created a new one during the local recovery
final Optional<SequenceNumbers.CommitInfo> newSafeCommit = store.findSafeIndexCommit(globalCheckpoint);
assert newSafeCommit.isPresent() : "no safe commit found after local recovery";
return newSafeCommit.get().localCheckpoint + 1;
} catch (Exception e) {
if (Assertions.ENABLED) {
throw new AssertionError(
"failed to find the safe commit after recovering shard locally up to global checkpoint " + globalCheckpoint, e);
}
logger.debug(new ParameterizedMessage(
"failed to find the safe commit after recovering shard locally up to global checkpoint {}", globalCheckpoint), e);
return UNASSIGNED_SEQ_NO;
}
}

public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) {
getEngine().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo);
}
Expand Down Expand Up @@ -1462,6 +1538,9 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operat
* Operations from the translog will be replayed to bring lucene up to date.
**/
public void openEngineAndRecoverFromTranslog() throws IOException {
assert recoveryState.getStage() == RecoveryState.Stage.INDEX : "unexpected recovery stage [" + recoveryState.getStage() + "]";
maybeCheckIndex();
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog();
final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> {
translogRecoveryStats.totalOperations(snapshot.totalOperations());
Expand All @@ -1478,6 +1557,8 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
* The translog is kept but its operations won't be replayed.
*/
public void openEngineAndSkipTranslogRecovery() throws IOException {
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryState.getStage() + "]";
innerOpenEngineAndTranslog();
getEngine().skipTranslogRecovery();
}
Expand All @@ -1486,17 +1567,6 @@ private void innerOpenEngineAndTranslog() throws IOException {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
// also check here, before we apply the translog
if (Booleans.isTrue(checkIndexOnStartup) || "checksum".equals(checkIndexOnStartup)) {
try {
checkIndex();
} catch (IOException ex) {
throw new RecoveryFailedException(recoveryState, "check index failed", ex);
}
}
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);

final EngineConfig config = newEngineConfig();

// we disable deletes since we allow for operations to be executed against the shard while recovering
Expand Down Expand Up @@ -1552,14 +1622,22 @@ private void onNewEngine(Engine newEngine) {
*/
public void performRecoveryRestart() throws IOException {
synchronized (mutex) {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners";
final Engine engine = this.currentEngineReference.getAndSet(null);
IOUtils.close(engine);
recoveryState().setStage(RecoveryState.Stage.INIT);
IOUtils.close(currentEngineReference.getAndSet(null));
resetRecoveryStage();
}
}

/**
* If a file-based recovery occurs, a recovery target calls this method to reset the recovery stage.
*/
public void resetRecoveryStage() {
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
assert currentEngineReference.get() == null;
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
recoveryState().setStage(RecoveryState.Stage.INIT);
}

/**
Expand Down Expand Up @@ -2296,6 +2374,17 @@ public void noopUpdate(String type) {
internalIndexingStats.noopUpdate(type);
}

public void maybeCheckIndex() {
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
if (Booleans.isTrue(checkIndexOnStartup) || "checksum".equals(checkIndexOnStartup)) {
try {
checkIndex();
} catch (IOException ex) {
throw new RecoveryFailedException(recoveryState, "check index failed", ex);
}
}
}

void checkIndex() throws IOException {
if (store.tryIncRef()) {
try {
Expand Down
17 changes: 17 additions & 0 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -1560,6 +1561,22 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long
}
}

/**
* Returns a {@link org.elasticsearch.index.seqno.SequenceNumbers.CommitInfo} of the safe commit if exists.
*/
public Optional<SequenceNumbers.CommitInfo> findSafeIndexCommit(long globalCheckpoint) throws IOException {
final List<IndexCommit> commits = DirectoryReader.listCommits(directory);
assert commits.isEmpty() == false : "no commit found";
final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commits, globalCheckpoint);
final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(safeCommit.getUserData().entrySet());
// all operations of the safe commit must be at most the global checkpoint.
if (commitInfo.maxSeqNo <= globalCheckpoint) {
return Optional.of(commitInfo);
} else {
return Optional.empty();
}
}

private static void updateCommitData(IndexWriter writer, Map<String, String> keysToUpdate) throws IOException {
final Map<String, String> userData = getUserData(writer);
userData.putAll(keysToUpdate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
Expand All @@ -44,18 +42,14 @@
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.CombinedDeletionPolicy;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -68,12 +62,11 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.List;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

/**
* The recovery target handles recoveries of peer shards of the shard+node to recover to.
Expand Down Expand Up @@ -178,9 +171,12 @@ private void doRecovery(final long recoveryId) {
cancellableThreads = recoveryTarget.cancellableThreads();
try {
assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
request = getStartRecoveryRequest(recoveryTarget);
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
recoveryTarget.indexShard().prepareForIndexRecovery();
final long startingSeqNo = recoveryTarget.indexShard().recoverLocallyUpToGlobalCheckpoint();
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG :
"unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
request = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo);
} catch (final Exception e) {
// this will be logged as warning later on...
logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e);
Expand Down Expand Up @@ -319,7 +315,7 @@ public RecoveryResponse read(StreamInput in) throws IOException {
* @param recoveryTarget the target of the recovery
* @return a snapshot of the store metadata
*/
private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) {
private static Store.MetadataSnapshot getStoreMetadataSnapshot(final Logger logger, final RecoveryTarget recoveryTarget) {
try {
return recoveryTarget.indexShard().snapshotStoreMetadata();
} catch (final org.apache.lucene.index.IndexNotFoundException e) {
Expand All @@ -335,89 +331,32 @@ private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget rec
/**
* Prepare the start recovery request.
*
* @param logger the logger
* @param localNode the local node of the recovery target
* @param recoveryTarget the target of the recovery
* @param startingSeqNo a sequence number that an operation-based peer recovery can start with.
* This is the first operation after the local checkpoint of the safe commit if exists.
* @return a start recovery request
*/
private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recoveryTarget) {
public static StartRecoveryRequest getStartRecoveryRequest(Logger logger, DiscoveryNode localNode,
RecoveryTarget recoveryTarget, long startingSeqNo) {
final StartRecoveryRequest request;
logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());

final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget);
final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(logger, recoveryTarget);
logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());

final long startingSeqNo;
if (metadataSnapshot.size() > 0) {
startingSeqNo = getStartingSeqNo(logger, recoveryTarget);
} else {
startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}

if (startingSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
logger.trace("{} preparing for file-based recovery from [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());
} else {
logger.trace(
"{} preparing for sequence-number-based recovery starting at sequence number [{}] from [{}]",
recoveryTarget.shardId(),
startingSeqNo,
recoveryTarget.sourceNode());
}

request = new StartRecoveryRequest(
recoveryTarget.shardId(),
recoveryTarget.indexShard().routingEntry().allocationId().getId(),
recoveryTarget.sourceNode(),
clusterService.localNode(),
localNode,
metadataSnapshot,
recoveryTarget.state().getPrimary(),
recoveryTarget.recoveryId(),
startingSeqNo);
return request;
}

/**
* Get the starting sequence number for a sequence-number-based request.
*
* @param recoveryTarget the target of the recovery
* @return the starting sequence number or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if obtaining the starting sequence number
* failed
*/
public static long getStartingSeqNo(final Logger logger, final RecoveryTarget recoveryTarget) {
try {
final Store store = recoveryTarget.store();
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), translogUUID);
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint);
final SequenceNumbers.CommitInfo seqNoStats = Store.loadSeqNoInfo(safeCommit);
if (logger.isTraceEnabled()) {
final StringJoiner descriptionOfExistingCommits = new StringJoiner(",");
for (IndexCommit commit : existingCommits) {
descriptionOfExistingCommits.add(CombinedDeletionPolicy.commitDescription(commit));
}
logger.trace("Calculate starting seqno based on global checkpoint [{}], safe commit [{}], existing commits [{}]",
globalCheckpoint, CombinedDeletionPolicy.commitDescription(safeCommit), descriptionOfExistingCommits);
}
if (seqNoStats.maxSeqNo <= globalCheckpoint) {
assert seqNoStats.localCheckpoint <= globalCheckpoint;
/*
* Commit point is good for sequence-number based recovery as the maximum sequence number included in it is below the global
* checkpoint (i.e., it excludes any operations that may not be on the primary). Recovery will start at the first operation
* after the local checkpoint stored in the commit.
*/
return seqNoStats.localCheckpoint + 1;
} else {
return SequenceNumbers.UNASSIGNED_SEQ_NO;
}
} catch (final TranslogCorruptedException | IOException e) {
/*
* This can happen, for example, if a phase one of the recovery completed successfully, a network partition happens before the
* translog on the recovery target is opened, the recovery enters a retry loop seeing now that the index files are on disk and
* proceeds to attempt a sequence-number-based recovery.
*/
return SequenceNumbers.UNASSIGNED_SEQ_NO;
}
}

public interface RecoveryListener {
void onRecoveryDone(RecoveryState state);

Expand Down
Loading

0 comments on commit d15684d

Please sign in to comment.