action,
+ String repository, String source, String target) {
+ this(client, action, new CloneSnapshotRequest(repository, source, target, Strings.EMPTY_ARRAY));
+ }
+
+ /**
+ * Sets a list of indices that should be cloned from the source to the target snapshot
+ *
+ * The list of indices supports multi-index syntax. For example: "+test*" ,"-test42" will clone all indices with
+ * prefix "test" except index "test42".
+ *
+ * @return this builder
+ */
+ public CloneSnapshotRequestBuilder setIndices(String... indices) {
+ request.indices(indices);
+ return this;
+ }
+
+ /**
+ * Specifies the indices options. Like what type of requested indices to ignore. For example indices that don't exist.
+ *
+ * @param indicesOptions the desired behaviour regarding indices options
+ * @return this request
+ */
+ public CloneSnapshotRequestBuilder setIndicesOptions(IndicesOptions indicesOptions) {
+ request.indicesOptions(indicesOptions);
+ return this;
+ }
+}
diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/TransportCloneSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/TransportCloneSnapshotAction.java
new file mode 100644
index 0000000000000..a696dcba607a0
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/clone/TransportCloneSnapshotAction.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.cluster.snapshots.clone;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.action.support.master.TransportMasterNodeAction;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.snapshots.SnapshotsService;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+
+import java.io.IOException;
+
+/**
+ * Transport action for the clone snapshot operation.
+ */
+public final class TransportCloneSnapshotAction extends TransportMasterNodeAction {
+
+ private final SnapshotsService snapshotsService;
+
+ @Inject
+ public TransportCloneSnapshotAction(TransportService transportService, ClusterService clusterService,
+ ThreadPool threadPool, SnapshotsService snapshotsService, ActionFilters actionFilters,
+ IndexNameExpressionResolver indexNameExpressionResolver) {
+ super(CloneSnapshotAction.NAME, transportService, clusterService, threadPool, actionFilters, CloneSnapshotRequest::new,
+ indexNameExpressionResolver);
+ this.snapshotsService = snapshotsService;
+ }
+
+ @Override
+ protected String executor() {
+ return ThreadPool.Names.SAME;
+ }
+
+ @Override
+ protected AcknowledgedResponse read(StreamInput in) throws IOException {
+ return new AcknowledgedResponse(in);
+ }
+
+ @Override
+ protected void masterOperation(CloneSnapshotRequest request, ClusterState state, ActionListener listener) {
+ snapshotsService.cloneSnapshot(request, ActionListener.map(listener, v -> new AcknowledgedResponse(true)));
+ }
+
+ @Override
+ protected ClusterBlockException checkBlock(CloneSnapshotRequest request, ClusterState state) {
+ // Cluster is not affected but we look up repositories in metadata
+ return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
+ }
+}
diff --git a/server/src/main/java/org/elasticsearch/client/ClusterAdminClient.java b/server/src/main/java/org/elasticsearch/client/ClusterAdminClient.java
index cd9adabaf2759..4701c67987f9b 100644
--- a/server/src/main/java/org/elasticsearch/client/ClusterAdminClient.java
+++ b/server/src/main/java/org/elasticsearch/client/ClusterAdminClient.java
@@ -71,6 +71,8 @@
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequestBuilder;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
+import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest;
+import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequestBuilder;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequestBuilder;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
@@ -506,7 +508,22 @@ public interface ClusterAdminClient extends ElasticsearchClient {
CreateSnapshotRequestBuilder prepareCreateSnapshot(String repository, String name);
/**
- * Get snapshot.
+ * Clones a snapshot.
+ */
+ CloneSnapshotRequestBuilder prepareCloneSnapshot(String repository, String source, String target);
+
+ /**
+ * Clones a snapshot.
+ */
+ ActionFuture cloneSnapshot(CloneSnapshotRequest request);
+
+ /**
+ * Clones a snapshot.
+ */
+ void cloneSnapshot(CloneSnapshotRequest request, ActionListener listener);
+
+ /**
+ * Get snapshots.
*/
ActionFuture getSnapshots(GetSnapshotsRequest request);
diff --git a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java
index 879c60d065166..12fae90ab950a 100644
--- a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java
+++ b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java
@@ -94,6 +94,9 @@
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequestBuilder;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
+import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotAction;
+import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest;
+import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequestBuilder;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequestBuilder;
@@ -959,6 +962,21 @@ public CreateSnapshotRequestBuilder prepareCreateSnapshot(String repository, Str
return new CreateSnapshotRequestBuilder(this, CreateSnapshotAction.INSTANCE, repository, name);
}
+ @Override
+ public CloneSnapshotRequestBuilder prepareCloneSnapshot(String repository, String source, String target) {
+ return new CloneSnapshotRequestBuilder(this, CloneSnapshotAction.INSTANCE, repository, source, target);
+ }
+
+ @Override
+ public ActionFuture cloneSnapshot(CloneSnapshotRequest request) {
+ return execute(CloneSnapshotAction.INSTANCE, request);
+ }
+
+ @Override
+ public void cloneSnapshot(CloneSnapshotRequest request, ActionListener listener) {
+ execute(CloneSnapshotAction.INSTANCE, request, listener);
+ }
+
@Override
public ActionFuture getSnapshots(GetSnapshotsRequest request) {
return execute(GetSnapshotsAction.INSTANCE, request);
diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java
index 535630196934e..b0e6604b2e4df 100644
--- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java
+++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java
@@ -35,8 +35,10 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
+import org.elasticsearch.repositories.RepositoryShardId;
import org.elasticsearch.repositories.RepositoryOperation;
import org.elasticsearch.snapshots.Snapshot;
+import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotsService;
import java.io.IOException;
@@ -101,11 +103,32 @@ public static Entry startedEntry(Snapshot snapshot, boolean includeGlobalState,
indices, dataStreams, startTime, repositoryStateId, shards, null, userMetadata, version);
}
+ /**
+ * Creates the initial snapshot clone entry
+ *
+ * @param snapshot snapshot to clone into
+ * @param source snapshot to clone from
+ * @param indices indices to clone
+ * @param startTime start time
+ * @param repositoryStateId repository state id that this clone is based on
+ * @param version repository metadata version to write
+ * @return snapshot clone entry
+ */
+ public static Entry startClone(Snapshot snapshot, SnapshotId source, List indices, long startTime,
+ long repositoryStateId, Version version) {
+ return new SnapshotsInProgress.Entry(snapshot, true, false, State.STARTED, indices, Collections.emptyList(),
+ startTime, repositoryStateId, ImmutableOpenMap.of(), null, Collections.emptyMap(), version, source,
+ ImmutableOpenMap.of());
+ }
+
public static class Entry implements Writeable, ToXContent, RepositoryOperation {
private final State state;
private final Snapshot snapshot;
private final boolean includeGlobalState;
private final boolean partial;
+ /**
+ * Map of {@link ShardId} to {@link ShardSnapshotStatus} tracking the state of each shard snapshot operation.
+ */
private final ImmutableOpenMap shards;
private final List indices;
private final List dataStreams;
@@ -113,6 +136,19 @@ public static class Entry implements Writeable, ToXContent, RepositoryOperation
private final long repositoryStateId;
// see #useShardGenerations
private final Version version;
+
+ /**
+ * Source snapshot if this is a clone operation or {@code null} if this is a snapshot.
+ */
+ @Nullable
+ private final SnapshotId source;
+
+ /**
+ * Map of {@link RepositoryShardId} to {@link ShardSnapshotStatus} tracking the state of each shard clone operation in this entry
+ * the same way {@link #shards} tracks the status of each shard snapshot operation in non-clone entries.
+ */
+ private final ImmutableOpenMap clones;
+
@Nullable private final Map userMetadata;
@Nullable private final String failure;
@@ -121,6 +157,15 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta
List dataStreams, long startTime, long repositoryStateId,
ImmutableOpenMap shards, String failure, Map userMetadata,
Version version) {
+ this(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, repositoryStateId, shards, failure,
+ userMetadata, version, null, ImmutableOpenMap.of());
+ }
+
+ private Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices,
+ List dataStreams, long startTime, long repositoryStateId,
+ ImmutableOpenMap shards, String failure, Map userMetadata,
+ Version version, @Nullable SnapshotId source,
+ @Nullable ImmutableOpenMap clones) {
this.state = state;
this.snapshot = snapshot;
this.includeGlobalState = includeGlobalState;
@@ -129,11 +174,18 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta
this.dataStreams = dataStreams;
this.startTime = startTime;
this.shards = shards;
- assert assertShardsConsistent(state, indices, shards);
this.repositoryStateId = repositoryStateId;
this.failure = failure;
this.userMetadata = userMetadata;
this.version = version;
+ this.source = source;
+ if (source == null) {
+ assert clones == null || clones.isEmpty() : "Provided [" + clones + "] but no source";
+ this.clones = ImmutableOpenMap.of();
+ } else {
+ this.clones = clones;
+ }
+ assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones);
}
private Entry(StreamInput in) throws IOException {
@@ -143,7 +195,7 @@ private Entry(StreamInput in) throws IOException {
state = State.fromValue(in.readByte());
indices = in.readList(IndexId::new);
startTime = in.readLong();
- shards = in.readImmutableMap(ShardId::new, ShardSnapshotStatus::new);
+ shards = in.readImmutableMap(ShardId::new, ShardSnapshotStatus::readFrom);
repositoryStateId = in.readLong();
failure = in.readOptionalString();
if (in.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
@@ -166,21 +218,41 @@ private Entry(StreamInput in) throws IOException {
} else {
dataStreams = Collections.emptyList();
}
+ if (in.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) {
+ source = in.readOptionalWriteable(SnapshotId::new);
+ clones = in.readImmutableMap(RepositoryShardId::new, ShardSnapshotStatus::readFrom);
+ } else {
+ source = null;
+ clones = ImmutableOpenMap.of();
+ }
}
- private static boolean assertShardsConsistent(State state, List indices,
- ImmutableOpenMap shards) {
+ private static boolean assertShardsConsistent(SnapshotId source, State state, List indices,
+ ImmutableOpenMap shards,
+ ImmutableOpenMap clones) {
if ((state == State.INIT || state == State.ABORTED) && shards.isEmpty()) {
return true;
}
final Set indexNames = indices.stream().map(IndexId::getName).collect(Collectors.toSet());
final Set indexNamesInShards = new HashSet<>();
- shards.keysIt().forEachRemaining(s -> indexNamesInShards.add(s.getIndexName()));
- assert indexNames.equals(indexNamesInShards)
+ shards.iterator().forEachRemaining(s -> {
+ indexNamesInShards.add(s.key.getIndexName());
+ assert source == null || s.value.nodeId == null :
+ "Shard snapshot must not be assigned to data node when copying from snapshot [" + source + "]";
+ });
+ assert source == null || indexNames.isEmpty() == false : "No empty snapshot clones allowed";
+ assert source != null || indexNames.equals(indexNamesInShards)
: "Indices in shards " + indexNamesInShards + " differ from expected indices " + indexNames + " for state [" + state + "]";
- final boolean shardsCompleted = completed(shards.values());
- assert (state.completed() && shardsCompleted) || (state.completed() == false && shardsCompleted == false)
- : "Completed state must imply all shards completed but saw state [" + state + "] and shards " + shards;
+ final boolean shardsCompleted = completed(shards.values()) && completed(clones.values());
+ // Check state consistency for normal snapshots and started clone operations
+ if (source == null || clones.isEmpty() == false) {
+ assert (state.completed() && shardsCompleted) || (state.completed() == false && shardsCompleted == false)
+ : "Completed state must imply all shards completed but saw state [" + state + "] and shards " + shards;
+ }
+ if (source != null && state.completed()) {
+ assert hasFailures(clones) == false || state == State.FAILED
+ : "Failed shard clones in [" + clones + "] but state was [" + state + "]";
+ }
return true;
}
@@ -201,7 +273,17 @@ public Entry withRepoGen(long newRepoGen) {
assert newRepoGen > repositoryStateId : "Updated repository generation [" + newRepoGen
+ "] must be higher than current generation [" + repositoryStateId + "]";
return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, newRepoGen, shards, failure,
- userMetadata, version);
+ userMetadata, version, source, clones);
+ }
+
+ public Entry withClones(ImmutableOpenMap updatedClones) {
+ if (updatedClones.equals(clones)) {
+ return this;
+ }
+ return new Entry(snapshot, includeGlobalState, partial,
+ completed(updatedClones.values()) ? (hasFailures(updatedClones) ? State.FAILED : State.SUCCESS) :
+ state, indices, dataStreams, startTime, repositoryStateId, shards, failure, userMetadata, version, source,
+ updatedClones);
}
/**
@@ -230,7 +312,7 @@ public Entry abort() {
public Entry fail(ImmutableOpenMap shards, State state, String failure) {
return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, repositoryStateId, shards,
- failure, userMetadata, version);
+ failure, userMetadata, version, source, clones);
}
/**
@@ -318,6 +400,19 @@ public Version version() {
return version;
}
+ @Nullable
+ public SnapshotId source() {
+ return source;
+ }
+
+ public boolean isClone() {
+ return source != null;
+ }
+
+ public ImmutableOpenMap clones() {
+ return clones;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -334,6 +429,8 @@ public boolean equals(Object o) {
if (state != entry.state) return false;
if (repositoryStateId != entry.repositoryStateId) return false;
if (version.equals(entry.version) == false) return false;
+ if (Objects.equals(source, ((Entry) o).source) == false) return false;
+ if (clones.equals(((Entry) o).clones) == false) return false;
return true;
}
@@ -349,6 +446,8 @@ public int hashCode() {
result = 31 * result + Long.hashCode(startTime);
result = 31 * result + Long.hashCode(repositoryStateId);
result = 31 * result + version.hashCode();
+ result = 31 * result + (source == null ? 0 : source.hashCode());
+ result = 31 * result + clones.hashCode();
return result;
}
@@ -418,6 +517,10 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) {
out.writeStringCollection(dataStreams);
}
+ if (out.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) {
+ out.writeOptionalWriteable(source);
+ out.writeMap(clones);
+ }
}
@Override
@@ -441,6 +544,15 @@ public static boolean completed(ObjectContainer shards) {
return true;
}
+ private static boolean hasFailures(ImmutableOpenMap clones) {
+ for (ObjectCursor value : clones.values()) {
+ if (value.value.state().failed()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
public static class ShardSnapshotStatus implements Writeable {
/**
@@ -490,15 +602,20 @@ private boolean assertConsistent() {
return true;
}
- public ShardSnapshotStatus(StreamInput in) throws IOException {
- nodeId = in.readOptionalString();
- state = ShardState.fromValue(in.readByte());
+ public static ShardSnapshotStatus readFrom(StreamInput in) throws IOException {
+ String nodeId = in.readOptionalString();
+ final ShardState state = ShardState.fromValue(in.readByte());
+ final String generation;
if (SnapshotsService.useShardGenerations(in.getVersion())) {
generation = in.readOptionalString();
} else {
generation = null;
}
- reason = in.readOptionalString();
+ final String reason = in.readOptionalString();
+ if (state == ShardState.QUEUED) {
+ return UNASSIGNED_QUEUED;
+ }
+ return new ShardSnapshotStatus(nodeId, state, reason, generation);
}
public ShardState state() {
diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestCloneSnapshotAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestCloneSnapshotAction.java
new file mode 100644
index 0000000000000..f22f1d95aefbd
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestCloneSnapshotAction.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.rest.action.admin.cluster;
+
+import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.common.xcontent.support.XContentMapValues;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestToXContentListener;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.rest.RestRequest.Method.PUT;
+
+/**
+ * Clones indices from one snapshot into another snapshot in the same repository
+ */
+public class RestCloneSnapshotAction extends BaseRestHandler {
+
+ @Override
+ public List routes() {
+ return Collections.singletonList(new Route(PUT, "/_snapshot/{repository}/{snapshot}/_clone/{target_snapshot}"));
+ }
+
+ @Override
+ public String getName() {
+ return "clone_snapshot_action";
+ }
+
+ @Override
+ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
+ final Map source = request.contentParser().map();
+ final CloneSnapshotRequest cloneSnapshotRequest = new CloneSnapshotRequest(
+ request.param("repository"), request.param("snapshot"), request.param("target_snapshot"),
+ XContentMapValues.nodeStringArrayValue(source.getOrDefault("indices", Collections.emptyList())));
+ cloneSnapshotRequest.masterNodeTimeout(request.paramAsTime("master_timeout", cloneSnapshotRequest.masterNodeTimeout()));
+ cloneSnapshotRequest.indicesOptions(IndicesOptions.fromMap(source, cloneSnapshotRequest.indicesOptions()));
+ return channel -> client.admin().cluster().cloneSnapshot(cloneSnapshotRequest, new RestToXContentListener<>(channel));
+ }
+}
diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java
index baf8e1fc457a8..5d8d41b1ad1fc 100644
--- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java
+++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java
@@ -924,7 +924,7 @@ private static void validateSnapshotRestorable(final String repository, final Sn
}
}
- private static boolean failed(SnapshotInfo snapshot, String index) {
+ public static boolean failed(SnapshotInfo snapshot, String index) {
for (SnapshotShardFailure failure : snapshot.shardFailures()) {
if (index.equals(failure.index())) {
return true;
diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
index 2ed1de343885d..810527de15230 100644
--- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
+++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
@@ -197,6 +197,10 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) {
final String localNodeId = clusterService.localNode().getId();
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
final State entryState = entry.state();
+ if (entry.isClone()) {
+ // This is a snapshot clone, it will be executed on the current master
+ continue;
+ }
if (entryState == State.STARTED) {
Map startedShards = null;
final Snapshot snapshot = entry.snapshot();
diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
index eb02d069fbde5..6e667f0dab39c 100644
--- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
+++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
@@ -28,9 +28,12 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
+import org.elasticsearch.action.ActionRunnable;
+import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
@@ -44,6 +47,7 @@
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
+import org.elasticsearch.repositories.RepositoryShardId;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
@@ -105,6 +109,8 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -128,6 +134,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
public static final Version FULL_CONCURRENCY_VERSION = Version.V_7_9_0;
+ public static final Version CLONE_SNAPSHOT_VERSION = Version.V_7_10_0;
+
public static final Version SHARD_GEN_IN_REPO_DATA_VERSION = Version.V_7_6_0;
public static final Version INDEX_GEN_IN_REPO_DATA_VERSION = Version.V_7_9_0;
@@ -166,6 +174,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
// Set of snapshots that are currently being ended by this node
private final Set endingSnapshots = Collections.synchronizedSet(new HashSet<>());
+ // Set of currently initializing clone operations
+ private final Set initializingClones = Collections.synchronizedSet(new HashSet<>());
+
private final UpdateSnapshotStatusAction updateSnapshotStatusHandler;
private final TransportService transportService;
@@ -363,20 +374,10 @@ public void createSnapshot(final CreateSnapshotRequest request, final ActionList
@Override
public ClusterState execute(ClusterState currentState) {
- // check if the snapshot name already exists in the repository
- if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
- throw new InvalidSnapshotNameException(
- repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists");
- }
+ ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository);
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
final List runningSnapshots = snapshots.entries();
- if (runningSnapshots.stream().anyMatch(s -> {
- final Snapshot running = s.snapshot();
- return running.getRepository().equals(repositoryName) && running.getSnapshotId().getName().equals(snapshotName);
- })) {
- throw new InvalidSnapshotNameException(
- repository.getMetadata().name(), snapshotName, "snapshot with the same name is already in-progress");
- }
+ ensureSnapshotNameNotRunning(runningSnapshots, repositoryName, snapshotName);
validate(repositoryName, snapshotName, currentState);
final boolean concurrentOperationsAllowed = currentState.nodes().getMinNodeVersion().onOrAfter(FULL_CONCURRENCY_VERSION);
final SnapshotDeletionsInProgress deletionsInProgress =
@@ -397,6 +398,7 @@ public ClusterState execute(ClusterState currentState) {
if (concurrentOperationsAllowed == false && runningSnapshots.stream().anyMatch(entry -> entry.state() != State.INIT)) {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running");
}
+ ensureNoCleanupInProgress(currentState, repositoryName, snapshotName);
ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshots, deletionsInProgress);
// Store newSnapshot here to be processed in clusterStateProcessed
List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request));
@@ -407,9 +409,7 @@ public ClusterState execute(ClusterState currentState) {
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
final List indexIds = repositoryData.resolveNewIndices(
- indices, runningSnapshots.stream().filter(entry -> entry.repository().equals(repositoryName))
- .flatMap(entry -> entry.indices().stream()).distinct()
- .collect(Collectors.toMap(IndexId::getName, Function.identity())));
+ indices, getInFlightIndexIds(runningSnapshots, repositoryName));
final Version version = minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null);
ImmutableOpenMap shards = shards(snapshots, deletionsInProgress, currentState.metadata(),
currentState.routingTable(), indexIds, useShardGenerations(version), repositoryData, repositoryName);
@@ -459,6 +459,278 @@ public TimeValue timeout() {
}, "create_snapshot [" + snapshotName + ']', listener::onFailure);
}
+ private static void ensureSnapshotNameNotRunning(List runningSnapshots, String repositoryName,
+ String snapshotName) {
+ if (runningSnapshots.stream().anyMatch(s -> {
+ final Snapshot running = s.snapshot();
+ return running.getRepository().equals(repositoryName) && running.getSnapshotId().getName().equals(snapshotName);
+ })) {
+ throw new InvalidSnapshotNameException(repositoryName, snapshotName, "snapshot with the same name is already in-progress");
+ }
+ }
+
+ private static Map getInFlightIndexIds(List runningSnapshots, String repositoryName) {
+ return runningSnapshots.stream().filter(entry -> entry.repository().equals(repositoryName))
+ .flatMap(entry -> entry.indices().stream()).distinct()
+ .collect(Collectors.toMap(IndexId::getName, Function.identity()));
+ }
+
+ // TODO: It is worth revisiting the design choice of creating a placeholder entry in snapshots-in-progress here once we have a cache
+ // for repository metadata and loading it has predictable performance
+ public void cloneSnapshot(CloneSnapshotRequest request, ActionListener listener) {
+ final String repositoryName = request.repository();
+ Repository repository = repositoriesService.repository(repositoryName);
+ if (repository.isReadOnly()) {
+ listener.onFailure(new RepositoryException(repositoryName, "cannot create snapshot in a readonly repository"));
+ return;
+ }
+ final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.target());
+ validate(repositoryName, snapshotName);
+ final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID());
+ final Snapshot snapshot = new Snapshot(repositoryName, snapshotId);
+ initializingClones.add(snapshot);
+ repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask() {
+
+ private SnapshotsInProgress.Entry newEntry;
+
+ @Override
+ public ClusterState execute(ClusterState currentState) {
+ ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository);
+ ensureNoCleanupInProgress(currentState, repositoryName, snapshotName);
+ final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
+ final List runningSnapshots = snapshots.entries();
+ ensureSnapshotNameNotRunning(runningSnapshots, repositoryName, snapshotName);
+ validate(repositoryName, snapshotName, currentState);
+
+ final SnapshotId sourceSnapshotId = repositoryData.getSnapshotIds()
+ .stream()
+ .filter(src -> src.getName().equals(request.source()))
+ .findAny()
+ .orElseThrow(() -> new SnapshotMissingException(repositoryName, request.source()));
+ final SnapshotDeletionsInProgress deletionsInProgress =
+ currentState.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY);
+ if (deletionsInProgress.getEntries().stream().anyMatch(entry -> entry.getSnapshots().contains(sourceSnapshotId))) {
+ throw new ConcurrentSnapshotExecutionException(repositoryName, sourceSnapshotId.getName(),
+ "cannot clone from snapshot that is being deleted");
+ }
+ ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshots, deletionsInProgress);
+ final List indicesForSnapshot = new ArrayList<>();
+ for (IndexId indexId : repositoryData.getIndices().values()) {
+ if (repositoryData.getSnapshots(indexId).contains(sourceSnapshotId)) {
+ indicesForSnapshot.add(indexId.getName());
+ }
+ }
+ final List matchingIndices =
+ SnapshotUtils.filterIndices(indicesForSnapshot, request.indices(), request.indicesOptions());
+ if (matchingIndices.isEmpty()) {
+ throw new SnapshotException(new Snapshot(repositoryName, sourceSnapshotId),
+ "No indices in the source snapshot [" + sourceSnapshotId + "] matched requested pattern ["
+ + Strings.arrayToCommaDelimitedString(request.indices()) + "]");
+ }
+ newEntry = SnapshotsInProgress.startClone(
+ snapshot, sourceSnapshotId,
+ repositoryData.resolveIndices(matchingIndices),
+ threadPool.absoluteTimeInMillis(), repositoryData.getGenId(),
+ minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null));
+ final List newEntries = new ArrayList<>(runningSnapshots);
+ newEntries.add(newEntry);
+ return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
+ SnapshotsInProgress.of(newEntries)).build();
+ }
+
+ @Override
+ public void onFailure(String source, Exception e) {
+ initializingClones.remove(snapshot);
+ logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to clone snapshot", repositoryName, snapshotName), e);
+ listener.onFailure(e);
+ }
+
+ @Override
+ public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
+ logger.info("snapshot clone [{}] started", snapshot);
+ addListener(snapshot, ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure));
+ startCloning(repository, newEntry);
+ }
+
+ @Override
+ public TimeValue timeout() {
+ initializingClones.remove(snapshot);
+ return request.masterNodeTimeout();
+ }
+ }, "clone_snapshot [" + request.source() + "][" + snapshotName + ']', listener::onFailure);
+ }
+
+ private static void ensureNoCleanupInProgress(ClusterState currentState, String repositoryName, String snapshotName) {
+ final RepositoryCleanupInProgress repositoryCleanupInProgress =
+ currentState.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY);
+ if (repositoryCleanupInProgress.hasCleanupInProgress()) {
+ throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
+ "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
+ }
+ }
+
+ private static void ensureSnapshotNameAvailableInRepo(RepositoryData repositoryData, String snapshotName, Repository repository) {
+ // check if the snapshot name already exists in the repository
+ if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
+ throw new InvalidSnapshotNameException(
+ repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists");
+ }
+ }
+
+ /**
+ * Determine the number of shards in each index of a clone operation and update the cluster state accordingly.
+ *
+ * @param repository repository to run operation on
+ * @param cloneEntry clone operation in the cluster state
+ */
+ private void startCloning(Repository repository, SnapshotsInProgress.Entry cloneEntry) {
+ final List indices = cloneEntry.indices();
+ final SnapshotId sourceSnapshot = cloneEntry.source();
+ final Snapshot targetSnapshot = cloneEntry.snapshot();
+
+ final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
+ // Exception handler for IO exceptions with loading index and repo metadata
+ final Consumer onFailure = e -> {
+ initializingClones.remove(targetSnapshot);
+ logger.info(() -> new ParameterizedMessage("Failed to start snapshot clone [{}]", cloneEntry), e);
+ removeFailedSnapshotFromClusterState(targetSnapshot, e, null, null);
+ };
+
+ // 1. step, load SnapshotInfo to make sure that source snapshot was successful for the indices we want to clone
+ // TODO: we could skip this step for snapshots with state SUCCESS
+ final StepListener snapshotInfoListener = new StepListener<>();
+ executor.execute(ActionRunnable.supply(snapshotInfoListener, () -> repository.getSnapshotInfo(sourceSnapshot)));
+
+ final StepListener>> allShardCountsListener = new StepListener<>();
+ final GroupedActionListener> shardCountListener =
+ new GroupedActionListener<>(allShardCountsListener, indices.size());
+ snapshotInfoListener.whenComplete(snapshotInfo -> {
+ for (IndexId indexId : indices) {
+ if (RestoreService.failed(snapshotInfo, indexId.getName())) {
+ throw new SnapshotException(targetSnapshot, "Can't clone index [" + indexId +
+ "] because its snapshot was not successful.");
+ }
+ }
+ // 2. step, load the number of shards we have in each index to be cloned from the index metadata.
+ repository.getRepositoryData(ActionListener.wrap(repositoryData -> {
+ for (IndexId index : indices) {
+ executor.execute(ActionRunnable.supply(shardCountListener, () -> {
+ final IndexMetadata metadata = repository.getSnapshotIndexMetaData(repositoryData, sourceSnapshot, index);
+ return Tuple.tuple(index, metadata.getNumberOfShards());
+ }));
+ }
+ }, onFailure));
+ }, onFailure);
+
+ // 3. step, we have all the shard counts, now update the cluster state to have clone jobs in the snap entry
+ allShardCountsListener.whenComplete(counts -> repository.executeConsistentStateUpdate(repoData -> new ClusterStateUpdateTask() {
+
+ private SnapshotsInProgress.Entry updatedEntry;
+
+ @Override
+ public ClusterState execute(ClusterState currentState) {
+ final SnapshotsInProgress snapshotsInProgress =
+ currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
+ final List updatedEntries = new ArrayList<>(snapshotsInProgress.entries());
+ boolean changed = false;
+ final String localNodeId = currentState.nodes().getLocalNodeId();
+ final String repoName = cloneEntry.repository();
+ final Map indexIds = getInFlightIndexIds(updatedEntries, repoName);
+ final ShardGenerations shardGenerations = repoData.shardGenerations();
+ for (int i = 0; i < updatedEntries.size(); i++) {
+ if (cloneEntry.equals(updatedEntries.get(i))) {
+ final ImmutableOpenMap.Builder clonesBuilder =
+ ImmutableOpenMap.builder();
+ // TODO: could be optimized by just dealing with repo shard id directly
+ final Set busyShardsInRepo =
+ busyShardsForRepo(repoName, snapshotsInProgress, currentState.metadata())
+ .stream()
+ .map(shardId -> new RepositoryShardId(indexIds.get(shardId.getIndexName()), shardId.getId()))
+ .collect(Collectors.toSet());
+ for (Tuple count : counts) {
+ for (int shardId = 0; shardId < count.v2(); shardId++) {
+ final RepositoryShardId repoShardId = new RepositoryShardId(count.v1(), shardId);
+ if (busyShardsInRepo.contains(repoShardId)) {
+ clonesBuilder.put(repoShardId, ShardSnapshotStatus.UNASSIGNED_QUEUED);
+ } else {
+ clonesBuilder.put(repoShardId,
+ new ShardSnapshotStatus(localNodeId, shardGenerations.getShardGen(count.v1(), shardId)));
+ }
+ }
+ }
+ updatedEntry = cloneEntry.withClones(clonesBuilder.build());
+ updatedEntries.set(i, updatedEntry);
+ changed = true;
+ break;
+ }
+ }
+ return updateWithSnapshots(currentState, changed ? SnapshotsInProgress.of(updatedEntries) : null, null);
+ }
+
+ @Override
+ public void onFailure(String source, Exception e) {
+ initializingClones.remove(targetSnapshot);
+ logger.info(() -> new ParameterizedMessage("Failed to start snapshot clone [{}]", cloneEntry), e);
+ failAllListenersOnMasterFailOver(e);
+ }
+
+ @Override
+ public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+ initializingClones.remove(targetSnapshot);
+ if (updatedEntry != null) {
+ final Snapshot target = updatedEntry.snapshot();
+ final SnapshotId sourceSnapshot = updatedEntry.source();
+ for (ObjectObjectCursor indexClone : updatedEntry.clones()) {
+ final ShardSnapshotStatus shardStatusBefore = indexClone.value;
+ if (shardStatusBefore.state() != ShardState.INIT) {
+ continue;
+ }
+ final RepositoryShardId repoShardId = indexClone.key;
+ runReadyClone(target, sourceSnapshot, shardStatusBefore, repoShardId, repository);
+ }
+ } else {
+ // Extremely unlikely corner case of master failing over between between starting the clone and
+ // starting shard clones.
+ logger.warn("Did not find expected entry [{}] in the cluster state", cloneEntry);
+ }
+ }
+ }, "start snapshot clone", onFailure), onFailure);
+ }
+
+ private final Set currentlyCloning = Collections.synchronizedSet(new HashSet<>());
+
+ private void runReadyClone(Snapshot target, SnapshotId sourceSnapshot, ShardSnapshotStatus shardStatusBefore,
+ RepositoryShardId repoShardId, Repository repository) {
+ final SnapshotId targetSnapshot = target.getSnapshotId();
+ final String localNodeId = clusterService.localNode().getId();
+ if (currentlyCloning.add(repoShardId)) {
+ repository.cloneShardSnapshot(sourceSnapshot, targetSnapshot, repoShardId, shardStatusBefore.generation(), ActionListener.wrap(
+ generation -> innerUpdateSnapshotState(
+ new ShardSnapshotUpdate(target, repoShardId,
+ new ShardSnapshotStatus(localNodeId, ShardState.SUCCESS, generation)),
+ ActionListener.runBefore(
+ ActionListener.wrap(
+ v -> logger.trace("Marked [{}] as successfully cloned from [{}] to [{}]", repoShardId,
+ sourceSnapshot, targetSnapshot),
+ e -> {
+ logger.warn("Cluster state update after successful shard clone [{}] failed", repoShardId);
+ failAllListenersOnMasterFailOver(e);
+ }
+ ), () -> currentlyCloning.remove(repoShardId))
+ ), e -> innerUpdateSnapshotState(
+ new ShardSnapshotUpdate(target, repoShardId,
+ new ShardSnapshotStatus(localNodeId, ShardState.FAILED, "failed to clone shard snapshot", null)),
+ ActionListener.runBefore(ActionListener.wrap(
+ v -> logger.trace("Marked [{}] as failed clone from [{}] to [{}]", repoShardId,
+ sourceSnapshot, targetSnapshot),
+ ex -> {
+ logger.warn("Cluster state update after failed shard clone [{}] failed", repoShardId);
+ failAllListenersOnMasterFailOver(ex);
+ }
+ ), () -> currentlyCloning.remove(repoShardId)))));
+ }
+ }
+
private void ensureBelowConcurrencyLimit(String repository, String name, SnapshotsInProgress snapshotsInProgress,
SnapshotDeletionsInProgress deletionsInProgress) {
final int inProgressOperations = snapshotsInProgress.entries().size() + deletionsInProgress.getEntries().size();
@@ -695,17 +967,24 @@ private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snaps
ShardGenerations.Builder builder = ShardGenerations.builder();
final Map indexLookup = new HashMap<>();
snapshot.indices().forEach(idx -> indexLookup.put(idx.getName(), idx));
- snapshot.shards().forEach(c -> {
- if (metadata.index(c.key.getIndex()) == null) {
- assert snapshot.partial() :
- "Index [" + c.key.getIndex() + "] was deleted during a snapshot but snapshot was not partial.";
- return;
- }
- final IndexId indexId = indexLookup.get(c.key.getIndexName());
- if (indexId != null) {
- builder.put(indexId, c.key.id(), c.value.generation());
- }
- });
+ if (snapshot.isClone()) {
+ snapshot.clones().forEach(c -> {
+ final IndexId indexId = indexLookup.get(c.key.indexName());
+ builder.put(indexId, c.key.shardId(), c.value.generation());
+ });
+ } else {
+ snapshot.shards().forEach(c -> {
+ if (metadata.index(c.key.getIndex()) == null) {
+ assert snapshot.partial() :
+ "Index [" + c.key.getIndex() + "] was deleted during a snapshot but snapshot was not partial.";
+ return;
+ }
+ final IndexId indexId = indexLookup.get(c.key.getIndexName());
+ if (indexId != null) {
+ builder.put(indexId, c.key.id(), c.value.generation());
+ }
+ });
+ }
return builder.build();
}
@@ -934,17 +1213,27 @@ public ClusterState execute(ClusterState currentState) {
for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) {
if (statesToUpdate.contains(snapshot.state())) {
- ImmutableOpenMap shards = processWaitingShardsAndRemovedNodes(snapshot.shards(),
- routingTable, nodes, knownFailures.computeIfAbsent(snapshot.repository(), k -> new HashMap<>()));
- if (shards != null) {
- final SnapshotsInProgress.Entry updatedSnapshot = snapshot.withShardStates(shards);
- changed = true;
- if (updatedSnapshot.state().completed()) {
- finishedSnapshots.add(updatedSnapshot);
+ // Currently initializing clone
+ if (snapshot.isClone() && snapshot.clones().isEmpty()) {
+ if (initializingClones.contains(snapshot.snapshot())) {
+ updatedSnapshotEntries.add(snapshot);
+ } else {
+ logger.debug("removing not yet start clone operation [{}]", snapshot);
+ changed = true;
}
- updatedSnapshotEntries.add(updatedSnapshot);
} else {
- updatedSnapshotEntries.add(snapshot);
+ ImmutableOpenMap shards = processWaitingShardsAndRemovedNodes(snapshot.shards(),
+ routingTable, nodes, knownFailures.computeIfAbsent(snapshot.repository(), k -> new HashMap<>()));
+ if (shards != null) {
+ final SnapshotsInProgress.Entry updatedSnapshot = snapshot.withShardStates(shards);
+ changed = true;
+ if (updatedSnapshot.state().completed()) {
+ finishedSnapshots.add(updatedSnapshot);
+ }
+ updatedSnapshotEntries.add(updatedSnapshot);
+ } else {
+ updatedSnapshotEntries.add(snapshot);
+ }
}
} else if (snapshot.repositoryStateId() == RepositoryData.UNKNOWN_REPO_GEN) {
// BwC path, older versions could create entries with unknown repo GEN in INIT or ABORTED state that did not yet
@@ -996,6 +1285,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
}
}
+ startExecutableClones(newState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY), null);
// run newly ready deletes
for (SnapshotDeletionsInProgress.Entry entry : deletionsToExecute) {
if (tryEnterRepoLoop(entry.repository())) {
@@ -1165,6 +1455,11 @@ private void endSnapshot(SnapshotsInProgress.Entry entry, Metadata metadata, @Nu
entry.snapshot(), new SnapshotException(snapshot, "Aborted on initialization"), repositoryData, null);
return;
}
+ if (entry.isClone() && entry.state() == State.FAILED) {
+ logger.debug("Removing failed snapshot clone [{}] from cluster state", entry);
+ removeFailedSnapshotFromClusterState(entry.snapshot(), new SnapshotException(entry.snapshot(), entry.failure()), null, null);
+ return;
+ }
final String repoName = entry.repository();
if (tryEnterRepoLoop(repoName)) {
if (repositoryData == null) {
@@ -1238,10 +1533,24 @@ private void finalizeSnapshotEntry(SnapshotsInProgress.Entry entry, Metadata met
entry.startTime(), failure, threadPool.absoluteTimeInMillis(),
entry.partial() ? shardGenerations.totalShards() : entry.shards().size(), shardFailures,
entry.includeGlobalState(), entry.userMetadata());
- repositoriesService.repository(snapshot.getRepository()).finalizeSnapshot(
+ final StepListener metadataListener = new StepListener<>();
+ final Repository repo = repositoriesService.repository(snapshot.getRepository());
+ if (entry.isClone()) {
+ threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(
+ ActionRunnable.supply(metadataListener, () -> {
+ final Metadata.Builder metaBuilder = Metadata.builder(repo.getSnapshotGlobalMetadata(entry.source()));
+ for (IndexId index : entry.indices()) {
+ metaBuilder.put(repo.getSnapshotIndexMetaData(repositoryData, entry.source(), index), false);
+ }
+ return metaBuilder.build();
+ }));
+ } else {
+ metadataListener.onResponse(metadata);
+ }
+ metadataListener.whenComplete(meta -> repo.finalizeSnapshot(
shardGenerations,
- repositoryData.getGenId(),
- metadataForSnapshot(entry, metadata),
+ repositoryData.getGenId(),
+ metadataForSnapshot(entry, meta),
snapshotInfo,
entry.version(),
state -> stateWithoutSnapshot(state, snapshot),
@@ -1251,7 +1560,8 @@ private void finalizeSnapshotEntry(SnapshotsInProgress.Entry entry, Metadata met
snapshotCompletionListeners.remove(snapshot), Tuple.tuple(newRepoData, snapshotInfo));
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
runNextQueuedOperation(newRepoData, repository, true);
- }, e -> handleFinalizationFailure(e, entry, repositoryData)));
+ }, e -> handleFinalizationFailure(e, entry, repositoryData))),
+ e -> handleFinalizationFailure(e, entry, repositoryData));
} catch (Exception e) {
assert false : new AssertionError(e);
handleFinalizationFailure(e, entry, repositoryData);
@@ -1428,8 +1738,6 @@ private static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot sn
private void removeFailedSnapshotFromClusterState(Snapshot snapshot, Exception failure, @Nullable RepositoryData repositoryData,
@Nullable CleanupAfterErrorListener listener) {
assert failure != null : "Failure must be supplied";
- assert (listener == null || repositoryData == null) && (listener == null && repositoryData == null) == false :
- "Either repository data or a listener but not both must be null but saw [" + listener + "] and [" + repositoryData + "]";
clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() {
@Override
@@ -1466,7 +1774,9 @@ public void onNoLongerMaster(String source) {
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
failSnapshotCompletionListeners(snapshot, failure);
if (listener == null) {
- runNextQueuedOperation(repositoryData, snapshot.getRepository(), true);
+ if (repositoryData != null) {
+ runNextQueuedOperation(repositoryData, snapshot.getRepository(), true);
+ }
} else {
listener.onFailure(null);
}
@@ -1519,11 +1829,11 @@ private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e) {
public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionListener listener) {
final String[] snapshotNames = request.snapshots();
- final String repositoryName = request.repository();
+ final String repoName = request.repository();
logger.info(() -> new ParameterizedMessage("deleting snapshots [{}] from repository [{}]",
- Strings.arrayToCommaDelimitedString(snapshotNames), repositoryName));
+ Strings.arrayToCommaDelimitedString(snapshotNames), repoName));
- final Repository repository = repositoriesService.repository(repositoryName);
+ final Repository repository = repositoriesService.repository(repoName);
repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(Priority.NORMAL) {
private Snapshot runningSnapshot;
@@ -1543,12 +1853,12 @@ public ClusterState execute(ClusterState currentState) throws Exception {
+ "]");
}
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
- final List snapshotEntries = findInProgressSnapshots(snapshots, snapshotNames, repositoryName);
+ final List snapshotEntries = findInProgressSnapshots(snapshots, snapshotNames, repoName);
final List snapshotIds = matchingSnapshotIds(
snapshotEntries.stream().map(e -> e.snapshot().getSnapshotId()).collect(Collectors.toList()), repositoryData,
- snapshotNames, repositoryName);
+ snapshotNames, repoName);
if (snapshotEntries.isEmpty() || minNodeVersion.onOrAfter(SnapshotsService.FULL_CONCURRENCY_VERSION)) {
- deleteFromRepoTask = createDeleteStateUpdate(snapshotIds, repositoryName, repositoryData, Priority.NORMAL, listener);
+ deleteFromRepoTask = createDeleteStateUpdate(snapshotIds, repoName, repositoryData, Priority.NORMAL, listener);
return deleteFromRepoTask.execute(currentState);
}
assert snapshotEntries.size() == 1 : "Expected just a single running snapshot but saw " + snapshotEntries;
@@ -1629,7 +1939,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
listener.onResponse(null);
} else {
clusterService.submitStateUpdateTask("delete snapshot",
- createDeleteStateUpdate(outstandingDeletes, repositoryName, repositoryData, Priority.IMMEDIATE, listener));
+ createDeleteStateUpdate(outstandingDeletes, repoName, repositoryData, Priority.IMMEDIATE, listener));
}
return;
}
@@ -1638,7 +1948,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
result -> {
logger.debug("deleted snapshot completed - deleting files");
clusterService.submitStateUpdateTask("delete snapshot",
- createDeleteStateUpdate(outstandingDeletes, repositoryName, result.v1(), Priority.IMMEDIATE, listener));
+ createDeleteStateUpdate(outstandingDeletes, repoName, result.v1(), Priority.IMMEDIATE, listener));
},
e -> {
if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) != null) {
@@ -1758,6 +2068,17 @@ public ClusterState execute(ClusterState currentState) {
}
}
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
+ final Set activeCloneSources = snapshots.entries()
+ .stream()
+ .filter(SnapshotsInProgress.Entry::isClone)
+ .map(SnapshotsInProgress.Entry::source)
+ .collect(Collectors.toSet());
+ for (SnapshotId snapshotId : snapshotIds) {
+ if (activeCloneSources.contains(snapshotId)) {
+ throw new ConcurrentSnapshotExecutionException(new Snapshot(repoName, snapshotId),
+ "cannot delete snapshot while it is being cloned");
+ }
+ }
final SnapshotsInProgress updatedSnapshots;
if (minNodeVersion.onOrAfter(FULL_CONCURRENCY_VERSION)) {
updatedSnapshots = SnapshotsInProgress.of(snapshots.entries().stream()
@@ -2266,7 +2587,7 @@ private static ImmutableOpenMap builder = ImmutableOpenMap.builder();
final ShardGenerations shardGenerations = repositoryData.shardGenerations();
- final Set inProgressShards = busyShardsForRepo(repoName, snapshotsInProgress);
+ final Set inProgressShards = busyShardsForRepo(repoName, snapshotsInProgress, metadata);
final boolean readyToExecute = deletionsInProgress == null || deletionsInProgress.getEntries().stream()
.noneMatch(entry -> entry.repository().equals(repoName) && entry.state() == SnapshotDeletionsInProgress.State.STARTED);
for (IndexId index : indices) {
@@ -2330,16 +2651,32 @@ private static ImmutableOpenMap busyShardsForRepo(String repoName, @Nullable SnapshotsInProgress snapshots) {
+ private static Set busyShardsForRepo(String repoName, @Nullable SnapshotsInProgress snapshots, Metadata metadata) {
final List runningSnapshots = snapshots == null ? Collections.emptyList() : snapshots.entries();
final Set inProgressShards = new HashSet<>();
for (SnapshotsInProgress.Entry runningSnapshot : runningSnapshots) {
if (runningSnapshot.repository().equals(repoName) == false) {
continue;
}
- for (ObjectObjectCursor shard : runningSnapshot.shards()) {
- if (shard.value.isActive()) {
- inProgressShards.add(shard.key);
+ if (runningSnapshot.isClone()) {
+ for (ObjectObjectCursor clone : runningSnapshot.clones()) {
+ final ShardSnapshotStatus shardState = clone.value;
+ if (shardState.isActive()) {
+ IndexMetadata indexMeta = metadata.index(clone.key.indexName());
+ final Index index;
+ if (indexMeta == null) {
+ index = new Index(clone.key.indexName(), IndexMetadata.INDEX_UUID_NA_VALUE);
+ } else {
+ index = indexMeta.getIndex();
+ }
+ inProgressShards.add(new ShardId(index, clone.key.shardId()));
+ }
+ }
+ } else {
+ for (ObjectObjectCursor shard : runningSnapshot.shards()) {
+ if (shard.value.isActive()) {
+ inProgressShards.add(shard.key);
+ }
}
}
}
@@ -2431,97 +2768,282 @@ public boolean assertAllListenersResolved() {
return true;
}
- private static final ClusterStateTaskExecutor SHARD_STATE_EXECUTOR = (currentState, tasks) -> {
+ /**
+ * Executor that applies {@link ShardSnapshotUpdate}s to the current cluster state. The algorithm implemented below works as described
+ * below:
+ * Every shard snapshot or clone state update can result in multiple snapshots being updated. In order to determine whether or not a
+ * shard update has an effect we use an outer loop over all current executing snapshot operations that iterates over them in the order
+ * they were started in and an inner loop over the list of shard update tasks.
+ *
+ * If the inner loop finds that a shard update task applies to a given snapshot and either a shard-snapshot or shard-clone operation in
+ * it then it will update the state of the snapshot entry accordingly. If that update was a noop, then the task is removed from the
+ * iteration as it was already applied before and likely just arrived on the master node again due to retries upstream.
+ * If the update was not a noop, then it means that the shard it applied to is now available for another snapshot or clone operation
+ * to be re-assigned if there is another snapshot operation that is waiting for the shard to become available. We therefore record the
+ * fact that a task was executed by adding it to a collection of executed tasks. If a subsequent execution of the outer loop finds that
+ * a task in the executed tasks collection applied to a shard it was waiting for to become available, then the shard snapshot operation
+ * will be started for that snapshot entry and the task removed from the collection of tasks that need to be applied to snapshot
+ * entries since it can not have any further effects.
+ *
+ * Package private to allow for tests.
+ */
+ static final ClusterStateTaskExecutor SHARD_STATE_EXECUTOR = (currentState, tasks) -> {
int changedCount = 0;
int startedCount = 0;
final List entries = new ArrayList<>();
+ final String localNodeId = currentState.nodes().getLocalNodeId();
// Tasks to check for updates for running snapshots.
final List unconsumedTasks = new ArrayList<>(tasks);
// Tasks that were used to complete an existing in-progress shard snapshot
final Set executedTasks = new HashSet<>();
+ // Outer loop over all snapshot entries in the order they were created in
for (SnapshotsInProgress.Entry entry : currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) {
if (entry.state().completed()) {
+ // completed snapshots do not require any updates so we just add them to the new list and keep going
entries.add(entry);
continue;
}
ImmutableOpenMap.Builder shards = null;
+ ImmutableOpenMap.Builder clones = null;
+ Map indicesLookup = null;
+ // inner loop over all the shard updates that are potentially applicable to the current snapshot entry
for (Iterator iterator = unconsumedTasks.iterator(); iterator.hasNext(); ) {
final ShardSnapshotUpdate updateSnapshotState = iterator.next();
final Snapshot updatedSnapshot = updateSnapshotState.snapshot;
final String updatedRepository = updatedSnapshot.getRepository();
if (entry.repository().equals(updatedRepository) == false) {
+ // the update applies to a different repository so it is irrelevant here
continue;
}
- final ShardId finishedShardId = updateSnapshotState.shardId;
- if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) {
- final ShardSnapshotStatus existing = entry.shards().get(finishedShardId);
- if (existing == null) {
- logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]",
- updateSnapshotState, entry);
- assert false : "This should never happen, data nodes should only send updates for expected shards";
- continue;
- }
- if (existing.state().completed()) {
- // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
- iterator.remove();
- continue;
- }
- logger.trace("[{}] Updating shard [{}] with status [{}]", updatedSnapshot,
- finishedShardId, updateSnapshotState.updatedState.state());
- if (shards == null) {
- shards = ImmutableOpenMap.builder(entry.shards());
- }
- shards.put(finishedShardId, updateSnapshotState.updatedState);
- executedTasks.add(updateSnapshotState);
- changedCount++;
- } else if (executedTasks.contains(updateSnapshotState)) {
- // tasks that completed a shard might allow starting a new shard snapshot for the current snapshot
- final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId);
- if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
- continue;
+ if (updateSnapshotState.isClone()) {
+ // The update applied to a shard clone operation
+ final RepositoryShardId finishedShardId = updateSnapshotState.repoShardId;
+ if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) {
+ assert entry.isClone() : "Non-clone snapshot [" + entry + "] received update for clone ["
+ + updateSnapshotState + "]";
+ final ShardSnapshotStatus existing = entry.clones().get(finishedShardId);
+ if (existing == null) {
+ logger.warn("Received clone shard snapshot status update [{}] but this shard is not tracked in [{}]",
+ updateSnapshotState, entry);
+ assert false : "This should never happen, master will not submit a state update for a non-existing clone";
+ continue;
+ }
+ if (existing.state().completed()) {
+ // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
+ iterator.remove();
+ continue;
+ }
+ logger.trace("[{}] Updating shard clone [{}] with status [{}]", updatedSnapshot,
+ finishedShardId, updateSnapshotState.updatedState.state());
+ if (clones == null) {
+ clones = ImmutableOpenMap.builder(entry.clones());
+ }
+ changedCount++;
+ clones.put(finishedShardId, updateSnapshotState.updatedState);
+ executedTasks.add(updateSnapshotState);
+ } else if (executedTasks.contains(updateSnapshotState)) {
+ // the update was already executed on the clone operation it applied to, now we check if it may be possible to
+ // start a shard snapshot or clone operation on the current entry
+ if (entry.isClone()) {
+ // current entry is a clone operation
+ final ShardSnapshotStatus existingStatus = entry.clones().get(finishedShardId);
+ if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
+ continue;
+ }
+ if (clones == null) {
+ clones = ImmutableOpenMap.builder(entry.clones());
+ }
+ final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
+ logger.trace("Starting clone [{}] on [{}] with generation [{}]", finishedShardId,
+ finishedStatus.nodeId(), finishedStatus.generation());
+ assert finishedStatus.nodeId().equals(localNodeId) : "Clone updated with node id [" + finishedStatus.nodeId() +
+ "] but local node id is [" + localNodeId + "]";
+ clones.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation()));
+ iterator.remove();
+ } else {
+ // current entry is a snapshot operation so we must translate the repository shard id to a routing shard id
+ final IndexMetadata indexMeta = currentState.metadata().index(finishedShardId.indexName());
+ if (indexMeta == null) {
+ // The index name that finished cloning does not exist in the cluster state so it isn't relevant to a
+ // normal snapshot
+ continue;
+ }
+ final ShardId finishedRoutingShardId = new ShardId(indexMeta.getIndex(), finishedShardId.shardId());
+ final ShardSnapshotStatus existingStatus = entry.shards().get(finishedRoutingShardId);
+ if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
+ continue;
+ }
+ if (shards == null) {
+ shards = ImmutableOpenMap.builder(entry.shards());
+ }
+ final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
+ logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId,
+ finishedStatus.nodeId(), finishedStatus.generation());
+ // A clone was updated, so we must use the correct data node id for the reassignment as actual shard
+ // snapshot
+ final ShardSnapshotStatus shardSnapshotStatus = startShardSnapshotAfterClone(currentState,
+ updateSnapshotState.updatedState.generation(), finishedRoutingShardId);
+ shards.put(finishedRoutingShardId, shardSnapshotStatus);
+ if (shardSnapshotStatus.isActive()) {
+ // only remove the update from the list of tasks that might hold a reusable shard if we actually
+ // started a snapshot and didn't just fail
+ iterator.remove();
+ }
+ }
}
- if (shards == null) {
- shards = ImmutableOpenMap.builder(entry.shards());
+ } else {
+ // a (non-clone) shard snapshot operation was updated
+ final ShardId finishedShardId = updateSnapshotState.shardId;
+ if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) {
+ final ShardSnapshotStatus existing = entry.shards().get(finishedShardId);
+ if (existing == null) {
+ logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]",
+ updateSnapshotState, entry);
+ assert false : "This should never happen, data nodes should only send updates for expected shards";
+ continue;
+ }
+ if (existing.state().completed()) {
+ // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
+ iterator.remove();
+ continue;
+ }
+ logger.trace("[{}] Updating shard [{}] with status [{}]", updatedSnapshot,
+ finishedShardId, updateSnapshotState.updatedState.state());
+ if (shards == null) {
+ shards = ImmutableOpenMap.builder(entry.shards());
+ }
+ shards.put(finishedShardId, updateSnapshotState.updatedState);
+ executedTasks.add(updateSnapshotState);
+ changedCount++;
+ } else if (executedTasks.contains(updateSnapshotState)) {
+ // We applied the update for a shard snapshot state to its snapshot entry, now check if we can update
+ // either a clone or a snapshot
+ if (entry.isClone()) {
+ // Since we updated a normal snapshot we need to translate its shard ids to repository shard ids which requires
+ // a lookup for the index ids
+ if (indicesLookup == null) {
+ indicesLookup = entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity()));
+ }
+ // shard snapshot was completed, we check if we can start a clone operation for the same repo shard
+ final IndexId indexId = indicesLookup.get(finishedShardId.getIndexName());
+ // If the lookup finds the index id then at least the entry is concerned with the index id just updated
+ // so we check on a shard level
+ if (indexId != null) {
+ final RepositoryShardId repoShardId = new RepositoryShardId(indexId, finishedShardId.getId());
+ final ShardSnapshotStatus existingStatus = entry.clones().get(repoShardId);
+ if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
+ continue;
+ }
+ if (clones == null) {
+ clones = ImmutableOpenMap.builder(entry.clones());
+ }
+ final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
+ logger.trace("Starting clone [{}] on [{}] with generation [{}]", finishedShardId,
+ finishedStatus.nodeId(), finishedStatus.generation());
+ clones.put(repoShardId, new ShardSnapshotStatus(localNodeId, finishedStatus.generation()));
+ iterator.remove();
+ startedCount++;
+ }
+ } else {
+ // shard snapshot was completed, we check if we can start another snapshot
+ final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId);
+ if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
+ continue;
+ }
+ if (shards == null) {
+ shards = ImmutableOpenMap.builder(entry.shards());
+ }
+ final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
+ logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId,
+ finishedStatus.nodeId(), finishedStatus.generation());
+ shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation()));
+ iterator.remove();
+ }
}
- final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
- logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId,
- finishedStatus.nodeId(), finishedStatus.generation());
- shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation()));
- iterator.remove();
- startedCount++;
}
}
- if (shards == null) {
- entries.add(entry);
+ final SnapshotsInProgress.Entry updatedEntry;
+ if (shards != null) {
+ assert clones == null : "Should not have updated clones when updating shard snapshots but saw " + clones +
+ " as well as " + shards;
+ updatedEntry = entry.withShardStates(shards.build());
+ } else if (clones != null) {
+ updatedEntry = entry.withClones(clones.build());
} else {
- entries.add(entry.withShardStates(shards.build()));
+ updatedEntry = entry;
}
+ entries.add(updatedEntry);
}
if (changedCount > 0) {
logger.trace("changed cluster state triggered by [{}] snapshot state updates and resulted in starting " +
"[{}] shard snapshots", changedCount, startedCount);
- return ClusterStateTaskExecutor.ClusterTasksResult.builder().successes(tasks).build(
- ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build());
+ return ClusterStateTaskExecutor.ClusterTasksResult.builder().successes(tasks)
+ .build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
+ SnapshotsInProgress.of(entries)).build());
}
return ClusterStateTaskExecutor.ClusterTasksResult.builder().successes(tasks).build(currentState);
};
+ /**
+ * Creates a {@link ShardSnapshotStatus} entry for a snapshot after the shard has become available for snapshotting as a result
+ * of a snapshot clone completing.
+ *
+ * @param currentState current cluster state
+ * @param shardGeneration shard generation of the shard in the repository
+ * @param shardId shard id of the shard that just finished cloning
+ * @return shard snapshot status
+ */
+ private static ShardSnapshotStatus startShardSnapshotAfterClone(ClusterState currentState, String shardGeneration, ShardId shardId) {
+ final ShardRouting primary = currentState.routingTable().index(shardId.getIndex()).shard(shardId.id()).primaryShard();
+ final ShardSnapshotStatus shardSnapshotStatus;
+ if (primary == null || !primary.assignedToNode()) {
+ shardSnapshotStatus = new ShardSnapshotStatus(
+ null, ShardState.MISSING, "primary shard is not allocated", shardGeneration);
+ } else if (primary.relocating() || primary.initializing()) {
+ shardSnapshotStatus =
+ new ShardSnapshotStatus(primary.currentNodeId(), ShardState.WAITING, shardGeneration);
+ } else if (primary.started() == false) {
+ shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), ShardState.MISSING,
+ "primary shard hasn't been started yet", shardGeneration);
+ } else {
+ shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), shardGeneration);
+ }
+ return shardSnapshotStatus;
+ }
+
/**
* An update to the snapshot state of a shard.
+ *
+ * Package private for testing
*/
- private static final class ShardSnapshotUpdate {
+ static final class ShardSnapshotUpdate {
private final Snapshot snapshot;
private final ShardId shardId;
+ private final RepositoryShardId repoShardId;
+
private final ShardSnapshotStatus updatedState;
- private ShardSnapshotUpdate(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus updatedState) {
+ ShardSnapshotUpdate(Snapshot snapshot, RepositoryShardId repositoryShardId, ShardSnapshotStatus updatedState) {
+ this.snapshot = snapshot;
+ this.shardId = null;
+ this.updatedState = updatedState;
+ this.repoShardId = repositoryShardId;
+ }
+
+ ShardSnapshotUpdate(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus updatedState) {
this.snapshot = snapshot;
this.shardId = shardId;
this.updatedState = updatedState;
+ repoShardId = null;
+ }
+
+
+ public boolean isClone() {
+ return repoShardId != null;
}
@Override
@@ -2533,13 +3055,14 @@ public boolean equals(Object other) {
return false;
}
final ShardSnapshotUpdate that = (ShardSnapshotUpdate) other;
- return this.snapshot.equals(that.snapshot) && this.shardId.equals(that.shardId) && this.updatedState == that.updatedState;
+ return this.snapshot.equals(that.snapshot) && Objects.equals(this.shardId, that.shardId)
+ && Objects.equals(this.repoShardId, that.repoShardId) && this.updatedState == that.updatedState;
}
@Override
public int hashCode() {
- return Objects.hash(snapshot, shardId, updatedState);
+ return Objects.hash(snapshot, shardId, updatedState, repoShardId);
}
}
@@ -2568,19 +3091,35 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
} finally {
// Maybe this state update completed the snapshot. If we are not already ending it because of a concurrent
// state update we check if its state is completed and end it if it is.
+ final SnapshotsInProgress snapshotsInProgress =
+ newState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
if (endingSnapshots.contains(update.snapshot) == false) {
- final SnapshotsInProgress snapshotsInProgress = newState.custom(SnapshotsInProgress.TYPE);
final SnapshotsInProgress.Entry updatedEntry = snapshotsInProgress.snapshot(update.snapshot);
// If the entry is still in the cluster state and is completed, try finalizing the snapshot in the repo
if (updatedEntry != null && updatedEntry.state().completed()) {
endSnapshot(updatedEntry, newState.metadata(), null);
}
}
+ startExecutableClones(snapshotsInProgress, update.snapshot.getRepository());
}
}
});
}
+ private void startExecutableClones(SnapshotsInProgress snapshotsInProgress, @Nullable String repoName) {
+ for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
+ if (entry.isClone() && entry.state() == State.STARTED && (repoName == null || entry.repository().equals(repoName))) {
+ // this is a clone, see if new work is ready
+ for (ObjectObjectCursor clone : entry.clones()) {
+ if (clone.value.state() == ShardState.INIT) {
+ runReadyClone(entry.snapshot(), entry.source(), clone.value, clone.key,
+ repositoriesService.repository(entry.repository()));
+ }
+ }
+ }
+ }
+ }
+
private class UpdateSnapshotStatusAction
extends TransportMasterNodeAction {
UpdateSnapshotStatusAction(TransportService transportService, ClusterService clusterService,
diff --git a/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusRequest.java b/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusRequest.java
index 1d20d2d109b7d..cd90a2866bbc1 100644
--- a/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusRequest.java
+++ b/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusRequest.java
@@ -41,7 +41,7 @@ public UpdateIndexShardSnapshotStatusRequest(StreamInput in) throws IOException
super(in);
snapshot = new Snapshot(in);
shardId = new ShardId(in);
- status = new SnapshotsInProgress.ShardSnapshotStatus(in);
+ status = SnapshotsInProgress.ShardSnapshotStatus.readFrom(in);
}
public UpdateIndexShardSnapshotStatusRequest(Snapshot snapshot, ShardId shardId, SnapshotsInProgress.ShardSnapshotStatus status) {
diff --git a/server/src/main/java/org/elasticsearch/snapshots/package-info.java b/server/src/main/java/org/elasticsearch/snapshots/package-info.java
index 5c08aadece0b7..eb50b9821da93 100644
--- a/server/src/main/java/org/elasticsearch/snapshots/package-info.java
+++ b/server/src/main/java/org/elasticsearch/snapshots/package-info.java
@@ -99,6 +99,41 @@
* update to remove the deletion's entry in {@code SnapshotDeletionsInProgress} which concludes the process of deleting a snapshot.
*
*
+ * Cloning a Snapshot
+ *
+ * Cloning part of a snapshot is a process executed entirely on the master node. On a high level, the process of cloning a snapshot is
+ * analogous to that of creating a snapshot from data in the cluster except that the source of data files is the snapshot repository
+ * instead of the data nodes. It begins with cloning all shards and then finalizes the cloned snapshot the same way a normal snapshot would
+ * be finalized. Concretely, it is executed as follows:
+ *
+ *
+ * - First, {@link org.elasticsearch.snapshots.SnapshotsService#cloneSnapshot} is invoked which will place a placeholder entry into
+ * {@code SnapshotsInProgress} that does not yet contain any shard clone assignments. Note that unlike in the case of snapshot
+ * creation, the shard level clone tasks in {@link org.elasticsearch.cluster.SnapshotsInProgress.Entry#clones} are not created in the
+ * initial cluster state update as is done for shard snapshot assignments in
+ * {@link org.elasticsearch.cluster.SnapshotsInProgress.Entry#shards}. This is due to the fact that shard snapshot assignments are
+ * computed purely from information in the current cluster state while shard clone assignments require information to be read from the
+ * repository, which is too slow of a process to be done inside a cluster state update. Loading this information ahead of creating a
+ * task in the cluster state, runs the risk of race conditions where the source snapshot is being deleted before the clone task is
+ * enqueued in the cluster state.
+ * - Once a placeholder task for the clone operation is put into the cluster state, we must determine the number of shards in each
+ * index that is to be cloned as well as ensure the health of the index snapshots in the source snapshot. In order to determine the
+ * shard count for each index that is to be cloned, we load the index metadata for each such index using the repository's
+ * {@link org.elasticsearch.repositories.Repository#getSnapshotIndexMetaData} method. In order to ensure the health of the source index
+ * snapshots, we load the {@link org.elasticsearch.snapshots.SnapshotInfo} for the source snapshot and check for shard snapshot
+ * failures of the relevant indices.
+ * - Once all shard counts are known and the health of all source indices data has been verified, we populate the
+ * {@code SnapshotsInProgress.Entry#clones} map for the clone operation with the the relevant shard clone tasks.
+ * - After the clone tasks have been added to the {@code SnapshotsInProgress.Entry}, master executes them on its snapshot thread-pool
+ * by invoking {@link org.elasticsearch.repositories.Repository#cloneShardSnapshot} for each shard that is to be cloned. Each completed
+ * shard snapshot triggers a call to the {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_STATE_EXECUTOR} which updates the
+ * clone's {@code SnapshotsInProgress.Entry} to mark the shard clone operation completed.
+ * - Once all the entries in {@code SnapshotsInProgress.Entry#clones} have completed, the clone is finalized just like any other
+ * snapshot through {@link org.elasticsearch.snapshots.SnapshotsService#endSnapshot}. The only difference being that the metadata that
+ * is written out for indices and the global metadata are read from the source snapshot in the repository instead of the cluster state.
+ *
+ *
+ *
* Concurrent Snapshot Operations
*
* Snapshot create and delete operations may be started concurrently. Operations targeting different repositories run independently of
diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java
new file mode 100644
index 0000000000000..8e15d6d5a1e48
--- /dev/null
+++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.snapshots;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.SnapshotsInProgress;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.IndexRoutingTable;
+import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
+import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.cluster.routing.ShardRoutingState;
+import org.elasticsearch.cluster.routing.TestShardRouting;
+import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.repositories.IndexId;
+import org.elasticsearch.repositories.RepositoryShardId;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
+import static org.hamcrest.Matchers.is;
+
+public class SnapshotsServiceTests extends ESTestCase {
+
+ public void testNoopShardStateUpdates() throws Exception {
+ final String repoName = "test-repo";
+ final Snapshot snapshot = snapshot(repoName, "snapshot-1");
+ final SnapshotsInProgress.Entry snapshotNoShards = snapshotEntry(snapshot, Collections.emptyList(), ImmutableOpenMap.of());
+
+ final String indexName1 = "index-1";
+ final ShardId shardId1 = new ShardId(index(indexName1), 0);
+ {
+ final ClusterState state = stateWithSnapshots(snapshotNoShards);
+ final SnapshotsService.ShardSnapshotUpdate shardCompletion =
+ new SnapshotsService.ShardSnapshotUpdate(snapshot, shardId1, successfulShardStatus(uuid()));
+ assertIsNoop(state, shardCompletion);
+ }
+ {
+ final ClusterState state = stateWithSnapshots(
+ snapshotEntry(
+ snapshot, Collections.singletonList(indexId(indexName1)), shardsMap(shardId1, initShardStatus(uuid()))));
+ final SnapshotsService.ShardSnapshotUpdate shardCompletion = new SnapshotsService.ShardSnapshotUpdate(
+ snapshot("other-repo", snapshot.getSnapshotId().getName()), shardId1, successfulShardStatus(uuid()));
+ assertIsNoop(state, shardCompletion);
+ }
+ }
+
+ public void testUpdateSnapshotToSuccess() throws Exception {
+ final String repoName = "test-repo";
+ final Snapshot sn1 = snapshot(repoName, "snapshot-1");
+ final String indexName1 = "index-1";
+ final String dataNodeId = uuid();
+ final IndexId indexId1 = indexId(indexName1);
+ final ShardId shardId1 = new ShardId(index(indexName1), 0);
+ final SnapshotsInProgress.Entry snapshotSingleShard =
+ snapshotEntry(sn1, Collections.singletonList(indexId1), shardsMap(shardId1, initShardStatus(dataNodeId)));
+
+ assertThat(snapshotSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
+
+ final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(sn1, shardId1, dataNodeId);
+ final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(snapshotSingleShard), completeShard);
+ final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+ final SnapshotsInProgress.Entry updatedSnapshot1 = snapshotsInProgress.entries().get(0);
+ assertThat(updatedSnapshot1.state(), is(SnapshotsInProgress.State.SUCCESS));
+ assertIsNoop(updatedClusterState, completeShard);
+ }
+
+ public void testUpdateSnapshotMultipleShards() throws Exception {
+ final String repoName = "test-repo";
+ final Snapshot sn1 = snapshot(repoName, "snapshot-1");
+ final String indexName1 = "index-1";
+ final String dataNodeId = uuid();
+ final IndexId indexId1 = indexId(indexName1);
+ final Index routingIndex1 = index(indexName1);
+ final ShardId shardId1 = new ShardId(routingIndex1, 0);
+ final ShardId shardId2 = new ShardId(routingIndex1, 1);
+ final SnapshotsInProgress.ShardSnapshotStatus shardInitStatus = initShardStatus(dataNodeId);
+ final SnapshotsInProgress.Entry snapshotSingleShard = snapshotEntry(sn1, Collections.singletonList(indexId1),
+ ImmutableOpenMap.builder(shardsMap(shardId1, shardInitStatus)).fPut(shardId2, shardInitStatus).build());
+
+ assertThat(snapshotSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
+
+ final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(sn1, shardId1, dataNodeId);
+ final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(snapshotSingleShard), completeShard);
+ final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+ final SnapshotsInProgress.Entry updatedSnapshot1 = snapshotsInProgress.entries().get(0);
+ assertThat(updatedSnapshot1.state(), is(SnapshotsInProgress.State.STARTED));
+ assertIsNoop(updatedClusterState, completeShard);
+ }
+
+ public void testUpdateCloneToSuccess() throws Exception {
+ final String repoName = "test-repo";
+ final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot");
+ final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot");
+ final String indexName1 = "index-1";
+ final String dataNodeId = uuid();
+ final IndexId indexId1 = indexId(indexName1);
+ final RepositoryShardId shardId1 = new RepositoryShardId(indexId1, 0);
+ final SnapshotsInProgress.Entry cloneSingleShard =
+ cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(), clonesMap(shardId1, initShardStatus(dataNodeId)));
+
+ assertThat(cloneSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
+
+ final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(targetSnapshot, shardId1, dataNodeId);
+ final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(cloneSingleShard), completeShard);
+ final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+ final SnapshotsInProgress.Entry updatedSnapshot1 = snapshotsInProgress.entries().get(0);
+ assertThat(updatedSnapshot1.state(), is(SnapshotsInProgress.State.SUCCESS));
+ assertIsNoop(updatedClusterState, completeShard);
+ }
+
+ public void testUpdateCloneMultipleShards() throws Exception {
+ final String repoName = "test-repo";
+ final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot");
+ final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot");
+ final String indexName1 = "index-1";
+ final String dataNodeId = uuid();
+ final IndexId indexId1 = indexId(indexName1);
+ final RepositoryShardId shardId1 = new RepositoryShardId(indexId1, 0);
+ final RepositoryShardId shardId2 = new RepositoryShardId(indexId1, 1);
+ final SnapshotsInProgress.ShardSnapshotStatus shardInitStatus = initShardStatus(dataNodeId);
+ final SnapshotsInProgress.Entry cloneMultipleShards = cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(),
+ ImmutableOpenMap.builder(clonesMap(shardId1, shardInitStatus)).fPut(shardId2, shardInitStatus).build());
+
+ assertThat(cloneMultipleShards.state(), is(SnapshotsInProgress.State.STARTED));
+
+ final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(targetSnapshot, shardId1, dataNodeId);
+ final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(cloneMultipleShards), completeShard);
+ final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+ final SnapshotsInProgress.Entry updatedSnapshot1 = snapshotsInProgress.entries().get(0);
+ assertThat(updatedSnapshot1.state(), is(SnapshotsInProgress.State.STARTED));
+ assertIsNoop(updatedClusterState, completeShard);
+ }
+
+ public void testCompletedCloneStartsSnapshot() throws Exception {
+ final String repoName = "test-repo";
+ final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot");
+ final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot");
+ final String indexName1 = "index-1";
+ final String dataNodeId = uuid();
+ final IndexId indexId1 = indexId(indexName1);
+ final RepositoryShardId shardId1 = new RepositoryShardId(indexId1, 0);
+ final SnapshotsInProgress.ShardSnapshotStatus shardInitStatus = initShardStatus(dataNodeId);
+ final SnapshotsInProgress.Entry cloneSingleShard = cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(),
+ clonesMap(shardId1, shardInitStatus));
+
+ final ClusterState stateWithIndex = stateWithUnassignedIndices(indexName1);
+ final Snapshot plainSnapshot = snapshot(repoName, "test-snapshot");
+ final ShardId routingShardId1 = new ShardId(stateWithIndex.metadata().index(indexName1).getIndex(), 0);
+ final SnapshotsInProgress.Entry snapshotSingleShard = snapshotEntry(plainSnapshot, Collections.singletonList(indexId1),
+ shardsMap(routingShardId1, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED));
+
+ assertThat(cloneSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
+
+ // 1. case: shard that just finished cloning is unassigned -> shard snapshot should go to MISSING state
+ final ClusterState stateWithUnassignedRoutingShard = stateWithSnapshots(stateWithIndex, cloneSingleShard, snapshotSingleShard);
+ final SnapshotsService.ShardSnapshotUpdate completeShardClone = successUpdate(targetSnapshot, shardId1, uuid());
+ {
+ final ClusterState updatedClusterState = applyUpdates(stateWithUnassignedRoutingShard, completeShardClone);
+ final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+ final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0);
+ assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS));
+ final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
+ assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.SUCCESS));
+ assertThat(startedSnapshot.shards().get(routingShardId1).state(), is(SnapshotsInProgress.ShardState.MISSING));
+ assertIsNoop(updatedClusterState, completeShardClone);
+ }
+
+ // 2. case: shard that just finished cloning is assigned correctly -> shard snapshot should go to INIT state
+ final ClusterState stateWithAssignedRoutingShard =
+ ClusterState.builder(stateWithUnassignedRoutingShard).routingTable(
+ RoutingTable.builder(stateWithUnassignedRoutingShard.routingTable()).add(
+ IndexRoutingTable.builder(routingShardId1.getIndex()).addIndexShard(
+ new IndexShardRoutingTable.Builder(routingShardId1).addShard(
+ TestShardRouting.newShardRouting(
+ routingShardId1, dataNodeId, true, ShardRoutingState.STARTED)
+ ).build())).build()).build();
+ {
+ final ClusterState updatedClusterState = applyUpdates(stateWithAssignedRoutingShard, completeShardClone);
+ final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+ final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0);
+ assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS));
+ final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
+ assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED));
+ final SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus = startedSnapshot.shards().get(routingShardId1);
+ assertThat(shardSnapshotStatus.state(), is(SnapshotsInProgress.ShardState.INIT));
+ assertThat(shardSnapshotStatus.nodeId(), is(dataNodeId));
+ assertIsNoop(updatedClusterState, completeShardClone);
+ }
+
+ // 3. case: shard that just finished cloning is currently initializing -> shard snapshot should go to WAITING state
+ final ClusterState stateWithInitializingRoutingShard =
+ ClusterState.builder(stateWithUnassignedRoutingShard).routingTable(
+ RoutingTable.builder(stateWithUnassignedRoutingShard.routingTable()).add(
+ IndexRoutingTable.builder(routingShardId1.getIndex()).addIndexShard(
+ new IndexShardRoutingTable.Builder(routingShardId1).addShard(
+ TestShardRouting.newShardRouting(
+ routingShardId1, dataNodeId, true, ShardRoutingState.INITIALIZING)
+ ).build())).build()).build();
+ {
+ final ClusterState updatedClusterState = applyUpdates(stateWithInitializingRoutingShard, completeShardClone);
+ final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+ final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0);
+ assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS));
+ final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
+ assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED));
+ assertThat(startedSnapshot.shards().get(routingShardId1).state(), is(SnapshotsInProgress.ShardState.WAITING));
+ assertIsNoop(updatedClusterState, completeShardClone);
+ }
+ }
+
+ public void testCompletedSnapshotStartsClone() throws Exception {
+ final String repoName = "test-repo";
+ final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot");
+ final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot");
+ final String indexName = "index-1";
+ final String dataNodeId = uuid();
+ final IndexId indexId1 = indexId(indexName);
+ final RepositoryShardId repositoryShardId = new RepositoryShardId(indexId1, 0);
+ final SnapshotsInProgress.Entry cloneSingleShard = cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(),
+ clonesMap(repositoryShardId, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED));
+
+ final ClusterState stateWithIndex = stateWithUnassignedIndices(indexName);
+ final Snapshot plainSnapshot = snapshot(repoName, "test-snapshot");
+ final ShardId routingShardId = new ShardId(stateWithIndex.metadata().index(indexName).getIndex(), 0);
+ final SnapshotsInProgress.Entry snapshotSingleShard = snapshotEntry(plainSnapshot, Collections.singletonList(indexId1),
+ shardsMap(routingShardId, initShardStatus(dataNodeId)));
+
+ assertThat(cloneSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
+
+ final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(plainSnapshot, routingShardId, dataNodeId);
+
+ final ClusterState updatedClusterState = applyUpdates(stateWithSnapshots(snapshotSingleShard, cloneSingleShard), completeShard);
+ final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+ final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0);
+ assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS));
+ final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
+ assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED));
+ final SnapshotsInProgress.ShardSnapshotStatus shardCloneStatus = startedSnapshot.clones().get(repositoryShardId);
+ assertThat(shardCloneStatus.state(), is(SnapshotsInProgress.ShardState.INIT));
+ assertThat(shardCloneStatus.nodeId(), is(updatedClusterState.nodes().getLocalNodeId()));
+ assertIsNoop(updatedClusterState, completeShard);
+ }
+
+ public void testCompletedSnapshotStartsNextSnapshot() throws Exception {
+ final String repoName = "test-repo";
+ final String indexName = "index-1";
+ final String dataNodeId = uuid();
+ final IndexId indexId1 = indexId(indexName);
+
+ final ClusterState stateWithIndex = stateWithUnassignedIndices(indexName);
+ final Snapshot plainSnapshot = snapshot(repoName, "test-snapshot-1");
+ final ShardId routingShardId = new ShardId(stateWithIndex.metadata().index(indexName).getIndex(), 0);
+ final SnapshotsInProgress.Entry snapshotSingleShard = snapshotEntry(plainSnapshot, Collections.singletonList(indexId1),
+ shardsMap(routingShardId, initShardStatus(dataNodeId)));
+
+ final Snapshot queuedSnapshot = snapshot(repoName, "test-snapshot-2");
+ final SnapshotsInProgress.Entry queuedSnapshotSingleShard = snapshotEntry(queuedSnapshot, Collections.singletonList(indexId1),
+ shardsMap(routingShardId, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED));
+
+ final SnapshotsService.ShardSnapshotUpdate completeShard = successUpdate(plainSnapshot, routingShardId, dataNodeId);
+
+ final ClusterState updatedClusterState =
+ applyUpdates(stateWithSnapshots(snapshotSingleShard, queuedSnapshotSingleShard), completeShard);
+ final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+ final SnapshotsInProgress.Entry completedSnapshot = snapshotsInProgress.entries().get(0);
+ assertThat(completedSnapshot.state(), is(SnapshotsInProgress.State.SUCCESS));
+ final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
+ assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED));
+ final SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus = startedSnapshot.shards().get(routingShardId);
+ assertThat(shardSnapshotStatus.state(), is(SnapshotsInProgress.ShardState.INIT));
+ assertThat(shardSnapshotStatus.nodeId(), is(dataNodeId));
+ assertIsNoop(updatedClusterState, completeShard);
+ }
+
+ public void testCompletedCloneStartsNextClone() throws Exception {
+ final String repoName = "test-repo";
+ final Snapshot sourceSnapshot = snapshot(repoName, "source-snapshot");
+ final Snapshot targetSnapshot = snapshot(repoName, "target-snapshot");
+ final String indexName1 = "index-1";
+ final IndexId indexId1 = indexId(indexName1);
+ final RepositoryShardId shardId1 = new RepositoryShardId(indexId1, 0);
+ final String masterNodeId = uuid();
+ final SnapshotsInProgress.Entry cloneSingleShard = cloneEntry(targetSnapshot, sourceSnapshot.getSnapshotId(),
+ clonesMap(shardId1, initShardStatus(masterNodeId)));
+
+ final Snapshot queuedTargetSnapshot = snapshot(repoName, "test-snapshot");
+ final SnapshotsInProgress.Entry queuedClone = cloneEntry(queuedTargetSnapshot, sourceSnapshot.getSnapshotId(),
+ clonesMap(shardId1, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED));
+
+ assertThat(cloneSingleShard.state(), is(SnapshotsInProgress.State.STARTED));
+
+ final ClusterState stateWithUnassignedRoutingShard = stateWithSnapshots(
+ ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes(masterNodeId)).build(), cloneSingleShard, queuedClone);
+ final SnapshotsService.ShardSnapshotUpdate completeShardClone = successUpdate(targetSnapshot, shardId1, masterNodeId);
+
+ final ClusterState updatedClusterState = applyUpdates(stateWithUnassignedRoutingShard, completeShardClone);
+ final SnapshotsInProgress snapshotsInProgress = updatedClusterState.custom(SnapshotsInProgress.TYPE);
+ final SnapshotsInProgress.Entry completedClone = snapshotsInProgress.entries().get(0);
+ assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS));
+ final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1);
+ assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED));
+ assertThat(startedSnapshot.clones().get(shardId1).state(), is(SnapshotsInProgress.ShardState.INIT));
+ assertIsNoop(updatedClusterState, completeShardClone);
+ }
+
+ private static DiscoveryNodes discoveryNodes(String localNodeId) {
+ return DiscoveryNodes.builder().localNodeId(localNodeId).build();
+ }
+
+ private static ImmutableOpenMap shardsMap(
+ ShardId shardId, SnapshotsInProgress.ShardSnapshotStatus shardStatus) {
+ return ImmutableOpenMap.builder().fPut(shardId, shardStatus).build();
+ }
+
+ private static ImmutableOpenMap clonesMap(
+ RepositoryShardId shardId, SnapshotsInProgress.ShardSnapshotStatus shardStatus) {
+ return ImmutableOpenMap.builder().fPut(shardId, shardStatus).build();
+ }
+
+ private static SnapshotsService.ShardSnapshotUpdate successUpdate(Snapshot snapshot, ShardId shardId, String nodeId) {
+ return new SnapshotsService.ShardSnapshotUpdate(snapshot, shardId, successfulShardStatus(nodeId));
+ }
+
+ private static SnapshotsService.ShardSnapshotUpdate successUpdate(Snapshot snapshot, RepositoryShardId shardId, String nodeId) {
+ return new SnapshotsService.ShardSnapshotUpdate(snapshot, shardId, successfulShardStatus(nodeId));
+ }
+
+ private static ClusterState stateWithUnassignedIndices(String... indexNames) {
+ final Metadata.Builder metaBuilder = Metadata.builder(Metadata.EMPTY_METADATA);
+ for (String index : indexNames) {
+ metaBuilder.put(IndexMetadata.builder(index)
+ .settings(Settings.builder().put(SETTING_VERSION_CREATED, Version.CURRENT.id))
+ .numberOfShards(1).numberOfReplicas(0)
+ .build(), false);
+ }
+ final RoutingTable.Builder routingTable = RoutingTable.builder();
+ for (String index : indexNames) {
+ final Index idx = metaBuilder.get(index).getIndex();
+ routingTable.add(IndexRoutingTable.builder(idx).addIndexShard(
+ new IndexShardRoutingTable.Builder(new ShardId(idx, 0)).build()));
+ }
+ return ClusterState.builder(ClusterState.EMPTY_STATE).metadata(metaBuilder).routingTable(routingTable.build()).build();
+ }
+
+ private static ClusterState stateWithSnapshots(ClusterState state, SnapshotsInProgress.Entry... entries) {
+ return ClusterState.builder(state).version(state.version() + 1L)
+ .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(Arrays.asList(entries))).build();
+ }
+
+ private static ClusterState stateWithSnapshots(SnapshotsInProgress.Entry... entries) {
+ return stateWithSnapshots(ClusterState.builder(ClusterState.EMPTY_STATE).nodes(discoveryNodes(uuid())).build(), entries);
+ }
+
+ private static void assertIsNoop(ClusterState state, SnapshotsService.ShardSnapshotUpdate shardCompletion) throws Exception {
+ assertSame(applyUpdates(state, shardCompletion), state);
+ }
+
+ private static ClusterState applyUpdates(ClusterState state, SnapshotsService.ShardSnapshotUpdate... updates) throws Exception {
+ return SnapshotsService.SHARD_STATE_EXECUTOR.execute(state, Arrays.asList(updates)).resultingState;
+ }
+
+ private static SnapshotsInProgress.Entry snapshotEntry(Snapshot snapshot, List indexIds,
+ ImmutableOpenMap shards) {
+ return SnapshotsInProgress.startedEntry(snapshot, randomBoolean(), randomBoolean(), indexIds, Collections.emptyList(),
+ 1L, randomNonNegativeLong(), shards, Collections.emptyMap(), Version.CURRENT);
+ }
+
+ private static SnapshotsInProgress.Entry cloneEntry(
+ Snapshot snapshot, SnapshotId source, ImmutableOpenMap clones) {
+ final List indexIds = StreamSupport.stream(clones.keys().spliterator(), false)
+ .map(k -> k.value.index()).distinct().collect(Collectors.toList());
+ return SnapshotsInProgress.startClone(snapshot, source, indexIds, 1L, randomNonNegativeLong(), Version.CURRENT).withClones(clones);
+ }
+
+ private static SnapshotsInProgress.ShardSnapshotStatus initShardStatus(String nodeId) {
+ return new SnapshotsInProgress.ShardSnapshotStatus(nodeId, uuid());
+ }
+
+ private static SnapshotsInProgress.ShardSnapshotStatus successfulShardStatus(String nodeId) {
+ return new SnapshotsInProgress.ShardSnapshotStatus(nodeId, SnapshotsInProgress.ShardState.SUCCESS, uuid());
+ }
+
+ private static Snapshot snapshot(String repoName, String name) {
+ return new Snapshot(repoName, new SnapshotId(name, uuid()));
+ }
+
+ private static Index index(String name) {
+ return new Index(name, uuid());
+ }
+
+ private static IndexId indexId(String name) {
+ return new IndexId(name, uuid());
+ }
+
+ private static String uuid() {
+ return UUIDs.randomBase64UUID(random());
+ }
+}
diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
index a91e95c836cf3..ac17d9b944238 100644
--- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
+++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
@@ -118,6 +118,10 @@ public long getFailureCount() {
/** Allows blocking on writing the snapshot file at the end of snapshot creation to simulate a died master node */
private volatile boolean blockAndFailOnWriteSnapFile;
+ private volatile boolean blockOnWriteShardLevelMeta;
+
+ private volatile boolean blockOnReadIndexMeta;
+
/**
* Writes to the blob {@code index.latest} at the repository root will fail with an {@link IOException} if {@code true}.
*/
@@ -189,6 +193,8 @@ public synchronized void unblock() {
blockOnWriteIndexFile = false;
blockAndFailOnWriteSnapFile = false;
blockOnDeleteIndexN = false;
+ blockOnWriteShardLevelMeta = false;
+ blockOnReadIndexMeta = false;
this.notifyAll();
}
@@ -212,6 +218,14 @@ public void setBlockOnDeleteIndexFile() {
blockOnDeleteIndexN = true;
}
+ public void setBlockOnWriteShardLevelMeta() {
+ blockOnWriteShardLevelMeta = true;
+ }
+
+ public void setBlockOnReadIndexMeta() {
+ blockOnReadIndexMeta = true;
+ }
+
public void setFailReadsAfterUnblock(boolean failReadsAfterUnblock) {
this.failReadsAfterUnblock = failReadsAfterUnblock;
}
@@ -229,7 +243,7 @@ private synchronized boolean blockExecution() {
boolean wasBlocked = false;
try {
while (blockOnDataFiles || blockOnAnyFiles || blockOnWriteIndexFile ||
- blockAndFailOnWriteSnapFile || blockOnDeleteIndexN) {
+ blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockOnReadIndexMeta) {
blocked = true;
this.wait();
wasBlocked = true;
@@ -365,8 +379,12 @@ protected BlobContainer wrapChild(BlobContainer child) {
@Override
public InputStream readBlob(String name) throws IOException {
- maybeReadErrorAfterBlock(name);
- maybeIOExceptionOrBlock(name);
+ if (blockOnReadIndexMeta && name.startsWith(BlobStoreRepository.METADATA_PREFIX) && path().equals(basePath()) == false) {
+ blockExecutionAndMaybeWait(name);
+ } else {
+ maybeReadErrorAfterBlock(name);
+ maybeIOExceptionOrBlock(name);
+ }
return super.readBlob(name);
}
@@ -430,6 +448,10 @@ public Map listBlobsByPrefix(String blobNamePrefix) throws
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
throws IOException {
maybeIOExceptionOrBlock(blobName);
+ if (blockOnWriteShardLevelMeta && blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX)
+ && path().equals(basePath()) == false) {
+ blockExecutionAndMaybeWait(blobName);
+ }
super.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
if (RandomizedContext.current().getRandom().nextBoolean()) {
// for network based repositories, the blob may have been written but we may still