Skip to content

Commit

Permalink
Addressing Comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ajay Kumar Movva <movvaam@amazon.com>
  • Loading branch information
Ajay Kumar Movva committed Nov 17, 2023
1 parent 2b00dc8 commit deafb78
Show file tree
Hide file tree
Showing 22 changed files with 371 additions and 138 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow to pass the list settings through environment variables (like [], ["a", "b", "c"], ...) ([#10625](https://github.com/opensearch-project/OpenSearch/pull/10625))
- [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853))
- Add back half_float BKD based sort query optimization ([#11024](https://github.com/opensearch-project/OpenSearch/pull/11024))
- [Admission Control] Add changes to integrate CPU AC and ResourceUsageCollector with Stats ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))
- [Admission Control] Integrate CPU AC with ResourceUsageCollector and add CPU AC stats to nodes/stats ([#10887](https://github.com/opensearch-project/OpenSearch/pull/10887))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.opensearch.node.resource.tracker.ResourceTrackerSettings;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings;
import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings;
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
Expand All @@ -45,8 +45,8 @@
import java.util.stream.Stream;

import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE;
import static org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT;
import static org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT;
import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT;
import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1)
Expand Down Expand Up @@ -124,7 +124,7 @@ public void testAdmissionControlEnforcedOnNonACEnabledActions() throws Execution
updateSettingsRequest.transientSettings(
Settings.builder()
.put(
CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
AdmissionControlMode.ENFORCED.getMode()
)
);
Expand Down Expand Up @@ -159,7 +159,7 @@ public void testAdmissionControlRejectionOnMonitor() {
updateSettingsRequest.transientSettings(
Settings.builder()
.put(
CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
AdmissionControlMode.MONITOR.getMode()
)
);
Expand Down Expand Up @@ -215,7 +215,7 @@ public void testAdmissionControlRejectionOnDisabled() {
updateSettingsRequest.transientSettings(
Settings.builder()
.put(
CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(),
AdmissionControlMode.DISABLED.getMode()
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ public enum Metric {
RESOURCE_USAGE_STATS("resource_usage_stats"),
SEGMENT_REPLICATION_BACKPRESSURE("segment_replication_backpressure"),
REPOSITORIES("repositories"),

ADMISSION_CONTROL("admission_control");

private String metricName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,13 @@ public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(

/**
* Intercept the transport action and perform admission control if applicable
* @param action The action the request handler is associated with
* @param executor The executor the request handling will be executed on
* @param forceExecution Force execution on the executor queue and never reject it
* @param actualHandler The handler itself that implements the request handling
* @param admissionControlActionType Admission control based on resource usage limits of provided action type
* @return returns the actual TransportRequestHandler after intercepting all previous handlers
* @param <T>
*/
@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@
import org.opensearch.persistent.decider.EnableAssignmentDecider;
import org.opensearch.plugins.PluginsService;
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings;
import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings;
import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.script.ScriptService;
Expand Down Expand Up @@ -698,9 +698,10 @@ public void apply(Settings value, Settings current, Settings previous) {
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE,
CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
CPUBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT,
CPUBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT
CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT,
CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT,
IndicesService.CLUSTER_RESTRICT_INDEX_REPLICATION_TYPE_SETTING
)
)
);
Expand Down
3 changes: 1 addition & 2 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ public class NodeService implements Closeable {
private final FileCache fileCache;
private final TaskCancellationMonitoringService taskCancellationMonitoringService;
private final RepositoriesService repositoriesService;
AdmissionControlService admissionControlService;

private final AdmissionControlService admissionControlService;
private final SegmentReplicationStatsTracker segmentReplicationStatsTracker;

NodeService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.node.ResourceUsageCollectorService;
import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats;
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats;
Expand All @@ -25,20 +25,19 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER;
import static org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER;

/**
* Admission control Service that bootstraps and manages all the Admission Controllers in OpenSearch.
*/
public class AdmissionControlService {
private final ThreadPool threadPool;
public final AdmissionControlSettings admissionControlSettings;
private final ConcurrentMap<String, AdmissionController> ADMISSION_CONTROLLERS;
private final ConcurrentMap<String, AdmissionController> admissionControllers;
private static final Logger logger = LogManager.getLogger(AdmissionControlService.class);
private final ClusterService clusterService;
private final Settings settings;

private ResourceUsageCollectorService resourceUsageCollectorService;
private final ResourceUsageCollectorService resourceUsageCollectorService;

/**
*
Expand All @@ -55,7 +54,7 @@ public AdmissionControlService(
) {
this.threadPool = threadPool;
this.admissionControlSettings = new AdmissionControlSettings(clusterService.getClusterSettings(), settings);
this.ADMISSION_CONTROLLERS = new ConcurrentHashMap<>();
this.admissionControllers = new ConcurrentHashMap<>();
this.clusterService = clusterService;
this.settings = settings;
this.resourceUsageCollectorService = resourceUsageCollectorService;
Expand All @@ -76,7 +75,7 @@ private void initialise() {
* @param admissionControlActionType admissionControllerActionType value
*/
public void applyTransportAdmissionControl(String action, AdmissionControlActionType admissionControlActionType) {
this.ADMISSION_CONTROLLERS.forEach(
this.admissionControllers.forEach(
(name, admissionController) -> { admissionController.apply(action, admissionControlActionType); }
);
}
Expand All @@ -87,7 +86,7 @@ public void applyTransportAdmissionControl(String action, AdmissionControlAction
*/
public void registerAdmissionController(String admissionControllerName) {
AdmissionController admissionController = this.controllerFactory(admissionControllerName);
this.ADMISSION_CONTROLLERS.put(admissionControllerName, admissionController);
this.admissionControllers.put(admissionControllerName, admissionController);
}

/**
Expand All @@ -96,7 +95,7 @@ public void registerAdmissionController(String admissionControllerName) {
private AdmissionController controllerFactory(String admissionControllerName) {
switch (admissionControllerName) {
case CPU_BASED_ADMISSION_CONTROLLER:
return new CPUBasedAdmissionController(
return new CpuBasedAdmissionController(
admissionControllerName,
this.resourceUsageCollectorService,
this.clusterService,
Expand All @@ -112,7 +111,7 @@ private AdmissionController controllerFactory(String admissionControllerName) {
* @return list of the registered admissionControllers
*/
public List<AdmissionController> getAdmissionControllers() {
return new ArrayList<>(this.ADMISSION_CONTROLLERS.values());
return new ArrayList<>(this.admissionControllers.values());
}

/**
Expand All @@ -121,17 +120,17 @@ public List<AdmissionController> getAdmissionControllers() {
* @return instance of the AdmissionController Instance
*/
public AdmissionController getAdmissionController(String controllerName) {
return this.ADMISSION_CONTROLLERS.getOrDefault(controllerName, null);
return this.admissionControllers.getOrDefault(controllerName, null);
}

/**
* Return admission control stats
*/
public AdmissionControlStats stats() {
List<AdmissionControllerStats> statsList = new ArrayList<>();
if (this.ADMISSION_CONTROLLERS.size() > 0) {
this.ADMISSION_CONTROLLERS.forEach((controllerName, admissionController) -> {
AdmissionControllerStats admissionControllerStats = new AdmissionControllerStats(admissionController, controllerName);
if (this.admissionControllers.size() > 0) {
this.admissionControllers.forEach((controllerName, admissionController) -> {
AdmissionControllerStats admissionControllerStats = new AdmissionControllerStats(admissionController);
statsList.add(admissionControllerStats);
});
return new AdmissionControlStats(statsList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public final class AdmissionControlSettings {
* Default parameters for the AdmissionControlSettings
*/
public static class Defaults {
public static final String MODE = "enforced";
public static final String MODE = "disabled";
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,20 @@ public String getName() {
* Add rejection count to the rejection count metric tracked by the admission controller
*/
public void addRejectionCount(String admissionControlActionType, long count) {
AtomicLong updatedCount = new AtomicLong(0);
if (this.rejectionCountMap.containsKey(admissionControlActionType)) {
updatedCount.addAndGet(this.rejectionCountMap.get(admissionControlActionType).get());
if (!this.rejectionCountMap.containsKey(admissionControlActionType)) {
this.rejectionCountMap.put(admissionControlActionType, new AtomicLong(0));
}
updatedCount.addAndGet(count);
this.rejectionCountMap.put(admissionControlActionType, updatedCount);
this.rejectionCountMap.get(admissionControlActionType).getAndAdd(count);
}

/**
* @return current value of the rejection count metric tracked by the admission-controller.
*/
public long getRejectionCount(String admissionControlActionType) {
AtomicLong rejectionCount = this.rejectionCountMap.getOrDefault(admissionControlActionType, new AtomicLong());
return rejectionCount.get();
if (this.rejectionCountMap.containsKey(admissionControlActionType)) {
return this.rejectionCountMap.get(admissionControlActionType).get();
}
return 0;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,34 @@
import org.opensearch.node.NodeResourceUsageStats;
import org.opensearch.node.ResourceUsageCollectorService;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings;
import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings;

import java.util.Locale;
import java.util.Optional;

/**
* Class for CPU Based Admission Controller in OpenSearch, which aims to provide CPU utilisation admission control.
* It provides methods to apply admission control if configured limit has been reached
*/
public class CPUBasedAdmissionController extends AdmissionController {
public class CpuBasedAdmissionController extends AdmissionController {
public static final String CPU_BASED_ADMISSION_CONTROLLER = "global_cpu_usage";
private static final Logger LOGGER = LogManager.getLogger(CPUBasedAdmissionController.class);
public CPUBasedAdmissionControllerSettings settings;
private static final Logger LOGGER = LogManager.getLogger(CpuBasedAdmissionController.class);
public CpuBasedAdmissionControllerSettings settings;

/**
* @param admissionControllerName Name of the admission controller
* @param resourceUsageCollectorService Instance used to get node resource usage stats
* @param clusterService ClusterService Instance
* @param settings Immutable settings instance
*/
public CPUBasedAdmissionController(
public CpuBasedAdmissionController(
String admissionControllerName,
ResourceUsageCollectorService resourceUsageCollectorService,
ClusterService clusterService,
Settings settings
) {
super(admissionControllerName, resourceUsageCollectorService, clusterService);
this.settings = new CPUBasedAdmissionControllerSettings(clusterService.getClusterSettings(), settings);
this.settings = new CpuBasedAdmissionControllerSettings(clusterService.getClusterSettings(), settings);
}

/**
Expand All @@ -64,7 +65,11 @@ private void applyForTransportLayer(String actionName, AdmissionControlActionTyp
this.addRejectionCount(admissionControlActionType.getType(), 1);
if (this.isAdmissionControllerEnforced(this.settings.getTransportLayerAdmissionControllerMode())) {
throw new OpenSearchRejectedExecutionException(
String.format("CPU usage admission controller limit reached for action [%s]", admissionControlActionType.name())
String.format(
Locale.ROOT,
"CPU usage admission controller rejected the request for action [%s] as CPU limit reached",
admissionControlActionType.name()
)
);
}
}
Expand All @@ -84,11 +89,12 @@ private boolean isLimitsBreached(String actionName, AdmissionControlActionType a
double cpuUsage = nodePerformanceStatistics.get().getCpuUtilizationPercent();
if (cpuUsage >= maxCpuLimit) {
LOGGER.warn(
"CpuBasedAdmissionController rejected the request as the current CPU "
+ "usage [{}] exceeds the allowed limit [{}] for transport action [{}]",
"CpuBasedAdmissionController limit reached as the current CPU "
+ "usage [{}] exceeds the allowed limit [{}] for transport action [{}] in admissionControlMode [{}]",
cpuUsage,
maxCpuLimit,
actionName
actionName,
this.settings.getTransportLayerAdmissionControllerMode()
);
return true;
}
Expand All @@ -109,6 +115,7 @@ private long getCpuRejectionThreshold(AdmissionControlActionType admissionContro
default:
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Admission control not Supported for AdmissionControlActionType: %s",
admissionControlActionType.getType()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
* Settings related to cpu based admission controller.
* @opensearch.internal
*/
public class CPUBasedAdmissionControllerSettings {
public class CpuBasedAdmissionControllerSettings {

/**
* Default parameters for the CPUBasedAdmissionControllerSettings
* Default parameters for the CpuBasedAdmissionControllerSettings
*/
public static class Defaults {
public static final long CPU_USAGE_LIMIT = 0;
public static final long CPU_USAGE_LIMIT = 95;
}

private AdmissionControlMode transportLayerMode;
Expand Down Expand Up @@ -63,7 +63,7 @@ public static class Defaults {
);

// currently limited to one setting will add further more settings in follow-up PR's
public CPUBasedAdmissionControllerSettings(ClusterSettings clusterSettings, Settings settings) {
public CpuBasedAdmissionControllerSettings(ClusterSettings clusterSettings, Settings settings) {
this.transportLayerMode = CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.get(settings);
clusterSettings.addSettingsUpdateConsumer(CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, this::setTransportLayerMode);
this.searchCPULimit = SEARCH_CPU_USAGE_LIMIT.get(settings);
Expand Down
Loading

0 comments on commit deafb78

Please sign in to comment.