Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add wlm resiliency orchestrator (query group service) #15925

Merged
merged 54 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
a720136
cancellation related
kiranprakash154 Aug 7, 2024
83e20c0
Update CHANGELOG.md
kiranprakash154 Aug 7, 2024
9983c73
add better cancellation reason
kiranprakash154 Aug 7, 2024
245ee5d
Update DefaultTaskCancellationTests.java
kiranprakash154 Aug 7, 2024
0771fd2
refactor
kiranprakash154 Aug 21, 2024
4b1ef81
refactor
kiranprakash154 Aug 26, 2024
3ea44d7
Update DefaultTaskCancellation.java
kiranprakash154 Aug 27, 2024
0103089
Update DefaultTaskCancellation.java
kiranprakash154 Aug 29, 2024
092d715
Update DefaultTaskCancellation.java
kiranprakash154 Aug 29, 2024
4a2c51e
Update DefaultTaskSelectionStrategy.java
kiranprakash154 Aug 29, 2024
cbb51bd
refactor
kiranprakash154 Aug 29, 2024
4e846e2
refactor node level threshold
kiranprakash154 Aug 29, 2024
241b036
Merge branch 'main' into kp/wlm-cancellation-1
kaushalmahi12 Aug 30, 2024
7511d99
use query group task
kaushalmahi12 Aug 30, 2024
498743a
code clean up and refactorings
kaushalmahi12 Sep 3, 2024
e26e525
add unit tests and fix existing ones
kaushalmahi12 Sep 4, 2024
0ff2b09
uncomment the test case
kaushalmahi12 Sep 4, 2024
ddb8dce
update CHANGELOG
kaushalmahi12 Sep 4, 2024
3528054
Merge branch 'main' into feature/wlm-cancellation
kaushalmahi12 Sep 4, 2024
e8366a5
fix imports
kaushalmahi12 Sep 4, 2024
d2d02e3
add queryGroupService
kaushalmahi12 Sep 4, 2024
448ea41
refactor and add UTs for new constructs
kaushalmahi12 Sep 5, 2024
3fc21be
fix javadocs
kaushalmahi12 Sep 5, 2024
fe02a6a
remove code clutter
kaushalmahi12 Sep 6, 2024
8aede33
change annotation version and task selection strategy
kaushalmahi12 Sep 6, 2024
623f6f8
rename a util class
kaushalmahi12 Sep 6, 2024
9e2e3ea
remove wrappers from resource type
kaushalmahi12 Sep 6, 2024
34184ef
apply spotless
kaushalmahi12 Sep 6, 2024
91893e7
address comments
kaushalmahi12 Sep 9, 2024
66e43b2
add rename changes
kaushalmahi12 Sep 9, 2024
a6b1afd
Merge branch 'main' into feature/wlm-cancellation
kaushalmahi12 Sep 9, 2024
981b15f
address comments
kaushalmahi12 Sep 9, 2024
16b6dff
Merge branch 'feature/wlm-cancellation' into feature/wlm-service
kaushalmahi12 Sep 10, 2024
0be1023
initial changes
kaushalmahi12 Sep 10, 2024
caf5914
refactor changes and logical bug fix
kaushalmahi12 Sep 10, 2024
6381e17
add chanegs
kaushalmahi12 Sep 11, 2024
b78ca02
address comments
kaushalmahi12 Sep 11, 2024
66e18b9
Merge branch 'feature/wlm-cancellation' into feature/wlm-service
kaushalmahi12 Sep 11, 2024
bec1ece
temp changes
kaushalmahi12 Sep 11, 2024
1e76ede
add UTs
kaushalmahi12 Sep 13, 2024
7c5a48e
Merge branch 'main' into feature/wlm-service
kaushalmahi12 Sep 13, 2024
931f6bf
add changelog
kaushalmahi12 Sep 13, 2024
3f4d590
add task completion listener hook
kaushalmahi12 Sep 13, 2024
501e5e9
add remaining pieces to make the feature functional
kaushalmahi12 Sep 13, 2024
2292fcd
extend stats and fix bugs
kaushalmahi12 Sep 16, 2024
139404a
fix bugs and add logic to make SBP work with wlm
kaushalmahi12 Sep 19, 2024
56c3393
address comments
kaushalmahi12 Oct 1, 2024
baa39c5
fix bugs and SBP ITs
kaushalmahi12 Oct 1, 2024
3c6adae
add missed applyCluster state change
kaushalmahi12 Oct 1, 2024
040ed4c
Merge branch 'main' into feature/wlm-service
kaushalmahi12 Oct 7, 2024
862af7b
address comments
kaushalmahi12 Oct 7, 2024
e2b203c
decouple queryGroupService and cancellationService
kaushalmahi12 Oct 7, 2024
623078a
replace StateApplier with StateListener interface
kaushalmahi12 Oct 7, 2024
2d7316b
fix precommit errors
kaushalmahi12 Oct 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for async deletion in S3BlobContainer ([#15621](https://github.com/opensearch-project/OpenSearch/pull/15621))
- MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637))
- [Workload Management] QueryGroup resource cancellation framework changes ([#15651](https://github.com/opensearch-project/OpenSearch/pull/15651))
- [Workload Management] Add orchestrator for wlm resiliency (QueryGroupService) ([#15925](https://github.com/opensearch-project/OpenSearch/pull/15925))
- Fallback to Remote cluster-state on Term-Version check mismatch - ([#15424](https://github.com/opensearch-project/OpenSearch/pull/15424))
- Implement WithFieldName interface in ValuesSourceAggregationBuilder & FieldSortBuilder ([#15916](https://github.com/opensearch-project/OpenSearch/pull/15916))
- Add successfulSearchShardIndices in searchRequestContext ([#15967](https://github.com/opensearch-project/OpenSearch/pull/15967))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
import org.opensearch.wlm.QueryGroupTask;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -411,6 +412,7 @@ protected void doExecute(Task task, TestRequest request, ActionListener<TestResp
threadPool.executor(ThreadPool.Names.SEARCH).execute(() -> {
try {
CancellableTask cancellableTask = (CancellableTask) task;
((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext());
long startTime = System.nanoTime();

// Doing a busy-wait until task cancellation or timeout.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.usage.UsageService;
import org.opensearch.wlm.QueryGroupTask;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -565,7 +566,10 @@ public ActionModule(
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<RestHeaderDefinition> headers = Stream.concat(
actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
Stream.of(new RestHeaderDefinition(Task.X_OPAQUE_ID, false))
Stream.of(
new RestHeaderDefinition(Task.X_OPAQUE_ID, false),
new RestHeaderDefinition(QueryGroupTask.QUERY_GROUP_ID_HEADER, false)
)
).collect(Collectors.toSet());
UnaryOperator<RestHandler> restWrapper = null;
for (ActionPlugin plugin : actionPlugins) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,9 @@ public void apply(Settings value, Settings current, Settings previous) {
WorkloadManagementSettings.NODE_LEVEL_CPU_CANCELLATION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_MEMORY_REJECTION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD,
WorkloadManagementSettings.WLM_MODE_SETTING,
WorkloadManagementSettings.QUERYGROUP_SERVICE_RUN_INTERVAL_SETTING,
WorkloadManagementSettings.QUERYGROUP_SERVICE_DURESS_STREAK_SETTING,

// Settings to be used for limiting rest requests
ResponseLimitSettings.CAT_INDICES_RESPONSE_LIMIT_SETTING,
Expand Down
39 changes: 35 additions & 4 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,13 @@
import org.opensearch.usage.UsageService;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.wlm.QueryGroupService;
import org.opensearch.wlm.QueryGroupsStateAccessor;
import org.opensearch.wlm.WorkloadManagementSettings;
import org.opensearch.wlm.WorkloadManagementTransportInterceptor;
import org.opensearch.wlm.cancellation.MaximumResourceTaskSelectionStrategy;
import org.opensearch.wlm.cancellation.QueryGroupTaskCancellationService;
import org.opensearch.wlm.listeners.QueryGroupRequestOperationListener;
import org.opensearch.wlm.tracker.QueryGroupResourceUsageTrackerService;

import javax.net.ssl.SNIHostName;

Expand Down Expand Up @@ -1022,8 +1027,30 @@ protected Node(
List<IdentityAwarePlugin> identityAwarePlugins = pluginsService.filterPlugins(IdentityAwarePlugin.class);
identityService.initializeIdentityAwarePlugins(identityAwarePlugins);

final QueryGroupService queryGroupService = new QueryGroupService(); // We will need to replace this with actual instance of the
// queryGroupService
final QueryGroupResourceUsageTrackerService queryGroupResourceUsageTrackerService = new QueryGroupResourceUsageTrackerService(
taskResourceTrackingService
);
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
final WorkloadManagementSettings workloadManagementSettings = new WorkloadManagementSettings(
settings,
settingsModule.getClusterSettings()
);

final QueryGroupsStateAccessor queryGroupsStateAccessor = new QueryGroupsStateAccessor();

final QueryGroupService queryGroupService = new QueryGroupService(
new QueryGroupTaskCancellationService(
workloadManagementSettings,
new MaximumResourceTaskSelectionStrategy(),
queryGroupResourceUsageTrackerService,
queryGroupsStateAccessor
),
clusterService,
threadPool,
workloadManagementSettings,
queryGroupsStateAccessor
);
taskResourceTrackingService.addTaskCompletionListener(queryGroupService);

final QueryGroupRequestOperationListener queryGroupRequestOperationListener = new QueryGroupRequestOperationListener(
queryGroupService,
threadPool
Expand Down Expand Up @@ -1089,7 +1116,7 @@ protected Node(

WorkloadManagementTransportInterceptor workloadManagementTransportInterceptor = new WorkloadManagementTransportInterceptor(
threadPool,
new QueryGroupService() // We will need to replace this with actual implementation
queryGroupService
);

final Collection<SecureSettingsFactory> secureSettingsFactories = pluginsService.filterPlugins(Plugin.class)
Expand Down Expand Up @@ -1184,7 +1211,8 @@ protected Node(
searchBackpressureSettings,
taskResourceTrackingService,
threadPool,
transportService.getTaskManager()
transportService.getTaskManager(),
queryGroupService
);

final SegmentReplicationStatsTracker segmentReplicationStatsTracker = new SegmentReplicationStatsTracker(indicesService);
Expand Down Expand Up @@ -1396,6 +1424,7 @@ protected Node(
b.bind(IndexingPressureService.class).toInstance(indexingPressureService);
b.bind(TaskResourceTrackingService.class).toInstance(taskResourceTrackingService);
b.bind(SearchBackpressureService.class).toInstance(searchBackpressureService);
b.bind(QueryGroupService.class).toInstance(queryGroupService);
b.bind(AdmissionControlService.class).toInstance(admissionControlService);
b.bind(UsageService.class).toInstance(usageService);
b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
Expand Down Expand Up @@ -1589,6 +1618,7 @@ public Node start() throws NodeValidationException {
nodeService.getMonitorService().start();
nodeService.getSearchBackpressureService().start();
nodeService.getTaskCancellationMonitoringService().start();
injector.getInstance(QueryGroupService.class).start();

final ClusterService clusterService = injector.getInstance(ClusterService.class);

Expand Down Expand Up @@ -1762,6 +1792,7 @@ private Node stop() {
injector.getInstance(FsHealthService.class).stop();
injector.getInstance(NodeResourceUsageTracker.class).stop();
injector.getInstance(ResourceUsageCollectorService.class).stop();
injector.getInstance(QueryGroupService.class).stop();
nodeService.getMonitorService().stop();
nodeService.getSearchBackpressureService().stop();
injector.getInstance(GatewayService.class).stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.tasks.TaskResourceTrackingService.TaskCompletionListener;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.wlm.QueryGroupService;
import org.opensearch.wlm.ResourceType;

import java.io.IOException;
Expand Down Expand Up @@ -86,12 +87,14 @@ public class SearchBackpressureService extends AbstractLifecycleComponent implem

private final Map<Class<? extends SearchBackpressureTask>, SearchBackpressureState> searchBackpressureStates;
private final TaskManager taskManager;
private final QueryGroupService queryGroupService;

public SearchBackpressureService(
SearchBackpressureSettings settings,
TaskResourceTrackingService taskResourceTrackingService,
ThreadPool threadPool,
TaskManager taskManager
TaskManager taskManager,
QueryGroupService queryGroupService
) {
this(settings, taskResourceTrackingService, threadPool, System::nanoTime, new NodeDuressTrackers(new EnumMap<>(ResourceType.class) {
{
Expand Down Expand Up @@ -131,7 +134,8 @@ public SearchBackpressureService(
settings.getClusterSettings(),
SearchShardTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE
),
taskManager
taskManager,
queryGroupService
);
}

Expand All @@ -143,14 +147,16 @@ public SearchBackpressureService(
NodeDuressTrackers nodeDuressTrackers,
TaskResourceUsageTrackers searchTaskTrackers,
TaskResourceUsageTrackers searchShardTaskTrackers,
TaskManager taskManager
TaskManager taskManager,
QueryGroupService queryGroupService
) {
this.settings = settings;
this.taskResourceTrackingService = taskResourceTrackingService;
this.taskResourceTrackingService.addTaskCompletionListener(this);
this.threadPool = threadPool;
this.nodeDuressTrackers = nodeDuressTrackers;
this.taskManager = taskManager;
this.queryGroupService = queryGroupService;

this.searchBackpressureStates = Map.of(
SearchTask.class,
Expand Down Expand Up @@ -346,6 +352,7 @@ <T extends CancellableTask & SearchBackpressureTask> List<CancellableTask> getTa
.stream()
.filter(type::isInstance)
.map(type::cast)
.filter(queryGroupService::shouldSBPHandle)
.collect(Collectors.toUnmodifiableList());
}

Expand Down
Loading
Loading