Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Add rest endpoint for remote store restore #3576

Merged
merged 5 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,8 @@ public void testApiNamingConventions() throws Exception {
"nodes.hot_threads",
"nodes.usage",
"nodes.reload_secure_settings",
"search_shards", };
"search_shards",
"remote_store.restore", };
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New API is added as high level rest client is not added for _remotestore APIs. I am not sure about guideline on writing high level rest client for new endpoints. If it is required, will add a tracking issue to add the same.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am guessing that will be needed since it's a new API. And also https://github.com/opensearch-project/opensearch-java and https://github.com/opensearch-project/opensearch-api-specification. I'd appreciate a write up on "adding new RESTful APIs" somewhere in the developer guide, and all the subsequent changes that are required today for the next person.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List<String> booleanReturnMethods = Arrays.asList("security.enable_user", "security.disable_user", "security.change_password");
Set<String> deprecatedMethods = new HashSet<>();
deprecatedMethods.add("indices.force_merge");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"remote_store.restore":{
"documentation":{
"url": "https://opensearch.org/docs/latest/opensearch/rest-api/remote-store#restore",
"description":"Restores from remote store."
},
"stability":"experimental",
"url":{
"paths":[
{
"path":"/_remotestore/_restore",
"methods":[
"POST"
]
}
]
},
"params":{
"cluster_manager_timeout":{
"type":"time",
"description":"Explicit operation timeout for connection to cluster-manager node"
},
"wait_for_completion":{
"type":"boolean",
"description":"Should this request wait until the operation has completed before returning",
"default":false
}
},
"body":{
"description":"A comma separated list of index IDs",
"required":true
}
}
}
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import org.opensearch.action.admin.cluster.node.usage.TransportNodesUsageAction;
import org.opensearch.action.admin.cluster.remote.RemoteInfoAction;
import org.opensearch.action.admin.cluster.remote.TransportRemoteInfoAction;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreAction;
import org.opensearch.action.admin.cluster.remotestore.restore.TransportRestoreRemoteStoreAction;
import org.opensearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryAction;
import org.opensearch.action.admin.cluster.repositories.cleanup.TransportCleanupRepositoryAction;
import org.opensearch.action.admin.cluster.repositories.delete.DeleteRepositoryAction;
Expand Down Expand Up @@ -267,6 +269,7 @@
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.seqno.RetentionLeaseActions;
import org.opensearch.indices.SystemIndices;
import org.opensearch.indices.breaker.CircuitBreakerService;
Expand Down Expand Up @@ -314,6 +317,7 @@
import org.opensearch.rest.action.admin.cluster.RestPutStoredScriptAction;
import org.opensearch.rest.action.admin.cluster.RestReloadSecureSettingsAction;
import org.opensearch.rest.action.admin.cluster.RestRemoteClusterInfoAction;
import org.opensearch.rest.action.admin.cluster.RestRestoreRemoteStoreAction;
import org.opensearch.rest.action.admin.cluster.RestRestoreSnapshotAction;
import org.opensearch.rest.action.admin.cluster.RestSnapshotsStatusAction;
import org.opensearch.rest.action.admin.cluster.RestVerifyRepositoryAction;
Expand Down Expand Up @@ -668,6 +672,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class);
actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class);

// Remote Store
actions.register(RestoreRemoteStoreAction.INSTANCE, TransportRestoreRemoteStoreAction.class);

return unmodifiableMap(actions.getRegistry());
}

Expand Down Expand Up @@ -853,6 +860,11 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
}
}
registerHandler.accept(new RestCatAction(catActions));

// Remote Store APIs
if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrross Added check for feature flag before registering the rest endpoint.

If feature flag is disabled, we get following output:

$ curl -X POST "localhost:9200/_remotestore/_restore" -H 'Content-Type: application/json' -d'{"indices": "my-index-1"}'

{"error":"no handler found for uri [/_remotestore/_restore] and method [POST]"}%

registerHandler.accept(new RestRestoreRemoteStoreAction());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.remotestore.restore;

import org.opensearch.action.ActionType;

/**
* Restore remote store action
*
* @opensearch.internal
*/
public final class RestoreRemoteStoreAction extends ActionType<RestoreRemoteStoreResponse> {

public static final RestoreRemoteStoreAction INSTANCE = new RestoreRemoteStoreAction();
public static final String NAME = "cluster:admin/remotestore/restore";

private RestoreRemoteStoreAction() {
super(NAME, RestoreRemoteStoreResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.remotestore.restore;

import org.opensearch.action.ActionResponse;
import org.opensearch.common.Nullable;
import org.opensearch.common.ParseField;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ConstructingObjectParser;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.rest.RestStatus;
import org.opensearch.snapshots.RestoreInfo;

import java.io.IOException;
import java.util.Objects;

import static org.opensearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

/**
* Contains information about remote store restores
*
* @opensearch.internal
*/
public final class RestoreRemoteStoreResponse extends ActionResponse implements ToXContentObject {

@Nullable
private final RestoreInfo restoreInfo;

public RestoreRemoteStoreResponse(@Nullable RestoreInfo restoreInfo) {
this.restoreInfo = restoreInfo;
}

public RestoreRemoteStoreResponse(StreamInput in) throws IOException {
super(in);
restoreInfo = RestoreInfo.readOptionalRestoreInfo(in);
}

/**
* Returns restore information if remote store restore was completed before this method returned, null otherwise
*
* @return restore information or null
*/
public RestoreInfo getRestoreInfo() {
return restoreInfo;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(restoreInfo);
}

public RestStatus status() {
if (restoreInfo == null) {
return RestStatus.ACCEPTED;
}
return restoreInfo.status();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (restoreInfo != null) {
builder.field("remote_store");
restoreInfo.toXContent(builder, params);
} else {
builder.field("accepted", true);
}
builder.endObject();
return builder;
}

public static final ConstructingObjectParser<RestoreRemoteStoreResponse, Void> PARSER = new ConstructingObjectParser<>(
"restore_remote_store",
true,
v -> {
RestoreInfo restoreInfo = (RestoreInfo) v[0];
Boolean accepted = (Boolean) v[1];
assert (accepted == null && restoreInfo != null) || (accepted != null && accepted && restoreInfo == null) : "accepted: ["
+ accepted
+ "], restoreInfo: ["
+ restoreInfo
+ "]";
return new RestoreRemoteStoreResponse(restoreInfo);
}
);

static {
PARSER.declareObject(
optionalConstructorArg(),
(parser, context) -> RestoreInfo.fromXContent(parser),
new ParseField("remote_store")
);
PARSER.declareBoolean(optionalConstructorArg(), new ParseField("accepted"));
}

public static RestoreRemoteStoreResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RestoreRemoteStoreResponse that = (RestoreRemoteStoreResponse) o;
return Objects.equals(restoreInfo, that.restoreInfo);
}

@Override
public int hashCode() {
return Objects.hash(restoreInfo);
}

@Override
public String toString() {
return "RestoreRemoteStoreResponse{" + "restoreInfo=" + restoreInfo + '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.remotestore.restore;

import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreClusterStateListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.snapshots.RestoreService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;

/**
* Transport action for restore remote store operation
*
* @opensearch.internal
*/
public final class TransportRestoreRemoteStoreAction extends TransportClusterManagerNodeAction<
RestoreRemoteStoreRequest,
RestoreRemoteStoreResponse> {
private final RestoreService restoreService;

@Inject
public TransportRestoreRemoteStoreAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
RestoreService restoreService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(
RestoreRemoteStoreAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
RestoreRemoteStoreRequest::new,
indexNameExpressionResolver
);
this.restoreService = restoreService;
}

@Override
protected String executor() {
return ThreadPool.Names.GENERIC;
}
Comment on lines +59 to +62
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to decide on the threadpool. I'd prefer having a dedicated threadpool

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me think a bit more on this. Will get back.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is on the same lines as snapshot restore (https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java#L82).

Currently, we don't have a dedicated threadpool for remote store as the segment upload happens in the refresh flow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Bukhtawar Does it answer your question?

Copy link
Collaborator

@Bukhtawar Bukhtawar Jun 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to think about what happens to segments when remote store isn't available. Do we block flushes eventually?. Also need to specify if this can run concurrently with a snapshot segment restore

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If remote store is not available, we will not block flush directly. If segment upload is not working, we will not purge translog. If translog grows beyond some threshold, we will block write operations. This will indirectly block flush.

Also need to specify if this can run concurrently with a snapshot segment restore

At a time, only one restore operation will happen. In the implementation of the API (#3642), we change the state of the index to INITIALIZING, if index state is already INITIALIZING, we fail the operation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @Bukhtawar , we need to use SNAPSHOT (why TransportRestoreSnapshotAction is not using it?). thread pool or introduce a dedicated one ...


@Override
protected RestoreRemoteStoreResponse read(StreamInput in) throws IOException {
return new RestoreRemoteStoreResponse(in);
}

@Override
protected ClusterBlockException checkBlock(RestoreRemoteStoreRequest request, ClusterState state) {
// Restoring a remote store might change the global state and create/change an index,
// so we need to check for METADATA_WRITE and WRITE blocks
ClusterBlockException blockException = state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
if (blockException != null) {
return blockException;
}
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);

}

@Override
protected void clusterManagerOperation(
final RestoreRemoteStoreRequest request,
final ClusterState state,
final ActionListener<RestoreRemoteStoreResponse> listener
) {
restoreService.restoreFromRemoteStore(
request,
ActionListener.delegateFailure(listener, (delegatedListener, restoreCompletionResponse) -> {
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
RestoreClusterStateListener.createAndRegisterListener(
clusterService,
restoreCompletionResponse,
delegatedListener,
RestoreRemoteStoreResponse::new
);
} else {
delegatedListener.onResponse(new RestoreRemoteStoreResponse(restoreCompletionResponse.getRestoreInfo()));
}
})
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
* compatible open source license.
*/

/** Restore Snapshot transport handler. */
/** Restore remote store transport handler. */
package org.opensearch.action.admin.cluster.remotestore.restore;
Loading