Skip to content

Commit

Permalink
SNAPSHOTS: Allow Parallel Restore Operations (#36397) (#36659)
Browse files Browse the repository at this point in the history
* Enable parallel restore operations
* Add uuid to restore in progress entries to uniquely identify them
* Adjust restore in progress entries to be a map in cluster state
* Added tests for:
   * Parallel restore from two different snapshots
   * Parallel restore from a single snapshot to different indices to test uuid identifiers are correctly used by `RestoreService` and routing allocator
   * Parallel restore with waiting for completion to test transport actions correctly use uuid identifiers
  • Loading branch information
original-brownbear authored Dec 17, 2018
1 parent 436336e commit e832d87
Show file tree
Hide file tree
Showing 24 changed files with 376 additions and 141 deletions.
3 changes: 2 additions & 1 deletion docs/reference/indices/recovery.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ Response:
"repository" : "my_repository",
"snapshot" : "my_snapshot",
"index" : "index1",
"version" : "{version}"
"version" : "{version}",
"restoreUUID": "PDh1ZAOaRbiGIVtCvZOMww"
},
"target" : {
"id" : "ryqJ5lO5S4-lSFbGntkEkg",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,13 @@ protected void masterOperation(final RestoreSnapshotRequest request, final Clust
public void onResponse(RestoreCompletionResponse restoreCompletionResponse) {
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
final Snapshot snapshot = restoreCompletionResponse.getSnapshot();
String uuid = restoreCompletionResponse.getUuid();

ClusterStateListener clusterStateListener = new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent changedEvent) {
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), snapshot);
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), snapshot);
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
if (prevEntry == null) {
// When there is a master failure after a restore has been started, this listener might not be registered
// on the current master and as such it might miss some intermediary cluster states due to batching.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public ClusterModule(Settings settings, ClusterService clusterService, List<Clus
public static Map<String, Supplier<ClusterState.Custom>> getClusterStateCustomSuppliers(List<ClusterPlugin> clusterPlugins) {
final Map<String, Supplier<ClusterState.Custom>> customSupplier = new HashMap<>();
customSupplier.put(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress::new);
customSupplier.put(RestoreInProgress.TYPE, RestoreInProgress::new);
customSupplier.put(RestoreInProgress.TYPE, () -> new RestoreInProgress.Builder().build());
customSupplier.put(SnapshotsInProgress.TYPE, SnapshotsInProgress::new);
for (ClusterPlugin plugin : clusterPlugins) {
Map<String, Supplier<ClusterState.Custom>> initialCustomSupplier = plugin.getInitialClusterStateCustomSupplier();
Expand Down
113 changes: 81 additions & 32 deletions server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.cluster;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState.Custom;
Expand All @@ -32,36 +33,33 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.UUID;

/**
* Meta data about restore processes that are currently executing
*/
public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements Custom {
public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements Custom, Iterable<RestoreInProgress.Entry> {

/**
* Fallback UUID used for restore operations that were started before v7.0 and don't have a uuid in the cluster state.
*/
public static final String BWC_UUID = new UUID(0, 0).toString();

public static final String TYPE = "restore";

private final List<Entry> entries;
private final ImmutableOpenMap<String, Entry> entries;

/**
* Constructs new restore metadata
*
* @param entries list of currently running restore processes
* @param entries map of currently running restore processes keyed by their restore uuid
*/
public RestoreInProgress(Entry... entries) {
this.entries = Arrays.asList(entries);
}

/**
* Returns list of currently running restore processes
*
* @return list of currently running restore processes
*/
public List<Entry> entries() {
return this.entries;
private RestoreInProgress(ImmutableOpenMap<String, Entry> entries) {
this.entries = entries;
}

@Override
Expand All @@ -83,20 +81,48 @@ public int hashCode() {

@Override
public String toString() {
StringBuilder builder = new StringBuilder("RestoreInProgress[");
for (int i = 0; i < entries.size(); i++) {
builder.append(entries.get(i).snapshot().getSnapshotId().getName());
if (i + 1 < entries.size()) {
builder.append(",");
}
return new StringBuilder("RestoreInProgress[").append(entries).append("]").toString();
}

public Entry get(String restoreUUID) {
return entries.get(restoreUUID);
}

public boolean isEmpty() {
return entries.isEmpty();
}

@Override
public Iterator<Entry> iterator() {
return entries.valuesIt();
}

public static final class Builder {

private final ImmutableOpenMap.Builder<String, Entry> entries = ImmutableOpenMap.builder();

public Builder() {
}

public Builder(RestoreInProgress restoreInProgress) {
entries.putAll(restoreInProgress.entries);
}

public Builder add(Entry entry) {
entries.put(entry.uuid, entry);
return this;
}

public RestoreInProgress build() {
return new RestoreInProgress(entries.build());
}
return builder.append("]").toString();
}

/**
* Restore metadata
*/
public static class Entry {
private final String uuid;
private final State state;
private final Snapshot snapshot;
private final ImmutableOpenMap<ShardId, ShardRestoreStatus> shards;
Expand All @@ -105,12 +131,14 @@ public static class Entry {
/**
* Creates new restore metadata
*
* @param uuid uuid of the restore
* @param snapshot snapshot
* @param state current state of the restore process
* @param indices list of indices being restored
* @param shards map of shards being restored to their current restore status
*/
public Entry(Snapshot snapshot, State state, List<String> indices, ImmutableOpenMap<ShardId, ShardRestoreStatus> shards) {
public Entry(String uuid, Snapshot snapshot, State state, List<String> indices,
ImmutableOpenMap<ShardId, ShardRestoreStatus> shards) {
this.snapshot = Objects.requireNonNull(snapshot);
this.state = Objects.requireNonNull(state);
this.indices = Objects.requireNonNull(indices);
Expand All @@ -119,6 +147,15 @@ public Entry(Snapshot snapshot, State state, List<String> indices, ImmutableOpen
} else {
this.shards = shards;
}
this.uuid = Objects.requireNonNull(uuid);
}

/**
* Returns restore uuid
* @return restore uuid
*/
public String uuid() {
return uuid;
}

/**
Expand Down Expand Up @@ -166,15 +203,16 @@ public boolean equals(Object o) {
return false;
}
@SuppressWarnings("unchecked") Entry entry = (Entry) o;
return snapshot.equals(entry.snapshot) &&
return uuid.equals(entry.uuid) &&
snapshot.equals(entry.snapshot) &&
state == entry.state &&
indices.equals(entry.indices) &&
shards.equals(entry.shards);
}

@Override
public int hashCode() {
return Objects.hash(snapshot, state, indices, shards);
return Objects.hash(uuid, snapshot, state, indices, shards);
}
}

Expand Down Expand Up @@ -393,8 +431,15 @@ public static NamedDiff<Custom> readDiffFrom(StreamInput in) throws IOException
}

public RestoreInProgress(StreamInput in) throws IOException {
Entry[] entries = new Entry[in.readVInt()];
for (int i = 0; i < entries.length; i++) {
int count = in.readVInt();
final ImmutableOpenMap.Builder<String, Entry> entriesBuilder = ImmutableOpenMap.builder(count);
for (int i = 0; i < count; i++) {
final String uuid;
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
uuid = in.readString();
} else {
uuid = BWC_UUID;
}
Snapshot snapshot = new Snapshot(in);
State state = State.fromValue(in.readByte());
int indices = in.readVInt();
Expand All @@ -409,9 +454,9 @@ public RestoreInProgress(StreamInput in) throws IOException {
ShardRestoreStatus shardState = ShardRestoreStatus.readShardRestoreStatus(in);
builder.put(shardId, shardState);
}
entries[i] = new Entry(snapshot, state, Collections.unmodifiableList(indexBuilder), builder.build());
entriesBuilder.put(uuid, new Entry(uuid, snapshot, state, Collections.unmodifiableList(indexBuilder), builder.build()));
}
this.entries = Arrays.asList(entries);
this.entries = entriesBuilder.build();
}

/**
Expand All @@ -420,7 +465,11 @@ public RestoreInProgress(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(entries.size());
for (Entry entry : entries) {
for (ObjectCursor<Entry> v : entries.values()) {
Entry entry = v.value;
if (out.getVersion().onOrAfter(Version.V_6_6_0)) {
out.writeString(entry.uuid);
}
entry.snapshot().writeTo(out);
out.writeByte(entry.state().value());
out.writeVInt(entry.indices().size());
Expand All @@ -441,8 +490,8 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startArray("snapshots");
for (Entry entry : entries) {
toXContent(entry, builder, params);
for (ObjectCursor<Entry> entry : entries.values()) {
toXContent(entry.value, builder, params);
}
builder.endArray();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.cluster.routing;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -208,22 +209,33 @@ public String toString() {
* recovery from a snapshot
*/
public static class SnapshotRecoverySource extends RecoverySource {
private final String restoreUUID;
private final Snapshot snapshot;
private final String index;
private final Version version;

public SnapshotRecoverySource(Snapshot snapshot, Version version, String index) {
public SnapshotRecoverySource(String restoreUUID, Snapshot snapshot, Version version, String index) {
this.restoreUUID = restoreUUID;
this.snapshot = Objects.requireNonNull(snapshot);
this.version = Objects.requireNonNull(version);
this.index = Objects.requireNonNull(index);
}

SnapshotRecoverySource(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
restoreUUID = in.readString();
} else {
restoreUUID = RestoreInProgress.BWC_UUID;
}
snapshot = new Snapshot(in);
version = Version.readVersion(in);
index = in.readString();
}

public String restoreUUID() {
return restoreUUID;
}

public Snapshot snapshot() {
return snapshot;
}
Expand All @@ -238,6 +250,9 @@ public Version version() {

@Override
protected void writeAdditionalFields(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_6_6_0)) {
out.writeString(restoreUUID);
}
snapshot.writeTo(out);
Version.writeVersion(version, out);
out.writeString(index);
Expand All @@ -253,12 +268,13 @@ public void addAdditionalFields(XContentBuilder builder, ToXContent.Params param
builder.field("repository", snapshot.getRepository())
.field("snapshot", snapshot.getSnapshotId().getName())
.field("version", version.toString())
.field("index", index);
.field("index", index)
.field("restoreUUID", restoreUUID);
}

@Override
public String toString() {
return "snapshot recovery from " + snapshot.toString();
return "snapshot recovery [" + restoreUUID + "] from " + snapshot;
}

@Override
Expand All @@ -271,12 +287,13 @@ public boolean equals(Object o) {
}

@SuppressWarnings("unchecked") SnapshotRecoverySource that = (SnapshotRecoverySource) o;
return snapshot.equals(that.snapshot) && index.equals(that.index) && version.equals(that.version);
return restoreUUID.equals(that.restoreUUID) && snapshot.equals(that.snapshot)
&& index.equals(that.index) && version.equals(that.version);
}

@Override
public int hashCode() {
return Objects.hash(snapshot, index, version);
return Objects.hash(restoreUUID, snapshot, index, version);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.snapshots.Snapshot;

/**
* This {@link AllocationDecider} prevents shards that have failed to be
Expand All @@ -46,25 +45,24 @@ public Decision canAllocate(final ShardRouting shardRouting, final RoutingAlloca
return allocation.decision(Decision.YES, NAME, "ignored as shard is not being recovered from a snapshot");
}

final Snapshot snapshot = ((RecoverySource.SnapshotRecoverySource) recoverySource).snapshot();
RecoverySource.SnapshotRecoverySource source = (RecoverySource.SnapshotRecoverySource) recoverySource;
final RestoreInProgress restoresInProgress = allocation.custom(RestoreInProgress.TYPE);

if (restoresInProgress != null) {
for (RestoreInProgress.Entry restoreInProgress : restoresInProgress.entries()) {
if (restoreInProgress.snapshot().equals(snapshot)) {
RestoreInProgress.ShardRestoreStatus shardRestoreStatus = restoreInProgress.shards().get(shardRouting.shardId());
if (shardRestoreStatus != null && shardRestoreStatus.state().completed() == false) {
assert shardRestoreStatus.state() != RestoreInProgress.State.SUCCESS : "expected shard [" + shardRouting
+ "] to be in initializing state but got [" + shardRestoreStatus.state() + "]";
return allocation.decision(Decision.YES, NAME, "shard is currently being restored");
}
break;
RestoreInProgress.Entry restoreInProgress = restoresInProgress.get(source.restoreUUID());
if (restoreInProgress != null) {
RestoreInProgress.ShardRestoreStatus shardRestoreStatus = restoreInProgress.shards().get(shardRouting.shardId());
if (shardRestoreStatus != null && shardRestoreStatus.state().completed() == false) {
assert shardRestoreStatus.state() != RestoreInProgress.State.SUCCESS : "expected shard [" + shardRouting
+ "] to be in initializing state but got [" + shardRestoreStatus.state() + "]";
return allocation.decision(Decision.YES, NAME, "shard is currently being restored");
}
}
}
return allocation.decision(Decision.NO, NAME, "shard has failed to be restored from the snapshot [%s] because of [%s] - " +
"manually close or delete the index [%s] in order to retry to restore the snapshot again or use the reroute API to force the " +
"allocation of an empty primary shard", snapshot, shardRouting.unassignedInfo().getDetails(), shardRouting.getIndexName());
"allocation of an empty primary shard",
source.snapshot(), shardRouting.unassignedInfo().getDetails(), shardRouting.getIndexName());
}

@Override
Expand Down
Loading

0 comments on commit e832d87

Please sign in to comment.