Skip to content

Commit

Permalink
Clone Snapshot API (elastic#61839)
Browse files Browse the repository at this point in the history
Adds clone snapshot API to clone part of a snapshot into a new snapshot.
  • Loading branch information
original-brownbear committed Oct 5, 2020
1 parent 40de7de commit be8e1d8
Show file tree
Hide file tree
Showing 13 changed files with 1,712 additions and 128 deletions.
52 changes: 52 additions & 0 deletions docs/reference/snapshot-restore/apis/clone-snapshot-api.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
[[clone-snapshot-api]]
=== Clone snapshot API
++++
<titleabbrev>Clone snapshot</titleabbrev>
++++

Clones part or all of a snapshot into a new snapshot.

[source,console]
----
PUT /_snapshot/my_repository/source_snapshot/_clone/target_snapshot
{
"indices": "index_a,index_b"
}
----
// TEST[skip:TODO]

[[clone-snapshot-api-request]]
==== {api-request-title}

`PUT /_snapshot/<repository>/<source_snapshot>/_clone/<target_snapshot>`

[[clone-snapshot-api-desc]]
==== {api-description-title}

The clone snapshot API allows creating a copy of all or part of an existing snapshot
within the same repository.

[[clone-snapshot-api-params]]
==== {api-path-parms-title}

`<repository>`::
(Required, string)
Name of the snapshot repository that both source and target snapshot belong to.

[[clone-snapshot-api-query-params]]
==== {api-query-parms-title}

`master_timeout`::
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a connection to the master node. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.

`timeout`::
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a response. If no response is received before the timeout expires, the request
fails and returns an error. Defaults to `30s`.

`indices`::
(Required, string)
A comma-separated list of indices to include in the snapshot.
<<multi-index,Multi-index syntax>> is supported.
1 change: 1 addition & 0 deletions docs/reference/snapshot-restore/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ understand the time requirements before proceeding.
--

include::register-repository.asciidoc[]
include::apis/clone-snapshot-api.asciidoc[]
include::take-snapshot.asciidoc[]
include::restore-snapshot.asciidoc[]
include::monitor-snapshot-restore.asciidoc[]
Expand Down
2 changes: 2 additions & 0 deletions docs/reference/snapshot-restore/take-snapshot.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,5 @@ PUT /_snapshot/my_backup/<snapshot-{now/d}>
PUT /_snapshot/my_backup/%3Csnapshot-%7Bnow%2Fd%7D%3E
-----------------------------------
// TEST[continued]

NOTE: You can also create snapshots that are copies of part of an existing snapshot using the <<clone-snapshot-api,clone snapshot API>>.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ protected AcknowledgedResponse read(StreamInput in) throws IOException {

@Override
protected void masterOperation(CloneSnapshotRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
throw new UnsupportedOperationException("not implemented yet");
snapshotsService.cloneSnapshot(request, ActionListener.map(listener, v -> new AcknowledgedResponse(true)));
}

@Override
Expand Down
157 changes: 145 additions & 12 deletions server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoryShardId;
import org.elasticsearch.repositories.RepositoryOperation;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotsService;

import java.io.IOException;
Expand Down Expand Up @@ -88,25 +90,81 @@ public String toString() {
return builder.append("]").toString();
}

/**
* Creates the initial {@link Entry} when starting a snapshot, if no shard-level snapshot work is to be done the resulting entry
* will be in state {@link State#SUCCESS} right away otherwise it will be in state {@link State#STARTED}.
*/
public static Entry startedEntry(Snapshot snapshot, boolean includeGlobalState, boolean partial, List<IndexId> indices,
List<String> dataStreams, long startTime, long repositoryStateId,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, Map<String, Object> userMetadata,
Version version) {
return new SnapshotsInProgress.Entry(snapshot, includeGlobalState, partial,
completed(shards.values()) ? State.SUCCESS : State.STARTED,
indices, dataStreams, startTime, repositoryStateId, shards, null, userMetadata, version);
}

/**
* Creates the initial snapshot clone entry
*
* @param snapshot snapshot to clone into
* @param source snapshot to clone from
* @param indices indices to clone
* @param startTime start time
* @param repositoryStateId repository state id that this clone is based on
* @param version repository metadata version to write
* @return snapshot clone entry
*/
public static Entry startClone(Snapshot snapshot, SnapshotId source, List<IndexId> indices, long startTime,
long repositoryStateId, Version version) {
return new SnapshotsInProgress.Entry(snapshot, true, false, State.STARTED, indices, Collections.emptyList(),
startTime, repositoryStateId, ImmutableOpenMap.of(), null, Collections.emptyMap(), version, source,
ImmutableOpenMap.of());
}

public static class Entry implements Writeable, ToXContent, RepositoryOperation {
private final State state;
private final Snapshot snapshot;
private final boolean includeGlobalState;
private final boolean partial;
/**
* Map of {@link ShardId} to {@link ShardSnapshotStatus} tracking the state of each shard snapshot operation.
*/
private final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
private final List<IndexId> indices;
private final List<String> dataStreams;
private final long startTime;
private final long repositoryStateId;
// see #useShardGenerations
private final Version version;

/**
* Source snapshot if this is a clone operation or {@code null} if this is a snapshot.
*/
@Nullable
private final SnapshotId source;

/**
* Map of {@link RepositoryShardId} to {@link ShardSnapshotStatus} tracking the state of each shard clone operation in this entry
* the same way {@link #shards} tracks the status of each shard snapshot operation in non-clone entries.
*/
private final ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones;

@Nullable private final Map<String, Object> userMetadata;
@Nullable private final String failure;

public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
List<String> dataStreams, long startTime, long repositoryStateId,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure, Map<String, Object> userMetadata,
Version version) {
this(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, repositoryStateId, shards, failure,
userMetadata, version, null, ImmutableOpenMap.of());
}

private Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
List<String> dataStreams, long startTime, long repositoryStateId,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure, Map<String, Object> userMetadata,
Version version, @Nullable SnapshotId source,
@Nullable ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones) {
this.state = state;
this.snapshot = snapshot;
this.includeGlobalState = includeGlobalState;
Expand All @@ -115,11 +173,18 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta
this.dataStreams = dataStreams;
this.startTime = startTime;
this.shards = shards;
assert assertShardsConsistent(state, indices, shards);
this.repositoryStateId = repositoryStateId;
this.failure = failure;
this.userMetadata = userMetadata;
this.version = version;
this.source = source;
if (source == null) {
assert clones == null || clones.isEmpty() : "Provided [" + clones + "] but no source";
this.clones = ImmutableOpenMap.of();
} else {
this.clones = clones;
}
assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones);
}

private Entry(StreamInput in) throws IOException {
Expand All @@ -129,7 +194,7 @@ private Entry(StreamInput in) throws IOException {
state = State.fromValue(in.readByte());
indices = in.readList(IndexId::new);
startTime = in.readLong();
shards = in.readImmutableMap(ShardId::new, ShardSnapshotStatus::new);
shards = in.readImmutableMap(ShardId::new, ShardSnapshotStatus::readFrom);
repositoryStateId = in.readLong();
failure = in.readOptionalString();
if (in.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
Expand All @@ -152,18 +217,41 @@ private Entry(StreamInput in) throws IOException {
} else {
dataStreams = Collections.emptyList();
}
if (in.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) {
source = in.readOptionalWriteable(SnapshotId::new);
clones = in.readImmutableMap(RepositoryShardId::new, ShardSnapshotStatus::readFrom);
} else {
source = null;
clones = ImmutableOpenMap.of();
}
}

private static boolean assertShardsConsistent(State state, List<IndexId> indices,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
private static boolean assertShardsConsistent(SnapshotId source, State state, List<IndexId> indices,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones) {
if ((state == State.INIT || state == State.ABORTED) && shards.isEmpty()) {
return true;
}
final Set<String> indexNames = indices.stream().map(IndexId::getName).collect(Collectors.toSet());
final Set<String> indexNamesInShards = new HashSet<>();
shards.keysIt().forEachRemaining(s -> indexNamesInShards.add(s.getIndexName()));
assert indexNames.equals(indexNamesInShards)
shards.iterator().forEachRemaining(s -> {
indexNamesInShards.add(s.key.getIndexName());
assert source == null || s.value.nodeId == null :
"Shard snapshot must not be assigned to data node when copying from snapshot [" + source + "]";
});
assert source == null || indexNames.isEmpty() == false : "No empty snapshot clones allowed";
assert source != null || indexNames.equals(indexNamesInShards)
: "Indices in shards " + indexNamesInShards + " differ from expected indices " + indexNames + " for state [" + state + "]";
final boolean shardsCompleted = completed(shards.values()) && completed(clones.values());
// Check state consistency for normal snapshots and started clone operations
if (source == null || clones.isEmpty() == false) {
assert (state.completed() && shardsCompleted) || (state.completed() == false && shardsCompleted == false)
: "Completed state must imply all shards completed but saw state [" + state + "] and shards " + shards;
}
if (source != null && state.completed()) {
assert hasFailures(clones) == false || state == State.FAILED
: "Failed shard clones in [" + clones + "] but state was [" + state + "]";
}
return true;
}

Expand Down Expand Up @@ -205,12 +293,22 @@ public Entry withRepoGen(long newRepoGen) {
assert newRepoGen > repositoryStateId : "Updated repository generation [" + newRepoGen
+ "] must be higher than current generation [" + repositoryStateId + "]";
return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, newRepoGen, shards, failure,
userMetadata, version);
userMetadata, version, source, clones);
}

public Entry withClones(ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> updatedClones) {
if (updatedClones.equals(clones)) {
return this;
}
return new Entry(snapshot, includeGlobalState, partial,
completed(updatedClones.values()) ? (hasFailures(updatedClones) ? State.FAILED : State.SUCCESS) :
state, indices, dataStreams, startTime, repositoryStateId, shards, failure, userMetadata, version, source,
updatedClones);
}

public Entry withShards(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, repositoryStateId, shards,
failure, userMetadata, version);
failure, userMetadata, version, source, clones);
}

/**
Expand Down Expand Up @@ -298,6 +396,19 @@ public Version version() {
return version;
}

@Nullable
public SnapshotId source() {
return source;
}

public boolean isClone() {
return source != null;
}

public ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones() {
return clones;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -314,6 +425,8 @@ public boolean equals(Object o) {
if (state != entry.state) return false;
if (repositoryStateId != entry.repositoryStateId) return false;
if (version.equals(entry.version) == false) return false;
if (Objects.equals(source, ((Entry) o).source) == false) return false;
if (clones.equals(((Entry) o).clones) == false) return false;

return true;
}
Expand All @@ -329,6 +442,8 @@ public int hashCode() {
result = 31 * result + Long.hashCode(startTime);
result = 31 * result + Long.hashCode(repositoryStateId);
result = 31 * result + version.hashCode();
result = 31 * result + (source == null ? 0 : source.hashCode());
result = 31 * result + clones.hashCode();
return result;
}

Expand Down Expand Up @@ -398,6 +513,10 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) {
out.writeStringCollection(dataStreams);
}
if (out.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) {
out.writeOptionalWriteable(source);
out.writeMap(clones);
}
}

@Override
Expand All @@ -421,6 +540,15 @@ public static boolean completed(ObjectContainer<ShardSnapshotStatus> shards) {
return true;
}

private static boolean hasFailures(ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones) {
for (ObjectCursor<ShardSnapshotStatus> value : clones.values()) {
if (value.value.state().failed()) {
return true;
}
}
return false;
}

public static class ShardSnapshotStatus implements Writeable {

/**
Expand Down Expand Up @@ -470,15 +598,20 @@ private boolean assertConsistent() {
return true;
}

public ShardSnapshotStatus(StreamInput in) throws IOException {
nodeId = in.readOptionalString();
state = ShardState.fromValue(in.readByte());
public static ShardSnapshotStatus readFrom(StreamInput in) throws IOException {
String nodeId = in.readOptionalString();
final ShardState state = ShardState.fromValue(in.readByte());
final String generation;
if (SnapshotsService.useShardGenerations(in.getVersion())) {
generation = in.readOptionalString();
} else {
generation = null;
}
reason = in.readOptionalString();
final String reason = in.readOptionalString();
if (state == ShardState.QUEUED) {
return UNASSIGNED_QUEUED;
}
return new ShardSnapshotStatus(nodeId, state, reason, generation);
}

public ShardState state() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ private static void validateSnapshotRestorable(final String repository, final Sn
}
}

private static boolean failed(SnapshotInfo snapshot, String index) {
public static boolean failed(SnapshotInfo snapshot, String index) {
for (SnapshotShardFailure failure : snapshot.shardFailures()) {
if (index.equals(failure.index())) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) {
final String localNodeId = clusterService.localNode().getId();
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
final State entryState = entry.state();
if (entry.isClone()) {
// This is a snapshot clone, it will be executed on the current master
continue;
}
if (entryState == State.STARTED) {
Map<ShardId, IndexShardSnapshotStatus> startedShards = null;
final Snapshot snapshot = entry.snapshot();
Expand Down
Loading

0 comments on commit be8e1d8

Please sign in to comment.