Skip to content

Commit

Permalink
Refactor code, rebase with pagination PR merge and add relevant changes
Browse files Browse the repository at this point in the history
Signed-off-by: Sumit Bansal <sumitsb@amazon.com>
  • Loading branch information
sumitasr committed Sep 30, 2024
1 parent 23b9c64 commit ed9d8c6
Show file tree
Hide file tree
Showing 15 changed files with 199 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1213,8 +1213,8 @@ public static void registerExceptions() {
);
registerExceptionHandle(
new OpenSearchExceptionHandle(
org.opensearch.rest.ResponseLimitBreachedException.class,
org.opensearch.rest.ResponseLimitBreachedException::new,
org.opensearch.common.breaker.ResponseLimitBreachedException.class,
org.opensearch.common.breaker.ResponseLimitBreachedException::new,
175,
V_2_18_0
)
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.NamedRegistry;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.breaker.ResponseLimitSettings;
import org.opensearch.common.inject.AbstractModule;
import org.opensearch.common.inject.TypeLiteral;
import org.opensearch.common.inject.multibindings.MapBinder;
Expand All @@ -325,7 +326,6 @@
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.ActionPlugin.ActionHandler;
import org.opensearch.rest.NamedRoute;
import org.opensearch.rest.ResponseLimitSettings;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.rest.RestHeaderDefinition;
Expand Down Expand Up @@ -992,7 +992,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestTemplatesAction());

// LIST API
registerHandler.accept(new RestIndicesListAction());
registerHandler.accept(new RestIndicesListAction(responseLimitSettings));

// Point in time API
registerHandler.accept(new RestCreatePitAction());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.TimeoutTaskCancellationUtility;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.breaker.ResponseLimitBreachedException;
import org.opensearch.common.breaker.ResponseLimitSettings;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.NotifyOnceListener;
import org.opensearch.rest.ResponseLimitBreachedException;
import org.opensearch.rest.ResponseLimitSettings;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import java.util.Objects;

import static org.opensearch.rest.ResponseLimitSettings.LimitEntity.SHARDS;
import static org.opensearch.common.breaker.ResponseLimitSettings.LimitEntity.SHARDS;

/**
* Perform cat shards action
Expand Down Expand Up @@ -133,7 +133,11 @@ private void validateRequestLimit(
int limit = responseLimitSettings.getCatShardsResponseLimit();
if (ResponseLimitSettings.isResponseLimitBreached(clusterStateResponse.getState().getRoutingTable(), SHARDS, limit)) {
listener.onFailure(
new ResponseLimitBreachedException("Too many shards requested. Can not request shards beyond {" + limit + "}")
new ResponseLimitBreachedException(
"Too many shards requested. Can not request shards beyond {" + limit + "}",
limit,
SHARDS
)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.breaker;

import org.opensearch.OpenSearchException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Thrown when api response breaches threshold limit.
*
* @opensearch.internal
*/
public class ResponseLimitBreachedException extends OpenSearchException {

private final int responseLimit;
private final ResponseLimitSettings.LimitEntity limitEntity;

public ResponseLimitBreachedException(StreamInput in) throws IOException {
super(in);
responseLimit = in.readVInt();
limitEntity = in.readEnum(ResponseLimitSettings.LimitEntity.class);
}

public ResponseLimitBreachedException(String msg, int responseLimit, ResponseLimitSettings.LimitEntity limitEntity) {
super(msg);
this.responseLimit = responseLimit;
this.limitEntity = limitEntity;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(responseLimit);
out.writeEnum(limitEntity);
}

public int getResponseLimit() {
return responseLimit;
}

public ResponseLimitSettings.LimitEntity getLimitEntity() {
return limitEntity;
}

@Override
public RestStatus status() {
return RestStatus.TOO_MANY_REQUESTS;
}

@Override
protected void metadataToXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("response_limit", responseLimit);
builder.field("limit_entity", limitEntity);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
* compatible open source license.
*/

package org.opensearch.rest;
package org.opensearch.common.breaker;

import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
Expand Down Expand Up @@ -89,13 +90,13 @@ public ResponseLimitSettings(ClusterSettings clusterSettings, Settings settings)
* @return True/False
*/
public static boolean isResponseLimitBreached(final Metadata metadata, final LimitEntity limitEntity, final int limit) {
if (Objects.isNull(metadata)) return false;
if (limit <= 0) return false;
if (Objects.requireNonNull(limitEntity) == LimitEntity.INDICES) {
if (Objects.isNull(metadata) || limit <= 0) return false;
if (limitEntity == LimitEntity.INDICES) {
int indicesCount = getTotalIndicesFromMetadata.apply(metadata);
return indicesCount > limit;
} else {
throw new IllegalArgumentException("Unsupported limit entity [" + limitEntity + "]");
}
return false;
}

/**
Expand All @@ -108,25 +109,33 @@ public static boolean isResponseLimitBreached(final Metadata metadata, final Lim
* @return True/False
*/
public static boolean isResponseLimitBreached(final RoutingTable routingTable, final LimitEntity limitEntity, final int limit) {
if (Objects.isNull(routingTable)) return false;
if (limit <= 0) return false;
if (Objects.isNull(routingTable) || limit <= 0) return false;
if (Objects.isNull(limitEntity)) {
throw new IllegalArgumentException("Limit entity cannot be null");
}
switch (limitEntity) {
case INDICES:
int indicesCount = getTotalIndicesFromRoutingTable.apply(routingTable);
if (indicesCount > limit) return true;
break;
case SHARDS:
final Map<String, IndexRoutingTable> indexRoutingTableMap = routingTable.getIndicesRouting();
int totalShards = 0;
for (final Map.Entry<String, IndexRoutingTable> entry : indexRoutingTableMap.entrySet()) {
// In case routing table is corrupted. We will not block actions.
if (Objects.isNull(entry.getValue()) || Objects.isNull(entry.getValue().getShards())) {
return false;
}
totalShards += entry.getValue().getShards().size();
if (totalShards > limit) return true;
}
if (isShardsLimitBreached(routingTable, limit)) return true;
break;
default:
throw new IllegalArgumentException("Unsupported limit entity [" + limitEntity + "]");
}
return false;
}

private static boolean isShardsLimitBreached(final RoutingTable routingTable, final int limit) {
final Map<String, IndexRoutingTable> indexRoutingTableMap = routingTable.getIndicesRouting();
int totalShards = 0;
for (final Map.Entry<String, IndexRoutingTable> entry : indexRoutingTableMap.entrySet()) {
for (final Map.Entry<Integer, IndexShardRoutingTable> indexShardRoutingTableEntry : entry.getValue().getShards().entrySet()) {
totalShards += indexShardRoutingTableEntry.getValue().getShards().size();
// Fail fast if limit value is breached and avoid unnecessary computation.
if (totalShards > limit) return true;
}
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.breaker.ResponseLimitSettings;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings;
Expand Down Expand Up @@ -155,7 +156,6 @@
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.ResponseLimitSettings;
import org.opensearch.script.ScriptService;
import org.opensearch.search.SearchService;
import org.opensearch.search.aggregations.MultiBucketConsumerService;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@

import org.opensearch.client.node.NodeClient;
import org.opensearch.common.Table;
import org.opensearch.common.breaker.ResponseLimitSettings;
import org.opensearch.common.io.Streams;
import org.opensearch.common.io.UTF8StreamWriter;
import org.opensearch.core.common.io.stream.BytesStream;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.ResponseLimitSettings;
import org.opensearch.rest.RestRequest;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.opensearch.cluster.health.ClusterIndexHealth;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Table;
import org.opensearch.common.breaker.ResponseLimitBreachedException;
import org.opensearch.common.breaker.ResponseLimitSettings;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.logging.DeprecationLogger;
import org.opensearch.common.settings.Settings;
Expand All @@ -59,8 +61,6 @@
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.Strings;
import org.opensearch.index.IndexSettings;
import org.opensearch.rest.ResponseLimitBreachedException;
import org.opensearch.rest.ResponseLimitSettings;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestResponse;
import org.opensearch.rest.action.RestResponseListener;
Expand Down Expand Up @@ -88,7 +88,7 @@
import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableList;
import static org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest.DEFAULT_CLUSTER_MANAGER_NODE_TIMEOUT;
import static org.opensearch.rest.ResponseLimitSettings.LimitEntity.INDICES;
import static org.opensearch.common.breaker.ResponseLimitSettings.LimitEntity.INDICES;
import static org.opensearch.rest.RestRequest.Method.GET;

/**
Expand Down Expand Up @@ -253,7 +253,11 @@ private void validateRequestLimit(final ClusterStateResponse clusterStateRespons
int limit = responseLimitSettings.getCatIndicesResponseLimit();
if (ResponseLimitSettings.isResponseLimitBreached(clusterStateResponse.getState().getMetadata(), INDICES, limit)) {
listener.onFailure(
new ResponseLimitBreachedException("Too many indices requested. Can not request indices beyond {" + limit + "}")
new ResponseLimitBreachedException(
"Too many indices requested. Can not request indices beyond {" + limit + "}",
limit,
INDICES
)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.Table;
import org.opensearch.common.breaker.ResponseLimitBreachedException;
import org.opensearch.common.breaker.ResponseLimitSettings;
import org.opensearch.common.logging.DeprecationLogger;
import org.opensearch.core.common.Strings;
import org.opensearch.index.engine.Segment;
import org.opensearch.rest.ResponseLimitBreachedException;
import org.opensearch.rest.ResponseLimitSettings;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestResponse;
import org.opensearch.rest.action.RestActionListener;
Expand All @@ -58,7 +58,7 @@

import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableList;
import static org.opensearch.rest.ResponseLimitSettings.LimitEntity.INDICES;
import static org.opensearch.common.breaker.ResponseLimitSettings.LimitEntity.INDICES;
import static org.opensearch.rest.RestRequest.Method.GET;

/**
Expand Down Expand Up @@ -131,7 +131,9 @@ private void validateRequestLimit(final ClusterStateResponse clusterStateRespons
int limit = responseLimitSettings.getCatSegmentsResponseLimit();
if (ResponseLimitSettings.isResponseLimitBreached(clusterStateResponse.getState().getRoutingTable(), INDICES, limit)) {
throw new ResponseLimitBreachedException(
"Segments from too many indices requested. Can not request indices beyond {" + limit + "}"
"Segments from too many indices requested. Can not request indices beyond {" + limit + "}",
limit,
INDICES
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.rest.action.list;

import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.common.breaker.ResponseLimitSettings;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.rest.RestRequest;
Expand All @@ -35,6 +36,10 @@ public class RestIndicesListAction extends RestIndicesAction {
private static final int MAX_SUPPORTED_LIST_INDICES_PAGE_SIZE = 5000;
private static final int DEFAULT_LIST_INDICES_PAGE_SIZE = 500;

public RestIndicesListAction(final ResponseLimitSettings responseLimitSettings) {
super(responseLimitSettings);
}

@Override
public List<Route> routes() {
return unmodifiableList(asList(new Route(GET, "/_list/indices"), new Route(GET, "/_list/indices/{index}")));
Expand Down Expand Up @@ -70,6 +75,11 @@ protected PageParams validateAndGetPageParams(RestRequest restRequest) {
return pageParams;
}

@Override
public boolean isRequestLimitCheckSupported() {
return false;
}

protected int defaultPageSize() {
return DEFAULT_LIST_INDICES_PAGE_SIZE;
}
Expand Down
Loading

0 comments on commit ed9d8c6

Please sign in to comment.