From c364891b3f3297c5761f06c8a9b4c7e016e92e1e Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 21 Jan 2019 21:35:10 -0500 Subject: [PATCH] Make prepare engine step of recovery source non-blocking (#37573) Relates #37174 --- .../recovery/PeerRecoveryTargetService.java | 9 +- .../recovery/RecoverySourceHandler.java | 107 +++++++++--------- .../indices/recovery/RecoveryTarget.java | 15 ++- .../recovery/RecoveryTargetHandler.java | 2 +- .../recovery/RemoteRecoveryTargetHandler.java | 9 +- .../IndexLevelReplicationTests.java | 6 +- .../index/shard/IndexShardTests.java | 4 +- .../recovery/RecoverySourceHandlerTests.java | 6 +- 8 files changed, 85 insertions(+), 73 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index fc44539b0a3b5..9c1570350e5fe 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -431,11 +431,12 @@ class PrepareForTranslogOperationsRequestHandler implements TransportRequestHand @Override public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception { - try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() - )) { - recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps()); + try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { + final ActionListener listener = + new HandledTransportAction.ChannelActionListener<>(channel, Actions.PREPARE_TRANSLOG, request); + recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps(), + ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure)); } - channel.sendResponse(TransportResponse.Empty.INSTANCE); } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 34434f50b456f..f360a68b7a83c 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -197,51 +197,51 @@ public void recoverToTarget(ActionListener listener) { assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than [" + startingSeqNo + "]"; - final TimeValue prepareEngineTime; - try { - // For a sequence based recovery, the target can keep its local translog - prepareEngineTime = prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, - shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo)); - } catch (final Exception e) { - throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); - } + final StepListener prepareEngineStep = new StepListener<>(); + // For a sequence based recovery, the target can keep its local translog + prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, + shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep); + final StepListener sendSnapshotStep = new StepListener<>(); + prepareEngineStep.whenComplete(prepareEngineTime -> { + /* + * add shard to replication group (shard will receive replication requests from this point on) now that engine is open. + * This means that any document indexed into the primary after this will be replicated to this replica as well + * make sure to do this before sampling the max sequence number in the next step, to ensure that we send + * all documents up to maxSeqNo in phase2. + */ + runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()), + shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger); - /* - * add shard to replication group (shard will receive replication requests from this point on) now that engine is open. - * This means that any document indexed into the primary after this will be replicated to this replica as well - * make sure to do this before sampling the max sequence number in the next step, to ensure that we send - * all documents up to maxSeqNo in phase2. - */ - runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()), - shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger); - - final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); - /* - * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all - * operations in the required range will be available for replaying from the translog of the source. - */ - cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); - - if (logger.isTraceEnabled()) { - logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo); - logger.trace("snapshot translog for recovery; current size is [{}]", - shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo)); - } + final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); + /* + * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all + * operations in the required range will be available for replaying from the translog of the source. + */ + cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo)); + if (logger.isTraceEnabled()) { + logger.trace("all operations up to [{}] completed, which will be used as an ending sequence number", endingSeqNo); + logger.trace("snapshot translog for recovery; current size is [{}]", + shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo)); + } + final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo); + resources.add(phase2Snapshot); + // we can release the retention lock here because the snapshot itself will retain the required operations. + retentionLock.close(); + // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values + // are at least as high as the corresponding values on the primary when any of these operations were executed on it. + final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); + final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes(); + phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep); + sendSnapshotStep.whenComplete( + r -> IOUtils.close(phase2Snapshot), + e -> { + IOUtils.closeWhileHandlingException(phase2Snapshot); + onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e)); + }); + + }, onFailure); - final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo); - resources.add(phase2Snapshot); - // we can release the retention lock here because the snapshot itself will retain the required operations. - IOUtils.close(retentionLock); - // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values - // are at least as high as the corresponding values on the primary when any of these operations were executed on it. - final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); - final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes(); - final StepListener sendSnapshotStep = new StepListener<>(); - phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, - maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep); - sendSnapshotStep.whenComplete( - r -> IOUtils.close(phase2Snapshot), - e -> onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e))); final StepListener finalizeStep = new StepListener<>(); sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure); @@ -251,7 +251,7 @@ public void recoverToTarget(ActionListener listener) { final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes, sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize, sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime, - prepareEngineTime.millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis()); + prepareEngineStep.result().millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis()); try { wrappedListener.onResponse(response); } finally { @@ -484,16 +484,21 @@ public SendFileResult phase1(final IndexCommit snapshot, final Supplier } } - TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps) throws IOException { + void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { StopWatch stopWatch = new StopWatch().start(); - logger.trace("recovery [phase1]: prepare remote engine for translog"); + final ActionListener wrappedListener = ActionListener.wrap( + nullVal -> { + stopWatch.stop(); + final TimeValue tookTime = stopWatch.totalTime(); + logger.trace("recovery [phase1]: remote engine start took [{}]", tookTime); + listener.onResponse(tookTime); + }, + e -> listener.onFailure(new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e))); // Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables // garbage collection (not the JVM's GC!) of tombstone deletes. - cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps)); - stopWatch.stop(); - final TimeValue tookTime = stopWatch.totalTime(); - logger.trace("recovery [phase1]: remote engine start took [{}]", tookTime); - return tookTime; + logger.trace("recovery [phase1]: prepare remote engine for translog"); + cancellableThreads.execute(() -> + recoveryTarget.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, wrappedListener)); } /** diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index db4bb187f9b45..d24c3773d264c 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -366,12 +366,15 @@ private void ensureRefCount() { /*** Implementation of {@link RecoveryTargetHandler } */ @Override - public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException { - if (fileBasedRecovery && indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0)) { - store.ensureIndexHas6xCommitTags(); - } - state().getTranslog().totalOperations(totalTranslogOps); - indexShard().openEngineAndSkipTranslogRecovery(); + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + if (fileBasedRecovery && indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0)) { + store.ensureIndexHas6xCommitTags(); + } + state().getTranslog().totalOperations(totalTranslogOps); + indexShard().openEngineAndSkipTranslogRecovery(); + return null; + }); } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index a682244470a63..b1ddc80e77eb3 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -36,7 +36,7 @@ public interface RecoveryTargetHandler { * @param fileBasedRecovery whether or not this call is part of an file based recovery * @param totalTranslogOps total translog operations expected to be sent */ - void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException; + void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener); /** * The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index ba703aeee2850..0799cc6595189 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -77,11 +77,12 @@ public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, TransportSe } @Override - public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException { + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG, - new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery), - TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), - EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery), + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), + new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure), + in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC)); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 87f80488f5e72..3bd6d452b2519 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -25,6 +25,7 @@ import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkShardRequest; @@ -199,13 +200,14 @@ public IndexResult index(Index op) throws IOException { Future fut = shards.asyncRecoverReplica(replica, (shard, node) -> new RecoveryTarget(shard, node, recoveryListener, v -> {}){ @Override - public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException { + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, + ActionListener listener) { try { indexedOnPrimary.await(); } catch (InterruptedException e) { throw new AssertionError(e); } - super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps); + super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener); } }); fut.get(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index de8215c1c6044..32197fe2cbd09 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2569,8 +2569,8 @@ public void testRefreshListenersDuringPeerRecovery() throws IOException { }) { // we're only checking that listeners are called when the engine is open, before there is no point @Override - public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException { - super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps); + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { + super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener); assertListenerCalled.accept(replica); } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 2f6863894d994..971504ff46377 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -491,9 +491,9 @@ public SendFileResult phase1(final IndexCommit snapshot, final Supplier } @Override - TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTranslogOps) throws IOException { + void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { prepareTargetForTranslogCalled.set(true); - return super.prepareTargetForTranslog(fileBasedRecovery, totalTranslogOps); + super.prepareTargetForTranslog(fileBasedRecovery, totalTranslogOps, listener); } @Override @@ -700,7 +700,7 @@ private List generateFiles(Store store, int numFiles, IntSupp class TestRecoveryTargetHandler implements RecoveryTargetHandler { @Override - public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) { + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener) { } @Override