Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare to make send translog of recovery non-blocking #37458

Merged
merged 4 commits into from
Jan 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2998,7 +2998,7 @@ public long getMaxSeqNoOfUpdatesOrDeletes() {
* which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary.
*
* @see #acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)
* @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long, long)
* @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long, long, ActionListener)
*/
public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
assert seqNo != UNASSIGNED_SEQ_NO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,14 +485,12 @@ class TranslogOperationsRequestHandler implements TransportRequestHandler<Recove
public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel,
Task task) throws IOException {
try (RecoveryRef recoveryRef =
onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
final RecoveryTarget recoveryTarget = recoveryRef.target();
try {
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(),
request.maxSeenAutoIdTimestampOnPrimary(), request.maxSeqNoOfUpdatesOrDeletesOnPrimary());
channel.sendResponse(new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint()));
} catch (MapperException exception) {
final ActionListener<RecoveryTranslogOperationsResponse> listener =
new HandledTransportAction.ChannelActionListener<>(channel, Actions.TRANSLOG_OPS, request);
final Consumer<Exception> retryOnMappingException = exception -> {
// in very rare cases a translog replay from primary is processed before a mapping update on this node
// which causes local mapping changes since the mapping (clusterstate) might not have arrived on this node.
logger.debug("delaying recovery due to missing mapping changes", exception);
Expand All @@ -504,31 +502,36 @@ public void onNewClusterState(ClusterState state) {
try {
messageReceived(request, channel, task);
} catch (Exception e) {
onFailure(e);
}
}

protected void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
logger.warn("failed to send error back to recovery source", e1);
listener.onFailure(e);
}
}

@Override
public void onClusterServiceClose() {
onFailure(new ElasticsearchException("cluster service was closed while waiting for mapping updates"));
listener.onFailure(new ElasticsearchException(
"cluster service was closed while waiting for mapping updates"));
}

@Override
public void onTimeout(TimeValue timeout) {
// note that we do not use a timeout (see comment above)
onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates (timeout [" + timeout +
"])"));
listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for mapping updates " +
"(timeout [" + timeout + "])"));
}
});
}
};
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(),
request.maxSeenAutoIdTimestampOnPrimary(), request.maxSeqNoOfUpdatesOrDeletesOnPrimary(),
ActionListener.wrap(
checkpoint -> listener.onResponse(new RecoveryTranslogOperationsResponse(checkpoint)),
e -> {
if (e instanceof MapperException) {
retryOnMappingException.accept(e);
} else {
listener.onFailure(e);
}
})
);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.StopWatch;
Expand Down Expand Up @@ -226,25 +227,27 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
logger.trace("snapshot translog for recovery; current size is [{}]",
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
}
final SendSnapshotResult sendSnapshotResult;
try (Translog.Snapshot snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo)) {
// we can release the retention lock here because the snapshot itself will retain the required operations.
IOUtils.close(retentionLock, () -> resources.remove(retentionLock));
// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
sendSnapshotResult = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot,
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
} catch (Exception e) {
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
}

final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo);
resources.add(phase2Snapshot);
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
// we can release the retention lock here because the snapshot itself will retain the required operations.
IOUtils.close(retentionLock);
// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep);
sendSnapshotStep.whenComplete(
r -> IOUtils.close(phase2Snapshot),
e -> onFailure.accept(new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e)));
final StepListener<Void> finalizeStep = new StepListener<>();
finalizeRecovery(sendSnapshotResult.targetLocalCheckpoint, finalizeStep);
sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure);

finalizeStep.whenComplete(r -> {
assert resources.isEmpty() : "not every resource is released [" + resources + "]";
final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
final SendSnapshotResult sendSnapshotResult = sendSnapshotStep.result();
final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,
sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,
sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,
Expand Down Expand Up @@ -507,10 +510,17 @@ TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int to
* @param snapshot a snapshot of the translog
* @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates or deletes on the primary after these operations were executed on it.
* @return the send snapshot result
* @param listener a listener which will be notified with the local checkpoint on the target.
*/
SendSnapshotResult phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot,
long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes) throws IOException {
void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes, ActionListener<SendSnapshotResult> listener) throws IOException {
ActionListener.completeWith(listener, () -> sendSnapshotBlockingly(
startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes));
}

private SendSnapshotResult sendSnapshotBlockingly(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo,
Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes) throws IOException {
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
assert startingSeqNo <= requiredSeqNoRangeStart :
Expand Down Expand Up @@ -538,9 +548,11 @@ SendSnapshotResult phase2(long startingSeqNo, long requiredSeqNoRangeStart, long
}

final CancellableThreads.IOInterruptible sendBatch = () -> {
final long targetCheckpoint = recoveryTarget.indexTranslogOperations(
operations, expectedTotalOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes);
targetLocalCheckpoint.set(targetCheckpoint);
// TODO: Make this non-blocking
final PlainActionFuture<Long> future = new PlainActionFuture<>();
recoveryTarget.indexTranslogOperations(
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
operations, expectedTotalOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, future);
targetLocalCheckpoint.set(future.actionGet());
};

// send operations in batches
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,40 +394,42 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar
}

@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
long maxSeqNoOfDeletesOrUpdatesOnPrimary) throws IOException {
final RecoveryState.Translog translog = state().getTranslog();
translog.totalOperations(totalTranslogOps);
assert indexShard().recoveryState() == state();
if (indexShard().state() != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, indexShard().state());
}
/*
* The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation
* will be replayed. Bootstrapping this timestamp here will disable the optimization for original append-only requests
* (source of these operations) replicated via replication. Without this step, we may have duplicate documents if we
* replay these operations first (without timestamp), then optimize append-only requests (with timestamp).
*/
indexShard().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary);
/*
* Bootstrap the max_seq_no_of_updates from the primary to make sure that the max_seq_no_of_updates on this replica when
* replaying any of these operations will be at least the max_seq_no_of_updates on the primary when that operation was executed on.
*/
indexShard().advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfDeletesOrUpdatesOnPrimary);
for (Translog.Operation operation : operations) {
Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY);
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
throw new MapperException("mapping updates are not allowed [" + operation + "]");
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
long maxSeqNoOfDeletesOrUpdatesOnPrimary, ActionListener<Long> listener) {
ActionListener.completeWith(listener, () -> {
final RecoveryState.Translog translog = state().getTranslog();
translog.totalOperations(totalTranslogOps);
assert indexShard().recoveryState() == state();
if (indexShard().state() != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, indexShard().state());
}
assert result.getFailure() == null: "unexpected failure while replicating translog entry: " + result.getFailure();
ExceptionsHelper.reThrowIfNotNull(result.getFailure());
}
// update stats only after all operations completed (to ensure that mapping updates don't mess with stats)
translog.incrementRecoveredOperations(operations.size());
indexShard().sync();
// roll over / flush / trim if needed
indexShard().afterWriteOperation();
return indexShard().getLocalCheckpoint();
/*
* The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation
* will be replayed. Bootstrapping this timestamp here will disable the optimization for original append-only requests
* (source of these operations) replicated via replication. Without this step, we may have duplicate documents if we
* replay these operations first (without timestamp), then optimize append-only requests (with timestamp).
*/
indexShard().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary);
/*
* Bootstrap the max_seq_no_of_updates from the primary to make sure that the max_seq_no_of_updates on this replica when
* replaying any of these operations will be at least the max_seq_no_of_updates on the primary when that op was executed on.
*/
indexShard().advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfDeletesOrUpdatesOnPrimary);
for (Translog.Operation operation : operations) {
Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY);
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
throw new MapperException("mapping updates are not allowed [" + operation + "]");
}
assert result.getFailure() == null : "unexpected failure while replicating translog entry: " + result.getFailure();
ExceptionsHelper.reThrowIfNotNull(result.getFailure());
}
// update stats only after all operations completed (to ensure that mapping updates don't mess with stats)
translog.incrementRecoveredOperations(operations.size());
indexShard().sync();
// roll over / flush / trim if needed
indexShard().afterWriteOperation();
return indexShard().getLocalCheckpoint();
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ public interface RecoveryTargetHandler {
* @param maxSeqNoOfUpdatesOrDeletesOnPrimary the max seq_no of update operations (index operations overwrite Lucene) or delete ops on
* the primary shard when capturing these operations. This value is at least as high as the
* max_seq_no_of_updates on the primary was when any of these ops were processed on it.
* @return the local checkpoint on the target shard
* @param listener a listener which will be notified with the local checkpoint on the target
* after these operations are successfully indexed on the target.
*/
long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfUpdatesOrDeletesOnPrimary) throws IOException;
void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
long maxSeqNoOfUpdatesOrDeletesOnPrimary, ActionListener<Long> listener);

/**
* Notifies the target of the files it is going to receive
Expand Down
Loading