Skip to content

Commit

Permalink
Add changes to block non-paginated calls in cat shards, indices and s…
Browse files Browse the repository at this point in the history
…egments

Signed-off-by: Sumit Bansal <sumitsb@amazon.com>
  • Loading branch information
sumitasr committed Sep 18, 2024
1 parent b2a7136 commit 59b896a
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 35 deletions.
7 changes: 6 additions & 1 deletion server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@
import org.opensearch.rest.action.admin.indices.RestValidateQueryAction;
import org.opensearch.rest.action.admin.indices.RestViewAction;
import org.opensearch.rest.action.cat.AbstractCatAction;
import org.opensearch.rest.action.cat.RequestLimitSettings;
import org.opensearch.rest.action.cat.RestAliasAction;
import org.opensearch.rest.action.cat.RestAllocationAction;
import org.opensearch.rest.action.cat.RestCatAction;
Expand Down Expand Up @@ -528,6 +529,7 @@ public class ActionModule extends AbstractModule {
private final RequestValidators<IndicesAliasesRequest> indicesAliasesRequestRequestValidators;
private final ThreadPool threadPool;
private final ExtensionsManager extensionsManager;
private final RequestLimitSettings requestLimitSettings;

public ActionModule(
Settings settings,
Expand Down Expand Up @@ -580,6 +582,7 @@ public ActionModule(
);

restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService, identityService);
requestLimitSettings = new RequestLimitSettings(clusterSettings, settings);
}

public Map<String, ActionHandler<?, ?>> getActions() {
Expand Down Expand Up @@ -960,7 +963,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestClusterManagerAction());
registerHandler.accept(new RestNodesAction());
registerHandler.accept(new RestTasksAction(nodesInCluster));
registerHandler.accept(new RestIndicesAction());
registerHandler.accept(new RestIndicesAction(requestLimitSettings));
registerHandler.accept(new RestSegmentsAction());
// Fully qualified to prevent interference with rest.action.count.RestCountAction
registerHandler.accept(new org.opensearch.rest.action.cat.RestCountAction());
Expand Down Expand Up @@ -1048,6 +1051,8 @@ protected void configure() {

// register dynamic ActionType -> transportAction Map used by NodeClient
bind(DynamicActionRegistry.class).toInstance(dynamicActionRegistry);

bind(RequestLimitSettings.class).toInstance(requestLimitSettings);
}

public ActionFilters getActionFilters() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.NotifyOnceListener;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.rest.action.cat.RequestLimitSettings;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import static org.opensearch.rest.action.cat.RequestLimitSettings.BlockAction.CAT_SHARDS;

/**
* Perform cat shards action
*
Expand All @@ -31,11 +36,18 @@
public class TransportCatShardsAction extends HandledTransportAction<CatShardsRequest, CatShardsResponse> {

private final NodeClient client;
private final RequestLimitSettings requestLimitSettings;

@Inject
public TransportCatShardsAction(NodeClient client, TransportService transportService, ActionFilters actionFilters) {
public TransportCatShardsAction(
NodeClient client,
TransportService transportService,
ActionFilters actionFilters,
RequestLimitSettings requestLimitSettings
) {
super(CatShardsAction.NAME, transportService, actionFilters, CatShardsRequest::new);
this.client = client;
this.requestLimitSettings = requestLimitSettings;
}

@Override
Expand Down Expand Up @@ -73,6 +85,9 @@ protected void innerOnFailure(Exception e) {
client.admin().cluster().state(clusterStateRequest, new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(ClusterStateResponse clusterStateResponse) {
if (requestLimitSettings.isCircuitBreakerLimitBreached(clusterStateResponse.getState(), CAT_SHARDS)) {
listener.onFailure(new CircuitBreakingException("Too many shards requested.", CircuitBreaker.Durability.TRANSIENT));
}
catShardsResponse.setClusterStateResponse(clusterStateResponse);
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.setShouldCancelOnTimeout(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.action.cat.RequestLimitSettings;
import org.opensearch.script.ScriptService;
import org.opensearch.search.SearchService;
import org.opensearch.search.aggregations.MultiBucketConsumerService;
Expand Down Expand Up @@ -793,7 +794,11 @@ public void apply(Settings value, Settings current, Settings previous) {
WorkloadManagementSettings.NODE_LEVEL_CPU_REJECTION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_CPU_CANCELLATION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_MEMORY_REJECTION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD
WorkloadManagementSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD,

RequestLimitSettings.CAT_INDICES_LIMIT_SETTING,
RequestLimitSettings.CAT_SHARDS_LIMIT_SETTING,
RequestLimitSettings.CAT_SEGMENTS_LIMIT_SETTING
)
)
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.rest.action.cat;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;

import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;

/**
* Class to define dynamic settings for putting circuit breakers on the actions and functions to evaluate if block is required.
*/
public class RequestLimitSettings {

public enum BlockAction {
CAT_INDICES,
CAT_SHARDS,
CAT_SEGMENTS
}

private volatile int catIndicesLimit;
private volatile int catShardsLimit;
private volatile int catSegmentsLimit;

public static final Setting<Integer> CAT_INDICES_LIMIT_SETTING = Setting.intSetting(
"cat.indices.limit",
-1,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<Integer> CAT_SHARDS_LIMIT_SETTING = Setting.intSetting(
"cat.shards.limit",
-1,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<Integer> CAT_SEGMENTS_LIMIT_SETTING = Setting.intSetting(
"cat.segments.limit",
-1,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public RequestLimitSettings(ClusterSettings clusterSettings, Settings settings) {
setCatShardsLimitSetting(CAT_SHARDS_LIMIT_SETTING.get(settings));
setCatIndicesLimitSetting(CAT_INDICES_LIMIT_SETTING.get(settings));
setCatSegmentsLimitSetting(CAT_SEGMENTS_LIMIT_SETTING.get(settings));

clusterSettings.addSettingsUpdateConsumer(CAT_SHARDS_LIMIT_SETTING, this::setCatShardsLimitSetting);
clusterSettings.addSettingsUpdateConsumer(CAT_INDICES_LIMIT_SETTING, this::setCatIndicesLimitSetting);
clusterSettings.addSettingsUpdateConsumer(CAT_SEGMENTS_LIMIT_SETTING, this::setCatSegmentsLimitSetting);
}

/**
* Method to check if the circuit breaker limit has reached for an action.
* The limits are controlled via dynamic settings.
*
* @param clusterState {@link ClusterState}
* @param actionToCheck {@link BlockAction}
* @return True/False
*/
public boolean isCircuitBreakerLimitBreached(final ClusterState clusterState, BlockAction actionToCheck) {
if (Objects.isNull(clusterState)) return false;
switch (actionToCheck) {
case CAT_INDICES:
if (catIndicesLimit <= 0) return false;
int indicesCount = getTotalIndices(clusterState);
if (indicesCount > catIndicesLimit) return true;
break;
case CAT_SHARDS:
if (catShardsLimit <= 0) return false;
int totalShards = getTotalShards(clusterState);
if (totalShards > catShardsLimit) return true;
break;
case CAT_SEGMENTS:
if (catSegmentsLimit <= 0) return false;
if (getTotalIndices(clusterState) > catSegmentsLimit) return true;
break;
}
return false;
}

private void setCatShardsLimitSetting(final int catShardsLimit) {
this.catShardsLimit = catShardsLimit;
}

private void setCatIndicesLimitSetting(final int catIndicesLimit) {
this.catIndicesLimit = catIndicesLimit;
}

private void setCatSegmentsLimitSetting(final int catSegmentsLimit) {
this.catSegmentsLimit = catSegmentsLimit;
}

private static int getTotalIndices(final ClusterState clusterState) {
return chainWalk(() -> clusterState.getMetadata().getIndices().size(), 0);
}

private static int getTotalShards(final ClusterState clusterState) {
final RoutingTable routingTable = clusterState.getRoutingTable();
final Map<String, IndexRoutingTable> indexRoutingTableMap = routingTable.getIndicesRouting();
int totalShards = 0;
for (final Map.Entry<String, IndexRoutingTable> entry : indexRoutingTableMap.entrySet()) {
for (final Map.Entry<Integer, IndexShardRoutingTable> indexShardRoutingTableEntry : entry.getValue().getShards().entrySet()) {
totalShards += indexShardRoutingTableEntry.getValue().getShards().size();
}
}
return totalShards;
}

// TODO: Evaluate if we can move this to common util.
private static <T> T chainWalk(Supplier<T> supplier, T defaultValue) {
try {
return supplier.get();
} catch (NullPointerException e) {
return defaultValue;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.index.IndexSettings;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestResponse;
Expand All @@ -81,6 +83,7 @@
import static java.util.Collections.unmodifiableList;
import static org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest.DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT;
import static org.opensearch.rest.RestRequest.Method.GET;
import static org.opensearch.rest.action.cat.RequestLimitSettings.BlockAction.CAT_INDICES;

/**
* _cat API action to list indices
Expand All @@ -96,6 +99,12 @@ public class RestIndicesAction extends AbstractCatAction {
private static final String DUPLICATE_PARAMETER_ERROR_MESSAGE =
"Please only use one of the request parameters [master_timeout, cluster_manager_timeout].";

private final RequestLimitSettings requestLimitSettings;

public RestIndicesAction(RequestLimitSettings requestLimitSettings) {
this.requestLimitSettings = requestLimitSettings;
}

@Override
public List<Route> routes() {
return unmodifiableList(asList(new Route(GET, "/_cat/indices"), new Route(GET, "/_cat/indices/{index}")));
Expand Down Expand Up @@ -151,48 +160,66 @@ public RestResponse buildResponse(final Table table) throws Exception {
new ActionListener<GetSettingsResponse>() {
@Override
public void onResponse(final GetSettingsResponse getSettingsResponse) {
final GroupedActionListener<ActionResponse> groupedListener = createGroupedListener(request, 4, listener);
groupedListener.onResponse(getSettingsResponse);

// The list of indices that will be returned is determined by the indices returned from the Get Settings call.
// All the other requests just provide additional detail, and wildcards may be resolved differently depending on the
// type of request in the presence of security plugins (looking at you, ClusterHealthRequest), so
// force the IndicesOptions for all the sub-requests to be as inclusive as possible.
final IndicesOptions subRequestIndicesOptions = IndicesOptions.lenientExpandHidden();

// Indices that were successfully resolved during the get settings request might be deleted when the subsequent
// cluster
// state, cluster health and indices stats requests execute. We have to distinguish two cases:
// 1) the deleted index was explicitly passed as parameter to the /_cat/indices request. In this case we want the
// subsequent requests to fail.
// 2) the deleted index was resolved as part of a wildcard or _all. In this case, we want the subsequent requests
// not to
// fail on the deleted index (as we want to ignore wildcards that cannot be resolved).
// This behavior can be ensured by letting the cluster state, cluster health and indices stats requests re-resolve
// the
// index names with the same indices options that we used for the initial cluster state request (strictExpand).
sendIndicesStatsRequest(
indices,
subRequestIndicesOptions,
includeUnloadedSegments,
client,
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure)
);
// Indices that were successfully resolved during the get settings request might be deleted when the
// subsequent cluster state, cluster health and indices stats requests execute. We have to distinguish two cases:
// 1) the deleted index was explicitly passed as parameter to the /_cat/indices request. In this case we
// want the subsequent requests to fail.
// 2) the deleted index was resolved as part of a wildcard or _all. In this case, we want the subsequent
// requests not to fail on the deleted index (as we want to ignore wildcards that cannot be resolved).
// This behavior can be ensured by letting the cluster state, cluster health and indices stats requests
// re-resolve the index names with the same indices options that we used for the initial cluster state
// request (strictExpand).
sendClusterStateRequest(
indices,
subRequestIndicesOptions,
local,
clusterManagerNodeTimeout,
client,
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure)
);
sendClusterHealthRequest(
indices,
subRequestIndicesOptions,
local,
clusterManagerNodeTimeout,
client,
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure)
new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(ClusterStateResponse clusterStateResponse) {
if (requestLimitSettings.isCircuitBreakerLimitBreached(clusterStateResponse.getState(), CAT_INDICES)) {
listener.onFailure(
new CircuitBreakingException("Too many indices requested.", CircuitBreaker.Durability.TRANSIENT)
);
}
final GroupedActionListener<ActionResponse> groupedListener = createGroupedListener(
request,
4,
listener
);
groupedListener.onResponse(getSettingsResponse);
groupedListener.onResponse(clusterStateResponse);

sendIndicesStatsRequest(
indices,
subRequestIndicesOptions,
includeUnloadedSegments,
client,
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure)
);

sendClusterHealthRequest(
indices,
subRequestIndicesOptions,
local,
clusterManagerNodeTimeout,
client,
ActionListener.wrap(groupedListener::onResponse, groupedListener::onFailure)
);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli
return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<ClusterStateResponse>(channel) {
@Override
public void processResponse(final ClusterStateResponse clusterStateResponse) {
/*
if (requestLimitSettings.isRequestLimitBreached(clusterStateResponse, CAT_SEGMENTS)) {
listener.onFailure(new CircuitBreakingException("Segments from too many indices requested.", CircuitBreaker.Durability.TRANSIENT));
}
*/
final IndicesSegmentsRequest indicesSegmentsRequest = new IndicesSegmentsRequest();
indicesSegmentsRequest.indices(indices);
client.admin().indices().segments(indicesSegmentsRequest, new RestResponseListener<IndicesSegmentResponse>(channel) {
Expand Down
Loading

0 comments on commit 59b896a

Please sign in to comment.