diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index f0ffedf514146..2ed1de343885d 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -450,8 +450,8 @@ public void onFailure(Exception e) { SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, req, new TransportResponseHandler() { @Override - public UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException { - return new UpdateIndexShardSnapshotStatusResponse(in); + public UpdateIndexShardSnapshotStatusResponse read(StreamInput in) { + return UpdateIndexShardSnapshotStatusResponse.INSTANCE; } @Override diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 049b1d4bcfd47..ee7f1ca447caf 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -100,6 +100,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -165,7 +166,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus // Set of snapshots that are currently being ended by this node private final Set endingSnapshots = Collections.synchronizedSet(new HashSet<>()); - private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor(); private final UpdateSnapshotStatusAction updateSnapshotStatusHandler; private final TransportService transportService; @@ -2452,101 +2452,130 @@ public boolean assertAllListenersResolved() { return true; } - private static class SnapshotStateExecutor implements ClusterStateTaskExecutor { - - @Override - public ClusterTasksResult - execute(ClusterState currentState, List tasks) { - int changedCount = 0; - int startedCount = 0; - final List entries = new ArrayList<>(); - // Tasks to check for updates for running snapshots. - final List unconsumedTasks = new ArrayList<>(tasks); - // Tasks that were used to complete an existing in-progress shard snapshot - final Set executedTasks = new HashSet<>(); - for (SnapshotsInProgress.Entry entry : currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) { - if (entry.state().completed()) { - entries.add(entry); + private static final ClusterStateTaskExecutor SHARD_STATE_EXECUTOR = (currentState, tasks) -> { + int changedCount = 0; + int startedCount = 0; + final List entries = new ArrayList<>(); + // Tasks to check for updates for running snapshots. + final List unconsumedTasks = new ArrayList<>(tasks); + // Tasks that were used to complete an existing in-progress shard snapshot + final Set executedTasks = new HashSet<>(); + for (SnapshotsInProgress.Entry entry : currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) { + if (entry.state().completed()) { + entries.add(entry); + continue; + } + ImmutableOpenMap.Builder shards = null; + for (Iterator iterator = unconsumedTasks.iterator(); iterator.hasNext(); ) { + final ShardSnapshotUpdate updateSnapshotState = iterator.next(); + final Snapshot updatedSnapshot = updateSnapshotState.snapshot; + final String updatedRepository = updatedSnapshot.getRepository(); + if (entry.repository().equals(updatedRepository) == false) { continue; } - ImmutableOpenMap.Builder shards = null; - for (Iterator iterator = unconsumedTasks.iterator(); iterator.hasNext(); ) { - final UpdateIndexShardSnapshotStatusRequest updateSnapshotState = iterator.next(); - final Snapshot updatedSnapshot = updateSnapshotState.snapshot(); - final String updatedRepository = updatedSnapshot.getRepository(); - if (entry.repository().equals(updatedRepository) == false) { + final ShardId finishedShardId = updateSnapshotState.shardId; + if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) { + final ShardSnapshotStatus existing = entry.shards().get(finishedShardId); + if (existing == null) { + logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]", + updateSnapshotState, entry); + assert false : "This should never happen, data nodes should only send updates for expected shards"; continue; } - final ShardId finishedShardId = updateSnapshotState.shardId(); - if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) { - final ShardSnapshotStatus existing = entry.shards().get(finishedShardId); - if (existing == null) { - logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]", - updateSnapshotState, entry); - assert false : "This should never happen, data nodes should only send updates for expected shards"; - continue; - } - if (existing.state().completed()) { - // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect. - iterator.remove(); - continue; - } - logger.trace("[{}] Updating shard [{}] with status [{}]", updatedSnapshot, - finishedShardId, updateSnapshotState.status().state()); - if (shards == null) { - shards = ImmutableOpenMap.builder(entry.shards()); - } - shards.put(finishedShardId, updateSnapshotState.status()); - executedTasks.add(updateSnapshotState); - changedCount++; - } else if (executedTasks.contains(updateSnapshotState)) { - // tasks that completed a shard might allow starting a new shard snapshot for the current snapshot - final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId); - if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) { - continue; - } - if (shards == null) { - shards = ImmutableOpenMap.builder(entry.shards()); - } - final ShardSnapshotStatus finishedStatus = updateSnapshotState.status(); - logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId, - finishedStatus.nodeId(), finishedStatus.generation()); - shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation())); + if (existing.state().completed()) { + // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect. iterator.remove(); - startedCount++; + continue; + } + logger.trace("[{}] Updating shard [{}] with status [{}]", updatedSnapshot, + finishedShardId, updateSnapshotState.updatedState.state()); + if (shards == null) { + shards = ImmutableOpenMap.builder(entry.shards()); + } + shards.put(finishedShardId, updateSnapshotState.updatedState); + executedTasks.add(updateSnapshotState); + changedCount++; + } else if (executedTasks.contains(updateSnapshotState)) { + // tasks that completed a shard might allow starting a new shard snapshot for the current snapshot + final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId); + if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) { + continue; + } + if (shards == null) { + shards = ImmutableOpenMap.builder(entry.shards()); } + final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState; + logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId, + finishedStatus.nodeId(), finishedStatus.generation()); + shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation())); + iterator.remove(); + startedCount++; } + } - if (shards == null) { - entries.add(entry); - } else { - entries.add(entry.withShardStates(shards.build())); - } + if (shards == null) { + entries.add(entry); + } else { + entries.add(entry.withShardStates(shards.build())); } - if (changedCount > 0) { - logger.trace("changed cluster state triggered by [{}] snapshot state updates and resulted in starting " + - "[{}] shard snapshots", changedCount, startedCount); - return ClusterTasksResult.builder().successes(tasks) - .build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, - SnapshotsInProgress.of(entries)).build()); + } + if (changedCount > 0) { + logger.trace("changed cluster state triggered by [{}] snapshot state updates and resulted in starting " + + "[{}] shard snapshots", changedCount, startedCount); + return ClusterStateTaskExecutor.ClusterTasksResult.builder().successes(tasks).build( + ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build()); + } + return ClusterStateTaskExecutor.ClusterTasksResult.builder().successes(tasks).build(currentState); + }; + + /** + * An update to the snapshot state of a shard. + */ + private static final class ShardSnapshotUpdate { + + private final Snapshot snapshot; + + private final ShardId shardId; + + private final ShardSnapshotStatus updatedState; + + private ShardSnapshotUpdate(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus updatedState) { + this.snapshot = snapshot; + this.shardId = shardId; + this.updatedState = updatedState; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; } - return ClusterTasksResult.builder().successes(tasks).build(currentState); + if ((other instanceof ShardSnapshotUpdate) == false) { + return false; + } + final ShardSnapshotUpdate that = (ShardSnapshotUpdate) other; + return this.snapshot.equals(that.snapshot) && this.shardId.equals(that.shardId) && this.updatedState == that.updatedState; + } + + + @Override + public int hashCode() { + return Objects.hash(snapshot, shardId, updatedState); } } /** - * Updates the shard status on master node + * Updates the shard status in the cluster state * - * @param request update shard status request + * @param update shard snapshot status update */ - private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request, - ActionListener listener) { - logger.trace("received updated snapshot restore state [{}]", request); + private void innerUpdateSnapshotState(ShardSnapshotUpdate update, ActionListener listener) { + logger.trace("received updated snapshot restore state [{}]", update); clusterService.submitStateUpdateTask( "update snapshot state", - request, + update, ClusterStateTaskConfig.build(Priority.NORMAL), - snapshotStateExecutor, + SHARD_STATE_EXECUTOR, new ClusterStateTaskListener() { @Override public void onFailure(String source, Exception e) { @@ -2556,13 +2585,13 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { try { - listener.onResponse(new UpdateIndexShardSnapshotStatusResponse()); + listener.onResponse(null); } finally { // Maybe this state update completed the snapshot. If we are not already ending it because of a concurrent // state update we check if its state is completed and end it if it is. - if (endingSnapshots.contains(request.snapshot()) == false) { + if (endingSnapshots.contains(update.snapshot) == false) { final SnapshotsInProgress snapshotsInProgress = newState.custom(SnapshotsInProgress.TYPE); - final SnapshotsInProgress.Entry updatedEntry = snapshotsInProgress.snapshot(request.snapshot()); + final SnapshotsInProgress.Entry updatedEntry = snapshotsInProgress.snapshot(update.snapshot); // If the entry is still in the cluster state and is completed, try finalizing the snapshot in the repo if (updatedEntry != null && updatedEntry.state().completed()) { endSnapshot(updatedEntry, newState.metadata(), null); @@ -2590,13 +2619,14 @@ protected String executor() { @Override protected UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException { - return new UpdateIndexShardSnapshotStatusResponse(in); + return UpdateIndexShardSnapshotStatusResponse.INSTANCE; } @Override protected void masterOperation(UpdateIndexShardSnapshotStatusRequest request, ClusterState state, ActionListener listener) throws Exception { - innerUpdateSnapshotState(request, listener); + innerUpdateSnapshotState(new ShardSnapshotUpdate(request.snapshot(), request.shardId(), request.status()), + ActionListener.delegateFailure(listener, (l, v) -> l.onResponse(UpdateIndexShardSnapshotStatusResponse.INSTANCE))); } @Override diff --git a/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusResponse.java b/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusResponse.java index 6ee95f87498f6..92f477074290d 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusResponse.java +++ b/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusResponse.java @@ -19,18 +19,15 @@ package org.elasticsearch.snapshots; import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; class UpdateIndexShardSnapshotStatusResponse extends ActionResponse { - UpdateIndexShardSnapshotStatusResponse() {} + public static final UpdateIndexShardSnapshotStatusResponse INSTANCE = new UpdateIndexShardSnapshotStatusResponse(); - UpdateIndexShardSnapshotStatusResponse(StreamInput in) throws IOException { - super(in); - } + private UpdateIndexShardSnapshotStatusResponse() {} @Override public void writeTo(StreamOutput out) throws IOException {}