Skip to content

Commit

Permalink
Refactor code to handle review comments and introduce new exception
Browse files Browse the repository at this point in the history
Signed-off-by: Sumit Bansal <sumitsb@amazon.com>
  • Loading branch information
sumitasr committed Sep 29, 2024
1 parent 66c962b commit ab80b7f
Show file tree
Hide file tree
Showing 12 changed files with 388 additions and 243 deletions.
12 changes: 6 additions & 6 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.ActionPlugin.ActionHandler;
import org.opensearch.rest.NamedRoute;
import org.opensearch.rest.RequestLimitSettings;
import org.opensearch.rest.ResponseLimitSettings;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.rest.RestHeaderDefinition;
Expand Down Expand Up @@ -529,7 +529,7 @@ public class ActionModule extends AbstractModule {
private final RequestValidators<IndicesAliasesRequest> indicesAliasesRequestRequestValidators;
private final ThreadPool threadPool;
private final ExtensionsManager extensionsManager;
private final RequestLimitSettings requestLimitSettings;
private final ResponseLimitSettings responseLimitSettings;

public ActionModule(
Settings settings,
Expand Down Expand Up @@ -582,7 +582,7 @@ public ActionModule(
);

restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService);
requestLimitSettings = new RequestLimitSettings(clusterSettings, settings);
responseLimitSettings = new ResponseLimitSettings(clusterSettings, settings);
}

public Map<String, ActionHandler<?, ?>> getActions() {
Expand Down Expand Up @@ -963,8 +963,8 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestClusterManagerAction());
registerHandler.accept(new RestNodesAction());
registerHandler.accept(new RestTasksAction(nodesInCluster));
registerHandler.accept(new RestIndicesAction(requestLimitSettings));
registerHandler.accept(new RestSegmentsAction(requestLimitSettings));
registerHandler.accept(new RestIndicesAction(responseLimitSettings));
registerHandler.accept(new RestSegmentsAction(responseLimitSettings));
// Fully qualified to prevent interference with rest.action.count.RestCountAction
registerHandler.accept(new org.opensearch.rest.action.cat.RestCountAction());
// Fully qualified to prevent interference with rest.action.indices.RestRecoveryAction
Expand Down Expand Up @@ -1052,7 +1052,7 @@ protected void configure() {
// register dynamic ActionType -> transportAction Map used by NodeClient
bind(DynamicActionRegistry.class).toInstance(dynamicActionRegistry);

bind(RequestLimitSettings.class).toInstance(requestLimitSettings);
bind(ResponseLimitSettings.class).toInstance(responseLimitSettings);
}

public ActionFilters getActionFilters() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.NotifyOnceListener;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.rest.RequestLimitSettings;
import org.opensearch.rest.ResponseLimitBreachedException;
import org.opensearch.rest.ResponseLimitSettings;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import static org.opensearch.rest.RequestLimitSettings.BlockAction.CAT_SHARDS;
import java.util.Objects;

import static org.opensearch.rest.ResponseLimitSettings.LimitEntity.SHARDS;

/**
* Perform cat shards action
Expand All @@ -36,18 +37,18 @@
public class TransportCatShardsAction extends HandledTransportAction<CatShardsRequest, CatShardsResponse> {

private final NodeClient client;
private final RequestLimitSettings requestLimitSettings;
private final ResponseLimitSettings responseLimitSettings;

@Inject
public TransportCatShardsAction(
NodeClient client,
TransportService transportService,
ActionFilters actionFilters,
RequestLimitSettings requestLimitSettings
ResponseLimitSettings responseLimitSettings
) {
super(CatShardsAction.NAME, transportService, actionFilters, CatShardsRequest::new);
this.client = client;
this.requestLimitSettings = requestLimitSettings;
this.responseLimitSettings = responseLimitSettings;
}

@Override
Expand Down Expand Up @@ -85,10 +86,7 @@ protected void innerOnFailure(Exception e) {
client.admin().cluster().state(clusterStateRequest, new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(ClusterStateResponse clusterStateResponse) {
if (shardsRequest.isRequestLimitCheckSupported()
&& requestLimitSettings.isCircuitLimitBreached(clusterStateResponse.getState(), CAT_SHARDS)) {
listener.onFailure(new CircuitBreakingException("Too many shards requested.", CircuitBreaker.Durability.TRANSIENT));
}
validateRequestLimit(shardsRequest, clusterStateResponse, cancellableListener);
catShardsResponse.setClusterStateResponse(clusterStateResponse);
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.setShouldCancelOnTimeout(true);
Expand Down Expand Up @@ -123,4 +121,21 @@ public void onFailure(Exception e) {
}

}

private void validateRequestLimit(
final CatShardsRequest shardsRequest,
final ClusterStateResponse clusterStateResponse,
final ActionListener<CatShardsResponse> listener
) {
if (shardsRequest.isRequestLimitCheckSupported()
&& Objects.nonNull(clusterStateResponse)
&& Objects.nonNull(clusterStateResponse.getState())) {
int limit = responseLimitSettings.getCatShardsResponseLimit();
if (ResponseLimitSettings.isResponseLimitBreached(clusterStateResponse.getState().getRoutingTable(), SHARDS, limit)) {
listener.onFailure(
new ResponseLimitBreachedException("Too many shards requested. Can not request shards beyond {" + limit + "}")
);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RequestLimitSettings;
import org.opensearch.rest.ResponseLimitSettings;
import org.opensearch.script.ScriptService;
import org.opensearch.search.SearchService;
import org.opensearch.search.aggregations.MultiBucketConsumerService;
Expand Down Expand Up @@ -799,9 +799,10 @@ public void apply(Settings value, Settings current, Settings previous) {
WorkloadManagementSettings.NODE_LEVEL_MEMORY_REJECTION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD,

RequestLimitSettings.CAT_INDICES_LIMIT_SETTING,
RequestLimitSettings.CAT_SHARDS_LIMIT_SETTING,
RequestLimitSettings.CAT_SEGMENTS_LIMIT_SETTING
// Settings to be used for limiting rest requests
ResponseLimitSettings.CAT_INDICES_RESPONSE_LIMIT_SETTING,
ResponseLimitSettings.CAT_SHARDS_RESPONSE_LIMIT_SETTING,
ResponseLimitSettings.CAT_SEGMENTS_RESPONSE_LIMIT_SETTING
)
)
);
Expand Down
144 changes: 0 additions & 144 deletions server/src/main/java/org/opensearch/rest/RequestLimitSettings.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.rest;

import org.opensearch.OpenSearchException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.rest.RestStatus;

import java.io.IOException;

/**
* Thrown when api response limit threshold exceeds
*
* @opensearch.internal
*/
public class ResponseLimitBreachedException extends OpenSearchException {

public ResponseLimitBreachedException(String msg) {
super(msg);
}

public ResponseLimitBreachedException(StreamInput in) throws IOException {
super(in);
}

@Override
public RestStatus status() {
return RestStatus.TOO_MANY_REQUESTS;
}
}
Loading

0 comments on commit ab80b7f

Please sign in to comment.