Skip to content

Commit

Permalink
Revert TM resource based task scheduling issues (#189529)
Browse files Browse the repository at this point in the history
  • Loading branch information
ymao1 authored Jul 30, 2024
1 parent 1f00087 commit 3245909
Show file tree
Hide file tree
Showing 70 changed files with 1,524 additions and 4,986 deletions.
33 changes: 0 additions & 33 deletions x-pack/plugins/alerting/server/rule_type_registry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -564,39 +564,6 @@ describe('Create Lifecycle', () => {
});
});

test('injects custom cost for certain rule types', () => {
const ruleType: RuleType<never, never, never, never, never, 'default', 'recovered', {}> = {
id: 'siem.indicatorRule',
name: 'Test',
actionGroups: [
{
id: 'default',
name: 'Default',
},
],
defaultActionGroupId: 'default',
minimumLicenseRequired: 'basic',
isExportable: true,
executor: jest.fn(),
category: 'test',
producer: 'alerts',
ruleTaskTimeout: '20m',
validate: {
params: { validate: (params) => params },
},
};
const registry = new RuleTypeRegistry(ruleTypeRegistryParams);
registry.register(ruleType);
expect(taskManager.registerTaskDefinitions).toHaveBeenCalledTimes(1);
expect(taskManager.registerTaskDefinitions.mock.calls[0][0]).toMatchObject({
'alerting:siem.indicatorRule': {
timeout: '20m',
title: 'Test',
cost: 10,
},
});
});

test('shallow clones the given rule type', () => {
const ruleType: RuleType<never, never, never, never, never, 'default', 'recovered', {}> = {
id: 'test',
Expand Down
7 changes: 0 additions & 7 deletions x-pack/plugins/alerting/server/rule_type_registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import { Logger } from '@kbn/core/server';
import { LicensingPluginSetup } from '@kbn/licensing-plugin/server';
import { RunContext, TaskManagerSetupContract } from '@kbn/task-manager-plugin/server';
import { stateSchemaByVersion } from '@kbn/alerting-state-types';
import { TaskCost } from '@kbn/task-manager-plugin/server/task';
import { TaskRunnerFactory } from './task_runner';
import {
RuleType,
Expand All @@ -41,9 +40,6 @@ import { AlertsService } from './alerts_service/alerts_service';
import { getRuleTypeIdValidLegacyConsumers } from './rule_type_registry_deprecated_consumers';
import { AlertingConfig } from './config';

const RULE_TYPES_WITH_CUSTOM_COST: Record<string, TaskCost> = {
'siem.indicatorRule': TaskCost.ExtraLarge,
};
export interface ConstructorOptions {
config: AlertingConfig;
logger: Logger;
Expand Down Expand Up @@ -293,8 +289,6 @@ export class RuleTypeRegistry {
normalizedRuleType as unknown as UntypedNormalizedRuleType
);

const taskCost: TaskCost | undefined = RULE_TYPES_WITH_CUSTOM_COST[ruleType.id];

this.taskManager.registerTaskDefinitions({
[`alerting:${ruleType.id}`]: {
title: ruleType.name,
Expand All @@ -316,7 +310,6 @@ export class RuleTypeRegistry {
spaceId: schema.string(),
consumer: schema.maybe(schema.string()),
}),
...(taskCost ? { cost: taskCost } : {}),
},
});

Expand Down
2 changes: 0 additions & 2 deletions x-pack/plugins/task_manager/kibana.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
"task_manager"
],
"optionalPlugins": [
"cloud",
"serverless",
"usageCollection"
]
}
Expand Down
6 changes: 3 additions & 3 deletions x-pack/plugins/task_manager/server/MONITORING.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ The root `timestamp` is the time in which the summary was exposed (either to the
Follow this step-by-step guide to make sense of the stats: https://www.elastic.co/guide/en/kibana/master/task-manager-troubleshooting.html#task-manager-diagnosing-root-cause

#### The Configuration Section
The `configuration` section summarizes Task Manager's current configuration, including dynamic configurations which change over time, such as `poll_interval` and `capacity` which adjust in reaction to changing load on the system.
The `configuration` section summarizes Task Manager's current configuration, including dynamic configurations which change over time, such as `poll_interval` and `max_workers` which adjust in reaction to changing load on the system.

These are "Hot" stats which are updated whenever a change happens in the configuration.

Expand All @@ -69,8 +69,8 @@ The `runtime` tracks Task Manager's performance as it runs, making note of task
These include:
- The time it takes a task to run (p50, p90, p95 & p99, using a configurable running average window, `50` by default)
- The average _drift_ that tasks experience (p50, p90, p95 & p99, using the same configurable running average window as above). Drift tells us how long after a task's scheduled a task typically executes.
- The average _load_ (p50, p90, p95 & p99, using the same configurable running average window as above). Load tells us what percentage of capacity is in use at the end of each polling cycle.
- The polling rate (the timestamp of the last time a polling cycle completed), the polling health stats (number of version clashes and mismatches) and the result [`No tasks | Filled task pool | Unexpectedly ran out of capacity`] frequency the past 50 polling cycles (using the same window size as the one used for running averages)
- The average _load_ (p50, p90, p95 & p99, using the same configurable running average window as above). Load tells us what percentage of workers is in use at the end of each polling cycle.
- The polling rate (the timestamp of the last time a polling cycle completed), the polling health stats (number of version clashes and mismatches) and the result [`No tasks | Filled task pool | Unexpectedly ran out of workers`] frequency the past 50 polling cycles (using the same window size as the one used for running averages)
- The `Success | Retry | Failure ratio` by task type. This is different than the workload stats which tell you what's in the queue, but ca't keep track of retries and of non recurring tasks as they're wiped off the index when completed.

These are "Hot" stats which are updated reactively as Tasks are executed and interacted with.
3 changes: 3 additions & 0 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ describe('config validation', () => {
"warn_threshold": 5000,
},
"max_attempts": 3,
"max_workers": 10,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_health_verbose_log": Object {
Expand Down Expand Up @@ -80,6 +81,7 @@ describe('config validation', () => {
"warn_threshold": 5000,
},
"max_attempts": 3,
"max_workers": 10,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_health_verbose_log": Object {
Expand Down Expand Up @@ -135,6 +137,7 @@ describe('config validation', () => {
"warn_threshold": 5000,
},
"max_attempts": 3,
"max_workers": 10,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_health_verbose_log": Object {
Expand Down
16 changes: 5 additions & 11 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@
import { schema, TypeOf } from '@kbn/config-schema';

export const MAX_WORKERS_LIMIT = 100;
export const DEFAULT_CAPACITY = 10;
export const MAX_CAPACITY = 50;
export const MIN_CAPACITY = 5;
export const DEFAULT_MAX_WORKERS = 10;
export const DEFAULT_POLL_INTERVAL = 3000;
export const DEFAULT_VERSION_CONFLICT_THRESHOLD = 80;
Expand Down Expand Up @@ -67,8 +64,6 @@ const requestTimeoutsConfig = schema.object({
export const configSchema = schema.object(
{
allow_reading_invalid_state: schema.boolean({ defaultValue: true }),
/* The number of normal cost tasks that this Kibana instance will run simultaneously */
capacity: schema.maybe(schema.number({ min: MIN_CAPACITY, max: MAX_CAPACITY })),
ephemeral_tasks: schema.object({
enabled: schema.boolean({ defaultValue: false }),
/* How many requests can Task Manager buffer before it rejects new requests. */
Expand All @@ -86,12 +81,11 @@ export const configSchema = schema.object(
min: 1,
}),
/* The maximum number of tasks that this Kibana instance will run simultaneously. */
max_workers: schema.maybe(
schema.number({
// disable the task manager rather than trying to specify it with 0 workers
min: 1,
})
),
max_workers: schema.number({
defaultValue: DEFAULT_MAX_WORKERS,
// disable the task manager rather than trying to specify it with 0 workers
min: 1,
}),
/* The interval at which monotonically increasing metrics counters will reset */
metrics_reset_interval: schema.number({
defaultValue: DEFAULT_METRICS_RESET_INTERVAL,
Expand Down
29 changes: 15 additions & 14 deletions x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { v4 as uuidv4 } from 'uuid';
import { asTaskPollingCycleEvent, asTaskRunEvent, TaskPersistence } from './task_events';
import { TaskRunResult } from './task_running';
import { TaskPoolRunResult } from './task_pool';
import { TaskPoolMock } from './task_pool/task_pool.mock';
import { TaskPoolMock } from './task_pool.mock';
import { executionContextServiceMock } from '@kbn/core/server/mocks';
import { taskManagerMock } from './mocks';

Expand All @@ -45,6 +45,7 @@ describe('EphemeralTaskLifecycle', () => {
definitions: new TaskTypeDictionary(taskManagerLogger),
executionContext,
config: {
max_workers: 10,
max_attempts: 9,
poll_interval: 6000000,
version_conflict_threshold: 80,
Expand Down Expand Up @@ -155,7 +156,7 @@ describe('EphemeralTaskLifecycle', () => {
expect(ephemeralTaskLifecycle.attemptToRun(task)).toMatchObject(asOk(task));

poolCapacity.mockReturnValue({
availableCapacity: 10,
availableWorkers: 10,
});

lifecycleEvent$.next(
Expand All @@ -178,7 +179,7 @@ describe('EphemeralTaskLifecycle', () => {
expect(ephemeralTaskLifecycle.attemptToRun(task)).toMatchObject(asOk(task));

poolCapacity.mockReturnValue({
availableCapacity: 10,
availableWorkers: 10,
});

lifecycleEvent$.next(
Expand Down Expand Up @@ -215,7 +216,7 @@ describe('EphemeralTaskLifecycle', () => {
expect(ephemeralTaskLifecycle.attemptToRun(tasks[2])).toMatchObject(asOk(tasks[2]));

poolCapacity.mockReturnValue({
availableCapacity: 2,
availableWorkers: 2,
});

lifecycleEvent$.next(
Expand Down Expand Up @@ -255,9 +256,9 @@ describe('EphemeralTaskLifecycle', () => {

// pool has capacity for both
poolCapacity.mockReturnValue({
availableCapacity: 10,
availableWorkers: 10,
});
pool.getUsedCapacityByType.mockReturnValue(0);
pool.getOccupiedWorkersByType.mockReturnValue(0);

lifecycleEvent$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed }))
Expand Down Expand Up @@ -295,10 +296,10 @@ describe('EphemeralTaskLifecycle', () => {

// pool has capacity in general
poolCapacity.mockReturnValue({
availableCapacity: 2,
availableWorkers: 2,
});
// but when we ask how many it has occupied by type - wee always have one worker already occupied by that type
pool.getUsedCapacityByType.mockReturnValue(1);
pool.getOccupiedWorkersByType.mockReturnValue(1);

lifecycleEvent$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed }))
Expand All @@ -307,7 +308,7 @@ describe('EphemeralTaskLifecycle', () => {
expect(pool.run).toHaveBeenCalledTimes(0);

// now we release the worker in the pool and cause another cycle in the epheemral queue
pool.getUsedCapacityByType.mockReturnValue(0);
pool.getOccupiedWorkersByType.mockReturnValue(0);
lifecycleEvent$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed }))
);
Expand Down Expand Up @@ -355,9 +356,9 @@ describe('EphemeralTaskLifecycle', () => {

// pool has capacity for all
poolCapacity.mockReturnValue({
availableCapacity: 10,
availableWorkers: 10,
});
pool.getUsedCapacityByType.mockReturnValue(0);
pool.getOccupiedWorkersByType.mockReturnValue(0);

lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })));

Expand Down Expand Up @@ -388,19 +389,19 @@ describe('EphemeralTaskLifecycle', () => {

expect(ephemeralTaskLifecycle.queuedTasks).toBe(3);
poolCapacity.mockReturnValue({
availableCapacity: 1,
availableWorkers: 1,
});
lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })));
expect(ephemeralTaskLifecycle.queuedTasks).toBe(2);

poolCapacity.mockReturnValue({
availableCapacity: 1,
availableWorkers: 1,
});
lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })));
expect(ephemeralTaskLifecycle.queuedTasks).toBe(1);

poolCapacity.mockReturnValue({
availableCapacity: 1,
availableWorkers: 1,
});
lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })));
expect(ephemeralTaskLifecycle.queuedTasks).toBe(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,13 @@ export class EphemeralTaskLifecycle {
taskType && this.definitions.get(taskType)?.maxConcurrency
? Math.max(
Math.min(
this.pool.availableCapacity(),
this.pool.availableWorkers,
this.definitions.get(taskType)!.maxConcurrency! -
this.pool.getUsedCapacityByType(taskType)
this.pool.getOccupiedWorkersByType(taskType)
),
0
)
: this.pool.availableCapacity();
: this.pool.availableWorkers;

private emitEvent = (event: TaskLifecycleEvent) => {
this.events$.next(event);
Expand Down
7 changes: 3 additions & 4 deletions x-pack/plugins/task_manager/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ export type {

export const config: PluginConfigDescriptor<TaskManagerConfig> = {
schema: configSchema,
exposeToUsage: {
max_workers: true,
},
deprecations: ({ deprecate }) => {
return [
deprecate('ephemeral_tasks.enabled', 'a future version', {
Expand All @@ -65,10 +68,6 @@ export const config: PluginConfigDescriptor<TaskManagerConfig> = {
level: 'warning',
message: `Configuring "xpack.task_manager.ephemeral_tasks.request_capacity" is deprecated and will be removed in a future version. Remove this setting to increase task execution resiliency.`,
}),
deprecate('max_workers', 'a future version', {
level: 'warning',
message: `Configuring "xpack.task_manager.max_workers" is deprecated and will be removed in a future version. Remove this setting and use "xpack.task_manager.capacity" instead.`,
}),
(settings, fromPath, addDeprecation) => {
const taskManager = get(settings, fromPath);
if (taskManager?.index) {
Expand Down

This file was deleted.

Loading

0 comments on commit 3245909

Please sign in to comment.