Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add wlm resiliency orchestrator (query group service) (#15925) #16225

Merged
merged 1 commit into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- [Workload Management] Add orchestrator for wlm resiliency (QueryGroupService) ([#15925](https://github.com/opensearch-project/OpenSearch/pull/15925))
- [Offline Nodes] Adds offline-tasks library containing various interfaces to be used for Offline Background Tasks. ([#13574](https://github.com/opensearch-project/OpenSearch/pull/13574))
- Add support for async deletion in S3BlobContainer ([#15621](https://github.com/opensearch-project/OpenSearch/pull/15621))
- [Workload Management] QueryGroup resource cancellation framework changes ([#15651](https://github.com/opensearch-project/OpenSearch/pull/15651))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
import org.opensearch.wlm.QueryGroupTask;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -411,6 +412,7 @@ protected void doExecute(Task task, TestRequest request, ActionListener<TestResp
threadPool.executor(ThreadPool.Names.SEARCH).execute(() -> {
try {
CancellableTask cancellableTask = (CancellableTask) task;
((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext());
long startTime = System.nanoTime();

// Doing a busy-wait until task cancellation or timeout.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.usage.UsageService;
import org.opensearch.wlm.QueryGroupTask;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -552,7 +553,10 @@ public ActionModule(
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<RestHeaderDefinition> headers = Stream.concat(
actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
Stream.of(new RestHeaderDefinition(Task.X_OPAQUE_ID, false))
Stream.of(
new RestHeaderDefinition(Task.X_OPAQUE_ID, false),
new RestHeaderDefinition(QueryGroupTask.QUERY_GROUP_ID_HEADER, false)
)
).collect(Collectors.toSet());
UnaryOperator<RestHandler> restWrapper = null;
for (ActionPlugin plugin : actionPlugins) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,9 @@ public void apply(Settings value, Settings current, Settings previous) {
WorkloadManagementSettings.NODE_LEVEL_CPU_CANCELLATION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_MEMORY_REJECTION_THRESHOLD,
WorkloadManagementSettings.NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD,
WorkloadManagementSettings.WLM_MODE_SETTING,
WorkloadManagementSettings.QUERYGROUP_SERVICE_RUN_INTERVAL_SETTING,
WorkloadManagementSettings.QUERYGROUP_SERVICE_DURESS_STREAK_SETTING,

SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING,

Expand Down
39 changes: 35 additions & 4 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,13 @@
import org.opensearch.usage.UsageService;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.wlm.QueryGroupService;
import org.opensearch.wlm.QueryGroupsStateAccessor;
import org.opensearch.wlm.WorkloadManagementSettings;
import org.opensearch.wlm.WorkloadManagementTransportInterceptor;
import org.opensearch.wlm.cancellation.MaximumResourceTaskSelectionStrategy;
import org.opensearch.wlm.cancellation.QueryGroupTaskCancellationService;
import org.opensearch.wlm.listeners.QueryGroupRequestOperationListener;
import org.opensearch.wlm.tracker.QueryGroupResourceUsageTrackerService;

import javax.net.ssl.SNIHostName;

Expand Down Expand Up @@ -1019,8 +1024,30 @@ protected Node(
List<IdentityAwarePlugin> identityAwarePlugins = pluginsService.filterPlugins(IdentityAwarePlugin.class);
identityService.initializeIdentityAwarePlugins(identityAwarePlugins);

final QueryGroupService queryGroupService = new QueryGroupService(); // We will need to replace this with actual instance of the
// queryGroupService
final QueryGroupResourceUsageTrackerService queryGroupResourceUsageTrackerService = new QueryGroupResourceUsageTrackerService(
taskResourceTrackingService
);
final WorkloadManagementSettings workloadManagementSettings = new WorkloadManagementSettings(
settings,
settingsModule.getClusterSettings()
);

final QueryGroupsStateAccessor queryGroupsStateAccessor = new QueryGroupsStateAccessor();

final QueryGroupService queryGroupService = new QueryGroupService(
new QueryGroupTaskCancellationService(
workloadManagementSettings,
new MaximumResourceTaskSelectionStrategy(),
queryGroupResourceUsageTrackerService,
queryGroupsStateAccessor
),
clusterService,
threadPool,
workloadManagementSettings,
queryGroupsStateAccessor
);
taskResourceTrackingService.addTaskCompletionListener(queryGroupService);

final QueryGroupRequestOperationListener queryGroupRequestOperationListener = new QueryGroupRequestOperationListener(
queryGroupService,
threadPool
Expand Down Expand Up @@ -1086,7 +1113,7 @@ protected Node(

WorkloadManagementTransportInterceptor workloadManagementTransportInterceptor = new WorkloadManagementTransportInterceptor(
threadPool,
new QueryGroupService() // We will need to replace this with actual implementation
queryGroupService
);

final Collection<SecureSettingsFactory> secureSettingsFactories = pluginsService.filterPlugins(Plugin.class)
Expand Down Expand Up @@ -1180,7 +1207,8 @@ protected Node(
searchBackpressureSettings,
taskResourceTrackingService,
threadPool,
transportService.getTaskManager()
transportService.getTaskManager(),
queryGroupService
);

final SegmentReplicationStatsTracker segmentReplicationStatsTracker = new SegmentReplicationStatsTracker(indicesService);
Expand Down Expand Up @@ -1392,6 +1420,7 @@ protected Node(
b.bind(IndexingPressureService.class).toInstance(indexingPressureService);
b.bind(TaskResourceTrackingService.class).toInstance(taskResourceTrackingService);
b.bind(SearchBackpressureService.class).toInstance(searchBackpressureService);
b.bind(QueryGroupService.class).toInstance(queryGroupService);
b.bind(AdmissionControlService.class).toInstance(admissionControlService);
b.bind(UsageService.class).toInstance(usageService);
b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
Expand Down Expand Up @@ -1583,6 +1612,7 @@ public Node start() throws NodeValidationException {
nodeService.getMonitorService().start();
nodeService.getSearchBackpressureService().start();
nodeService.getTaskCancellationMonitoringService().start();
injector.getInstance(QueryGroupService.class).start();

final ClusterService clusterService = injector.getInstance(ClusterService.class);

Expand Down Expand Up @@ -1756,6 +1786,7 @@ private Node stop() {
injector.getInstance(FsHealthService.class).stop();
injector.getInstance(NodeResourceUsageTracker.class).stop();
injector.getInstance(ResourceUsageCollectorService.class).stop();
injector.getInstance(QueryGroupService.class).stop();
nodeService.getMonitorService().stop();
nodeService.getSearchBackpressureService().stop();
injector.getInstance(GatewayService.class).stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.tasks.TaskResourceTrackingService.TaskCompletionListener;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.wlm.QueryGroupService;
import org.opensearch.wlm.ResourceType;

import java.io.IOException;
Expand Down Expand Up @@ -86,12 +87,14 @@ public class SearchBackpressureService extends AbstractLifecycleComponent implem

private final Map<Class<? extends SearchBackpressureTask>, SearchBackpressureState> searchBackpressureStates;
private final TaskManager taskManager;
private final QueryGroupService queryGroupService;

public SearchBackpressureService(
SearchBackpressureSettings settings,
TaskResourceTrackingService taskResourceTrackingService,
ThreadPool threadPool,
TaskManager taskManager
TaskManager taskManager,
QueryGroupService queryGroupService
) {
this(settings, taskResourceTrackingService, threadPool, System::nanoTime, new NodeDuressTrackers(new EnumMap<>(ResourceType.class) {
{
Expand Down Expand Up @@ -131,7 +134,8 @@ public SearchBackpressureService(
settings.getClusterSettings(),
SearchShardTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE
),
taskManager
taskManager,
queryGroupService
);
}

Expand All @@ -143,14 +147,16 @@ public SearchBackpressureService(
NodeDuressTrackers nodeDuressTrackers,
TaskResourceUsageTrackers searchTaskTrackers,
TaskResourceUsageTrackers searchShardTaskTrackers,
TaskManager taskManager
TaskManager taskManager,
QueryGroupService queryGroupService
) {
this.settings = settings;
this.taskResourceTrackingService = taskResourceTrackingService;
this.taskResourceTrackingService.addTaskCompletionListener(this);
this.threadPool = threadPool;
this.nodeDuressTrackers = nodeDuressTrackers;
this.taskManager = taskManager;
this.queryGroupService = queryGroupService;

this.searchBackpressureStates = Map.of(
SearchTask.class,
Expand Down Expand Up @@ -346,6 +352,7 @@ <T extends CancellableTask & SearchBackpressureTask> List<CancellableTask> getTa
.stream()
.filter(type::isInstance)
.map(type::cast)
.filter(queryGroupService::shouldSBPHandle)
.collect(Collectors.toUnmodifiableList());
}

Expand Down
Loading
Loading