Skip to content

Commit

Permalink
Add search backpressure cancellation at the coordinator level (#5605)
Browse files Browse the repository at this point in the history
* Cancellation of in-flight search requests at coordinator level

Signed-off-by: PritLadani <pritkladani@gmail.com>
  • Loading branch information
PritLadani authored Feb 5, 2023
1 parent e42b76f commit 74912d2
Show file tree
Hide file tree
Showing 29 changed files with 1,639 additions and 438 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added support to apply index create block ([#4603](https://github.com/opensearch-project/OpenSearch/issues/4603))
- Adds support for minimum compatible version for extensions ([#6003](https://github.com/opensearch-project/OpenSearch/pull/6003))
- Add a guardrail to limit maximum number of shard on the cluster ([#6143](https://github.com/opensearch-project/OpenSearch/pull/6143))
- Add cancellation of in-flight SearchTasks based on resource consumption ([#5606](https://github.com/opensearch-project/OpenSearch/pull/5605))

### Dependencies
- Update nebula-publishing-plugin to 19.2.0 ([#5704](https://github.com/opensearch-project/OpenSearch/pull/5704))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.search.fetch.ShardFetchSearchRequest;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.SearchBackpressureTask;
import org.opensearch.tasks.TaskId;

import java.util.Map;
Expand All @@ -47,7 +48,7 @@
*
* @opensearch.internal
*/
public class SearchShardTask extends CancellableTask {
public class SearchShardTask extends CancellableTask implements SearchBackpressureTask {
// generating metadata in a lazy way since source can be quite big
private final MemoizedSupplier<String> metadataSupplier;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.common.unit.TimeValue;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.SearchBackpressureTask;
import org.opensearch.tasks.TaskId;

import java.util.Map;
Expand All @@ -46,7 +47,7 @@
*
* @opensearch.internal
*/
public class SearchTask extends CancellableTask {
public class SearchTask extends CancellableTask implements SearchBackpressureTask {
// generating description in a lazy way since source can be quite big
private final Supplier<String> descriptionSupplier;
private SearchProgressListener progressListener = SearchProgressListener.NOOP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@
import org.opensearch.search.backpressure.settings.NodeDuressSettings;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
import org.opensearch.search.backpressure.settings.SearchTaskSettings;
import org.opensearch.tasks.TaskManager;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.watcher.ResourceWatcherService;
Expand Down Expand Up @@ -600,18 +598,31 @@ public void apply(Settings value, Settings current, Settings previous) {

// Settings related to search backpressure
SearchBackpressureSettings.SETTING_MODE,
SearchBackpressureSettings.SETTING_CANCELLATION_RATIO,
SearchBackpressureSettings.SETTING_CANCELLATION_RATE,
SearchBackpressureSettings.SETTING_CANCELLATION_BURST,

NodeDuressSettings.SETTING_NUM_SUCCESSIVE_BREACHES,
NodeDuressSettings.SETTING_CPU_THRESHOLD,
NodeDuressSettings.SETTING_HEAP_THRESHOLD,
SearchTaskSettings.SETTING_CANCELLATION_RATIO,
SearchTaskSettings.SETTING_CANCELLATION_RATE,
SearchTaskSettings.SETTING_CANCELLATION_BURST,
SearchTaskSettings.SETTING_HEAP_PERCENT_THRESHOLD,
SearchTaskSettings.SETTING_HEAP_VARIANCE_THRESHOLD,
SearchTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE,
SearchTaskSettings.SETTING_CPU_TIME_MILLIS_THRESHOLD,
SearchTaskSettings.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD,
SearchTaskSettings.SETTING_TOTAL_HEAP_PERCENT_THRESHOLD,
SearchShardTaskSettings.SETTING_CANCELLATION_RATIO,
SearchShardTaskSettings.SETTING_CANCELLATION_RATE,
SearchShardTaskSettings.SETTING_CANCELLATION_BURST,
SearchShardTaskSettings.SETTING_HEAP_PERCENT_THRESHOLD,
SearchShardTaskSettings.SETTING_HEAP_VARIANCE_THRESHOLD,
SearchShardTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE,
SearchShardTaskSettings.SETTING_CPU_TIME_MILLIS_THRESHOLD,
SearchShardTaskSettings.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD,
SearchShardTaskSettings.SETTING_TOTAL_HEAP_PERCENT_THRESHOLD,
HeapUsageTracker.SETTING_HEAP_PERCENT_THRESHOLD,
HeapUsageTracker.SETTING_HEAP_VARIANCE_THRESHOLD,
HeapUsageTracker.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE,
CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD,
ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD
SearchBackpressureSettings.SETTING_CANCELLATION_RATIO, // deprecated
SearchBackpressureSettings.SETTING_CANCELLATION_RATE, // deprecated
SearchBackpressureSettings.SETTING_CANCELLATION_BURST // deprecated
)
)
);
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,8 @@ protected Node(
final SearchBackpressureService searchBackpressureService = new SearchBackpressureService(
searchBackpressureSettings,
taskResourceTrackingService,
threadPool
threadPool,
transportService.getTaskManager()
);

final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.backpressure;

/**
* Listener for callbacks related to cancellation settings
*/
public interface CancellationSettingsListener {

void onRatioChanged(double ratio);

void onRateChanged(double rate);

void onBurstChanged(double burst);
}
Loading

0 comments on commit 74912d2

Please sign in to comment.