Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare Snapshot Shard State Update Logic For Clone Logic #62617

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,8 @@ public void onFailure(Exception e) {
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, req,
new TransportResponseHandler<UpdateIndexShardSnapshotStatusResponse>() {
@Override
public UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException {
return new UpdateIndexShardSnapshotStatusResponse(in);
public UpdateIndexShardSnapshotStatusResponse read(StreamInput in) {
return UpdateIndexShardSnapshotStatusResponse.INSTANCE;
}

@Override
Expand Down
193 changes: 111 additions & 82 deletions server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
// Set of snapshots that are currently being ended by this node
private final Set<Snapshot> endingSnapshots = Collections.synchronizedSet(new HashSet<>());

private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor();
private final UpdateSnapshotStatusAction updateSnapshotStatusHandler;

private final TransportService transportService;
Expand Down Expand Up @@ -1909,101 +1908,130 @@ public boolean assertAllListenersResolved() {
return true;
}

private static class SnapshotStateExecutor implements ClusterStateTaskExecutor<UpdateIndexShardSnapshotStatusRequest> {

@Override
public ClusterTasksResult<UpdateIndexShardSnapshotStatusRequest>
execute(ClusterState currentState, List<UpdateIndexShardSnapshotStatusRequest> tasks) {
int changedCount = 0;
int startedCount = 0;
final List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
// Tasks to check for updates for running snapshots.
final List<UpdateIndexShardSnapshotStatusRequest> unconsumedTasks = new ArrayList<>(tasks);
// Tasks that were used to complete an existing in-progress shard snapshot
final Set<UpdateIndexShardSnapshotStatusRequest> executedTasks = new HashSet<>();
for (SnapshotsInProgress.Entry entry : currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) {
if (entry.state().completed()) {
entries.add(entry);
private static final ClusterStateTaskExecutor<ShardSnapshotUpdate> SHARD_STATE_EXECUTOR = (currentState, tasks) -> {
int changedCount = 0;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for review: there's no logical changes here whatsoever, just indent changes and the type change for the tasks.

int startedCount = 0;
final List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
// Tasks to check for updates for running snapshots.
final List<ShardSnapshotUpdate> unconsumedTasks = new ArrayList<>(tasks);
// Tasks that were used to complete an existing in-progress shard snapshot
final Set<ShardSnapshotUpdate> executedTasks = new HashSet<>();
for (SnapshotsInProgress.Entry entry : currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) {
if (entry.state().completed()) {
entries.add(entry);
continue;
}
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = null;
for (Iterator<ShardSnapshotUpdate> iterator = unconsumedTasks.iterator(); iterator.hasNext(); ) {
final ShardSnapshotUpdate updateSnapshotState = iterator.next();
final Snapshot updatedSnapshot = updateSnapshotState.snapshot;
final String updatedRepository = updatedSnapshot.getRepository();
if (entry.repository().equals(updatedRepository) == false) {
continue;
}
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = null;
for (Iterator<UpdateIndexShardSnapshotStatusRequest> iterator = unconsumedTasks.iterator(); iterator.hasNext(); ) {
final UpdateIndexShardSnapshotStatusRequest updateSnapshotState = iterator.next();
final Snapshot updatedSnapshot = updateSnapshotState.snapshot();
final String updatedRepository = updatedSnapshot.getRepository();
if (entry.repository().equals(updatedRepository) == false) {
final ShardId finishedShardId = updateSnapshotState.shardId;
if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) {
final ShardSnapshotStatus existing = entry.shards().get(finishedShardId);
if (existing == null) {
logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]",
updateSnapshotState, entry);
assert false : "This should never happen, data nodes should only send updates for expected shards";
continue;
}
final ShardId finishedShardId = updateSnapshotState.shardId();
if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) {
final ShardSnapshotStatus existing = entry.shards().get(finishedShardId);
if (existing == null) {
logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]",
updateSnapshotState, entry);
assert false : "This should never happen, data nodes should only send updates for expected shards";
continue;
}
if (existing.state().completed()) {
// No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
iterator.remove();
continue;
}
logger.trace("[{}] Updating shard [{}] with status [{}]", updatedSnapshot,
finishedShardId, updateSnapshotState.status().state());
if (shards == null) {
shards = ImmutableOpenMap.builder(entry.shards());
}
shards.put(finishedShardId, updateSnapshotState.status());
executedTasks.add(updateSnapshotState);
changedCount++;
} else if (executedTasks.contains(updateSnapshotState)) {
// tasks that completed a shard might allow starting a new shard snapshot for the current snapshot
final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId);
if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
continue;
}
if (shards == null) {
shards = ImmutableOpenMap.builder(entry.shards());
}
final ShardSnapshotStatus finishedStatus = updateSnapshotState.status();
logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId,
finishedStatus.nodeId(), finishedStatus.generation());
shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation()));
if (existing.state().completed()) {
// No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
iterator.remove();
startedCount++;
continue;
}
logger.trace("[{}] Updating shard [{}] with status [{}]", updatedSnapshot,
finishedShardId, updateSnapshotState.updatedState.state());
if (shards == null) {
shards = ImmutableOpenMap.builder(entry.shards());
}
shards.put(finishedShardId, updateSnapshotState.updatedState);
executedTasks.add(updateSnapshotState);
changedCount++;
} else if (executedTasks.contains(updateSnapshotState)) {
// tasks that completed a shard might allow starting a new shard snapshot for the current snapshot
final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId);
if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
continue;
}
if (shards == null) {
shards = ImmutableOpenMap.builder(entry.shards());
}
final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId,
finishedStatus.nodeId(), finishedStatus.generation());
shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation()));
iterator.remove();
startedCount++;
}
}

if (shards == null) {
entries.add(entry);
} else {
entries.add(entry.withShardStates(shards.build()));
}
if (shards == null) {
entries.add(entry);
} else {
entries.add(entry.withShardStates(shards.build()));
}
if (changedCount > 0) {
logger.trace("changed cluster state triggered by [{}] snapshot state updates and resulted in starting " +
"[{}] shard snapshots", changedCount, startedCount);
return ClusterTasksResult.<UpdateIndexShardSnapshotStatusRequest>builder().successes(tasks)
.build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
SnapshotsInProgress.of(entries)).build());
}
if (changedCount > 0) {
logger.trace("changed cluster state triggered by [{}] snapshot state updates and resulted in starting " +
"[{}] shard snapshots", changedCount, startedCount);
return ClusterStateTaskExecutor.ClusterTasksResult.<ShardSnapshotUpdate>builder().successes(tasks).build(
ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build());
}
return ClusterStateTaskExecutor.ClusterTasksResult.<ShardSnapshotUpdate>builder().successes(tasks).build(currentState);
};

/**
* An update to the snapshot state of a shard.
*/
private static final class ShardSnapshotUpdate {

private final Snapshot snapshot;

private final ShardId shardId;

private final ShardSnapshotStatus updatedState;

private ShardSnapshotUpdate(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus updatedState) {
this.snapshot = snapshot;
this.shardId = shardId;
this.updatedState = updatedState;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
return ClusterTasksResult.<UpdateIndexShardSnapshotStatusRequest>builder().successes(tasks).build(currentState);
if ((other instanceof ShardSnapshotUpdate) == false) {
return false;
}
final ShardSnapshotUpdate that = (ShardSnapshotUpdate) other;
return this.snapshot.equals(that.snapshot) && this.shardId.equals(that.shardId) && this.updatedState == that.updatedState;
}


@Override
public int hashCode() {
return Objects.hash(snapshot, shardId, updatedState);
}
}

/**
* Updates the shard status on master node
* Updates the shard status in the cluster state
*
* @param request update shard status request
* @param update shard snapshot status update
*/
private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request,
ActionListener<UpdateIndexShardSnapshotStatusResponse> listener) {
logger.trace("received updated snapshot restore state [{}]", request);
private void innerUpdateSnapshotState(ShardSnapshotUpdate update, ActionListener<Void> listener) {
logger.trace("received updated snapshot restore state [{}]", update);
clusterService.submitStateUpdateTask(
"update snapshot state",
request,
update,
ClusterStateTaskConfig.build(Priority.NORMAL),
snapshotStateExecutor,
SHARD_STATE_EXECUTOR,
new ClusterStateTaskListener() {
@Override
public void onFailure(String source, Exception e) {
Expand All @@ -2013,13 +2041,13 @@ public void onFailure(String source, Exception e) {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
try {
listener.onResponse(new UpdateIndexShardSnapshotStatusResponse());
listener.onResponse(null);
} finally {
// Maybe this state update completed the snapshot. If we are not already ending it because of a concurrent
// state update we check if its state is completed and end it if it is.
if (endingSnapshots.contains(request.snapshot()) == false) {
if (endingSnapshots.contains(update.snapshot) == false) {
final SnapshotsInProgress snapshotsInProgress = newState.custom(SnapshotsInProgress.TYPE);
final SnapshotsInProgress.Entry updatedEntry = snapshotsInProgress.snapshot(request.snapshot());
final SnapshotsInProgress.Entry updatedEntry = snapshotsInProgress.snapshot(update.snapshot);
// If the entry is still in the cluster state and is completed, try finalizing the snapshot in the repo
if (updatedEntry != null && updatedEntry.state().completed()) {
endSnapshot(updatedEntry, newState.metadata(), null);
Expand Down Expand Up @@ -2047,13 +2075,14 @@ protected String executor() {

@Override
protected UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException {
return new UpdateIndexShardSnapshotStatusResponse(in);
return UpdateIndexShardSnapshotStatusResponse.INSTANCE;
}

@Override
protected void masterOperation(Task task, UpdateIndexShardSnapshotStatusRequest request, ClusterState state,
ActionListener<UpdateIndexShardSnapshotStatusResponse> listener) {
innerUpdateSnapshotState(request, listener);
innerUpdateSnapshotState(new ShardSnapshotUpdate(request.snapshot(), request.shardId(), request.status()),
ActionListener.delegateFailure(listener, (l, v) -> l.onResponse(UpdateIndexShardSnapshotStatusResponse.INSTANCE)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@
package org.elasticsearch.snapshots;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

class UpdateIndexShardSnapshotStatusResponse extends ActionResponse {

UpdateIndexShardSnapshotStatusResponse() {}
public static final UpdateIndexShardSnapshotStatusResponse INSTANCE = new UpdateIndexShardSnapshotStatusResponse();

UpdateIndexShardSnapshotStatusResponse(StreamInput in) throws IOException {
super(in);
}
private UpdateIndexShardSnapshotStatusResponse() {}

@Override
public void writeTo(StreamOutput out) throws IOException {}
Expand Down