Skip to content

Commit

Permalink
Add Clone Snapshot Request Handling Scaffolding (elastic#63037)
Browse files Browse the repository at this point in the history
Adds all the scaffolding for snapshot clone request handling
and index-to-clone resolution to reduce the diff in elastic#61839 to
the bare essentials of the state machine changes for snapshot
cloning and relevant tests and give us the opportunity to
review the API in isolation.
  • Loading branch information
original-brownbear committed Oct 5, 2020
1 parent 8607912 commit 40de7de
Show file tree
Hide file tree
Showing 8 changed files with 418 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.elasticsearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.clone.TransportCloneSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.create.TransportCreateSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotAction;
Expand Down Expand Up @@ -267,6 +269,7 @@
import org.elasticsearch.rest.action.admin.cluster.RestCancelTasksAction;
import org.elasticsearch.rest.action.admin.cluster.RestCleanupRepositoryAction;
import org.elasticsearch.rest.action.admin.cluster.RestClearVotingConfigExclusionsAction;
import org.elasticsearch.rest.action.admin.cluster.RestCloneSnapshotAction;
import org.elasticsearch.rest.action.admin.cluster.RestClusterAllocationExplainAction;
import org.elasticsearch.rest.action.admin.cluster.RestClusterGetSettingsAction;
import org.elasticsearch.rest.action.admin.cluster.RestClusterHealthAction;
Expand Down Expand Up @@ -522,6 +525,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(GetSnapshotsAction.INSTANCE, TransportGetSnapshotsAction.class);
actions.register(DeleteSnapshotAction.INSTANCE, TransportDeleteSnapshotAction.class);
actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class);
actions.register(CloneSnapshotAction.INSTANCE, TransportCloneSnapshotAction.class);
actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class);
actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);

Expand Down Expand Up @@ -665,6 +669,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestCleanupRepositoryAction());
registerHandler.accept(new RestGetSnapshotsAction());
registerHandler.accept(new RestCreateSnapshotAction());
registerHandler.accept(new RestCloneSnapshotAction());
registerHandler.accept(new RestRestoreSnapshotAction());
registerHandler.accept(new RestDeleteSnapshotAction());
registerHandler.accept(new RestSnapshotsStatusAction());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.cluster.snapshots.clone;

import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedResponse;

public final class CloneSnapshotAction extends ActionType<AcknowledgedResponse> {

public static final CloneSnapshotAction INSTANCE = new CloneSnapshotAction();
public static final String NAME = "cluster:admin/snapshot/clone";

private CloneSnapshotAction() {
super(NAME, AcknowledgedResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.cluster.snapshots.clone;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

import static org.elasticsearch.action.ValidateActions.addValidationError;

public class CloneSnapshotRequest extends MasterNodeRequest<CloneSnapshotRequest> implements IndicesRequest.Replaceable{

private final String repository;

private final String source;

private final String target;

private String[] indices;

private IndicesOptions indicesOptions = IndicesOptions.strictExpandHidden();

public CloneSnapshotRequest(StreamInput in) throws IOException {
super(in);
repository = in.readString();
source = in.readString();
target = in.readString();
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
}

/**
* Creates a clone snapshot request for cloning the given source snapshot's indices into the given target snapshot on the given
* repository.
*
* @param repository repository that source snapshot belongs to and that the target snapshot will be created in
* @param source source snapshot name
* @param target target snapshot name
* @param indices indices to clone from source to target
*/
public CloneSnapshotRequest(String repository, String source, String target, String[] indices) {
this.repository = repository;
this.source = source;
this.target = target;
this.indices = indices;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(repository);
out.writeString(source);
out.writeString(target);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (source == null) {
validationException = addValidationError("source snapshot name is missing", null);
}
if (target == null) {
validationException = addValidationError("target snapshot name is missing", null);
}
if (repository == null) {
validationException = addValidationError("repository is missing", validationException);
}
if (indices == null) {
validationException = addValidationError("indices is null", validationException);
} else if (indices.length == 0) {
validationException = addValidationError("indices patterns are empty", validationException);
} else {
for (String index : indices) {
if (index == null) {
validationException = addValidationError("index is null", validationException);
break;
}
}
}
return validationException;
}

@Override
public String[] indices() {
return this.indices;
}

@Override
public IndicesOptions indicesOptions() {
return indicesOptions;
}

@Override
public CloneSnapshotRequest indices(String... indices) {
this.indices = indices;
return this;
}

/**
* @see CloneSnapshotRequestBuilder#setIndicesOptions
*/
public CloneSnapshotRequest indicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
return this;
}

public String repository() {
return this.repository;
}

public String target() {
return this.target;
}

public String source() {
return this.source;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.cluster.snapshots.clone;

import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.Strings;

public class CloneSnapshotRequestBuilder extends MasterNodeOperationRequestBuilder<CloneSnapshotRequest, AcknowledgedResponse,
CloneSnapshotRequestBuilder> {

protected CloneSnapshotRequestBuilder(ElasticsearchClient client, ActionType<AcknowledgedResponse> action,
CloneSnapshotRequest request) {
super(client, action, request);
}

public CloneSnapshotRequestBuilder(ElasticsearchClient client, ActionType<AcknowledgedResponse> action,
String repository, String source, String target) {
this(client, action, new CloneSnapshotRequest(repository, source, target, Strings.EMPTY_ARRAY));
}

/**
* Sets a list of indices that should be cloned from the source to the target snapshot
* <p>
* The list of indices supports multi-index syntax. For example: "+test*" ,"-test42" will clone all indices with
* prefix "test" except index "test42".
*
* @return this builder
*/
public CloneSnapshotRequestBuilder setIndices(String... indices) {
request.indices(indices);
return this;
}

/**
* Specifies the indices options. Like what type of requested indices to ignore. For example indices that don't exist.
*
* @param indicesOptions the desired behaviour regarding indices options
* @return this request
*/
public CloneSnapshotRequestBuilder setIndicesOptions(IndicesOptions indicesOptions) {
request.indicesOptions(indicesOptions);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.cluster.snapshots.clone;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;

/**
* Transport action for the clone snapshot operation.
*/
public final class TransportCloneSnapshotAction extends TransportMasterNodeAction<CloneSnapshotRequest, AcknowledgedResponse> {

private final SnapshotsService snapshotsService;

@Inject
public TransportCloneSnapshotAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, SnapshotsService snapshotsService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(CloneSnapshotAction.NAME, transportService, clusterService, threadPool, actionFilters, CloneSnapshotRequest::new,
indexNameExpressionResolver);
this.snapshotsService = snapshotsService;
}

@Override
protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected AcknowledgedResponse read(StreamInput in) throws IOException {
return new AcknowledgedResponse(in);
}

@Override
protected void masterOperation(CloneSnapshotRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
throw new UnsupportedOperationException("not implemented yet");
}

@Override
protected ClusterBlockException checkBlock(CloneSnapshotRequest request, ClusterState state) {
// Cluster is not affected but we look up repositories in metadata
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequestBuilder;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.clone.CloneSnapshotRequestBuilder;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequestBuilder;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
Expand Down Expand Up @@ -506,7 +508,22 @@ public interface ClusterAdminClient extends ElasticsearchClient {
CreateSnapshotRequestBuilder prepareCreateSnapshot(String repository, String name);

/**
* Get snapshot.
* Clones a snapshot.
*/
CloneSnapshotRequestBuilder prepareCloneSnapshot(String repository, String source, String target);

/**
* Clones a snapshot.
*/
ActionFuture<AcknowledgedResponse> cloneSnapshot(CloneSnapshotRequest request);

/**
* Clones a snapshot.
*/
void cloneSnapshot(CloneSnapshotRequest request, ActionListener<AcknowledgedResponse> listener);

/**
* Get snapshots.
*/
ActionFuture<GetSnapshotsResponse> getSnapshots(GetSnapshotsRequest request);

Expand Down
Loading

0 comments on commit 40de7de

Please sign in to comment.