Skip to content

Commit

Permalink
Make prepare engine step of recovery source non-blocking (#37573)
Browse files Browse the repository at this point in the history
Relates #37174
  • Loading branch information
dnhatn committed Jan 23, 2019
1 parent 0735345 commit c364891
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -431,11 +431,12 @@ class PrepareForTranslogOperationsRequestHandler implements TransportRequestHand

@Override
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps());
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ActionListener<TransportResponse> listener =
new HandledTransportAction.ChannelActionListener<>(channel, Actions.PREPARE_TRANSLOG, request);
recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps(),
ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure));
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,51 +197,51 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than ["
+ startingSeqNo + "]";

final TimeValue prepareEngineTime;
try {
// For a sequence based recovery, the target can keep its local translog
prepareEngineTime = prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
}
final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
// For a sequence based recovery, the target can keep its local translog
prepareTargetForTranslog(isSequenceNumberBasedRecovery == false,
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep);
final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
prepareEngineStep.whenComplete(prepareEngineTime -> {
/*
* add shard to replication group (shard will receive replication requests from this point on) now that engine is open.
* This means that any document indexed into the primary after this will be replicated to this replica as well
* make sure to do this before sampling the max sequence number in the next step, to ensure that we send
* all documents up to maxSeqNo in phase2.
*/
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()),
shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger);

/*
* add shard to replication group (shard will receive replication requests from this point on) now that engine is open.
* This means that any document indexed into the primary after this will be replicated to this replica as well
* make sure to do this before sampling the max sequence number in the next step, to ensure that we send
* all documents up to maxSeqNo in phase2.
*/
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()),
shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger);

final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
/*
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
* operations in the required range will be available for replaying from the translog of the source.
*/
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));

if (logger.isTraceEnabled()) {
logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo);
logger.trace("snapshot translog for recovery; current size is [{}]",
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
}
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
/*
* We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all
* operations in the required range will be available for replaying from the translog of the source.
*/
cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));
if (logger.isTraceEnabled()) {
logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo);
logger.trace("snapshot translog for recovery; current size is [{}]",
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
}
final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo);
resources.add(phase2Snapshot);
// we can release the retention lock here because the snapshot itself will retain the required operations.
retentionLock.close();
// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep);
sendSnapshotStep.whenComplete(
r -> IOUtils.close(phase2Snapshot),
e -> {
IOUtils.closeWhileHandlingException(phase2Snapshot);
onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e));
});

}, onFailure);

final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo);
resources.add(phase2Snapshot);
// we can release the retention lock here because the snapshot itself will retain the required operations.
IOUtils.close(retentionLock);
// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep);
sendSnapshotStep.whenComplete(
r -> IOUtils.close(phase2Snapshot),
e -> onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e)));
final StepListener<Void> finalizeStep = new StepListener<>();
sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure);

Expand All @@ -251,7 +251,7 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,
prepareEngineTime.millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis());
prepareEngineStep.result().millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis());
try {
wrappedListener.onResponse(response);
} finally {
Expand Down Expand Up @@ -484,16 +484,21 @@ public SendFileResult phase1(final IndexCommit snapshot, final Supplier<Integer>
}
}

TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps) throws IOException {
void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<TimeValue> listener) {
StopWatch stopWatch = new StopWatch().start();
logger.trace("recovery [phase1]: prepare remote engine for translog");
final ActionListener<Void> wrappedListener = ActionListener.wrap(
nullVal -> {
stopWatch.stop();
final TimeValue tookTime = stopWatch.totalTime();
logger.trace("recovery [phase1]: remote engine start took [{}]", tookTime);
listener.onResponse(tookTime);
},
e -> listener.onFailure(new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e)));
// Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes.
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps));
stopWatch.stop();
final TimeValue tookTime = stopWatch.totalTime();
logger.trace("recovery [phase1]: remote engine start took [{}]", tookTime);
return tookTime;
logger.trace("recovery [phase1]: prepare remote engine for translog");
cancellableThreads.execute(() ->
recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, wrappedListener));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,12 +366,15 @@ private void ensureRefCount() {
/*** Implementation of {@link RecoveryTargetHandler } */

@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException {
if (fileBasedRecovery && indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0)) {
store.ensureIndexHas6xCommitTags();
}
state().getTranslog().totalOperations(totalTranslogOps);
indexShard().openEngineAndSkipTranslogRecovery();
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
ActionListener.completeWith(listener, () -> {
if (fileBasedRecovery && indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0)) {
store.ensureIndexHas6xCommitTags();
}
state().getTranslog().totalOperations(totalTranslogOps);
indexShard().openEngineAndSkipTranslogRecovery();
return null;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public interface RecoveryTargetHandler {
* @param fileBasedRecovery whether or not this call is part of an file based recovery
* @param totalTranslogOps total translog operations expected to be sent
*/
void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException;
void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener);

/**
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,12 @@ public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportSe
}

@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException {
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG,
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure),
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkShardRequest;
Expand Down Expand Up @@ -199,13 +200,14 @@ public IndexResult index(Index op) throws IOException {
Future<Void> fut = shards.asyncRecoverReplica(replica,
(shard, node) -> new RecoveryTarget(shard, node, recoveryListener, v -> {}){
@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException {
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps,
ActionListener<Void> listener) {
try {
indexedOnPrimary.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps);
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener);
}
});
fut.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2569,8 +2569,8 @@ public void testRefreshListenersDuringPeerRecovery() throws IOException {
}) {
// we're only checking that listeners are called when the engine is open, before there is no point
@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException {
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps);
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener);
assertListenerCalled.accept(replica);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,9 +491,9 @@ public SendFileResult phase1(final IndexCommit snapshot, final Supplier<Integer>
}

@Override
TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps) throws IOException {
void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<TimeValue> listener) {
prepareTargetForTranslogCalled.set(true);
return super.prepareTargetForTranslog(fileBasedRecovery, totalTranslogOps);
super.prepareTargetForTranslog(fileBasedRecovery, totalTranslogOps, listener);
}

@Override
Expand Down Expand Up @@ -700,7 +700,7 @@ private List<StoreFileMetaData> generateFiles(Store store, int numFiles, IntSupp

class TestRecoveryTargetHandler implements RecoveryTargetHandler {
@Override
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) {
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
}

@Override
Expand Down

0 comments on commit c364891

Please sign in to comment.