From 0bd5ba1562b4214acff795c026678baa9c27a1d9 Mon Sep 17 00:00:00 2001 From: Movva Ajaykumar Date: Thu, 30 Nov 2023 00:08:52 +0530 Subject: [PATCH] Integrate CPU AC with ResourceUsageCollector and add CPU AC stats to nodes/stats (#10887) * Added changes to integrade cpu AC to ResourceUsageCollector and Emit Stats Signed-off-by: Ajay Kumar Movva Co-authored-by: Bharathwaj G Signed-off-by: Shivansh Arora --- CHANGELOG.md | 1 + .../AdmissionControlMultiNodeIT.java | 292 ++++++++++++++++++ .../admin/cluster/node/stats/NodeStats.java | 26 +- .../cluster/node/stats/NodesStatsRequest.java | 3 +- .../node/stats/TransportNodesStatsAction.java | 3 +- .../stats/TransportClusterStatsAction.java | 1 + .../action/bulk/TransportShardBulkAction.java | 4 +- .../action/search/SearchTransportService.java | 11 + .../TransportReplicationAction.java | 77 ++++- .../replication/TransportWriteAction.java | 44 ++- .../common/network/NetworkModule.java | 32 +- .../common/settings/ClusterSettings.java | 8 +- .../main/java/org/opensearch/node/Node.java | 29 +- .../java/org/opensearch/node/NodeService.java | 13 +- .../AdmissionControlService.java | 70 ++++- .../controllers/AdmissionController.java | 65 +++- .../CPUBasedAdmissionController.java | 55 ---- .../CpuBasedAdmissionController.java | 125 ++++++++ ...e.java => AdmissionControlActionType.java} | 6 +- ... CpuBasedAdmissionControllerSettings.java} | 24 +- .../stats/AdmissionControlStats.java | 67 ++++ .../stats/AdmissionControllerStats.java | 74 +++++ .../admissioncontrol/stats/package-info.java | 12 + .../AdmissionControlTransportHandler.java | 25 +- .../AdmissionControlTransportInterceptor.java | 14 +- .../transport/TransportInterceptor.java | 14 + .../transport/TransportService.java | 35 +++ .../cluster/node/stats/NodeStatsTests.java | 61 +++- .../cluster/stats/ClusterStatsNodesTests.java | 9 +- .../opensearch/cluster/DiskUsageTests.java | 6 + .../common/network/NetworkModuleTests.java | 20 +- .../AdmissionControlServiceTests.java | 58 ++-- .../AdmissionControlSingleNodeTests.java | 203 ++++++++++++ .../CPUBasedAdmissionControllerTests.java | 109 ------- .../CpuBasedAdmissionControllerTests.java | 143 +++++++++ ...a => AdmissionControlActionTypeTests.java} | 12 +- ...CPUBasedAdmissionControlSettingsTests.java | 33 +- .../stats/AdmissionControlStatsTests.java | 92 ++++++ .../stats/AdmissionControllerStatsTests.java | 82 +++++ ...AdmissionControlTransportHandlerTests.java | 22 +- .../MockInternalClusterInfoService.java | 3 +- .../opensearch/test/InternalTestCluster.java | 1 + 42 files changed, 1655 insertions(+), 329 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlMultiNodeIT.java delete mode 100644 server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java create mode 100644 server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionController.java rename server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/{TransportActionType.java => AdmissionControlActionType.java} (85%) rename server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/{CPUBasedAdmissionControllerSettings.java => CpuBasedAdmissionControllerSettings.java} (82%) create mode 100644 server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java create mode 100644 server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java create mode 100644 server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/package-info.java create mode 100644 server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java delete mode 100644 server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java create mode 100644 server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionControllerTests.java rename server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/{TransportActionTypeTests.java => AdmissionControlActionTypeTests.java} (53%) create mode 100644 server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStatsTests.java create mode 100644 server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStatsTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 44824d207c409..87cf272fd4570 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853)) - Add back half_float BKD based sort query optimization ([#11024](https://github.com/opensearch-project/OpenSearch/pull/11024)) - Make number of segment metadata files in remote segment store configurable ([#11329](https://github.com/opensearch-project/OpenSearch/pull/11329)) +- [Admission Control] Integrate CPU AC with ResourceUsageCollector and add CPU AC stats to nodes/stats ([#10887](https://github.com/opensearch-project/OpenSearch/pull/10887)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlMultiNodeIT.java b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlMultiNodeIT.java new file mode 100644 index 0000000000000..0af3d31f9e846 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlMultiNodeIT.java @@ -0,0 +1,292 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.search.SearchPhaseExecutionException; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.UUIDs; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.node.resource.tracker.ResourceTrackerSettings; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.After; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; + +import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE; +import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT; +import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1) +public class AdmissionControlMultiNodeIT extends OpenSearchIntegTestCase { + + public static final Settings settings = Settings.builder() + .put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) + .put(ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) + .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED) + .put(SEARCH_CPU_USAGE_LIMIT.getKey(), 0) + .put(INDEXING_CPU_USAGE_LIMIT.getKey(), 0) + .build(); + + private static final Logger LOGGER = LogManager.getLogger(AdmissionControlMultiNodeIT.class); + + public static final String INDEX_NAME = "test_index"; + + @Before + public void init() { + assertAcked( + prepareCreate( + INDEX_NAME, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + ) + ); + ensureGreen(INDEX_NAME); + } + + @After + public void cleanup() { + client().admin().indices().prepareDelete(INDEX_NAME).get(); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(settings).build(); + } + + public void testAdmissionControlRejectionOnEnforced() { + Tuple primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME); + String primaryName = primaryReplicaNodeNames.v1(); + String replicaName = primaryReplicaNodeNames.v2(); + String coordinatingOnlyNode = getCoordinatingOnlyNode(); + AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName); + AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName); + final BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < 3; ++i) { + IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) + .source(Collections.singletonMap("key", randomAlphaOfLength(50))); + bulkRequest.add(request); + } + BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); + assertEquals(429, res.getItems()[0].getFailure().getStatus().getStatus()); + AdmissionControllerStats admissionControlPrimaryStats = admissionControlServicePrimary.stats() + .getAdmissionControllerStatsList() + .get(0); + assertEquals(admissionControlPrimaryStats.rejectionCount.get(AdmissionControlActionType.INDEXING.getType()).longValue(), 1); + Arrays.stream(res.getItems()).forEach(bulkItemResponse -> { + assertTrue(bulkItemResponse.getFailureMessage().contains("OpenSearchRejectedExecutionException")); + }); + SearchResponse searchResponse; + try { + searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get(); + } catch (Exception exception) { + assertTrue(((SearchPhaseExecutionException) exception).getDetailedMessage().contains("OpenSearchRejectedExecutionException")); + } + AdmissionControllerStats primaryStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0); + assertEquals(primaryStats.rejectionCount.get(AdmissionControlActionType.SEARCH.getType()).longValue(), 1); + } + + public void testAdmissionControlEnforcedOnNonACEnabledActions() throws ExecutionException, InterruptedException { + String coordinatingOnlyNode = getCoordinatingOnlyNode(); + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + + updateSettingsRequest.transientSettings( + Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(); + nodesStatsRequest.clear() + .indices(true) + .addMetrics( + NodesStatsRequest.Metric.JVM.metricName(), + NodesStatsRequest.Metric.OS.metricName(), + NodesStatsRequest.Metric.FS.metricName(), + NodesStatsRequest.Metric.PROCESS.metricName(), + NodesStatsRequest.Metric.ADMISSION_CONTROL.metricName() + ); + NodesStatsResponse nodesStatsResponse = client(coordinatingOnlyNode).admin().cluster().nodesStats(nodesStatsRequest).actionGet(); + ClusterHealthResponse clusterHealthResponse = client().admin().cluster().health(new ClusterHealthRequest()).actionGet(); + assertEquals(200, clusterHealthResponse.status().getStatus()); + assertFalse(nodesStatsResponse.hasFailures()); + } + + public void testAdmissionControlRejectionOnMonitor() { + Tuple primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME); + String primaryName = primaryReplicaNodeNames.v1(); + String replicaName = primaryReplicaNodeNames.v2(); + String coordinatingOnlyNode = getCoordinatingOnlyNode(); + + AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName); + AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + + updateSettingsRequest.transientSettings( + Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.MONITOR.getMode() + ) + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + final BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < 3; ++i) { + IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) + .source(Collections.singletonMap("key", randomAlphaOfLength(50))); + bulkRequest.add(request); + } + BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); + assertFalse(res.hasFailures()); + AdmissionControllerStats admissionControlPrimaryStats = admissionControlServicePrimary.stats() + .getAdmissionControllerStatsList() + .get(0); + AdmissionControllerStats admissionControlReplicaStats = admissionControlServiceReplica.stats() + .getAdmissionControllerStatsList() + .get(0); + long primaryRejectionCount = admissionControlPrimaryStats.rejectionCount.getOrDefault( + AdmissionControlActionType.INDEXING.getType(), + new AtomicLong(0).longValue() + ); + long replicaRejectionCount = admissionControlReplicaStats.rejectionCount.getOrDefault( + AdmissionControlActionType.INDEXING.getType(), + new AtomicLong(0).longValue() + ); + assertEquals(primaryRejectionCount, 1); + assertEquals(replicaRejectionCount, 0); + SearchResponse searchResponse; + searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get(); + admissionControlPrimaryStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0); + admissionControlReplicaStats = admissionControlServiceReplica.stats().getAdmissionControllerStatsList().get(0); + primaryRejectionCount = admissionControlPrimaryStats.getRejectionCount() + .getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue()); + replicaRejectionCount = admissionControlReplicaStats.getRejectionCount() + .getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue()); + assertTrue(primaryRejectionCount == 1 || replicaRejectionCount == 1); + assertFalse(primaryRejectionCount == 1 && replicaRejectionCount == 1); + } + + public void testAdmissionControlRejectionOnDisabled() { + Tuple primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME); + String primaryName = primaryReplicaNodeNames.v1(); + String replicaName = primaryReplicaNodeNames.v2(); + String coordinatingOnlyNode = getCoordinatingOnlyNode(); + + AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName); + AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + + updateSettingsRequest.transientSettings( + Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.DISABLED.getMode() + ) + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + final BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < 3; ++i) { + IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) + .source(Collections.singletonMap("key", randomAlphaOfLength(50))); + bulkRequest.add(request); + } + BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); + assertFalse(res.hasFailures()); + AdmissionControllerStats admissionControlPrimaryStats = admissionControlServicePrimary.stats() + .getAdmissionControllerStatsList() + .get(0); + AdmissionControllerStats admissionControlReplicaStats = admissionControlServiceReplica.stats() + .getAdmissionControllerStatsList() + .get(0); + long primaryRejectionCount = admissionControlPrimaryStats.rejectionCount.getOrDefault( + AdmissionControlActionType.INDEXING.getType(), + new AtomicLong(0).longValue() + ); + long replicaRejectionCount = admissionControlReplicaStats.rejectionCount.getOrDefault( + AdmissionControlActionType.INDEXING.getType(), + new AtomicLong(0).longValue() + ); + assertEquals(primaryRejectionCount, 0); + assertEquals(replicaRejectionCount, 0); + SearchResponse searchResponse; + searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get(); + admissionControlPrimaryStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0); + admissionControlReplicaStats = admissionControlServiceReplica.stats().getAdmissionControllerStatsList().get(0); + primaryRejectionCount = admissionControlPrimaryStats.getRejectionCount() + .getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue()); + replicaRejectionCount = admissionControlReplicaStats.getRejectionCount() + .getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue()); + assertTrue(primaryRejectionCount == 0 && replicaRejectionCount == 0); + } + + private Tuple getPrimaryReplicaNodeNames(String indexName) { + IndicesStatsResponse response = client().admin().indices().prepareStats(indexName).get(); + String primaryId = Stream.of(response.getShards()) + .map(ShardStats::getShardRouting) + .filter(ShardRouting::primary) + .findAny() + .get() + .currentNodeId(); + String replicaId = Stream.of(response.getShards()) + .map(ShardStats::getShardRouting) + .filter(sr -> sr.primary() == false) + .findAny() + .get() + .currentNodeId(); + DiscoveryNodes nodes = client().admin().cluster().prepareState().get().getState().nodes(); + String primaryName = nodes.get(primaryId).getName(); + String replicaName = nodes.get(replicaId).getName(); + return new Tuple<>(primaryName, replicaName); + } + + private String getCoordinatingOnlyNode() { + return client().admin() + .cluster() + .prepareState() + .get() + .getState() + .nodes() + .getCoordinatingOnlyNodes() + .values() + .iterator() + .next() + .getName(); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index 874713b51d627..8293a5bb27612 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -58,6 +58,7 @@ import org.opensearch.monitor.process.ProcessStats; import org.opensearch.node.AdaptiveSelectionStats; import org.opensearch.node.NodesResourceUsageStats; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats; import org.opensearch.repositories.RepositoriesStats; import org.opensearch.script.ScriptCacheStats; import org.opensearch.script.ScriptStats; @@ -154,6 +155,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable private RepositoriesStats repositoriesStats; + @Nullable + private AdmissionControlStats admissionControlStats; + public NodeStats(StreamInput in) throws IOException { super(in); timestamp = in.readVLong(); @@ -225,6 +229,12 @@ public NodeStats(StreamInput in) throws IOException { } else { repositoriesStats = null; } + // TODO: change to V_2_12_0 on main after backport to 2.x + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + admissionControlStats = in.readOptionalWriteable(AdmissionControlStats::new); + } else { + admissionControlStats = null; + } } public NodeStats( @@ -254,7 +264,8 @@ public NodeStats( @Nullable TaskCancellationStats taskCancellationStats, @Nullable SearchPipelineStats searchPipelineStats, @Nullable SegmentReplicationRejectionStats segmentReplicationRejectionStats, - @Nullable RepositoriesStats repositoriesStats + @Nullable RepositoriesStats repositoriesStats, + @Nullable AdmissionControlStats admissionControlStats ) { super(node); this.timestamp = timestamp; @@ -283,6 +294,7 @@ public NodeStats( this.searchPipelineStats = searchPipelineStats; this.segmentReplicationRejectionStats = segmentReplicationRejectionStats; this.repositoriesStats = repositoriesStats; + this.admissionControlStats = admissionControlStats; } public long getTimestamp() { @@ -435,6 +447,11 @@ public RepositoriesStats getRepositoriesStats() { return repositoriesStats; } + @Nullable + public AdmissionControlStats getAdmissionControlStats() { + return admissionControlStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -487,6 +504,10 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_12_0)) { out.writeOptionalWriteable(repositoriesStats); } + // TODO: change to V_2_12_0 on main after backport to 2.x + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeOptionalWriteable(admissionControlStats); + } } @Override @@ -587,6 +608,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (getRepositoriesStats() != null) { getRepositoriesStats().toXContent(builder, params); } + if (getAdmissionControlStats() != null) { + getAdmissionControlStats().toXContent(builder, params); + } return builder; } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java index 22a667e7e8f6f..1af56f10b95ee 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -218,7 +218,8 @@ public enum Metric { SEARCH_PIPELINE("search_pipeline"), RESOURCE_USAGE_STATS("resource_usage_stats"), SEGMENT_REPLICATION_BACKPRESSURE("segment_replication_backpressure"), - REPOSITORIES("repositories"); + REPOSITORIES("repositories"), + ADMISSION_CONTROL("admission_control"); private String metricName; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 99cf42cfdc4d0..1df73d3b4394d 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -127,7 +127,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) { NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics), NodesStatsRequest.Metric.RESOURCE_USAGE_STATS.containedIn(metrics), NodesStatsRequest.Metric.SEGMENT_REPLICATION_BACKPRESSURE.containedIn(metrics), - NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics) + NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics), + NodesStatsRequest.Metric.ADMISSION_CONTROL.containedIn(metrics) ); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 5efec8b876435..9c5dcc9e9de3f 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -171,6 +171,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq false, false, false, + false, false ); List shardsStats = new ArrayList<>(); diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index 268a6ed6f85b8..b8517f53ff294 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -98,6 +98,7 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndices; import org.opensearch.node.NodeClosedException; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.tasks.Task; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; @@ -180,7 +181,8 @@ public TransportShardBulkAction( false, indexingPressureService, systemIndices, - tracer + tracer, + AdmissionControlActionType.INDEXING ); this.updateHelper = updateHelper; this.mappingUpdatedAction = mappingUpdatedAction; diff --git a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java index a723937afd2ed..64c738f633f2e 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java @@ -45,6 +45,7 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.search.SearchPhaseResult; import org.opensearch.search.SearchService; import org.opensearch.search.dfs.DfsSearchResult; @@ -542,6 +543,9 @@ public static void registerRequestHandler(TransportService transportService, Sea transportService.registerRequestHandler( DFS_ACTION_NAME, ThreadPool.Names.SAME, + false, + true, + AdmissionControlActionType.SEARCH, ShardSearchRequest::new, (request, channel, task) -> searchService.executeDfsPhase( request, @@ -556,6 +560,9 @@ public static void registerRequestHandler(TransportService transportService, Sea transportService.registerRequestHandler( QUERY_ACTION_NAME, ThreadPool.Names.SAME, + false, + true, + AdmissionControlActionType.SEARCH, ShardSearchRequest::new, (request, channel, task) -> { searchService.executeQueryPhase( @@ -575,6 +582,9 @@ public static void registerRequestHandler(TransportService transportService, Sea transportService.registerRequestHandler( QUERY_ID_ACTION_NAME, ThreadPool.Names.SAME, + false, + true, + AdmissionControlActionType.SEARCH, QuerySearchRequest::new, (request, channel, task) -> { searchService.executeQueryPhase( @@ -633,6 +643,7 @@ public static void registerRequestHandler(TransportService transportService, Sea ThreadPool.Names.SAME, true, true, + AdmissionControlActionType.SEARCH, ShardFetchSearchRequest::new, (request, channel, task) -> { searchService.executeFetchPhase( diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java index ddebdc5530e70..95f998e2d89c2 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java @@ -82,6 +82,7 @@ import org.opensearch.indices.IndexClosedException; import org.opensearch.indices.IndicesService; import org.opensearch.node.NodeClosedException; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.ConnectTransportException; @@ -201,6 +202,40 @@ protected TransportReplicationAction( String executor, boolean syncGlobalCheckpointAfterOperation, boolean forceExecutionOnPrimary + ) { + this( + settings, + actionName, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + actionFilters, + requestReader, + replicaRequestReader, + executor, + syncGlobalCheckpointAfterOperation, + forceExecutionOnPrimary, + null + ); + } + + protected TransportReplicationAction( + Settings settings, + String actionName, + TransportService transportService, + ClusterService clusterService, + IndicesService indicesService, + ThreadPool threadPool, + ShardStateAction shardStateAction, + ActionFilters actionFilters, + Writeable.Reader requestReader, + Writeable.Reader replicaRequestReader, + String executor, + boolean syncGlobalCheckpointAfterOperation, + boolean forceExecutionOnPrimary, + AdmissionControlActionType admissionControlActionType ) { super(actionName, actionFilters, transportService.getTaskManager()); this.threadPool = threadPool; @@ -219,14 +254,8 @@ protected TransportReplicationAction( transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest); - transportService.registerRequestHandler( - transportPrimaryAction, - executor, - forceExecutionOnPrimary, - true, - in -> new ConcreteShardRequest<>(requestReader, in), - this::handlePrimaryRequest - ); + // This method will register Primary Request Handler Based on AdmissionControlActionType + registerPrimaryRequestHandler(requestReader, admissionControlActionType); // we must never reject on because of thread pool capacity on replicas transportService.registerRequestHandler( @@ -247,6 +276,38 @@ protected TransportReplicationAction( clusterSettings.addSettingsUpdateConsumer(REPLICATION_RETRY_TIMEOUT, (v) -> retryTimeout = v); } + /** + * This method will register handler as based on admissionControlActionType and AdmissionControlHandler will be + * invoked for registered action + * @param requestReader instance of the request reader + * @param admissionControlActionType type of AdmissionControlActionType + */ + private void registerPrimaryRequestHandler( + Writeable.Reader requestReader, + AdmissionControlActionType admissionControlActionType + ) { + if (admissionControlActionType != null) { + transportService.registerRequestHandler( + transportPrimaryAction, + executor, + forceExecutionOnPrimary, + true, + admissionControlActionType, + in -> new ConcreteShardRequest<>(requestReader, in), + this::handlePrimaryRequest + ); + } else { + transportService.registerRequestHandler( + transportPrimaryAction, + executor, + forceExecutionOnPrimary, + true, + in -> new ConcreteShardRequest<>(requestReader, in), + this::handlePrimaryRequest + ); + } + } + @Override protected void doExecute(Task task, Request request, ActionListener listener) { assert request.shardId() != null : "request shardId must be set"; diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java index 9ebfa8cfd0df8..27f9e6dee83de 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java @@ -59,6 +59,7 @@ import org.opensearch.index.translog.Translog.Location; import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndices; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.telemetry.tracing.Span; import org.opensearch.telemetry.tracing.SpanBuilder; import org.opensearch.telemetry.tracing.SpanScope; @@ -104,7 +105,8 @@ protected TransportWriteAction( boolean forceExecutionOnPrimary, IndexingPressureService indexingPressureService, SystemIndices systemIndices, - Tracer tracer + Tracer tracer, + AdmissionControlActionType admissionControlActionType ) { // We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the // ThreadPool.Names.WRITE/ThreadPool.Names.SYSTEM_WRITE thread pools in this class. @@ -121,7 +123,8 @@ protected TransportWriteAction( replicaRequest, ThreadPool.Names.SAME, true, - forceExecutionOnPrimary + forceExecutionOnPrimary, + admissionControlActionType ); this.executorFunction = executorFunction; this.indexingPressureService = indexingPressureService; @@ -129,6 +132,43 @@ protected TransportWriteAction( this.tracer = tracer; } + protected TransportWriteAction( + Settings settings, + String actionName, + TransportService transportService, + ClusterService clusterService, + IndicesService indicesService, + ThreadPool threadPool, + ShardStateAction shardStateAction, + ActionFilters actionFilters, + Writeable.Reader request, + Writeable.Reader replicaRequest, + Function executorFunction, + boolean forceExecutionOnPrimary, + IndexingPressureService indexingPressureService, + SystemIndices systemIndices, + Tracer tracer + ) { + this( + settings, + actionName, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + actionFilters, + request, + replicaRequest, + executorFunction, + forceExecutionOnPrimary, + indexingPressureService, + systemIndices, + tracer, + null + ); + } + protected String executor(IndexShard shard) { return executorFunction.apply(shard); } diff --git a/server/src/main/java/org/opensearch/common/network/NetworkModule.java b/server/src/main/java/org/opensearch/common/network/NetworkModule.java index 821d48fccf48c..2edf3967c61b0 100644 --- a/server/src/main/java/org/opensearch/common/network/NetworkModule.java +++ b/server/src/main/java/org/opensearch/common/network/NetworkModule.java @@ -55,6 +55,7 @@ import org.opensearch.http.HttpServerTransport; import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.opensearch.plugins.NetworkPlugin; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.tasks.RawTaskStatus; import org.opensearch.tasks.Task; import org.opensearch.telemetry.tracing.Tracer; @@ -153,9 +154,6 @@ public NetworkModule( List transportInterceptors ) { this.settings = settings; - if (transportInterceptors != null) { - transportInterceptors.forEach(this::registerTransportInterceptor); - } for (NetworkPlugin plugin : plugins) { Map> httpTransportFactory = plugin.getHttpTransports( settings, @@ -192,6 +190,10 @@ public NetworkModule( registerTransportInterceptor(interceptor); } } + // Adding last because interceptors are triggered from last to first order from the list + if (transportInterceptors != null) { + transportInterceptors.forEach(this::registerTransportInterceptor); + } } /** Adds a transport implementation that can be selected by setting {@link #TRANSPORT_TYPE_KEY}. */ @@ -299,6 +301,30 @@ public TransportRequestHandler interceptHandler( return actualHandler; } + /** + * Intercept the transport action and perform admission control if applicable + * @param action The action the request handler is associated with + * @param executor The executor the request handling will be executed on + * @param forceExecution Force execution on the executor queue and never reject it + * @param actualHandler The handler itself that implements the request handling + * @param admissionControlActionType Admission control based on resource usage limits of provided action type + * @return returns the actual TransportRequestHandler after intercepting all previous handlers + * @param + */ + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler, + AdmissionControlActionType admissionControlActionType + ) { + for (TransportInterceptor interceptor : this.transportInterceptors) { + actualHandler = interceptor.interceptHandler(action, executor, forceExecution, actualHandler, admissionControlActionType); + } + return actualHandler; + } + @Override public AsyncSender interceptSender(AsyncSender sender) { for (TransportInterceptor interceptor : this.transportInterceptors) { diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 770096190ae2c..15b93f5ab7ac2 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -136,7 +136,7 @@ import org.opensearch.persistent.decider.EnableAssignmentDecider; import org.opensearch.plugins.PluginsService; import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings; -import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings; +import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.rest.BaseRestHandler; import org.opensearch.script.ScriptService; @@ -701,9 +701,9 @@ public void apply(Settings value, Settings current, Settings previous) { IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING, AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, - CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, - CPUBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT, - CPUBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, + CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT, + CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT ) ) ); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 3cbaeb6408f40..e21422e2834dc 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -899,12 +899,24 @@ protected Node( final RestController restController = actionModule.getRestController(); - final AdmissionControlService admissionControlService = new AdmissionControlService( + final NodeResourceUsageTracker nodeResourceUsageTracker = new NodeResourceUsageTracker( + threadPool, settings, - clusterService.getClusterSettings(), + clusterService.getClusterSettings() + ); + final ResourceUsageCollectorService resourceUsageCollectorService = new ResourceUsageCollectorService( + nodeResourceUsageTracker, + clusterService, threadPool ); + final AdmissionControlService admissionControlService = new AdmissionControlService( + settings, + clusterService, + threadPool, + resourceUsageCollectorService + ); + AdmissionControlTransportInterceptor admissionControlTransportInterceptor = new AdmissionControlTransportInterceptor( admissionControlService ); @@ -1106,16 +1118,6 @@ protected Node( transportService.getTaskManager(), taskCancellationMonitoringSettings ); - final NodeResourceUsageTracker nodeResourceUsageTracker = new NodeResourceUsageTracker( - threadPool, - settings, - clusterService.getClusterSettings() - ); - final ResourceUsageCollectorService resourceUsageCollectorService = new ResourceUsageCollectorService( - nodeResourceUsageTracker, - clusterService, - threadPool - ); this.nodeService = new NodeService( settings, threadPool, @@ -1140,7 +1142,8 @@ protected Node( taskCancellationMonitoringService, resourceUsageCollectorService, segmentReplicationStatsTracker, - repositoryService + repositoryService, + admissionControlService ); final SearchService searchService = newSearchService( diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 49dde0b81cac7..15cc8f3d20bb3 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -54,6 +54,7 @@ import org.opensearch.ingest.IngestService; import org.opensearch.monitor.MonitorService; import org.opensearch.plugins.PluginsService; +import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.AggregationUsageService; @@ -96,7 +97,7 @@ public class NodeService implements Closeable { private final FileCache fileCache; private final TaskCancellationMonitoringService taskCancellationMonitoringService; private final RepositoriesService repositoriesService; - + private final AdmissionControlService admissionControlService; private final SegmentReplicationStatsTracker segmentReplicationStatsTracker; NodeService( @@ -123,7 +124,8 @@ public class NodeService implements Closeable { TaskCancellationMonitoringService taskCancellationMonitoringService, ResourceUsageCollectorService resourceUsageCollectorService, SegmentReplicationStatsTracker segmentReplicationStatsTracker, - RepositoriesService repositoriesService + RepositoriesService repositoriesService, + AdmissionControlService admissionControlService ) { this.settings = settings; this.threadPool = threadPool; @@ -148,6 +150,7 @@ public class NodeService implements Closeable { this.taskCancellationMonitoringService = taskCancellationMonitoringService; this.resourceUsageCollectorService = resourceUsageCollectorService; this.repositoriesService = repositoriesService; + this.admissionControlService = admissionControlService; clusterService.addStateApplier(ingestService); clusterService.addStateApplier(searchPipelineService); this.segmentReplicationStatsTracker = segmentReplicationStatsTracker; @@ -232,7 +235,8 @@ public NodeStats stats( boolean searchPipelineStats, boolean resourceUsageStats, boolean segmentReplicationTrackerStats, - boolean repositoriesStats + boolean repositoriesStats, + boolean admissionControl ) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) @@ -263,7 +267,8 @@ public NodeStats stats( taskCancellation ? this.taskCancellationMonitoringService.stats() : null, searchPipelineStats ? this.searchPipelineService.stats() : null, segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null, - repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null + repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null, + admissionControl ? this.admissionControlService.stats() : null ); } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java index 2cc409b0e4465..adca6992833bd 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java @@ -10,10 +10,14 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; +import org.opensearch.node.ResourceUsageCollectorService; import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; -import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats; import org.opensearch.threadpool.ThreadPool; import java.util.ArrayList; @@ -21,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import static org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER; +import static org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER; /** * Admission control Service that bootstraps and manages all the Admission Controllers in OpenSearch. @@ -29,23 +33,31 @@ public class AdmissionControlService { private final ThreadPool threadPool; public final AdmissionControlSettings admissionControlSettings; - private final ConcurrentMap ADMISSION_CONTROLLERS; + private final ConcurrentMap admissionControllers; private static final Logger logger = LogManager.getLogger(AdmissionControlService.class); - private final ClusterSettings clusterSettings; + private final ClusterService clusterService; private final Settings settings; + private final ResourceUsageCollectorService resourceUsageCollectorService; /** * * @param settings Immutable settings instance - * @param clusterSettings ClusterSettings Instance + * @param clusterService ClusterService Instance * @param threadPool ThreadPool Instance + * @param resourceUsageCollectorService Instance used to get node resource usage stats */ - public AdmissionControlService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { + public AdmissionControlService( + Settings settings, + ClusterService clusterService, + ThreadPool threadPool, + ResourceUsageCollectorService resourceUsageCollectorService + ) { this.threadPool = threadPool; - this.admissionControlSettings = new AdmissionControlSettings(clusterSettings, settings); - this.ADMISSION_CONTROLLERS = new ConcurrentHashMap<>(); - this.clusterSettings = clusterSettings; + this.admissionControlSettings = new AdmissionControlSettings(clusterService.getClusterSettings(), settings); + this.admissionControllers = new ConcurrentHashMap<>(); + this.clusterService = clusterService; this.settings = settings; + this.resourceUsageCollectorService = resourceUsageCollectorService; this.initialise(); } @@ -58,10 +70,14 @@ private void initialise() { } /** - * Handler to trigger registered admissionController + * + * @param action Transport action name + * @param admissionControlActionType admissionControllerActionType value */ - public void applyTransportAdmissionControl(String action) { - this.ADMISSION_CONTROLLERS.forEach((name, admissionController) -> { admissionController.apply(action); }); + public void applyTransportAdmissionControl(String action, AdmissionControlActionType admissionControlActionType) { + this.admissionControllers.forEach( + (name, admissionController) -> { admissionController.apply(action, admissionControlActionType); } + ); } /** @@ -70,7 +86,7 @@ public void applyTransportAdmissionControl(String action) { */ public void registerAdmissionController(String admissionControllerName) { AdmissionController admissionController = this.controllerFactory(admissionControllerName); - this.ADMISSION_CONTROLLERS.put(admissionControllerName, admissionController); + this.admissionControllers.put(admissionControllerName, admissionController); } /** @@ -79,7 +95,12 @@ public void registerAdmissionController(String admissionControllerName) { private AdmissionController controllerFactory(String admissionControllerName) { switch (admissionControllerName) { case CPU_BASED_ADMISSION_CONTROLLER: - return new CPUBasedAdmissionController(admissionControllerName, this.settings, this.clusterSettings); + return new CpuBasedAdmissionController( + admissionControllerName, + this.resourceUsageCollectorService, + this.clusterService, + this.settings + ); default: throw new IllegalArgumentException("Not Supported AdmissionController : " + admissionControllerName); } @@ -90,7 +111,7 @@ private AdmissionController controllerFactory(String admissionControllerName) { * @return list of the registered admissionControllers */ public List getAdmissionControllers() { - return new ArrayList<>(this.ADMISSION_CONTROLLERS.values()); + return new ArrayList<>(this.admissionControllers.values()); } /** @@ -99,6 +120,21 @@ public List getAdmissionControllers() { * @return instance of the AdmissionController Instance */ public AdmissionController getAdmissionController(String controllerName) { - return this.ADMISSION_CONTROLLERS.getOrDefault(controllerName, null); + return this.admissionControllers.getOrDefault(controllerName, null); + } + + /** + * Return admission control stats + */ + public AdmissionControlStats stats() { + List statsList = new ArrayList<>(); + if (this.admissionControllers.size() > 0) { + this.admissionControllers.forEach((controllerName, admissionController) -> { + AdmissionControllerStats admissionControllerStats = new AdmissionControllerStats(admissionController); + statsList.add(admissionControllerStats); + }); + return new AdmissionControlStats(statsList); + } + return null; } } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java index 00564a9967f31..2246ce34dd399 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java @@ -8,8 +8,14 @@ package org.opensearch.ratelimitting.admissioncontrol.controllers; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.node.ResourceUsageCollectorService; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; /** @@ -19,17 +25,25 @@ */ public abstract class AdmissionController { - private final AtomicLong rejectionCount; private final String admissionControllerName; + final ResourceUsageCollectorService resourceUsageCollectorService; + public final Map rejectionCountMap; + public final ClusterService clusterService; /** - * - * @param rejectionCount initialised rejectionCount value for AdmissionController - * @param admissionControllerName name of the admissionController + * @param admissionControllerName name of the admissionController + * @param resourceUsageCollectorService instance used to get resource usage stats of the node + * @param clusterService instance of the clusterService */ - public AdmissionController(AtomicLong rejectionCount, String admissionControllerName) { - this.rejectionCount = rejectionCount; + public AdmissionController( + String admissionControllerName, + ResourceUsageCollectorService resourceUsageCollectorService, + ClusterService clusterService + ) { this.admissionControllerName = admissionControllerName; + this.resourceUsageCollectorService = resourceUsageCollectorService; + this.clusterService = clusterService; + this.rejectionCountMap = ConcurrentCollections.newConcurrentMap(); } /** @@ -41,10 +55,17 @@ public boolean isEnabledForTransportLayer(AdmissionControlMode admissionControlM } /** - * Increment the tracking-objects and apply the admission control if threshold is breached. - * Mostly applicable while applying admission controller + * + * @return true if admissionController is Enforced Mode else false + */ + public Boolean isAdmissionControllerEnforced(AdmissionControlMode admissionControlMode) { + return admissionControlMode == AdmissionControlMode.ENFORCED; + } + + /** + * Apply admission control based on the resource usage for an action */ - public abstract void apply(String action); + public abstract void apply(String action, AdmissionControlActionType admissionControlActionType); /** * @return name of the admission-controller @@ -54,17 +75,31 @@ public String getName() { } /** - * Adds the rejection count for the controller. Primarily used when copying controller states. - * @param count To add the value of the tracking resource object as the provided count + * Add rejection count to the rejection count metric tracked by the admission controller */ - public void addRejectionCount(long count) { - this.rejectionCount.addAndGet(count); + public void addRejectionCount(String admissionControlActionType, long count) { + if (!this.rejectionCountMap.containsKey(admissionControlActionType)) { + this.rejectionCountMap.put(admissionControlActionType, new AtomicLong(0)); + } + this.rejectionCountMap.get(admissionControlActionType).getAndAdd(count); } /** * @return current value of the rejection count metric tracked by the admission-controller. */ - public long getRejectionCount() { - return this.rejectionCount.get(); + public long getRejectionCount(String admissionControlActionType) { + if (this.rejectionCountMap.containsKey(admissionControlActionType)) { + return this.rejectionCountMap.get(admissionControlActionType).get(); + } + return 0; + } + + /** + * Get rejection stats of the admission controller + */ + public Map getRejectionStats() { + Map rejectionStats = new HashMap<>(); + rejectionCountMap.forEach((actionType, count) -> rejectionStats.put(actionType, count.get())); + return rejectionStats; } } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java deleted file mode 100644 index 3a8956b2cce87..0000000000000 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.ratelimitting.admissioncontrol.controllers; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Settings; -import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings; - -import java.util.concurrent.atomic.AtomicLong; - -/** - * Class for CPU Based Admission Controller in OpenSearch, which aims to provide CPU utilisation admission control. - * It provides methods to apply admission control if configured limit has been reached - */ -public class CPUBasedAdmissionController extends AdmissionController { - private static final Logger LOGGER = LogManager.getLogger(CPUBasedAdmissionController.class); - public CPUBasedAdmissionControllerSettings settings; - - /** - * - * @param admissionControllerName State of the admission controller - */ - public CPUBasedAdmissionController(String admissionControllerName, Settings settings, ClusterSettings clusterSettings) { - super(new AtomicLong(0), admissionControllerName); - this.settings = new CPUBasedAdmissionControllerSettings(clusterSettings, settings); - } - - /** - * This function will take of applying admission controller based on CPU usage - * @param action is the transport action - */ - @Override - public void apply(String action) { - // TODO Will extend this logic further currently just incrementing rejectionCount - if (this.isEnabledForTransportLayer(this.settings.getTransportLayerAdmissionControllerMode())) { - this.applyForTransportLayer(action); - } - } - - private void applyForTransportLayer(String actionName) { - // currently incrementing counts to evaluate the controller triggering as expected and using in testing so limiting to 10 - // TODO will update rejection logic further in next PR's - if (this.getRejectionCount() < 10) { - this.addRejectionCount(1); - } - } -} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionController.java new file mode 100644 index 0000000000000..5c180346c05e1 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionController.java @@ -0,0 +1,125 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.controllers; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.node.NodeResourceUsageStats; +import org.opensearch.node.ResourceUsageCollectorService; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings; + +import java.util.Locale; +import java.util.Optional; + +/** + * Class for CPU Based Admission Controller in OpenSearch, which aims to provide CPU utilisation admission control. + * It provides methods to apply admission control if configured limit has been reached + */ +public class CpuBasedAdmissionController extends AdmissionController { + public static final String CPU_BASED_ADMISSION_CONTROLLER = "global_cpu_usage"; + private static final Logger LOGGER = LogManager.getLogger(CpuBasedAdmissionController.class); + public CpuBasedAdmissionControllerSettings settings; + + /** + * @param admissionControllerName Name of the admission controller + * @param resourceUsageCollectorService Instance used to get node resource usage stats + * @param clusterService ClusterService Instance + * @param settings Immutable settings instance + */ + public CpuBasedAdmissionController( + String admissionControllerName, + ResourceUsageCollectorService resourceUsageCollectorService, + ClusterService clusterService, + Settings settings + ) { + super(admissionControllerName, resourceUsageCollectorService, clusterService); + this.settings = new CpuBasedAdmissionControllerSettings(clusterService.getClusterSettings(), settings); + } + + /** + * Apply admission control based on process CPU usage + * @param action is the transport action + */ + @Override + public void apply(String action, AdmissionControlActionType admissionControlActionType) { + if (this.isEnabledForTransportLayer(this.settings.getTransportLayerAdmissionControllerMode())) { + this.applyForTransportLayer(action, admissionControlActionType); + } + } + + /** + * Apply transport layer admission control if configured limit has been reached + */ + private void applyForTransportLayer(String actionName, AdmissionControlActionType admissionControlActionType) { + if (isLimitsBreached(actionName, admissionControlActionType)) { + this.addRejectionCount(admissionControlActionType.getType(), 1); + if (this.isAdmissionControllerEnforced(this.settings.getTransportLayerAdmissionControllerMode())) { + throw new OpenSearchRejectedExecutionException( + String.format( + Locale.ROOT, + "CPU usage admission controller rejected the request for action [%s] as CPU limit reached", + admissionControlActionType.name() + ) + ); + } + } + } + + /** + * Check if the configured resource usage limits are breached for the action + */ + private boolean isLimitsBreached(String actionName, AdmissionControlActionType admissionControlActionType) { + // check if cluster state is ready + if (clusterService.state() != null && clusterService.state().nodes() != null) { + long maxCpuLimit = this.getCpuRejectionThreshold(admissionControlActionType); + Optional nodePerformanceStatistics = this.resourceUsageCollectorService.getNodeStatistics( + this.clusterService.state().nodes().getLocalNodeId() + ); + if (nodePerformanceStatistics.isPresent()) { + double cpuUsage = nodePerformanceStatistics.get().getCpuUtilizationPercent(); + if (cpuUsage >= maxCpuLimit) { + LOGGER.warn( + "CpuBasedAdmissionController limit reached as the current CPU " + + "usage [{}] exceeds the allowed limit [{}] for transport action [{}] in admissionControlMode [{}]", + cpuUsage, + maxCpuLimit, + actionName, + this.settings.getTransportLayerAdmissionControllerMode() + ); + return true; + } + } + } + return false; + } + + /** + * Get CPU rejection threshold based on action type + */ + private long getCpuRejectionThreshold(AdmissionControlActionType admissionControlActionType) { + switch (admissionControlActionType) { + case SEARCH: + return this.settings.getSearchCPULimit(); + case INDEXING: + return this.settings.getIndexingCPULimit(); + default: + throw new IllegalArgumentException( + String.format( + Locale.ROOT, + "Admission control not Supported for AdmissionControlActionType: %s", + admissionControlActionType.getType() + ) + ); + } + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionType.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java similarity index 85% rename from server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionType.java rename to server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java index f2fdca0cfe49b..8cf6e973ceb64 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionType.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java @@ -13,13 +13,13 @@ /** * Enums that defines the type of the transport requests */ -public enum TransportActionType { +public enum AdmissionControlActionType { INDEXING("indexing"), SEARCH("search"); private final String type; - TransportActionType(String uriType) { + AdmissionControlActionType(String uriType) { this.type = uriType; } @@ -31,7 +31,7 @@ public String getType() { return type; } - public static TransportActionType fromName(String name) { + public static AdmissionControlActionType fromName(String name) { name = name.toLowerCase(Locale.ROOT); switch (name) { case "indexing": diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettings.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CpuBasedAdmissionControllerSettings.java similarity index 82% rename from server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettings.java rename to server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CpuBasedAdmissionControllerSettings.java index 141e9b68db145..1bddd1446a4c4 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettings.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CpuBasedAdmissionControllerSettings.java @@ -14,29 +14,22 @@ import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; -import java.util.Arrays; -import java.util.List; - /** * Settings related to cpu based admission controller. * @opensearch.internal */ -public class CPUBasedAdmissionControllerSettings { - public static final String CPU_BASED_ADMISSION_CONTROLLER = "global_cpu_usage"; +public class CpuBasedAdmissionControllerSettings { /** - * Default parameters for the CPUBasedAdmissionControllerSettings + * Default parameters for the CpuBasedAdmissionControllerSettings */ public static class Defaults { - public static final long CPU_USAGE = 95; - public static List TRANSPORT_LAYER_DEFAULT_URI_TYPE = Arrays.asList("indexing", "search"); + public static final long CPU_USAGE_LIMIT = 95; } private AdmissionControlMode transportLayerMode; private Long searchCPULimit; private Long indexingCPULimit; - - private final List transportActionsList; /** * Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set * rejection will be performed, otherwise only rejection metrics will be populated. @@ -54,7 +47,7 @@ public static class Defaults { */ public static final Setting SEARCH_CPU_USAGE_LIMIT = Setting.longSetting( "admission_control.search.cpu_usage.limit", - Defaults.CPU_USAGE, + Defaults.CPU_USAGE_LIMIT, Setting.Property.Dynamic, Setting.Property.NodeScope ); @@ -64,18 +57,17 @@ public static class Defaults { */ public static final Setting INDEXING_CPU_USAGE_LIMIT = Setting.longSetting( "admission_control.indexing.cpu_usage.limit", - Defaults.CPU_USAGE, + Defaults.CPU_USAGE_LIMIT, Setting.Property.Dynamic, Setting.Property.NodeScope ); // currently limited to one setting will add further more settings in follow-up PR's - public CPUBasedAdmissionControllerSettings(ClusterSettings clusterSettings, Settings settings) { + public CpuBasedAdmissionControllerSettings(ClusterSettings clusterSettings, Settings settings) { this.transportLayerMode = CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.get(settings); clusterSettings.addSettingsUpdateConsumer(CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, this::setTransportLayerMode); this.searchCPULimit = SEARCH_CPU_USAGE_LIMIT.get(settings); this.indexingCPULimit = INDEXING_CPU_USAGE_LIMIT.get(settings); - this.transportActionsList = Defaults.TRANSPORT_LAYER_DEFAULT_URI_TYPE; clusterSettings.addSettingsUpdateConsumer(INDEXING_CPU_USAGE_LIMIT, this::setIndexingCPULimit); clusterSettings.addSettingsUpdateConsumer(SEARCH_CPU_USAGE_LIMIT, this::setSearchCPULimit); } @@ -103,8 +95,4 @@ public void setIndexingCPULimit(Long indexingCPULimit) { public void setSearchCPULimit(Long searchCPULimit) { this.searchCPULimit = searchCPULimit; } - - public List getTransportActionsList() { - return transportActionsList; - } } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java new file mode 100644 index 0000000000000..39909c571c63e --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.stats; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; + +/** + * Class for admission control stats used as part of node stats + * @opensearch.internal + */ +public class AdmissionControlStats implements ToXContentFragment, Writeable { + + private final List admissionControllerStatsList; + + /** + * + * @param admissionControllerStatsList list of admissionControllerStats + */ + public AdmissionControlStats(List admissionControllerStatsList) { + this.admissionControllerStatsList = admissionControllerStatsList; + } + + /** + * + * @param in the stream to read from + * @throws IOException if an I/O error occurs + */ + public AdmissionControlStats(StreamInput in) throws IOException { + this.admissionControllerStatsList = in.readList(AdmissionControllerStats::new); + } + + /** + * Write this into the {@linkplain StreamOutput}. + * + * @param out the output stream to write entity content to + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeList(this.admissionControllerStatsList); + } + + public List getAdmissionControllerStatsList() { + return admissionControllerStatsList; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("admission_control"); + for (AdmissionControllerStats admissionControllerStats : this.admissionControllerStatsList) { + builder.field(admissionControllerStats.getAdmissionControllerName(), admissionControllerStats); + } + return builder.endObject(); + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java new file mode 100644 index 0000000000000..3895cac3eaa07 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java @@ -0,0 +1,74 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.stats; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; + +import java.io.IOException; +import java.util.Map; + +/** + * Class for admission controller ( such as CPU ) stats which includes rejection count for each action type + * @opensearch.internal + */ +public class AdmissionControllerStats implements Writeable, ToXContentFragment { + public Map rejectionCount; + public String admissionControllerName; + + public AdmissionControllerStats(AdmissionController admissionController) { + this.rejectionCount = admissionController.getRejectionStats(); + this.admissionControllerName = admissionController.getName(); + } + + public AdmissionControllerStats(StreamInput in) throws IOException { + this.rejectionCount = in.readMap(StreamInput::readString, StreamInput::readLong); + this.admissionControllerName = in.readString(); + } + + public String getAdmissionControllerName() { + return admissionControllerName; + } + + public Map getRejectionCount() { + return rejectionCount; + } + + /** + * Writes this instance into a {@link StreamOutput} + * @param out the {@link StreamOutput} to write to + * @throws IOException if an error occurs while writing to the StreamOutput + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(this.rejectionCount, StreamOutput::writeString, StreamOutput::writeLong); + out.writeString(this.admissionControllerName); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startObject("transport"); + { + builder.startObject("rejection_count"); + { + for (Map.Entry rejectionCountEntry : this.rejectionCount.entrySet()) { + builder.field(rejectionCountEntry.getKey(), rejectionCountEntry.getValue()); + } + } + builder.endObject(); + } + builder.endObject(); + return builder.endObject(); + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/package-info.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/package-info.java new file mode 100644 index 0000000000000..7c96dcd569d64 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * This package contains stats related classes for the admissionController Feature + */ +package org.opensearch.ratelimitting.admissioncontrol.stats; diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java index 7d0f5fbc17a51..1e8f309234f90 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportChannel; import org.opensearch.transport.TransportRequest; @@ -28,18 +29,21 @@ public class AdmissionControlTransportHandler implem protected final Logger log = LogManager.getLogger(this.getClass()); AdmissionControlService admissionControlService; boolean forceExecution; + AdmissionControlActionType admissionControlActionType; public AdmissionControlTransportHandler( String action, TransportRequestHandler actualHandler, AdmissionControlService admissionControlService, - boolean forceExecution + boolean forceExecution, + AdmissionControlActionType admissionControlActionType ) { super(); this.action = action; this.actualHandler = actualHandler; this.admissionControlService = admissionControlService; this.forceExecution = forceExecution; + this.admissionControlActionType = admissionControlActionType; } /** @@ -50,15 +54,16 @@ public AdmissionControlTransportHandler( */ @Override public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { - // intercept all the transport requests here and apply admission control - try { - // TODO Need to evaluate if we need to apply admission control or not if force Execution is true will update in next PR. - this.admissionControlService.applyTransportAdmissionControl(this.action); - } catch (final OpenSearchRejectedExecutionException openSearchRejectedExecutionException) { - log.warn(openSearchRejectedExecutionException.getMessage()); - channel.sendResponse(openSearchRejectedExecutionException); - } catch (final Exception e) { - throw e; + // skip admission control if force execution is true + if (!this.forceExecution) { + // intercept the transport requests here and apply admission control + try { + this.admissionControlService.applyTransportAdmissionControl(this.action, this.admissionControlActionType); + } catch (final OpenSearchRejectedExecutionException openSearchRejectedExecutionException) { + log.warn(openSearchRejectedExecutionException.getMessage()); + channel.sendResponse(openSearchRejectedExecutionException); + return; + } } actualHandler.messageReceived(request, channel, task); } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java index 01cfcbd780006..ae1520bca769d 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java @@ -9,12 +9,13 @@ package org.opensearch.ratelimitting.admissioncontrol.transport; import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.transport.TransportInterceptor; import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportRequestHandler; /** - * This class allows throttling to intercept requests on both the sender and the receiver side. + * This class allows throttling by intercepting requests on both the sender and the receiver side. */ public class AdmissionControlTransportInterceptor implements TransportInterceptor { @@ -33,8 +34,15 @@ public TransportRequestHandler interceptHandler( String action, String executor, boolean forceExecution, - TransportRequestHandler actualHandler + TransportRequestHandler actualHandler, + AdmissionControlActionType admissionControlActionType ) { - return new AdmissionControlTransportHandler<>(action, actualHandler, this.admissionControlService, forceExecution); + return new AdmissionControlTransportHandler<>( + action, + actualHandler, + this.admissionControlService, + forceExecution, + admissionControlActionType + ); } } diff --git a/server/src/main/java/org/opensearch/transport/TransportInterceptor.java b/server/src/main/java/org/opensearch/transport/TransportInterceptor.java index 9ee2db6d39893..e8efbeb7de3f9 100644 --- a/server/src/main/java/org/opensearch/transport/TransportInterceptor.java +++ b/server/src/main/java/org/opensearch/transport/TransportInterceptor.java @@ -35,6 +35,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.core.common.io.stream.Writeable.Reader; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; /** * This interface allows plugins to intercept requests on both the sender and the receiver side. @@ -57,6 +58,19 @@ default TransportRequestHandler interceptHandler return actualHandler; } + /** + * This is called for handlers that needs admission control support + */ + default TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler, + AdmissionControlActionType admissionControlActionType + ) { + return interceptHandler(action, executor, forceExecution, actualHandler); + } + /** * This is called up-front providing the actual low level {@link AsyncSender} that performs the low level send request. * The returned sender is used to send all requests that come in via diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index 5aeed72f306db..a1697b1898eeb 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -64,6 +64,7 @@ import org.opensearch.core.service.ReportingService; import org.opensearch.core.transport.TransportResponse; import org.opensearch.node.NodeClosedException; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskManager; import org.opensearch.telemetry.tracing.Span; @@ -1200,6 +1201,40 @@ public void registerRequestHandler( transport.registerRequestHandler(reg); } + /** + * Registers a new request handler with admission control support + * + * @param action The action the request handler is associated with + * @param executor The executor the request handling will be executed on + * @param forceExecution Force execution on the executor queue and never reject it + * @param canTripCircuitBreaker Check the request size and raise an exception in case the limit is breached. + * @param admissionControlActionType Admission control based on resource usage limits of provided action type + * @param requestReader The request class that will be used to construct new instances for streaming + * @param handler The handler itself that implements the request handling + */ + public void registerRequestHandler( + String action, + String executor, + boolean forceExecution, + boolean canTripCircuitBreaker, + AdmissionControlActionType admissionControlActionType, + Writeable.Reader requestReader, + TransportRequestHandler handler + ) { + validateActionName(action); + handler = interceptor.interceptHandler(action, executor, forceExecution, handler, admissionControlActionType); + RequestHandlerRegistry reg = new RequestHandlerRegistry<>( + action, + requestReader, + taskManager, + handler, + executor, + forceExecution, + canTripCircuitBreaker + ); + transport.registerRequestHandler(reg); + } + /** * called by the {@link Transport} implementation when an incoming request arrives but before * any parsing of it has happened (with the exception of the requestId and action) diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 80f4ebf5d737a..b8ab5c935fa34 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -66,6 +66,11 @@ import org.opensearch.node.NodeResourceUsageStats; import org.opensearch.node.NodesResourceUsageStats; import org.opensearch.node.ResponseCollectorService; +import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats; import org.opensearch.script.ScriptCacheStats; import org.opensearch.script.ScriptStats; import org.opensearch.test.OpenSearchTestCase; @@ -540,15 +545,44 @@ public void testSerialization() throws IOException { assertEquals(replicationStats.getTotalBytesBehind(), deserializedReplicationStats.getTotalBytesBehind()); assertEquals(replicationStats.getMaxReplicationLag(), deserializedReplicationStats.getMaxReplicationLag()); } + AdmissionControlStats admissionControlStats = nodeStats.getAdmissionControlStats(); + AdmissionControlStats deserializedAdmissionControlStats = deserializedNodeStats.getAdmissionControlStats(); + if (admissionControlStats == null) { + assertNull(deserializedAdmissionControlStats); + } else { + assertEquals( + admissionControlStats.getAdmissionControllerStatsList().size(), + deserializedAdmissionControlStats.getAdmissionControllerStatsList().size() + ); + AdmissionControllerStats admissionControllerStats = admissionControlStats.getAdmissionControllerStatsList().get(0); + AdmissionControllerStats deserializedAdmissionControllerStats = deserializedAdmissionControlStats + .getAdmissionControllerStatsList() + .get(0); + assertEquals( + admissionControllerStats.getAdmissionControllerName(), + deserializedAdmissionControllerStats.getAdmissionControllerName() + ); + assertEquals(1, (long) admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType())); + assertEquals( + admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType()), + deserializedAdmissionControllerStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType()) + ); + + assertEquals(2, (long) admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType())); + assertEquals( + admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType()), + deserializedAdmissionControllerStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType()) + ); + } } } } - public static NodeStats createNodeStats() { + public static NodeStats createNodeStats() throws IOException { return createNodeStats(false); } - public static NodeStats createNodeStats(boolean remoteStoreStats) { + public static NodeStats createNodeStats(boolean remoteStoreStats) throws IOException { DiscoveryNode node = new DiscoveryNode( "test_node", buildNewFakeTransportAddress(), @@ -862,6 +896,26 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { clusterManagerThrottlingStats = new ClusterManagerThrottlingStats(); clusterManagerThrottlingStats.onThrottle("test-task", randomInt()); } + + AdmissionControlStats admissionControlStats = null; + if (frequently()) { + AdmissionController admissionController = new AdmissionController( + CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, + null, + null + ) { + @Override + public void apply(String action, AdmissionControlActionType admissionControlActionType) { + return; + } + }; + admissionController.addRejectionCount(AdmissionControlActionType.SEARCH.getType(), 1); + admissionController.addRejectionCount(AdmissionControlActionType.INDEXING.getType(), 2); + AdmissionControllerStats stats = new AdmissionControllerStats(admissionController); + List statsList = new ArrayList(); + statsList.add(stats); + admissionControlStats = new AdmissionControlStats(statsList); + } ScriptCacheStats scriptCacheStats = scriptStats != null ? scriptStats.toScriptCacheStats() : null; WeightedRoutingStats weightedRoutingStats = null; @@ -899,7 +953,8 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { null, null, segmentReplicationRejectionStats, - null + null, + admissionControlStats ); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java index a0c45f95ef7c0..40a30342b86b9 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java @@ -41,6 +41,7 @@ import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.test.OpenSearchTestCase; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -87,7 +88,13 @@ public void testNetworkTypesToXContent() throws Exception { } public void testIngestStats() throws Exception { - NodeStats nodeStats = randomValueOtherThanMany(n -> n.getIngestStats() == null, NodeStatsTests::createNodeStats); + NodeStats nodeStats = randomValueOtherThanMany(n -> n.getIngestStats() == null, () -> { + try { + return NodeStatsTests.createNodeStats(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); SortedMap processorStats = new TreeMap<>(); nodeStats.getIngestStats().getProcessorStats().values().forEach(stats -> { diff --git a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java index f037b75dc16a3..ff47ec3015697 100644 --- a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java @@ -193,6 +193,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -222,6 +223,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -251,6 +253,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ) ); @@ -311,6 +314,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -340,6 +344,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -369,6 +374,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ) ); diff --git a/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java b/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java index ab51cafb039c2..de4bdcac6c2b2 100644 --- a/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java +++ b/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java @@ -474,13 +474,29 @@ public List getTransportInterceptors( try { transportInterceptor.interceptHandler("foo/bar/boom", null, true, null); } catch (Exception e) { - assertEquals(0, called.get()); + assertEquals(1, called.get()); assertEquals(1, called1.get()); } + + coreTransportInterceptors = new ArrayList<>(); + coreTransportInterceptors.add(interceptor); + module = newNetworkModule(settings, coreTransportInterceptors, new NetworkPlugin() { + @Override + public List getTransportInterceptors( + NamedWriteableRegistry namedWriteableRegistry, + ThreadContext threadContext + ) { + assertNotNull(threadContext); + return Collections.singletonList(interceptor1); + } + }); + + transportInterceptor = module.getTransportInterceptor(); + try { transportInterceptor.interceptHandler("foo/baz/boom", null, false, null); } catch (Exception e) { - assertEquals(0, called.get()); + assertEquals(1, called.get()); assertEquals(2, called1.get()); } } diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java index bac4eaf3fd677..7a67ffc8c7c5d 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java @@ -12,9 +12,10 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; -import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; -import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings; +import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -46,13 +47,13 @@ public void tearDown() throws Exception { } public void testWhenAdmissionControllerRegistered() { - admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); assertEquals(admissionControlService.getAdmissionControllers().size(), 1); } public void testRegisterInvalidAdmissionController() { String test = "TEST"; - admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); assertEquals(admissionControlService.getAdmissionControllers().size(), 1); IllegalArgumentException ex = expectThrows( IllegalArgumentException.class, @@ -62,12 +63,12 @@ public void testRegisterInvalidAdmissionController() { } public void testAdmissionControllerSettings() { - admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); AdmissionControlSettings admissionControlSettings = admissionControlService.admissionControlSettings; List admissionControllerList = admissionControlService.getAdmissionControllers(); assertEquals(admissionControllerList.size(), 1); - CPUBasedAdmissionController cpuBasedAdmissionController = (CPUBasedAdmissionController) admissionControlService - .getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER); + CpuBasedAdmissionController cpuBasedAdmissionController = (CpuBasedAdmissionController) admissionControlService + .getAdmissionController(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER); assertEquals( admissionControlSettings.isTransportLayerAdmissionControlEnabled(), cpuBasedAdmissionController.isEnabledForTransportLayer( @@ -90,7 +91,7 @@ public void testAdmissionControllerSettings() { Settings newSettings = Settings.builder() .put(settings) .put( - CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED.getMode() ) .build(); @@ -105,36 +106,53 @@ public void testAdmissionControllerSettings() { public void testApplyAdmissionControllerDisabled() { this.action = "indices:data/write/bulk[s][p]"; - admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); - admissionControlService.applyTransportAdmissionControl(this.action); + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); + admissionControlService.applyTransportAdmissionControl(this.action, null); List admissionControllerList = admissionControlService.getAdmissionControllers(); - admissionControllerList.forEach(admissionController -> { assertEquals(admissionController.getRejectionCount(), 0); }); + admissionControllerList.forEach(admissionController -> { + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); + }); } public void testApplyAdmissionControllerEnabled() { this.action = "indices:data/write/bulk[s][p]"; - admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); - admissionControlService.applyTransportAdmissionControl(this.action); + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); + admissionControlService.applyTransportAdmissionControl(this.action, null); assertEquals( - admissionControlService.getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER) - .getRejectionCount(), + admissionControlService.getAdmissionController(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER) + .getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0 ); Settings settings = Settings.builder() .put( - CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.MONITOR.getMode() ) .build(); clusterService.getClusterSettings().applySettings(settings); - admissionControlService.applyTransportAdmissionControl(this.action); List admissionControllerList = admissionControlService.getAdmissionControllers(); assertEquals(admissionControllerList.size(), 1); + } + + public void testApplyAdmissionControllerEnforced() { + this.action = "indices:data/write/bulk[s][p]"; + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); + admissionControlService.applyTransportAdmissionControl(this.action, null); assertEquals( - admissionControlService.getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER) - .getRejectionCount(), - 1 + admissionControlService.getAdmissionController(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER) + .getRejectionCount(AdmissionControlActionType.INDEXING.getType()), + 0 ); + + Settings settings = Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.MONITOR.getMode() + ) + .build(); + clusterService.getClusterSettings().applySettings(settings); + List admissionControllerList = admissionControlService.getAdmissionControllers(); + assertEquals(admissionControllerList.size(), 1); } } diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java new file mode 100644 index 0000000000000..a1694b2c3cee2 --- /dev/null +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java @@ -0,0 +1,203 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol; + +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.bulk.BulkRequestBuilder; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.search.SearchPhaseExecutionException; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.action.ActionFuture; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.node.ResourceUsageCollectorService; +import org.opensearch.node.resource.tracker.ResourceTrackerSettings; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats; +import org.opensearch.test.OpenSearchSingleNodeTestCase; +import org.junit.After; + +import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE; +import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT; +import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.is; + +/** + * Single node integration tests for admission control + */ +public class AdmissionControlSingleNodeTests extends OpenSearchSingleNodeTestCase { + + @Override + protected boolean resetNodeAfterTest() { + return true; + } + + @After + public void cleanup() { + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().putNull("*")) + .setTransientSettings(Settings.builder().putNull("*")) + ); + } + + @Override + protected Settings nodeSettings() { + return Settings.builder() + .put(super.nodeSettings()) + .put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) + .put(ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) + .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED) + .put(SEARCH_CPU_USAGE_LIMIT.getKey(), 0) + .put(INDEXING_CPU_USAGE_LIMIT.getKey(), 0) + .build(); + } + + public void testAdmissionControlRejectionEnforcedMode() throws Exception { + ensureGreen(); + assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); + // Thread.sleep(700); + client().admin().indices().prepareCreate("index").execute().actionGet(); + BulkRequestBuilder bulk = client().prepareBulk(); + for (int i = 0; i < 3; i++) { + bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); + } + // Verify that cluster state is updated + ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); + assertThat(future2.isDone(), is(true)); + + // verify bulk request hits 429 + BulkResponse res = client().bulk(bulk.request()).actionGet(); + assertEquals(429, res.getItems()[0].getFailure().getStatus().getStatus()); + AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class); + AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType())); + client().admin().indices().prepareRefresh("index").get(); + + // verify search request hits 429 + SearchRequest searchRequest = new SearchRequest("index"); + try { + client().search(searchRequest).actionGet(); + } catch (Exception e) { + assertTrue(((SearchPhaseExecutionException) e).getDetailedMessage().contains("OpenSearchRejectedExecutionException")); + } + acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType())); + } + + public void testAdmissionControlRejectionMonitorOnlyMode() throws Exception { + assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); + // Verify that cluster state is updated + ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); + assertThat(future2.isDone(), is(true)); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.transientSettings( + Settings.builder() + .put(super.nodeSettings()) + .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.MONITOR.getMode()) + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + BulkRequestBuilder bulk = client().prepareBulk(); + for (int i = 0; i < 3; i++) { + bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); + } + // verify bulk request success but admission control having rejections stats + BulkResponse res = client().bulk(bulk.request()).actionGet(); + assertFalse(res.hasFailures()); + AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class); + AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType())); + client().admin().indices().prepareRefresh("index").get(); + + // verify search request success but admission control having rejections stats + SearchRequest searchRequest = new SearchRequest("index"); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(3, searchResponse.getHits().getHits().length); + acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType())); + } + + public void testAdmissionControlRejectionDisabledMode() throws Exception { + assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); + // Verify that cluster state is updated + ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); + assertThat(future2.isDone(), is(true)); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.transientSettings( + Settings.builder().put(super.nodeSettings()).put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED) + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + BulkRequestBuilder bulk = client().prepareBulk(); + for (int i = 0; i < 3; i++) { + bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); + } + // verify bulk request success and no rejections + BulkResponse res = client().bulk(bulk.request()).actionGet(); + assertFalse(res.hasFailures()); + AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class); + AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(0, acStats.getRejectionCount().size()); + client().admin().indices().prepareRefresh("index").get(); + + // verify search request success and no rejections + SearchRequest searchRequest = new SearchRequest("index"); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(3, searchResponse.getHits().getHits().length); + acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(0, acStats.getRejectionCount().size()); + + } + + public void testAdmissionControlWithinLimits() throws Exception { + assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); + // Verify that cluster state is updated + ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); + assertThat(future2.isDone(), is(true)); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.transientSettings( + Settings.builder() + .put(super.nodeSettings()) + .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED) + .put(SEARCH_CPU_USAGE_LIMIT.getKey(), 101) + .put(INDEXING_CPU_USAGE_LIMIT.getKey(), 101) + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + BulkRequestBuilder bulk = client().prepareBulk(); + for (int i = 0; i < 3; i++) { + bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); + } + // verify bulk request success and no rejections + BulkResponse res = client().bulk(bulk.request()).actionGet(); + assertFalse(res.hasFailures()); + AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class); + AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(0, acStats.getRejectionCount().size()); + client().admin().indices().prepareRefresh("index").get(); + + // verify search request success and no rejections + SearchRequest searchRequest = new SearchRequest("index"); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(3, searchResponse.getHits().getHits().length); + acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(0, acStats.getRejectionCount().size()); + } +} diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java deleted file mode 100644 index af6ec0749e709..0000000000000 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.ratelimitting.admissioncontrol.controllers; - -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Settings; -import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; -import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings; -import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.TestThreadPool; -import org.opensearch.threadpool.ThreadPool; - -public class CPUBasedAdmissionControllerTests extends OpenSearchTestCase { - private ClusterService clusterService; - private ThreadPool threadPool; - CPUBasedAdmissionController admissionController = null; - - String action = "TEST_ACTION"; - - @Override - public void setUp() throws Exception { - super.setUp(); - threadPool = new TestThreadPool("admission_controller_settings_test"); - clusterService = new ClusterService( - Settings.EMPTY, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool - ); - } - - @Override - public void tearDown() throws Exception { - super.tearDown(); - threadPool.shutdownNow(); - } - - public void testCheckDefaultParameters() { - admissionController = new CPUBasedAdmissionController( - CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, - Settings.EMPTY, - clusterService.getClusterSettings() - ); - assertEquals(admissionController.getName(), CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER); - assertEquals(admissionController.getRejectionCount(), 0); - assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); - assertFalse( - admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode()) - ); - } - - public void testCheckUpdateSettings() { - admissionController = new CPUBasedAdmissionController( - CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, - Settings.EMPTY, - clusterService.getClusterSettings() - ); - Settings settings = Settings.builder() - .put( - CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), - AdmissionControlMode.ENFORCED.getMode() - ) - .build(); - clusterService.getClusterSettings().applySettings(settings); - - assertEquals(admissionController.getName(), CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER); - assertEquals(admissionController.getRejectionCount(), 0); - assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); - assertTrue(admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode())); - } - - public void testApplyControllerWithDefaultSettings() { - admissionController = new CPUBasedAdmissionController( - CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, - Settings.EMPTY, - clusterService.getClusterSettings() - ); - assertEquals(admissionController.getRejectionCount(), 0); - assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); - action = "indices:data/write/bulk[s][p]"; - admissionController.apply(action); - assertEquals(admissionController.getRejectionCount(), 0); - } - - public void testApplyControllerWhenSettingsEnabled() { - Settings settings = Settings.builder() - .put( - CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), - AdmissionControlMode.ENFORCED.getMode() - ) - .build(); - admissionController = new CPUBasedAdmissionController( - CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, - settings, - clusterService.getClusterSettings() - ); - assertTrue(admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode())); - assertEquals(admissionController.getRejectionCount(), 0); - action = "indices:data/write/bulk[s][p]"; - admissionController.apply(action); - assertEquals(admissionController.getRejectionCount(), 1); - } -} diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionControllerTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionControllerTests.java new file mode 100644 index 0000000000000..e72c0cd58ed64 --- /dev/null +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionControllerTests.java @@ -0,0 +1,143 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.controllers; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.node.ResourceUsageCollectorService; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import org.mockito.Mockito; + +public class CpuBasedAdmissionControllerTests extends OpenSearchTestCase { + private ClusterService clusterService; + private ThreadPool threadPool; + CpuBasedAdmissionController admissionController = null; + String action = "TEST_ACTION"; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("admission_controller_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testCheckDefaultParameters() { + admissionController = new CpuBasedAdmissionController( + CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, + null, + clusterService, + Settings.EMPTY + ); + assertEquals(admissionController.getName(), CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); + assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); + assertFalse( + admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode()) + ); + } + + public void testCheckUpdateSettings() { + admissionController = new CpuBasedAdmissionController( + CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, + null, + clusterService, + Settings.EMPTY + ); + Settings settings = Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .build(); + clusterService.getClusterSettings().applySettings(settings); + + assertEquals(admissionController.getName(), CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); + assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); + assertTrue(admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode())); + } + + public void testApplyControllerWithDefaultSettings() { + ResourceUsageCollectorService rs = Mockito.mock(ResourceUsageCollectorService.class); + admissionController = new CpuBasedAdmissionController( + CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, + rs, + clusterService, + Settings.EMPTY + ); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); + assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); + action = "indices:data/write/bulk[s][p]"; + admissionController.apply(action, AdmissionControlActionType.INDEXING); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); + } + + public void testApplyControllerWhenSettingsEnabled() throws Exception { + Settings settings = Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .build(); + ResourceUsageCollectorService rs = Mockito.mock(ResourceUsageCollectorService.class); + admissionController = new CpuBasedAdmissionController( + CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, + rs, + clusterService, + settings + ); + assertTrue(admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode())); + assertTrue( + admissionController.isAdmissionControllerEnforced(admissionController.settings.getTransportLayerAdmissionControllerMode()) + ); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); + // we can assert admission control and rejections as part of ITs + } + + public void testRejectionCount() { + Settings settings = Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .build(); + ResourceUsageCollectorService rs = Mockito.mock(ResourceUsageCollectorService.class); + admissionController = new CpuBasedAdmissionController( + CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, + rs, + clusterService, + settings + ); + admissionController.addRejectionCount(AdmissionControlActionType.SEARCH.getType(), 1); + admissionController.addRejectionCount(AdmissionControlActionType.INDEXING.getType(), 3); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.SEARCH.getType()), 1); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 3); + admissionController.addRejectionCount(AdmissionControlActionType.SEARCH.getType(), 1); + admissionController.addRejectionCount(AdmissionControlActionType.INDEXING.getType(), 2); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.SEARCH.getType()), 2); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 5); + } +} diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionTypeTests.java similarity index 53% rename from server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java rename to server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionTypeTests.java index 02f582c26f54e..15a25e6cbca1c 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionTypeTests.java @@ -10,18 +10,18 @@ import org.opensearch.test.OpenSearchTestCase; -public class TransportActionTypeTests extends OpenSearchTestCase { +public class AdmissionControlActionTypeTests extends OpenSearchTestCase { public void testValidActionType() { - assertEquals(TransportActionType.SEARCH.getType(), "search"); - assertEquals(TransportActionType.INDEXING.getType(), "indexing"); - assertEquals(TransportActionType.fromName("search"), TransportActionType.SEARCH); - assertEquals(TransportActionType.fromName("indexing"), TransportActionType.INDEXING); + assertEquals(AdmissionControlActionType.SEARCH.getType(), "search"); + assertEquals(AdmissionControlActionType.INDEXING.getType(), "indexing"); + assertEquals(AdmissionControlActionType.fromName("search"), AdmissionControlActionType.SEARCH); + assertEquals(AdmissionControlActionType.fromName("indexing"), AdmissionControlActionType.INDEXING); } public void testInValidActionType() { String name = "test"; - IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> TransportActionType.fromName(name)); + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> AdmissionControlActionType.fromName(name)); assertEquals(ex.getMessage(), "Not Supported TransportAction Type: " + name); } } diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java index 43103926a69a2..11688e2f30d4b 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java @@ -47,16 +47,16 @@ public void testSettingsExists() { "All the cpu based admission controller settings should be supported built in settings", settings.containsAll( Arrays.asList( - CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, - CPUBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT, - CPUBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, + CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT, + CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT ) ) ); } public void testDefaultSettings() { - CPUBasedAdmissionControllerSettings cpuBasedAdmissionControllerSettings = new CPUBasedAdmissionControllerSettings( + CpuBasedAdmissionControllerSettings cpuBasedAdmissionControllerSettings = new CpuBasedAdmissionControllerSettings( clusterService.getClusterSettings(), Settings.EMPTY ); @@ -64,7 +64,6 @@ public void testDefaultSettings() { assertEquals(cpuBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); assertEquals(cpuBasedAdmissionControllerSettings.getIndexingCPULimit().longValue(), percent); assertEquals(cpuBasedAdmissionControllerSettings.getSearchCPULimit().longValue(), percent); - assertEquals(cpuBasedAdmissionControllerSettings.getTransportActionsList(), Arrays.asList("indexing", "search")); } public void testGetConfiguredSettings() { @@ -72,13 +71,13 @@ public void testGetConfiguredSettings() { long indexingPercent = 85; Settings settings = Settings.builder() .put( - CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED.getMode() ) - .put(CPUBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT.getKey(), indexingPercent) + .put(CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT.getKey(), indexingPercent) .build(); - CPUBasedAdmissionControllerSettings cpuBasedAdmissionControllerSettings = new CPUBasedAdmissionControllerSettings( + CpuBasedAdmissionControllerSettings cpuBasedAdmissionControllerSettings = new CpuBasedAdmissionControllerSettings( clusterService.getClusterSettings(), settings ); @@ -90,16 +89,16 @@ public void testGetConfiguredSettings() { public void testUpdateAfterGetDefaultSettings() { long percent = 95; long searchPercent = 80; - CPUBasedAdmissionControllerSettings cpuBasedAdmissionControllerSettings = new CPUBasedAdmissionControllerSettings( + CpuBasedAdmissionControllerSettings cpuBasedAdmissionControllerSettings = new CpuBasedAdmissionControllerSettings( clusterService.getClusterSettings(), Settings.EMPTY ); Settings settings = Settings.builder() .put( - CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED.getMode() ) - .put(CPUBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT.getKey(), searchPercent) + .put(CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT.getKey(), searchPercent) .build(); clusterService.getClusterSettings().applySettings(settings); assertEquals(cpuBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); @@ -113,13 +112,13 @@ public void testUpdateAfterGetConfiguredSettings() { long searchPercent = 80; Settings settings = Settings.builder() .put( - CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED.getMode() ) - .put(CPUBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT.getKey(), searchPercent) + .put(CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT.getKey(), searchPercent) .build(); - CPUBasedAdmissionControllerSettings cpuBasedAdmissionControllerSettings = new CPUBasedAdmissionControllerSettings( + CpuBasedAdmissionControllerSettings cpuBasedAdmissionControllerSettings = new CpuBasedAdmissionControllerSettings( clusterService.getClusterSettings(), settings ); @@ -129,10 +128,10 @@ public void testUpdateAfterGetConfiguredSettings() { Settings updatedSettings = Settings.builder() .put( - CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.MONITOR.getMode() ) - .put(CPUBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT.getKey(), indexingPercent) + .put(CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT.getKey(), indexingPercent) .build(); clusterService.getClusterSettings().applySettings(updatedSettings); assertEquals(cpuBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.MONITOR); @@ -143,7 +142,7 @@ public void testUpdateAfterGetConfiguredSettings() { updatedSettings = Settings.builder() .put(updatedSettings) - .put(CPUBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT.getKey(), searchPercent) + .put(CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT.getKey(), searchPercent) .build(); clusterService.getClusterSettings().applySettings(updatedSettings); diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStatsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStatsTests.java new file mode 100644 index 0000000000000..7b4db5f787d6e --- /dev/null +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStatsTests.java @@ -0,0 +1,92 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.stats; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.node.ResourceUsageCollectorService; +import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.Mockito.mock; + +public class AdmissionControlStatsTests extends OpenSearchTestCase { + AdmissionController admissionController; + AdmissionControllerStats admissionControllerStats; + AdmissionControlStats admissionControlStats; + private ThreadPool threadPool; + + @Override + public void setUp() throws Exception { + super.setUp(); + Settings settings = Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .build(); + threadPool = new TestThreadPool("admission_controller_settings_test"); + ClusterService clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + admissionController = new CpuBasedAdmissionController( + CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, + mock(ResourceUsageCollectorService.class), + clusterService, + settings + ); + admissionControllerStats = new AdmissionControllerStats(admissionController); + List admissionControllerStats = new ArrayList<>(); + admissionControlStats = new AdmissionControlStats(admissionControllerStats); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testDefaults() throws IOException { + assertEquals(admissionControlStats.getAdmissionControllerStatsList().size(), 0); + } + + public void testRejectionCount() throws IOException { + admissionController.addRejectionCount(AdmissionControlActionType.SEARCH.getType(), 11); + admissionController.addRejectionCount(AdmissionControlActionType.INDEXING.getType(), 1); + admissionControllerStats = new AdmissionControllerStats(admissionController); + admissionControlStats = new AdmissionControlStats(List.of(admissionControllerStats)); + XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + builder = admissionControlStats.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + String response = builder.toString(); + assertEquals( + response, + "{\"admission_control\":{\"global_cpu_usage\":{\"transport\":{\"rejection_count\":{\"search\":11,\"indexing\":1}}}}}" + ); + AdmissionControlStats admissionControlStats1 = admissionControlStats; + assertEquals(admissionControlStats.hashCode(), admissionControlStats1.hashCode()); + } +} diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStatsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStatsTests.java new file mode 100644 index 0000000000000..fe0399e79a5f4 --- /dev/null +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStatsTests.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.stats; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.node.ResourceUsageCollectorService; +import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; + +import static org.mockito.Mockito.mock; + +public class AdmissionControllerStatsTests extends OpenSearchTestCase { + AdmissionController admissionController; + AdmissionControllerStats admissionControllerStats; + private ThreadPool threadPool; + + @Override + public void setUp() throws Exception { + super.setUp(); + Settings settings = Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .build(); + threadPool = new TestThreadPool("admission_controller_settings_test"); + ClusterService clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + admissionController = new CpuBasedAdmissionController("TEST", mock(ResourceUsageCollectorService.class), clusterService, settings); + admissionControllerStats = new AdmissionControllerStats(admissionController); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testDefaults() throws IOException { + assertEquals(admissionControllerStats.getRejectionCount().size(), 0); + assertEquals(admissionControllerStats.getAdmissionControllerName(), "TEST"); + } + + public void testRejectionCount() throws IOException { + admissionController.addRejectionCount(AdmissionControlActionType.SEARCH.getType(), 11); + admissionController.addRejectionCount(AdmissionControlActionType.INDEXING.getType(), 1); + admissionControllerStats = new AdmissionControllerStats(admissionController); + long searchRejection = admissionControllerStats.getRejectionCount().getOrDefault(AdmissionControlActionType.SEARCH.getType(), 0L); + long indexingRejection = admissionControllerStats.getRejectionCount() + .getOrDefault(AdmissionControlActionType.INDEXING.getType(), 0L); + assertEquals(searchRejection, 11); + assertEquals(indexingRejection, 1); + XContentBuilder builder = JsonXContent.contentBuilder(); + builder = admissionControllerStats.toXContent(builder, ToXContent.EMPTY_PARAMS); + String response = builder.toString(); + assertEquals(response, "{\"transport\":{\"rejection_count\":{\"search\":11,\"indexing\":1}}}"); + AdmissionControllerStats admissionControllerStats1 = admissionControllerStats; + assertEquals(admissionControllerStats.hashCode(), admissionControllerStats1.hashCode()); + } +} diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandlerTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandlerTests.java index 03d4819a94045..0c95769e19489 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandlerTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandlerTests.java @@ -29,7 +29,8 @@ public void testHandlerInvoked() throws Exception { action, handler, mock(AdmissionControlService.class), - false + false, + null ); admissionControlTransportHandler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); assertEquals(1, handler.count); @@ -38,33 +39,32 @@ public void testHandlerInvoked() throws Exception { public void testHandlerInvokedRejectedException() throws Exception { String action = "TEST"; AdmissionControlService admissionControlService = mock(AdmissionControlService.class); - doThrow(new OpenSearchRejectedExecutionException()).when(admissionControlService).applyTransportAdmissionControl(action); + doThrow(new OpenSearchRejectedExecutionException()).when(admissionControlService).applyTransportAdmissionControl(action, null); InterceptingRequestHandler handler = new InterceptingRequestHandler<>(action); admissionControlTransportHandler = new AdmissionControlTransportHandler( action, handler, admissionControlService, - false + false, + null ); - try { - admissionControlTransportHandler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); - } catch (OpenSearchRejectedExecutionException exception) { - assertEquals(0, handler.count); - handler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); - } + admissionControlTransportHandler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); + assertEquals(0, handler.count); + handler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); assertEquals(1, handler.count); } public void testHandlerInvokedRandomException() throws Exception { String action = "TEST"; AdmissionControlService admissionControlService = mock(AdmissionControlService.class); - doThrow(new NullPointerException()).when(admissionControlService).applyTransportAdmissionControl(action); + doThrow(new NullPointerException()).when(admissionControlService).applyTransportAdmissionControl(action, null); InterceptingRequestHandler handler = new InterceptingRequestHandler<>(action); admissionControlTransportHandler = new AdmissionControlTransportHandler( action, handler, admissionControlService, - false + false, + null ); try { admissionControlTransportHandler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); diff --git a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java index 2ba4de5e54a67..1ad6083074025 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java @@ -123,7 +123,8 @@ List adjustNodesStats(List nodesStats) { nodeStats.getTaskCancellationStats(), nodeStats.getSearchPipelineStats(), nodeStats.getSegmentReplicationRejectionStats(), - nodeStats.getRepositoriesStats() + nodeStats.getRepositoriesStats(), + nodeStats.getAdmissionControlStats() ); }).collect(Collectors.toList()); } diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index 952cd6c085966..c2b964aa96212 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -2735,6 +2735,7 @@ public void ensureEstimatedStats() { false, false, false, + false, false ); assertThat(