Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use global checkpoint as starting seq in ops-based recovery #43463

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
f8c0b15
Use global checkpoint as base for seq based recovery
dnhatn Jun 19, 2019
72458c9
Revert "Use global checkpoint as base for seq based recovery"
dnhatn Jun 25, 2019
d6fe637
Merge branch 'master' into recover-to-global-checkpoint
dnhatn Jun 25, 2019
ef454e2
Recover locally first
dnhatn Jun 25, 2019
800b329
Merge branch 'peer-recovery-retention-leases' into recover-to-global-…
dnhatn Jun 27, 2019
c05d70d
add comment
dnhatn Jun 26, 2019
c535d5f
fix tests
dnhatn Jun 27, 2019
ed024e7
Merge branch 'peer-recovery-retention-leases' into recover-to-global-…
dnhatn Jun 28, 2019
0a476b7
Merge branch 'peer-recovery-retention-leases' into recover-to-global-…
dnhatn Jul 12, 2019
3f35450
introduce prepare step
dnhatn Jul 13, 2019
0a67c49
Merge branch 'peer-recovery-retention-leases' into HEAD
dnhatn Jul 15, 2019
d465b43
verify index
dnhatn Jul 15, 2019
7e45622
fix test
dnhatn Jul 15, 2019
4cf88eb
Do not add new state
dnhatn Jul 19, 2019
3280e64
combine with performRecoveryRestart
dnhatn Jul 22, 2019
30ef0c6
more feedback
dnhatn Jul 22, 2019
a40b5dd
log the global checkpoint
dnhatn Jul 22, 2019
ed5d302
adjust the total local to reflect the exact count
dnhatn Jul 22, 2019
731af0e
Merge branch 'peer-recovery-retention-leases' into recover-to-global-…
dnhatn Jul 22, 2019
e6923e8
fix translog recovery stats
dnhatn Jul 22, 2019
6ec4e80
do not adjust
dnhatn Jul 23, 2019
7a26f32
Revert "do not adjust"
dnhatn Jul 23, 2019
1deddbe
Merge branch 'peer-recovery-retention-leases' into recover-to-global-…
dnhatn Jul 23, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1877,11 +1877,6 @@ public interface Warmer {
*/
public abstract Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException;

/**
* Do not replay translog operations, but make the engine be ready.
*/
public abstract void skipTranslogRecovery();

/**
* Returns <code>true</code> iff this engine is currently recovering from translog.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,12 +447,6 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover
return this;
}

@Override
public void skipTranslogRecovery() {
assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
pendingTranslogRecovery.set(false); // we are good - now we can commit
}

private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,6 @@ public Engine recoverFromTranslog(final TranslogRecoveryRunner translogRecoveryR
return this;
}

@Override
public void skipTranslogRecovery() {
}

@Override
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) {
}
Expand Down
21 changes: 8 additions & 13 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1393,8 +1393,10 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operat
/**
* opens the engine on top of the existing lucene engine and translog.
* Operations from the translog will be replayed to bring lucene up to date.
*
* @param recoverUpToSeqNo the upper bound of sequence number to be recovered (inclusive)
**/
public void openEngineAndRecoverFromTranslog() throws IOException {
public void openEngineAndRecoverFromTranslog(long recoverUpToSeqNo) throws IOException {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog();
final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> {
translogRecoveryStats.totalOperations(snapshot.totalOperations());
Expand All @@ -1403,16 +1405,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
translogRecoveryStats::incrementRecoveredOperations);
};
innerOpenEngineAndTranslog();
getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
}

/**
* Opens the engine on top of the existing lucene engine and translog.
* The translog is kept but its operations won't be replayed.
*/
public void openEngineAndSkipTranslogRecovery() throws IOException {
innerOpenEngineAndTranslog();
getEngine().skipTranslogRecovery();
getEngine().recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo);
}

private void innerOpenEngineAndTranslog() throws IOException {
Expand Down Expand Up @@ -1788,8 +1781,10 @@ public List<Segment> segments(boolean verbose) {
return getEngine().segments(verbose);
}

public void flushAndCloseEngine() throws IOException {
getEngine().flushAndClose();
public void closeEngine() throws IOException {
synchronized (mutex) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
IOUtils.close(this.currentEngineReference.getAndSet(null));
}
}

public String getHistoryUUID() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
store.associateIndexWithNewTranslog(translogUUID);
writeEmptyRetentionLeasesFile(indexShard);
}
indexShard.openEngineAndRecoverFromTranslog();
indexShard.openEngineAndRecoverFromTranslog(Long.MAX_VALUE);
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
indexShard.postRecovery("post recovery from shard_store");
Expand Down Expand Up @@ -480,7 +480,7 @@ private void restore(final IndexShard indexShard, final Repository repository, f
store.associateIndexWithNewTranslog(translogUUID);
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
writeEmptyRetentionLeasesFile(indexShard);
indexShard.openEngineAndRecoverFromTranslog();
indexShard.openEngineAndRecoverFromTranslog(Long.MAX_VALUE);
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
indexShard.postRecovery("restore done");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public PeerRecoverySourceService(TransportService transportService, IndicesServi
this.transportService = transportService;
this.indicesService = indicesService;
this.recoverySettings = recoverySettings;
transportService.registerRequestHandler(Actions.START_RECOVERY, StartRecoveryRequest::new, ThreadPool.Names.GENERIC,
transportService.registerRequestHandler(Actions.START_RECOVERY, ThreadPool.Names.GENERIC, StartRecoveryRequest::new,
new StartRecoveryTransportRequestHandler());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -119,7 +118,7 @@ public PeerRecoveryTargetService(ThreadPool threadPool, TransportService transpo
transportService.registerRequestHandler(Actions.CLEAN_FILES, ThreadPool.Names.GENERIC,
RecoveryCleanFilesRequest::new, new CleanFilesRequestHandler());
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, ThreadPool.Names.GENERIC,
RecoveryPrepareForTranslogOperationsRequest::new, new PrepareForTranslogOperationsRequestHandler());
RecoveryPrepareForTranslogRequest::new, new PrepareForTranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, ThreadPool.Names.GENERIC, RecoveryTranslogOperationsRequest::new,
new TranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new
Expand Down Expand Up @@ -345,9 +344,10 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget);
logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());

final long globalCheckpoint = recoveryTarget.readGlobalCheckpointFromTranslog();
final long startingSeqNo;
if (metadataSnapshot.size() > 0) {
startingSeqNo = getStartingSeqNo(logger, recoveryTarget);
startingSeqNo = getStartingSeqNo(logger, recoveryTarget, globalCheckpoint);
} else {
startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
Expand All @@ -370,7 +370,8 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
metadataSnapshot,
recoveryTarget.state().getPrimary(),
recoveryTarget.recoveryId(),
startingSeqNo);
startingSeqNo,
globalCheckpoint);
return request;
}

Expand All @@ -381,11 +382,9 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
* @return the starting sequence number or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if obtaining the starting sequence number
* failed
*/
public static long getStartingSeqNo(final Logger logger, final RecoveryTarget recoveryTarget) {
public static long getStartingSeqNo(final Logger logger, final RecoveryTarget recoveryTarget, final long globalCheckpoint) {
try {
final Store store = recoveryTarget.store();
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), translogUUID);
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint);
final SequenceNumbers.CommitInfo seqNoStats = Store.loadSeqNoInfo(safeCommit);
Expand Down Expand Up @@ -424,14 +423,15 @@ public interface RecoveryListener {
void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure);
}

class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryPrepareForTranslogOperationsRequest> {
class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryPrepareForTranslogRequest> {

@Override
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, Task task) {
public void messageReceived(RecoveryPrepareForTranslogRequest request, TransportChannel channel, Task task) {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ActionListener<TransportResponse> listener = new ChannelActionListener<>(channel, Actions.PREPARE_TRANSLOG, request);
final ActionListener<RecoveryPrepareForTranslogOperationsResponse> listener = new ChannelActionListener<>(
channel, Actions.PREPARE_TRANSLOG, request);
recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps(),
ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
request.recoverUpToSeqNo(), ActionListener.map(listener, RecoveryPrepareForTranslogOperationsResponse::new));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.indices.recovery;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.transport.TransportResponse;

import java.io.IOException;
import java.util.Optional;

final class RecoveryPrepareForTranslogOperationsResponse extends TransportResponse {
final Optional<Store.MetadataSnapshot> targetMetadata;

RecoveryPrepareForTranslogOperationsResponse(Optional<Store.MetadataSnapshot> targetMetadata) {
this.targetMetadata = targetMetadata;
}

RecoveryPrepareForTranslogOperationsResponse(final StreamInput in) throws IOException {
super(in);
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
targetMetadata = Optional.ofNullable(in.readOptionalWriteable(Store.MetadataSnapshot::new));
} else {
targetMetadata = Optional.empty();
}
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeOptionalWriteable(targetMetadata.orElse(null));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,43 @@

package org.elasticsearch.indices.recovery;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;

class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
final class RecoveryPrepareForTranslogRequest extends TransportRequest {

private final long recoveryId;
private final ShardId shardId;
private final int totalTranslogOps;
private final boolean fileBasedRecovery;
private final long recoverUpToSeqNo;

RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps, boolean fileBasedRecovery) {
RecoveryPrepareForTranslogRequest(long recoveryId, ShardId shardId, int totalTranslogOps,
boolean fileBasedRecovery, long recoverUpToSeqNo) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.totalTranslogOps = totalTranslogOps;
this.fileBasedRecovery = fileBasedRecovery;
this.recoverUpToSeqNo = recoverUpToSeqNo;
}

RecoveryPrepareForTranslogOperationsRequest(StreamInput in) throws IOException {
RecoveryPrepareForTranslogRequest(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
totalTranslogOps = in.readVInt();
fileBasedRecovery = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
recoverUpToSeqNo = in.readZLong();
} else {
recoverUpToSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
}

public long recoveryId() {
Expand All @@ -67,12 +77,19 @@ public boolean isFileBasedRecovery() {
return fileBasedRecovery;
}

public long recoverUpToSeqNo() {
return recoverUpToSeqNo;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(recoveryId);
shardId.writeTo(out);
out.writeVInt(totalTranslogOps);
out.writeBoolean(fileBasedRecovery);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeZLong(recoverUpToSeqNo);
}
}
}
Loading