Skip to content

Commit

Permalink
Prepare to make send translog of recovery non-blocking.
Browse files Browse the repository at this point in the history
  • Loading branch information
kovrus authored and mergify[bot] committed Sep 17, 2019
1 parent 807fd03 commit d5ffdfe
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2651,7 +2651,7 @@ public long getMaxSeqNoOfUpdatesOrDeletes() {
* which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary.
*
* @see #acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)
* @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long, long)
* @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long, long, ActionListener)
*/
public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
assert seqNo != UNASSIGNED_SEQ_NO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,52 +542,60 @@ public void messageReceived(final RecoveryHandoffPrimaryContextRequest request,
class TranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryTranslogOperationsRequest> {

@Override
public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel, Task task) throws IOException {
public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel,
Task task) throws IOException {
try (RecoveryRef recoveryRef =
onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ClusterStateObserver observer =
new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
final RecoveryTarget recoveryTarget = recoveryRef.target();
try {
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(),
request.maxSeenAutoIdTimestampOnPrimary(), request.maxSeqNoOfUpdatesOrDeletesOnPrimary());
channel.sendResponse(new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint()));
} catch (MapperException exception) {
// in very rare cases a translog replay from primary is processed before a mapping update on this node
// which causes local mapping changes since the mapping (clusterstate) might not have arrived on this node.
final ActionListener<RecoveryTranslogOperationsResponse> listener =
new HandledTransportAction.ChannelActionListener<>(channel, Actions.TRANSLOG_OPS, request);
final Consumer<Exception> retryOnMappingException = exception -> {
// in very rare cases a translog replay from primary is processed before
// a mapping update on this node which causes local mapping changes since
// the mapping (clusterstate) might not have arrived on this node.
logger.debug("delaying recovery due to missing mapping changes", exception);
// we do not need to use a timeout here since the entire recovery mechanism has an inactivity protection (it will be
// canceled)
// we do not need to use a timeout here since the entire recovery mechanism has an
// inactivity protection (it will be canceled)
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
try {
messageReceived(request, channel, task);
} catch (Exception e) {
onFailure(e);
}
}

void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
logger.warn("failed to send error back to recovery source", e1);
listener.onFailure(e);
}
}

@Override
public void onClusterServiceClose() {
onFailure(new ElasticsearchException("cluster service was closed while waiting for mapping updates"));
listener.onFailure(new ElasticsearchException(
"cluster service was closed while waiting for mapping updates"));
}

@Override
public void onTimeout(TimeValue timeout) {
// note that we do not use a timeout (see comment above)
onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates (timeout [" + timeout +
"])"));
listener.onFailure(new ElasticsearchTimeoutException(
"timed out waiting for mapping updates (timeout [" + timeout + "])"));
}
});
}
};
recoveryTarget.indexTranslogOperations(
request.operations(),
request.totalTranslogOps(),
request.maxSeenAutoIdTimestampOnPrimary(), request.maxSeqNoOfUpdatesOrDeletesOnPrimary(),
ActionListener.wrap(
checkpoint -> listener.onResponse(new RecoveryTranslogOperationsResponse(checkpoint)),
e -> {
if (e instanceof MapperException) {
retryOnMappingException.accept(e);
} else {
listener.onFailure(e);
}
})
);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.StopWatch;
Expand Down Expand Up @@ -246,25 +247,27 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
logger.trace("snapshot translog for recovery; current size is [{}]",
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
}
final SendSnapshotResult sendSnapshotResult;
try (Translog.Snapshot snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo)) {
// we can release the retention lock here because the snapshot itself will retain the required operations.
IOUtils.close(retentionLock, () -> resources.remove(retentionLock));
// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
sendSnapshotResult = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot,
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
} catch (Exception e) {
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
}
final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo);
resources.add(phase2Snapshot);
// we can release the retention lock here because the snapshot itself will retain the required operations.
IOUtils.close(retentionLock);
// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep);
sendSnapshotStep.whenComplete(
r -> IOUtils.close(phase2Snapshot),
e -> onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e)));

final StepListener<Void> finalizeStep = new StepListener<>();
finalizeRecovery(sendSnapshotResult.targetLocalCheckpoint, finalizeStep);
sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure);

finalizeStep.whenComplete(r -> {
assert resources.isEmpty() : "not every resource is released [" + resources + "]";
final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
final SendSnapshotResult sendSnapshotResult = sendSnapshotStep.result();
final RecoveryResponse response = new RecoveryResponse(
sendFileResult.phase1FileNames,
sendFileResult.phase1FileSizes,
Expand Down Expand Up @@ -543,14 +546,27 @@ TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int to
* @param snapshot a snapshot of the translog
* @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates or deletes on the primary after these operations were executed on it.
* @return the send snapshot result
* @param listener a listener which will be notified with the local checkpoint on the target.
*/
SendSnapshotResult phase2(long startingSeqNo,
long requiredSeqNoRangeStart,
long endingSeqNo,
Translog.Snapshot snapshot,
long maxSeenAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes) throws IOException {
void phase2(long startingSeqNo,
long requiredSeqNoRangeStart,
long endingSeqNo,
Translog.Snapshot snapshot,
long maxSeenAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes,
ActionListener<SendSnapshotResult> listener) throws IOException {
ActionListener.completeWith(listener, () -> sendSnapshotBlockingly(
startingSeqNo,
requiredSeqNoRangeStart,
endingSeqNo,
snapshot,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes));
}

private SendSnapshotResult sendSnapshotBlockingly(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo,
Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes) throws IOException {
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
assert startingSeqNo <= requiredSeqNoRangeStart :
Expand Down Expand Up @@ -578,9 +594,16 @@ SendSnapshotResult phase2(long startingSeqNo,
}

final CancellableThreads.IOInterruptable sendBatch = () -> {
final long targetCheckpoint = recoveryTarget.indexTranslogOperations(
operations, expectedTotalOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
targetLocalCheckpoint.set(targetCheckpoint);
// TODO: Make this non-blocking
final PlainActionFuture<Long> future = new PlainActionFuture<>();
recoveryTarget.indexTranslogOperations(
operations,
expectedTotalOps,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
future
);
targetLocalCheckpoint.set(future.actionGet());
};

// send operations in batches
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,42 +393,48 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar
}

@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
long maxSeqNoOfDeletesOrUpdatesOnPrimary) throws IOException {
final RecoveryState.Translog translog = state().getTranslog();
translog.totalOperations(totalTranslogOps);
assert indexShard().recoveryState() == state();
if (indexShard().state() != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, indexShard().state());
}
/*
* The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation
* will be replayed. Bootstrapping this timestamp here will disable the optimization for original append-only requests
* (source of these operations) replicated via replication. Without this step, we may have duplicate documents if we
* replay these operations first (without timestamp), then optimize append-only requests (with timestamp).
*/
indexShard().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary);
/*
* Bootstrap the max_seq_no_of_updates from the primary to make sure that the max_seq_no_of_updates on this replica when
* replaying any of these operations will be at least the max_seq_no_of_updates on the primary when that operation was executed on.
*/
indexShard().advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfDeletesOrUpdatesOnPrimary);
for (Translog.Operation operation : operations) {
Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY);
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
throw new MapperException("mapping updates are not allowed [" + operation + "]");
public void indexTranslogOperations(List<Translog.Operation> operations,
int totalTranslogOps,
long maxSeenAutoIdTimestampOnPrimary,
long maxSeqNoOfDeletesOrUpdatesOnPrimary,
ActionListener<Long> listener) {
ActionListener.completeWith(listener, () -> {
final RecoveryState.Translog translog = state().getTranslog();
translog.totalOperations(totalTranslogOps);
assert indexShard().recoveryState() == state();
if (indexShard().state() != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, indexShard().state());
}
assert result.getFailure() == null: "unexpected failure while replicating translog entry: " + result.getFailure();
ExceptionsHelper.reThrowIfNotNull(result.getFailure());
}
// update stats only after all operations completed (to ensure that mapping updates don't mess with stats)
translog.incrementRecoveredOperations(operations.size());
indexShard().sync();
// roll over / flush / trim if needed
indexShard().afterWriteOperation();
return indexShard().getLocalCheckpoint();
/*
* The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation
* will be replayed. Bootstrapping this timestamp here will disable the optimization for original append-only requests
* (source of these operations) replicated via replication. Without this step, we may have duplicate documents if we
* replay these operations first (without timestamp), then optimize append-only requests (with timestamp).
*/
indexShard().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary);
/*
* Bootstrap the max_seq_no_of_updates from the primary to make sure that the max_seq_no_of_updates on this replica when
* replaying any of these operations will be at least the max_seq_no_of_updates on the primary when that op was executed on.
*/
indexShard().advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfDeletesOrUpdatesOnPrimary);
for (Translog.Operation operation : operations) {
Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY);
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
throw new MapperException("mapping updates are not allowed [" + operation + "]");
}
assert result.getFailure() == null : "unexpected failure while replicating translog entry: " + result.getFailure();
ExceptionsHelper.reThrowIfNotNull(result.getFailure());
}
// update stats only after all operations completed (to ensure that mapping updates don't mess with stats)
translog.incrementRecoveredOperations(operations.size());
indexShard().sync();
// roll over / flush / trim if needed
indexShard().afterWriteOperation();
return indexShard().getLocalCheckpoint();
});
}


@Override
public void receiveFileInfo(List<String> phase1FileNames,
List<Long> phase1FileSizes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,12 @@ public interface RecoveryTargetHandler {
* @param maxSeqNoOfUpdatesOrDeletesOnPrimary the max seq_no of update operations (index operations overwrite Lucene) or delete ops on
* the primary shard when capturing these operations. This value is at least as high as the
* max_seq_no_of_updates on the primary was when any of these ops were processed on it.
* @param listener a listener which will be notified with the local checkpoint on the target
* after these operations are successfully indexed on the target.
* @return the local checkpoint on the target shard
*/
long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfUpdatesOrDeletesOnPrimary) throws IOException;
void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
long maxSeqNoOfUpdatesOrDeletesOnPrimary, ActionListener<Long> listener);

/**
* Notifies the target of the files it is going to receive
Expand Down
Loading

0 comments on commit d5ffdfe

Please sign in to comment.