Skip to content

Commit

Permalink
Prepare Snapshot Shard State Update Logic For Clone Logic (elastic#62617
Browse files Browse the repository at this point in the history
)

Small refactoring to shorten the diff with the clone logic in elastic#61839:

* Since clones will create a different kind of shard state update that
isn't the same request sent by the snapshot shards service (and cannot be
the same request because we have no `ShardId`) base the shard state updates
on a different class that can be extended to be general enough to accomodate
shard clones as well.
* Make the update executor a singleton (can't make it an inline lambda as that
would break CS update batching because the executor is used as a map key but
this change still makes it crystal clear that there's no internal state to the
executor)
* Make shard state update responses a singleton (can't use TransportResponse.Empty because
we need an action response but still it makes it clear that there's no actual
response with content here)
  • Loading branch information
original-brownbear committed Oct 5, 2020
1 parent 83fcaf4 commit 35a5158
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,8 @@ public void onFailure(Exception e) {
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, req,
new TransportResponseHandler<UpdateIndexShardSnapshotStatusResponse>() {
@Override
public UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException {
return new UpdateIndexShardSnapshotStatusResponse(in);
public UpdateIndexShardSnapshotStatusResponse read(StreamInput in) {
return UpdateIndexShardSnapshotStatusResponse.INSTANCE;
}

@Override
Expand Down
194 changes: 112 additions & 82 deletions server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -165,7 +166,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
// Set of snapshots that are currently being ended by this node
private final Set<Snapshot> endingSnapshots = Collections.synchronizedSet(new HashSet<>());

private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor();
private final UpdateSnapshotStatusAction updateSnapshotStatusHandler;

private final TransportService transportService;
Expand Down Expand Up @@ -2452,101 +2452,130 @@ public boolean assertAllListenersResolved() {
return true;
}

private static class SnapshotStateExecutor implements ClusterStateTaskExecutor<UpdateIndexShardSnapshotStatusRequest> {

@Override
public ClusterTasksResult<UpdateIndexShardSnapshotStatusRequest>
execute(ClusterState currentState, List<UpdateIndexShardSnapshotStatusRequest> tasks) {
int changedCount = 0;
int startedCount = 0;
final List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
// Tasks to check for updates for running snapshots.
final List<UpdateIndexShardSnapshotStatusRequest> unconsumedTasks = new ArrayList<>(tasks);
// Tasks that were used to complete an existing in-progress shard snapshot
final Set<UpdateIndexShardSnapshotStatusRequest> executedTasks = new HashSet<>();
for (SnapshotsInProgress.Entry entry : currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) {
if (entry.state().completed()) {
entries.add(entry);
private static final ClusterStateTaskExecutor<ShardSnapshotUpdate> SHARD_STATE_EXECUTOR = (currentState, tasks) -> {
int changedCount = 0;
int startedCount = 0;
final List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
// Tasks to check for updates for running snapshots.
final List<ShardSnapshotUpdate> unconsumedTasks = new ArrayList<>(tasks);
// Tasks that were used to complete an existing in-progress shard snapshot
final Set<ShardSnapshotUpdate> executedTasks = new HashSet<>();
for (SnapshotsInProgress.Entry entry : currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) {
if (entry.state().completed()) {
entries.add(entry);
continue;
}
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = null;
for (Iterator<ShardSnapshotUpdate> iterator = unconsumedTasks.iterator(); iterator.hasNext(); ) {
final ShardSnapshotUpdate updateSnapshotState = iterator.next();
final Snapshot updatedSnapshot = updateSnapshotState.snapshot;
final String updatedRepository = updatedSnapshot.getRepository();
if (entry.repository().equals(updatedRepository) == false) {
continue;
}
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = null;
for (Iterator<UpdateIndexShardSnapshotStatusRequest> iterator = unconsumedTasks.iterator(); iterator.hasNext(); ) {
final UpdateIndexShardSnapshotStatusRequest updateSnapshotState = iterator.next();
final Snapshot updatedSnapshot = updateSnapshotState.snapshot();
final String updatedRepository = updatedSnapshot.getRepository();
if (entry.repository().equals(updatedRepository) == false) {
final ShardId finishedShardId = updateSnapshotState.shardId;
if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) {
final ShardSnapshotStatus existing = entry.shards().get(finishedShardId);
if (existing == null) {
logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]",
updateSnapshotState, entry);
assert false : "This should never happen, data nodes should only send updates for expected shards";
continue;
}
final ShardId finishedShardId = updateSnapshotState.shardId();
if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) {
final ShardSnapshotStatus existing = entry.shards().get(finishedShardId);
if (existing == null) {
logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]",
updateSnapshotState, entry);
assert false : "This should never happen, data nodes should only send updates for expected shards";
continue;
}
if (existing.state().completed()) {
// No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
iterator.remove();
continue;
}
logger.trace("[{}] Updating shard [{}] with status [{}]", updatedSnapshot,
finishedShardId, updateSnapshotState.status().state());
if (shards == null) {
shards = ImmutableOpenMap.builder(entry.shards());
}
shards.put(finishedShardId, updateSnapshotState.status());
executedTasks.add(updateSnapshotState);
changedCount++;
} else if (executedTasks.contains(updateSnapshotState)) {
// tasks that completed a shard might allow starting a new shard snapshot for the current snapshot
final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId);
if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
continue;
}
if (shards == null) {
shards = ImmutableOpenMap.builder(entry.shards());
}
final ShardSnapshotStatus finishedStatus = updateSnapshotState.status();
logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId,
finishedStatus.nodeId(), finishedStatus.generation());
shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation()));
if (existing.state().completed()) {
// No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
iterator.remove();
startedCount++;
continue;
}
logger.trace("[{}] Updating shard [{}] with status [{}]", updatedSnapshot,
finishedShardId, updateSnapshotState.updatedState.state());
if (shards == null) {
shards = ImmutableOpenMap.builder(entry.shards());
}
shards.put(finishedShardId, updateSnapshotState.updatedState);
executedTasks.add(updateSnapshotState);
changedCount++;
} else if (executedTasks.contains(updateSnapshotState)) {
// tasks that completed a shard might allow starting a new shard snapshot for the current snapshot
final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId);
if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
continue;
}
if (shards == null) {
shards = ImmutableOpenMap.builder(entry.shards());
}
final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId,
finishedStatus.nodeId(), finishedStatus.generation());
shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation()));
iterator.remove();
startedCount++;
}
}

if (shards == null) {
entries.add(entry);
} else {
entries.add(entry.withShardStates(shards.build()));
}
if (shards == null) {
entries.add(entry);
} else {
entries.add(entry.withShardStates(shards.build()));
}
if (changedCount > 0) {
logger.trace("changed cluster state triggered by [{}] snapshot state updates and resulted in starting " +
"[{}] shard snapshots", changedCount, startedCount);
return ClusterTasksResult.<UpdateIndexShardSnapshotStatusRequest>builder().successes(tasks)
.build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
SnapshotsInProgress.of(entries)).build());
}
if (changedCount > 0) {
logger.trace("changed cluster state triggered by [{}] snapshot state updates and resulted in starting " +
"[{}] shard snapshots", changedCount, startedCount);
return ClusterStateTaskExecutor.ClusterTasksResult.<ShardSnapshotUpdate>builder().successes(tasks).build(
ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build());
}
return ClusterStateTaskExecutor.ClusterTasksResult.<ShardSnapshotUpdate>builder().successes(tasks).build(currentState);
};

/**
* An update to the snapshot state of a shard.
*/
private static final class ShardSnapshotUpdate {

private final Snapshot snapshot;

private final ShardId shardId;

private final ShardSnapshotStatus updatedState;

private ShardSnapshotUpdate(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus updatedState) {
this.snapshot = snapshot;
this.shardId = shardId;
this.updatedState = updatedState;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
return ClusterTasksResult.<UpdateIndexShardSnapshotStatusRequest>builder().successes(tasks).build(currentState);
if ((other instanceof ShardSnapshotUpdate) == false) {
return false;
}
final ShardSnapshotUpdate that = (ShardSnapshotUpdate) other;
return this.snapshot.equals(that.snapshot) && this.shardId.equals(that.shardId) && this.updatedState == that.updatedState;
}


@Override
public int hashCode() {
return Objects.hash(snapshot, shardId, updatedState);
}
}

/**
* Updates the shard status on master node
* Updates the shard status in the cluster state
*
* @param request update shard status request
* @param update shard snapshot status update
*/
private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request,
ActionListener<UpdateIndexShardSnapshotStatusResponse> listener) {
logger.trace("received updated snapshot restore state [{}]", request);
private void innerUpdateSnapshotState(ShardSnapshotUpdate update, ActionListener<Void> listener) {
logger.trace("received updated snapshot restore state [{}]", update);
clusterService.submitStateUpdateTask(
"update snapshot state",
request,
update,
ClusterStateTaskConfig.build(Priority.NORMAL),
snapshotStateExecutor,
SHARD_STATE_EXECUTOR,
new ClusterStateTaskListener() {
@Override
public void onFailure(String source, Exception e) {
Expand All @@ -2556,13 +2585,13 @@ public void onFailure(String source, Exception e) {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
try {
listener.onResponse(new UpdateIndexShardSnapshotStatusResponse());
listener.onResponse(null);
} finally {
// Maybe this state update completed the snapshot. If we are not already ending it because of a concurrent
// state update we check if its state is completed and end it if it is.
if (endingSnapshots.contains(request.snapshot()) == false) {
if (endingSnapshots.contains(update.snapshot) == false) {
final SnapshotsInProgress snapshotsInProgress = newState.custom(SnapshotsInProgress.TYPE);
final SnapshotsInProgress.Entry updatedEntry = snapshotsInProgress.snapshot(request.snapshot());
final SnapshotsInProgress.Entry updatedEntry = snapshotsInProgress.snapshot(update.snapshot);
// If the entry is still in the cluster state and is completed, try finalizing the snapshot in the repo
if (updatedEntry != null && updatedEntry.state().completed()) {
endSnapshot(updatedEntry, newState.metadata(), null);
Expand Down Expand Up @@ -2590,13 +2619,14 @@ protected String executor() {

@Override
protected UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException {
return new UpdateIndexShardSnapshotStatusResponse(in);
return UpdateIndexShardSnapshotStatusResponse.INSTANCE;
}

@Override
protected void masterOperation(UpdateIndexShardSnapshotStatusRequest request, ClusterState state,
ActionListener<UpdateIndexShardSnapshotStatusResponse> listener) throws Exception {
innerUpdateSnapshotState(request, listener);
innerUpdateSnapshotState(new ShardSnapshotUpdate(request.snapshot(), request.shardId(), request.status()),
ActionListener.delegateFailure(listener, (l, v) -> l.onResponse(UpdateIndexShardSnapshotStatusResponse.INSTANCE)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@
package org.elasticsearch.snapshots;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

class UpdateIndexShardSnapshotStatusResponse extends ActionResponse {

UpdateIndexShardSnapshotStatusResponse() {}
public static final UpdateIndexShardSnapshotStatusResponse INSTANCE = new UpdateIndexShardSnapshotStatusResponse();

UpdateIndexShardSnapshotStatusResponse(StreamInput in) throws IOException {
super(in);
}
private UpdateIndexShardSnapshotStatusResponse() {}

@Override
public void writeTo(StreamOutput out) throws IOException {}
Expand Down

0 comments on commit 35a5158

Please sign in to comment.