Skip to content

Commit

Permalink
Clone Snapshot API (#61839)
Browse files Browse the repository at this point in the history
Adds clone snapshot API to clone part of a snapshot into a new snapshot.
  • Loading branch information
original-brownbear authored Oct 2, 2020
1 parent 13a073d commit f7f239d
Show file tree
Hide file tree
Showing 14 changed files with 1,684 additions and 126 deletions.
52 changes: 52 additions & 0 deletions docs/reference/snapshot-restore/apis/clone-snapshot-api.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
[[clone-snapshot-api]]
=== Clone snapshot API
++++
<titleabbrev>Clone snapshot</titleabbrev>
++++

Clones part or all of a snapshot into a new snapshot.

[source,console]
----
PUT /_snapshot/my_repository/source_snapshot/_clone/target_snapshot
{
"indices": "index_a,index_b"
}
----
// TEST[skip:TODO]

[[clone-snapshot-api-request]]
==== {api-request-title}

`PUT /_snapshot/<repository>/<source_snapshot>/_clone/<target_snapshot>`

[[clone-snapshot-api-desc]]
==== {api-description-title}

The clone snapshot API allows creating a copy of all or part of an existing snapshot
within the same repository.

[[clone-snapshot-api-params]]
==== {api-path-parms-title}

`<repository>`::
(Required, string)
Name of the snapshot repository that both source and target snapshot belong to.

[[clone-snapshot-api-query-params]]
==== {api-query-parms-title}

`master_timeout`::
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a connection to the master node. If no response is received before the timeout
expires, the request fails and returns an error. Defaults to `30s`.

`timeout`::
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
a response. If no response is received before the timeout expires, the request
fails and returns an error. Defaults to `30s`.

`indices`::
(Required, string)
A comma-separated list of indices to include in the snapshot.
<<multi-index,Multi-index syntax>> is supported.
1 change: 1 addition & 0 deletions docs/reference/snapshot-restore/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ understand the time requirements before proceeding.
--

include::register-repository.asciidoc[]
include::apis/clone-snapshot-api.asciidoc[]
include::take-snapshot.asciidoc[]
include::restore-snapshot.asciidoc[]
include::monitor-snapshot-restore.asciidoc[]
Expand Down
2 changes: 2 additions & 0 deletions docs/reference/snapshot-restore/take-snapshot.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,5 @@ PUT /_snapshot/my_backup/<snapshot-{now/d}>
PUT /_snapshot/my_backup/%3Csnapshot-%7Bnow%2Fd%7D%3E
-----------------------------------
// TEST[continued]

NOTE: You can also create snapshots that are copies of part of an existing snapshot using the <<clone-snapshot-api,clone snapshot API>>.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1293,11 +1293,6 @@ private ActionFuture<CreateSnapshotResponse> startFullSnapshotFromMasterClient(S
.setWaitForCompletion(true).execute();
}

// Large snapshot pool settings to set up nodes for tests involving multiple repositories that need to have enough
// threads so that blocking some threads on one repository doesn't block other repositories from doing work
private static final Settings LARGE_SNAPSHOT_POOL_SETTINGS = Settings.builder()
.put("thread_pool.snapshot.core", 5).put("thread_pool.snapshot.max", 5).build();

private void createIndexWithContent(String indexName, String nodeInclude, String nodeExclude) {
createIndexWithContent(indexName, indexSettingsNoReplicas(1)
.put("index.routing.allocation.include._name", nodeInclude)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,6 @@ protected ClusterBlockException checkBlock(CloneSnapshotRequest request, Cluster
@Override
protected void masterOperation(Task task, final CloneSnapshotRequest request, ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
throw new UnsupportedOperationException("not implemented yet");
snapshotsService.cloneSnapshot(request, ActionListener.map(listener, v -> new AcknowledgedResponse(true)));
}
}
133 changes: 123 additions & 10 deletions server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoryShardId;
import org.elasticsearch.repositories.RepositoryOperation;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotsService;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -96,18 +99,52 @@ public static Entry startedEntry(Snapshot snapshot, boolean includeGlobalState,
indices, dataStreams, startTime, repositoryStateId, shards, null, userMetadata, version);
}

/**
* Creates the initial snapshot clone entry
*
* @param snapshot snapshot to clone into
* @param source snapshot to clone from
* @param indices indices to clone
* @param startTime start time
* @param repositoryStateId repository state id that this clone is based on
* @param version repository metadata version to write
* @return snapshot clone entry
*/
public static Entry startClone(Snapshot snapshot, SnapshotId source, List<IndexId> indices, long startTime,
long repositoryStateId, Version version) {
return new SnapshotsInProgress.Entry(snapshot, true, false, State.STARTED, indices, Collections.emptyList(),
startTime, repositoryStateId, ImmutableOpenMap.of(), null, Collections.emptyMap(), version, source,
ImmutableOpenMap.of());
}

public static class Entry implements Writeable, ToXContent, RepositoryOperation {
private final State state;
private final Snapshot snapshot;
private final boolean includeGlobalState;
private final boolean partial;
/**
* Map of {@link ShardId} to {@link ShardSnapshotStatus} tracking the state of each shard snapshot operation.
*/
private final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
private final List<IndexId> indices;
private final List<String> dataStreams;
private final long startTime;
private final long repositoryStateId;
// see #useShardGenerations
private final Version version;

/**
* Source snapshot if this is a clone operation or {@code null} if this is a snapshot.
*/
@Nullable
private final SnapshotId source;

/**
* Map of {@link RepositoryShardId} to {@link ShardSnapshotStatus} tracking the state of each shard clone operation in this entry
* the same way {@link #shards} tracks the status of each shard snapshot operation in non-clone entries.
*/
private final ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones;

@Nullable private final Map<String, Object> userMetadata;
@Nullable private final String failure;

Expand All @@ -116,6 +153,15 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta
List<String> dataStreams, long startTime, long repositoryStateId,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure, Map<String, Object> userMetadata,
Version version) {
this(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, repositoryStateId, shards, failure,
userMetadata, version, null, ImmutableOpenMap.of());
}

private Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
List<String> dataStreams, long startTime, long repositoryStateId,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure, Map<String, Object> userMetadata,
Version version, @Nullable SnapshotId source,
@Nullable ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones) {
this.state = state;
this.snapshot = snapshot;
this.includeGlobalState = includeGlobalState;
Expand All @@ -124,11 +170,18 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta
this.dataStreams = dataStreams;
this.startTime = startTime;
this.shards = shards;
assert assertShardsConsistent(state, indices, shards);
this.repositoryStateId = repositoryStateId;
this.failure = failure;
this.userMetadata = userMetadata;
this.version = version;
this.source = source;
if (source == null) {
assert clones == null || clones.isEmpty() : "Provided [" + clones + "] but no source";
this.clones = ImmutableOpenMap.of();
} else {
this.clones = clones;
}
assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones);
}

private Entry(StreamInput in) throws IOException {
Expand All @@ -144,29 +197,59 @@ private Entry(StreamInput in) throws IOException {
userMetadata = in.readMap();
version = Version.readVersion(in);
dataStreams = in.readStringList();
if (in.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) {
source = in.readOptionalWriteable(SnapshotId::new);
clones = in.readImmutableMap(RepositoryShardId::new, ShardSnapshotStatus::readFrom);
} else {
source = null;
clones = ImmutableOpenMap.of();
}
}

private static boolean assertShardsConsistent(State state, List<IndexId> indices,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
private static boolean assertShardsConsistent(SnapshotId source, State state, List<IndexId> indices,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones) {
if ((state == State.INIT || state == State.ABORTED) && shards.isEmpty()) {
return true;
}
final Set<String> indexNames = indices.stream().map(IndexId::getName).collect(Collectors.toSet());
final Set<String> indexNamesInShards = new HashSet<>();
shards.keysIt().forEachRemaining(s -> indexNamesInShards.add(s.getIndexName()));
assert indexNames.equals(indexNamesInShards)
shards.iterator().forEachRemaining(s -> {
indexNamesInShards.add(s.key.getIndexName());
assert source == null || s.value.nodeId == null :
"Shard snapshot must not be assigned to data node when copying from snapshot [" + source + "]";
});
assert source == null || indexNames.isEmpty() == false : "No empty snapshot clones allowed";
assert source != null || indexNames.equals(indexNamesInShards)
: "Indices in shards " + indexNamesInShards + " differ from expected indices " + indexNames + " for state [" + state + "]";
final boolean shardsCompleted = completed(shards.values());
assert (state.completed() && shardsCompleted) || (state.completed() == false && shardsCompleted == false)
: "Completed state must imply all shards completed but saw state [" + state + "] and shards " + shards;
final boolean shardsCompleted = completed(shards.values()) && completed(clones.values());
// Check state consistency for normal snapshots and started clone operations
if (source == null || clones.isEmpty() == false) {
assert (state.completed() && shardsCompleted) || (state.completed() == false && shardsCompleted == false)
: "Completed state must imply all shards completed but saw state [" + state + "] and shards " + shards;
}
if (source != null && state.completed()) {
assert hasFailures(clones) == false || state == State.FAILED
: "Failed shard clones in [" + clones + "] but state was [" + state + "]";
}
return true;
}

public Entry withRepoGen(long newRepoGen) {
assert newRepoGen > repositoryStateId : "Updated repository generation [" + newRepoGen
+ "] must be higher than current generation [" + repositoryStateId + "]";
return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, newRepoGen, shards, failure,
userMetadata, version);
userMetadata, version, source, clones);
}

public Entry withClones(ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> updatedClones) {
if (updatedClones.equals(clones)) {
return this;
}
return new Entry(snapshot, includeGlobalState, partial,
completed(updatedClones.values()) ? (hasFailures(updatedClones) ? State.FAILED : State.SUCCESS) :
state, indices, dataStreams, startTime, repositoryStateId, shards, failure, userMetadata, version, source,
updatedClones);
}

/**
Expand Down Expand Up @@ -203,7 +286,7 @@ public Entry abort() {

public Entry fail(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, State state, String failure) {
return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, repositoryStateId, shards,
failure, userMetadata, version);
failure, userMetadata, version, source, clones);
}

/**
Expand Down Expand Up @@ -291,6 +374,19 @@ public Version version() {
return version;
}

@Nullable
public SnapshotId source() {
return source;
}

public boolean isClone() {
return source != null;
}

public ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones() {
return clones;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -307,6 +403,8 @@ public boolean equals(Object o) {
if (state != entry.state) return false;
if (repositoryStateId != entry.repositoryStateId) return false;
if (version.equals(entry.version) == false) return false;
if (Objects.equals(source, ((Entry) o).source) == false) return false;
if (clones.equals(((Entry) o).clones) == false) return false;

return true;
}
Expand All @@ -322,6 +420,8 @@ public int hashCode() {
result = 31 * result + Long.hashCode(startTime);
result = 31 * result + Long.hashCode(repositoryStateId);
result = 31 * result + version.hashCode();
result = 31 * result + (source == null ? 0 : source.hashCode());
result = 31 * result + clones.hashCode();
return result;
}

Expand Down Expand Up @@ -383,6 +483,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeMap(userMetadata);
Version.writeVersion(version, out);
out.writeStringCollection(dataStreams);
if (out.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) {
out.writeOptionalWriteable(source);
out.writeMap(clones);
}
}

@Override
Expand All @@ -406,6 +510,15 @@ public static boolean completed(ObjectContainer<ShardSnapshotStatus> shards) {
return true;
}

private static boolean hasFailures(ImmutableOpenMap<RepositoryShardId, ShardSnapshotStatus> clones) {
for (ObjectCursor<ShardSnapshotStatus> value : clones.values()) {
if (value.value.state().failed()) {
return true;
}
}
return false;
}

public static class ShardSnapshotStatus implements Writeable {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ private static void validateSnapshotRestorable(final String repository, final Sn
}
}

private static boolean failed(SnapshotInfo snapshot, String index) {
public static boolean failed(SnapshotInfo snapshot, String index) {
for (SnapshotShardFailure failure : snapshot.shardFailures()) {
if (index.equals(failure.index())) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) {
final String localNodeId = clusterService.localNode().getId();
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
final State entryState = entry.state();
if (entry.isClone()) {
// This is a snapshot clone, it will be executed on the current master
continue;
}
if (entryState == State.STARTED) {
Map<ShardId, IndexShardSnapshotStatus> startedShards = null;
final Snapshot snapshot = entry.snapshot();
Expand Down
Loading

0 comments on commit f7f239d

Please sign in to comment.