diff --git a/x-pack/plugins/alerts/server/alerts_client/alerts_client.ts b/x-pack/plugins/alerts/server/alerts_client/alerts_client.ts index a455872d23702..b35b40b52f7f7 100644 --- a/x-pack/plugins/alerts/server/alerts_client/alerts_client.ts +++ b/x-pack/plugins/alerts/server/alerts_client/alerts_client.ts @@ -253,7 +253,11 @@ export class AlertsClient { if (data.enabled) { let scheduledTask; try { - scheduledTask = await this.scheduleAlert(createdAlert.id, rawAlert.alertTypeId); + scheduledTask = await this.scheduleAlert( + createdAlert.id, + rawAlert.alertTypeId, + data.schedule + ); } catch (e) { // Cleanup data, something went wrong scheduling the task try { @@ -697,7 +701,11 @@ export class AlertsClient { this.invalidateApiKey({ apiKey: updateAttributes.apiKey }); throw e; } - const scheduledTask = await this.scheduleAlert(id, attributes.alertTypeId); + const scheduledTask = await this.scheduleAlert( + id, + attributes.alertTypeId, + attributes.schedule as IntervalSchedule + ); await this.unsecuredSavedObjectsClient.update('alert', id, { scheduledTaskId: scheduledTask.id, }); @@ -933,9 +941,10 @@ export class AlertsClient { ]); } - private async scheduleAlert(id: string, alertTypeId: string) { + private async scheduleAlert(id: string, alertTypeId: string, schedule: IntervalSchedule) { return await this.taskManager.schedule({ taskType: `alerting:${alertTypeId}`, + schedule, params: { alertId: id, spaceId: this.spaceId, diff --git a/x-pack/plugins/alerts/server/alerts_client/tests/create.test.ts b/x-pack/plugins/alerts/server/alerts_client/tests/create.test.ts index bce1af203fb0e..965ea1949bf3a 100644 --- a/x-pack/plugins/alerts/server/alerts_client/tests/create.test.ts +++ b/x-pack/plugins/alerts/server/alerts_client/tests/create.test.ts @@ -353,6 +353,9 @@ describe('create()', () => { "alertId": "1", "spaceId": "default", }, + "schedule": Object { + "interval": "10s", + }, "scope": Array [ "alerting", ], diff --git a/x-pack/plugins/alerts/server/alerts_client/tests/enable.test.ts b/x-pack/plugins/alerts/server/alerts_client/tests/enable.test.ts index f098bbcad8d05..215493c71aec7 100644 --- a/x-pack/plugins/alerts/server/alerts_client/tests/enable.test.ts +++ b/x-pack/plugins/alerts/server/alerts_client/tests/enable.test.ts @@ -201,6 +201,9 @@ describe('enable()', () => { alertId: '1', spaceId: 'default', }, + schedule: { + interval: '10s', + }, state: { alertInstances: {}, alertTypeState: {}, diff --git a/x-pack/plugins/alerts/server/task_runner/get_next_run_at.test.ts b/x-pack/plugins/alerts/server/task_runner/get_next_run_at.test.ts deleted file mode 100644 index f5914fdf01a16..0000000000000 --- a/x-pack/plugins/alerts/server/task_runner/get_next_run_at.test.ts +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { getNextRunAt } from './get_next_run_at'; - -const mockedNow = new Date('2019-06-03T18:55:25.982Z'); -// eslint-disable-next-line @typescript-eslint/no-explicit-any -(global as any).Date = class Date extends global.Date { - static now() { - return mockedNow.getTime(); - } -}; - -test('Adds interface to given date when result is > Date.now()', () => { - const currentRunAt = new Date('2019-06-03T18:55:23.982Z'); - const result = getNextRunAt(currentRunAt, { interval: '10s' }); - expect(result).toEqual(new Date('2019-06-03T18:55:33.982Z')); -}); - -test('Uses Date.now() when the result would of been a date in the past', () => { - const currentRunAt = new Date('2019-06-03T18:55:13.982Z'); - const result = getNextRunAt(currentRunAt, { interval: '10s' }); - expect(result).toEqual(mockedNow); -}); diff --git a/x-pack/plugins/alerts/server/task_runner/get_next_run_at.ts b/x-pack/plugins/alerts/server/task_runner/get_next_run_at.ts deleted file mode 100644 index cea4584e1f713..0000000000000 --- a/x-pack/plugins/alerts/server/task_runner/get_next_run_at.ts +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { parseDuration } from '../lib'; -import { IntervalSchedule } from '../types'; - -export function getNextRunAt(currentRunAt: Date, schedule: IntervalSchedule) { - let nextRunAt = currentRunAt.getTime() + parseDuration(schedule.interval); - if (nextRunAt < Date.now()) { - // To prevent returning dates in the past, we'll return now instead - nextRunAt = Date.now(); - } - return new Date(nextRunAt); -} diff --git a/x-pack/plugins/alerts/server/task_runner/task_runner.test.ts b/x-pack/plugins/alerts/server/task_runner/task_runner.test.ts index 8e345d6ff66a8..86e78dea66a09 100644 --- a/x-pack/plugins/alerts/server/task_runner/task_runner.test.ts +++ b/x-pack/plugins/alerts/server/task_runner/task_runner.test.ts @@ -7,7 +7,11 @@ import sinon from 'sinon'; import { schema } from '@kbn/config-schema'; import { AlertExecutorOptions } from '../types'; -import { ConcreteTaskInstance, TaskStatus } from '../../../task_manager/server'; +import { + ConcreteTaskInstance, + isUnrecoverableError, + TaskStatus, +} from '../../../task_manager/server'; import { TaskRunnerContext } from './task_runner_factory'; import { TaskRunner } from './task_runner'; import { encryptedSavedObjectsMock } from '../../../encrypted_saved_objects/server/mocks'; @@ -22,6 +26,7 @@ import { eventLoggerMock } from '../../../event_log/server/event_logger.mock'; import { IEventLogger } from '../../../event_log/server'; import { SavedObjectsErrorHelpers } from '../../../../../src/core/server'; import { Alert } from '../../common'; +import { omit } from 'lodash'; const alertType = { id: 'test', name: 'My test alert', @@ -43,6 +48,7 @@ describe('Task Runner', () => { status: TaskStatus.Running, version: '123', runAt: new Date(), + schedule: { interval: '10s' }, scheduledAt: new Date(), startedAt: new Date(), retryAt: new Date(Date.now() + 5 * 60 * 1000), @@ -134,8 +140,8 @@ describe('Task Runner', () => { }, taskRunnerFactoryInitializerParams ); - alertsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); - encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({ + alertsClient.get.mockResolvedValue(mockedAlertTypeSavedObject); + encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue({ id: '1', type: 'alert', attributes: { @@ -146,7 +152,9 @@ describe('Task Runner', () => { const runnerResult = await taskRunner.run(); expect(runnerResult).toMatchInlineSnapshot(` Object { - "runAt": 1970-01-01T00:00:10.000Z, + "schedule": Object { + "interval": "10s", + }, "state": Object { "alertInstances": Object {}, "alertTypeState": undefined, @@ -208,8 +216,8 @@ describe('Task Runner', () => { mockedTaskInstance, taskRunnerFactoryInitializerParams ); - alertsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); - encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({ + alertsClient.get.mockResolvedValue(mockedAlertTypeSavedObject); + encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue({ id: '1', type: 'alert', attributes: { @@ -336,7 +344,7 @@ describe('Task Runner', () => { mockedTaskInstance, taskRunnerFactoryInitializerParams ); - alertsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); + alertsClient.get.mockResolvedValue(mockedAlertTypeSavedObject); encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({ id: '1', type: 'alert', @@ -501,8 +509,8 @@ describe('Task Runner', () => { }, taskRunnerFactoryInitializerParams ); - alertsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); - encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({ + alertsClient.get.mockResolvedValue(mockedAlertTypeSavedObject); + encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue({ id: '1', type: 'alert', attributes: { @@ -609,7 +617,7 @@ describe('Task Runner', () => { mockedTaskInstance, taskRunnerFactoryInitializerParams ); - alertsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); + alertsClient.get.mockResolvedValue(mockedAlertTypeSavedObject); encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({ id: '1', type: 'alert', @@ -620,7 +628,9 @@ describe('Task Runner', () => { }); expect(await taskRunner.run()).toMatchInlineSnapshot(` Object { - "runAt": 1970-01-01T00:00:10.000Z, + "schedule": Object { + "interval": "10s", + }, "state": Object {}, } `); @@ -635,7 +645,7 @@ describe('Task Runner', () => { mockedTaskInstance, taskRunnerFactoryInitializerParams ); - alertsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); + alertsClient.get.mockResolvedValue(mockedAlertTypeSavedObject); encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({ id: '1', type: 'alert', @@ -671,7 +681,7 @@ describe('Task Runner', () => { mockedTaskInstance, taskRunnerFactoryInitializerParams ); - alertsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); + alertsClient.get.mockResolvedValue(mockedAlertTypeSavedObject); encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({ id: '1', type: 'alert', @@ -697,6 +707,41 @@ describe('Task Runner', () => { }); }); + test('rescheduled the Alert if the schedule has update during a task run', async () => { + const taskRunner = new TaskRunner( + alertType, + mockedTaskInstance, + taskRunnerFactoryInitializerParams + ); + + alertsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); + alertsClient.get.mockResolvedValueOnce({ + ...mockedAlertTypeSavedObject, + schedule: { interval: '30s' }, + }); + encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({ + id: '1', + type: 'alert', + attributes: { + apiKey: Buffer.from('123:abc').toString('base64'), + }, + references: [], + }); + + expect(await taskRunner.run()).toMatchInlineSnapshot(` + Object { + "schedule": Object { + "interval": "30s", + }, + "state": Object { + "alertInstances": Object {}, + "alertTypeState": undefined, + "previousStartedAt": 1970-01-01T00:00:00.000Z, + }, + } + `); + }); + test('recovers gracefully when the AlertType executor throws an exception', async () => { alertType.executor.mockImplementation( ({ services: executorServices }: AlertExecutorOptions) => { @@ -710,7 +755,7 @@ describe('Task Runner', () => { taskRunnerFactoryInitializerParams ); - alertsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); + alertsClient.get.mockResolvedValue(mockedAlertTypeSavedObject); encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({ id: '1', type: 'alert', @@ -724,7 +769,9 @@ describe('Task Runner', () => { expect(runnerResult).toMatchInlineSnapshot(` Object { - "runAt": 1970-01-01T00:00:10.000Z, + "schedule": Object { + "interval": "10s", + }, "state": Object {}, } `); @@ -770,13 +817,15 @@ describe('Task Runner', () => { taskRunnerFactoryInitializerParams ); - alertsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); + alertsClient.get.mockResolvedValue(mockedAlertTypeSavedObject); const runnerResult = await taskRunner.run(); expect(runnerResult).toMatchInlineSnapshot(` Object { - "runAt": 1970-01-01T00:05:00.000Z, + "schedule": Object { + "interval": "10s", + }, "state": Object {}, } `); @@ -793,8 +842,8 @@ describe('Task Runner', () => { taskRunnerFactoryInitializerParams ); - alertsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); - encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({ + alertsClient.get.mockResolvedValue(mockedAlertTypeSavedObject); + encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue({ id: '1', type: 'alert', attributes: { @@ -807,7 +856,9 @@ describe('Task Runner', () => { expect(runnerResult).toMatchInlineSnapshot(` Object { - "runAt": 1970-01-01T00:05:00.000Z, + "schedule": Object { + "interval": "10s", + }, "state": Object {}, } `); @@ -824,6 +875,42 @@ describe('Task Runner', () => { taskRunnerFactoryInitializerParams ); + encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue({ + id: '1', + type: 'alert', + attributes: { + apiKey: Buffer.from('123:abc').toString('base64'), + }, + references: [], + }); + + const runnerResult = await taskRunner.run(); + + expect(runnerResult).toMatchInlineSnapshot(` + Object { + "schedule": Object { + "interval": "10s", + }, + "state": Object {}, + } + `); + }); + + test('recovers gracefully when the Runner of a legacy Alert task which has no schedule throws an exception when fetching attributes', async () => { + alertsClient.get.mockImplementation(() => { + throw new Error('OMG'); + }); + + // legacy alerts used to run by returning a new `runAt` instead of using a schedule + // ensure we return a fallback schedule when this happens, otherwise the task might be deleted + const legacyTaskInstance = omit(mockedTaskInstance, 'schedule'); + + const taskRunner = new TaskRunner( + alertType, + legacyTaskInstance, + taskRunnerFactoryInitializerParams + ); + encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({ id: '1', type: 'alert', @@ -837,7 +924,9 @@ describe('Task Runner', () => { expect(runnerResult).toMatchInlineSnapshot(` Object { - "runAt": 1970-01-01T00:05:00.000Z, + "schedule": Object { + "interval": "5m", + }, "state": Object {}, } `); @@ -863,8 +952,8 @@ describe('Task Runner', () => { taskRunnerFactoryInitializerParams ); - alertsClient.get.mockResolvedValueOnce(mockedAlertTypeSavedObject); - encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({ + alertsClient.get.mockResolvedValue(mockedAlertTypeSavedObject); + encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue({ id: '1', type: 'alert', attributes: { @@ -882,7 +971,7 @@ describe('Task Runner', () => { test('avoids rescheduling a failed Alert Task Runner when it throws due to failing to fetch the alert', async () => { alertsClient.get.mockImplementation(() => { - throw SavedObjectsErrorHelpers.createGenericNotFoundError('task', '1'); + throw SavedObjectsErrorHelpers.createGenericNotFoundError('alert', '1'); }); const taskRunner = new TaskRunner( @@ -891,7 +980,7 @@ describe('Task Runner', () => { taskRunnerFactoryInitializerParams ); - encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({ + encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue({ id: '1', type: 'alert', attributes: { @@ -900,13 +989,9 @@ describe('Task Runner', () => { references: [], }); - const runnerResult = await taskRunner.run(); - - expect(runnerResult).toMatchInlineSnapshot(` - Object { - "runAt": undefined, - "state": Object {}, - } - `); + return taskRunner.run().catch((ex) => { + expect(ex).toMatchInlineSnapshot(`[Error: Saved object [alert/1] not found]`); + expect(isUnrecoverableError(ex)).toBeTruthy(); + }); }); }); diff --git a/x-pack/plugins/alerts/server/task_runner/task_runner.ts b/x-pack/plugins/alerts/server/task_runner/task_runner.ts index 954c5675df89c..2611ba766173b 100644 --- a/x-pack/plugins/alerts/server/task_runner/task_runner.ts +++ b/x-pack/plugins/alerts/server/task_runner/task_runner.ts @@ -7,10 +7,9 @@ import type { PublicMethodsOf } from '@kbn/utility-types'; import { pickBy, mapValues, without } from 'lodash'; import { Logger, KibanaRequest } from '../../../../../src/core/server'; import { TaskRunnerContext } from './task_runner_factory'; -import { ConcreteTaskInstance } from '../../../task_manager/server'; +import { ConcreteTaskInstance, throwUnrecoverableError } from '../../../task_manager/server'; import { createExecutionHandler } from './create_execution_handler'; import { AlertInstance, createAlertInstanceFactory } from '../alert_instance'; -import { getNextRunAt } from './get_next_run_at'; import { validateAlertTypeParams, executionStatusFromState, @@ -38,11 +37,11 @@ import { isAlertSavedObjectNotFoundError } from '../lib/is_alert_not_found_error import { AlertsClient } from '../alerts_client'; import { partiallyUpdateAlert } from '../saved_objects'; -const FALLBACK_RETRY_INTERVAL: IntervalSchedule = { interval: '5m' }; +const FALLBACK_RETRY_INTERVAL = '5m'; interface AlertTaskRunResult { state: AlertTaskState; - runAt: Date | undefined; + schedule: IntervalSchedule | undefined; } interface AlertTaskInstance extends ConcreteTaskInstance { @@ -307,14 +306,10 @@ export class TaskRunner { state: await promiseResult( this.validateAndExecuteAlert(services, apiKey, alert) ), - runAt: asOk( - getNextRunAt( - new Date(this.taskInstance.startedAt!), - // we do not currently have a good way of returning the type - // from SavedObjectsClient, and as we currenrtly require a schedule - // and we only support `interval`, we can cast this safely - alert.schedule - ) + schedule: asOk( + // fetch the alert again to ensure we return the correct schedule as it may have + // cahnged during the task execution + (await alertsClient.get({ id: alertId })).schedule ), }; } @@ -324,9 +319,10 @@ export class TaskRunner { params: { alertId, spaceId }, startedAt, state: originalState, + schedule: taskSchedule, } = this.taskInstance; - const { state, runAt } = await errorAsAlertTaskRunResult(this.loadAlertAttributesAndRun()); + const { state, schedule } = await errorAsAlertTaskRunResult(this.loadAlertAttributesAndRun()); const namespace = spaceId === 'default' ? undefined : spaceId; const executionStatus: AlertExecutionStatus = map( @@ -373,16 +369,11 @@ export class TaskRunner { return originalState; } ), - runAt: resolveErr(runAt, (err) => { - return isAlertSavedObjectNotFoundError(err, alertId) - ? undefined - : getNextRunAt( - new Date(), - // if we fail at this point we wish to recover but don't have access to the Alert's - // attributes, so we'll use a default interval to prevent the underlying task from - // falling into a failed state - FALLBACK_RETRY_INTERVAL - ); + schedule: resolveErr(schedule, (error) => { + if (isAlertSavedObjectNotFoundError(error, alertId)) { + throwUnrecoverableError(error); + } + return { interval: taskSchedule?.interval ?? FALLBACK_RETRY_INTERVAL }; }), }; } @@ -460,7 +451,7 @@ async function errorAsAlertTaskRunResult( } catch (e) { return { state: asErr(e), - runAt: asErr(e), + schedule: asErr(e), }; } } diff --git a/x-pack/plugins/task_manager/README.md b/x-pack/plugins/task_manager/README.md index b25d3cc49f980..744d657bcd790 100644 --- a/x-pack/plugins/task_manager/README.md +++ b/x-pack/plugins/task_manager/README.md @@ -142,18 +142,34 @@ When Kibana attempts to claim and run a task instance, it looks its definition u ## Task result The task runner's `run` method is expected to return a promise that resolves to undefined or to an object that looks like the following: + +|Property|Description|Type| +|---|---|---| +|runAt| Optional. If specified, this is used as the tasks' next `runAt`, overriding the default system scheduler. | Date ISO String | +|schedule| Optional. If specified, this is used as the tasks' new recurring schedule, overriding the default system scheduler and any existing schedule. | { interval: string } | +|error| Optional, an error object, logged out as a warning. The pressence of this property indicates that the task did not succeed.| Error | +|state| Optional, this will be passed into the next run of the task, if this is a recurring task. |Record| + +### Examples + ```js { // Optional, if specified, this is used as the tasks' nextRun, overriding // the default system scheduler. runAt: "2020-07-24T17:34:35.272Z", - // Optional, an error object, logged out as a warning. The pressence of this - // property indicates that the task did not succeed. error: { message: 'Hrumph!' }, - // Optional, this will be passed into the next run of the task, if - // this is a recurring task. + state: { + anything: 'goes here', + }, +} +``` + +```js +{ + schedule: { interval: '30s' }, + state: { anything: 'goes here', }, @@ -162,6 +178,39 @@ The task runner's `run` method is expected to return a promise that resolves to Other return values will result in a warning, but the system should continue to work. +### Task retries when the Task Runner fails +If a task runner throws an error, task manager will try to rerun the task shortly after (up to the task definition's `maxAttempts`). +Normal tasks will wait a default amount of 5m before trying again and every subsequent attempt will add an additonal 5m cool off period to avoid a stampeding herd of failed tasks from storming Elasticsearch. + +Recurring tasks will also get retried, but instead of using the 5m interval for the retry, they will be retried on their next scheduled run. + +### Force failing a task +If you wish to purposfully fail a task, you can throw an error of any kind and the retry logic will apply. +If, on the other hand, you wish not only to fail the task, but you'd also like to indicate the Task Manager that it shouldn't retry the task, you can throw an Unrecoverable Error, using the `throwUnrecoverableError` helper function. + +For example: +```js + taskManager.registerTaskDefinitions({ + myTask: { + /// ... + createTaskRunner(context) { + return { + async run() { + const result = ... // Do some work + + if(!result) { + // No point retrying? + throwUnrecoverableError(new Error("No point retrying, this is unrecoverable")); + } + + return result; + } + }; + }, + }, + }); +``` + ## Task instances The task_manager module will store scheduled task instances in an index. This allows for recovery of failed tasks, coordination across Kibana clusters, persistence across Kibana reboots, etc. @@ -178,8 +227,8 @@ The data stored for a task instance looks something like this: // this. runAt: "2020-07-24T17:34:35.272Z", - // Indicates that this is a recurring task. We currently only support - // interval syntax with either minutes such as `5m` or seconds `10s`. + // Indicates that this is a recurring task. We support interval syntax + // with days such as '1d', hours '3h', minutes such as `5m`, seconds `10s`. schedule: { interval: '5m' }, // How many times this task has been unsuccesfully attempted, diff --git a/x-pack/plugins/task_manager/server/buffered_task_store.ts b/x-pack/plugins/task_manager/server/buffered_task_store.ts index e1e5f802204c1..ce646b30ed453 100644 --- a/x-pack/plugins/task_manager/server/buffered_task_store.ts +++ b/x-pack/plugins/task_manager/server/buffered_task_store.ts @@ -6,7 +6,7 @@ import { TaskStore } from './task_store'; import { ConcreteTaskInstance } from './task'; -import { Updatable } from './task_runner'; +import { Updatable } from './task_running'; import { createBuffer, Operation, BufferOptions } from './lib/bulk_operation_buffer'; import { unwrapPromise } from './lib/result_type'; diff --git a/x-pack/plugins/task_manager/server/index.ts b/x-pack/plugins/task_manager/server/index.ts index 7eba218e16fed..cf70e68437cc6 100644 --- a/x-pack/plugins/task_manager/server/index.ts +++ b/x-pack/plugins/task_manager/server/index.ts @@ -18,6 +18,8 @@ export { RunContext, } from './task'; +export { isUnrecoverableError, throwUnrecoverableError } from './task_running'; + export { TaskManagerPlugin as TaskManager, TaskManagerSetupContract, diff --git a/x-pack/plugins/task_manager/server/lib/intervals.test.ts b/x-pack/plugins/task_manager/server/lib/intervals.test.ts index e79694915f926..147e41e1a9d60 100644 --- a/x-pack/plugins/task_manager/server/lib/intervals.test.ts +++ b/x-pack/plugins/task_manager/server/lib/intervals.test.ts @@ -58,6 +58,8 @@ describe('taskIntervals', () => { expect(parseIntervalAsSecond('20m')).toEqual(20 * 60); expect(parseIntervalAsSecond('61m')).toEqual(61 * 60); expect(parseIntervalAsSecond('90m')).toEqual(90 * 60); + expect(parseIntervalAsSecond('2h')).toEqual(2 * 60 * 60); + expect(parseIntervalAsSecond('9d')).toEqual(9 * 60 * 60 * 24); }); }); @@ -94,6 +96,8 @@ describe('taskIntervals', () => { expect(parseIntervalAsMillisecond('20m')).toEqual(20 * 60 * 1000); expect(parseIntervalAsMillisecond('61m')).toEqual(61 * 60 * 1000); expect(parseIntervalAsMillisecond('90m')).toEqual(90 * 60 * 1000); + expect(parseIntervalAsMillisecond('1h')).toEqual(60 * 60 * 1000); + expect(parseIntervalAsMillisecond('3d')).toEqual(3 * 24 * 60 * 60 * 1000); }); }); diff --git a/x-pack/plugins/task_manager/server/lib/intervals.ts b/x-pack/plugins/task_manager/server/lib/intervals.ts index a28dfa62a501f..94537277123ee 100644 --- a/x-pack/plugins/task_manager/server/lib/intervals.ts +++ b/x-pack/plugins/task_manager/server/lib/intervals.ts @@ -9,11 +9,15 @@ import { memoize } from 'lodash'; export enum IntervalCadence { Minute = 'm', Second = 's', + Hour = 'h', + Day = 'd', } const VALID_CADENCE = new Set(Object.values(IntervalCadence)); const CADENCE_IN_MS: Record = { [IntervalCadence.Second]: 1000, [IntervalCadence.Minute]: 60 * 1000, + [IntervalCadence.Hour]: 60 * 60 * 1000, + [IntervalCadence.Day]: 24 * 60 * 60 * 1000, }; function isCadence(cadence: IntervalCadence | string): cadence is IntervalCadence { diff --git a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts index a931f0ff7c304..6ab866b6167ac 100644 --- a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.test.ts @@ -14,7 +14,7 @@ import { ConcreteTaskInstance, TaskStatus } from '../task'; import { asTaskRunEvent, asTaskPollingCycleEvent, TaskTiming } from '../task_events'; import { asOk } from '../lib/result_type'; import { TaskLifecycleEvent } from '../polling_lifecycle'; -import { TaskRunResult } from '../task_runner'; +import { TaskRunResult } from '../task_running'; import { createTaskRunAggregator, summarizeTaskRunStat, diff --git a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts index 6dd533177a86e..5addad7086f69 100644 --- a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts @@ -20,7 +20,7 @@ import { } from '../task_events'; import { isOk, Ok, unwrap } from '../lib/result_type'; import { ConcreteTaskInstance } from '../task'; -import { TaskRunResult } from '../task_runner'; +import { TaskRunResult } from '../task_running'; import { FillPoolResult } from '../lib/fill_pool'; import { AveragedStat, diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.ts index ccba750401f28..d5a06c76aaacd 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.ts @@ -36,7 +36,7 @@ import { createObservableMonitor, } from './polling'; import { TaskPool } from './task_pool'; -import { TaskManagerRunner, TaskRunner } from './task_runner'; +import { TaskManagerRunner, TaskRunner } from './task_running'; import { TaskStore, OwnershipClaimingOpts, ClaimOwnershipResult } from './task_store'; import { identifyEsError } from './lib/identify_es_error'; import { BufferedTaskStore } from './buffered_task_store'; diff --git a/x-pack/plugins/task_manager/server/task.ts b/x-pack/plugins/task_manager/server/task.ts index 6551bd47ef9e7..8b7870550040f 100644 --- a/x-pack/plugins/task_manager/server/task.ts +++ b/x-pack/plugins/task_manager/server/task.ts @@ -37,35 +37,47 @@ export interface RunContext { /** * The return value of a task's run function should be a promise of RunResult. */ -export interface RunResult { - /** - * Specifies the next run date / time for this task. If unspecified, this is - * treated as a single-run task, and will not be rescheduled after - * completion. - */ - runAt?: Date; - - /** - * If specified, indicates that the task failed to accomplish its work. This is - * logged out as a warning, and the task will be reattempted after a delay. - */ - error?: object; +export type SuccessfulRunResult = { /** * The state which will be passed to the next run of this task (if this is a * recurring task). See the RunContext type definition for more details. */ state: Record; -} +} & ( + | // ensure a SuccessfulRunResult can either specify a new `runAt` or a new `schedule`, but not both + { + /** + * Specifies the next run date / time for this task. If unspecified, this is + * treated as a single-run task, and will not be rescheduled after + * completion. + */ + runAt?: Date; + schedule?: never; + } + | { + /** + * Specifies a new schedule for this tasks. If unspecified, the task will + * continue to use which ever schedule it already has, and if no there is + * no previous schedule then it will be treated as a single-run task. + */ + schedule?: IntervalSchedule; + runAt?: never; + } +); + +export type FailedRunResult = SuccessfulRunResult & { + /** + * If specified, indicates that the task failed to accomplish its work. This is + * logged out as a warning, and the task will be reattempted after a delay. + */ + error: Error; +}; -export interface SuccessfulRunResult { - runAt?: Date; - state?: Record; -} +export type RunResult = FailedRunResult | SuccessfulRunResult; -export interface FailedRunResult extends SuccessfulRunResult { - error: Error; -} +export const isFailedRunResult = (result: unknown): result is FailedRunResult => + !!((result as FailedRunResult)?.error ?? false); export interface FailedTaskResult { status: TaskStatus.Failed; @@ -73,6 +85,7 @@ export interface FailedTaskResult { export const validateRunResult = Joi.object({ runAt: Joi.date().optional(), + schedule: Joi.object().optional(), error: Joi.object().optional(), state: Joi.object().optional(), }).optional(); diff --git a/x-pack/plugins/task_manager/server/task_events.ts b/x-pack/plugins/task_manager/server/task_events.ts index b011d435e28dc..0b2ae3023deb6 100644 --- a/x-pack/plugins/task_manager/server/task_events.ts +++ b/x-pack/plugins/task_manager/server/task_events.ts @@ -11,7 +11,7 @@ import { ConcreteTaskInstance } from './task'; import { Result, Err } from './lib/result_type'; import { FillPoolResult } from './lib/fill_pool'; import { PollingError } from './polling'; -import { TaskRunResult } from './task_runner'; +import { TaskRunResult } from './task_running'; export enum TaskEventType { TASK_CLAIM = 'TASK_CLAIM', diff --git a/x-pack/plugins/task_manager/server/task_pool.ts b/x-pack/plugins/task_manager/server/task_pool.ts index 9f7948ecad34a..acd2ea59ad30b 100644 --- a/x-pack/plugins/task_manager/server/task_pool.ts +++ b/x-pack/plugins/task_manager/server/task_pool.ts @@ -13,7 +13,7 @@ import moment, { Duration } from 'moment'; import { performance } from 'perf_hooks'; import { padStart } from 'lodash'; import { Logger } from '../../../../src/core/server'; -import { TaskRunner } from './task_runner'; +import { TaskRunner } from './task_running'; import { isTaskSavedObjectNotFoundError } from './lib/is_task_not_found_error'; interface Opts { diff --git a/x-pack/plugins/task_manager/server/task_running/errors.test.ts b/x-pack/plugins/task_manager/server/task_running/errors.test.ts new file mode 100644 index 0000000000000..5d1a15d8bb207 --- /dev/null +++ b/x-pack/plugins/task_manager/server/task_running/errors.test.ts @@ -0,0 +1,29 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { isUnrecoverableError, throwUnrecoverableError } from './errors'; + +describe('Error Types', () => { + describe('Unrecoverable error', () => { + it('wraps and throws normal errors', () => { + expect(() => throwUnrecoverableError(new Error('OMG'))).toThrowError('OMG'); + }); + + it('idnentifies wrapped normal errors', async () => { + let result; + try { + throwUnrecoverableError(new Error('OMG')); + } catch (ex) { + result = ex; + } + expect(isUnrecoverableError(result)).toBeTruthy(); + }); + + it('idnentifies normal errors', () => { + expect(isUnrecoverableError(new Error('OMG'))).toBeFalsy(); + }); + }); +}); diff --git a/x-pack/plugins/task_manager/server/task_running/errors.ts b/x-pack/plugins/task_manager/server/task_running/errors.ts new file mode 100644 index 0000000000000..f59fa1dc9cc45 --- /dev/null +++ b/x-pack/plugins/task_manager/server/task_running/errors.ts @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +// Unrecoverable +const CODE_UNRECOVERABLE = 'TaskManager/unrecoverable'; + +const code = Symbol('TaskManagerErrorCode'); + +export interface DecoratedError extends Error { + [code]?: string; +} + +function isTaskManagerError(error: unknown): error is DecoratedError { + return Boolean(error && (error as DecoratedError)[code]); +} + +export function isUnrecoverableError(error: Error | DecoratedError) { + return isTaskManagerError(error) && error[code] === CODE_UNRECOVERABLE; +} + +export function throwUnrecoverableError(error: Error) { + (error as DecoratedError)[code] = CODE_UNRECOVERABLE; + throw error; +} diff --git a/x-pack/plugins/task_manager/server/task_running/index.ts b/x-pack/plugins/task_manager/server/task_running/index.ts new file mode 100644 index 0000000000000..4d33b0e713538 --- /dev/null +++ b/x-pack/plugins/task_manager/server/task_running/index.ts @@ -0,0 +1,7 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +export * from './task_runner'; +export * from './errors'; diff --git a/x-pack/plugins/task_manager/server/task_runner.test.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts similarity index 92% rename from x-pack/plugins/task_manager/server/task_runner.test.ts rename to x-pack/plugins/task_manager/server/task_running/task_runner.test.ts index 676eeedf08f5a..f5e2d3d96bc42 100644 --- a/x-pack/plugins/task_manager/server/task_runner.test.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts @@ -6,15 +6,16 @@ import _ from 'lodash'; import sinon from 'sinon'; -import { secondsFromNow } from './lib/intervals'; -import { asOk, asErr } from './lib/result_type'; -import { TaskManagerRunner, TaskRunResult } from './task_runner'; -import { TaskEvent, asTaskRunEvent, asTaskMarkRunningEvent, TaskRun } from './task_events'; -import { ConcreteTaskInstance, TaskStatus, TaskDefinition, RunResult } from './task'; -import { SavedObjectsErrorHelpers } from '../../../../src/core/server'; +import { secondsFromNow } from '../lib/intervals'; +import { asOk, asErr } from '../lib/result_type'; +import { TaskManagerRunner, TaskRunResult } from '../task_running'; +import { TaskEvent, asTaskRunEvent, asTaskMarkRunningEvent, TaskRun } from '../task_events'; +import { ConcreteTaskInstance, TaskStatus, TaskDefinition, SuccessfulRunResult } from '../task'; +import { SavedObjectsErrorHelpers } from '../../../../../src/core/server'; import moment from 'moment'; -import { TaskTypeDictionary } from './task_type_dictionary'; -import { mockLogger } from './test_utils'; +import { TaskTypeDictionary } from '../task_type_dictionary'; +import { mockLogger } from '../test_utils'; +import { throwUnrecoverableError } from './errors'; const minutesFromNow = (mins: number): Date => secondsFromNow(mins * 60); @@ -194,6 +195,73 @@ describe('TaskManagerRunner', () => { sinon.assert.calledWithMatch(store.update, { runAt }); }); + test('reschedules tasks that return a schedule', async () => { + const runAt = minutesFromNow(1); + const schedule = { + interval: '1m', + }; + const { runner, store } = testOpts({ + instance: { + status: TaskStatus.Running, + startedAt: new Date(), + }, + definitions: { + bar: { + title: 'Bar!', + createTaskRunner: () => ({ + async run() { + return { schedule, state: {} }; + }, + }), + }, + }, + }); + + await runner.run(); + + sinon.assert.calledOnce(store.update); + sinon.assert.calledWithMatch(store.update, { runAt }); + }); + + test(`doesn't reschedule recurring tasks that throw an unrecoverable error`, async () => { + const id = _.random(1, 20).toString(); + const error = new Error('Dangit!'); + const onTaskEvent = jest.fn(); + const { runner, store, instance: originalInstance } = testOpts({ + onTaskEvent, + instance: { id, status: TaskStatus.Running, startedAt: new Date() }, + definitions: { + bar: { + title: 'Bar!', + createTaskRunner: () => ({ + async run() { + throwUnrecoverableError(error); + }, + }), + }, + }, + }); + + await runner.run(); + + const instance = store.update.args[0][0]; + expect(instance.status).toBe('failed'); + + expect(onTaskEvent).toHaveBeenCalledWith( + withAnyTiming( + asTaskRunEvent( + id, + asErr({ + error, + task: originalInstance, + result: TaskRunResult.Failed, + }) + ) + ) + ); + expect(onTaskEvent).toHaveBeenCalledTimes(1); + }); + test('tasks that return runAt override the schedule', async () => { const runAt = minutesFromNow(_.random(5)); const { runner, store } = testOpts({ @@ -1039,7 +1107,7 @@ describe('TaskManagerRunner', () => { bar: { title: 'Bar!', createTaskRunner: () => ({ - run: async () => result as RunResult, + run: async () => result as SuccessfulRunResult, }), }, }, diff --git a/x-pack/plugins/task_manager/server/task_runner.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.ts similarity index 87% rename from x-pack/plugins/task_manager/server/task_runner.ts rename to x-pack/plugins/task_manager/server/task_running/task_runner.ts index 45e4cb3000570..fb7a28c8f402c 100644 --- a/x-pack/plugins/task_manager/server/task_runner.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.ts @@ -16,8 +16,8 @@ import { performance } from 'perf_hooks'; import Joi from 'joi'; import { identity, defaults, flow } from 'lodash'; -import { Middleware } from './lib/middleware'; -import { asOk, asErr, mapErr, eitherAsync, unwrap, isOk, mapOk, Result } from './lib/result_type'; +import { Middleware } from '../lib/middleware'; +import { asOk, asErr, mapErr, eitherAsync, unwrap, isOk, mapOk, Result } from '../lib/result_type'; import { TaskRun, TaskMarkRunning, @@ -25,24 +25,25 @@ import { asTaskMarkRunningEvent, startTaskTimer, TaskTiming, -} from './task_events'; -import { intervalFromDate, intervalFromNow } from './lib/intervals'; +} from '../task_events'; +import { intervalFromDate, intervalFromNow } from '../lib/intervals'; import { CancelFunction, CancellableTask, ConcreteTaskInstance, - RunResult, + isFailedRunResult, SuccessfulRunResult, FailedRunResult, FailedTaskResult, TaskDefinition, validateRunResult, TaskStatus, -} from './task'; -import { TaskTypeDictionary } from './task_type_dictionary'; +} from '../task'; +import { TaskTypeDictionary } from '../task_type_dictionary'; +import { isUnrecoverableError } from './errors'; const defaultBackoffPerFailure = 5 * 60 * 1000; -const EMPTY_RUN_RESULT: SuccessfulRunResult = {}; +const EMPTY_RUN_RESULT: SuccessfulRunResult = { state: {} }; export interface TaskRunner { isExpired: boolean; @@ -266,7 +267,7 @@ export class TaskManagerRunner implements TaskRunner { // and lets us set a proper "retryAt" value each time. error: new Error('Task timeout'), addDuration: this.definition.timeout, - }), + }) ?? null, }); const timeUntilClaimExpiresAfterUpdate = howManyMsUntilOwnershipClaimExpires( @@ -310,7 +311,9 @@ export class TaskManagerRunner implements TaskRunner { this.logger.debug(`The task ${this} is not cancellable.`); } - private validateResult(result?: RunResult | void): Result { + private validateResult( + result?: SuccessfulRunResult | FailedRunResult | void + ): Result { const { error } = Joi.validate(result, validateRunResult); if (error) { @@ -324,7 +327,7 @@ export class TaskManagerRunner implements TaskRunner { return asOk(EMPTY_RUN_RESULT); } - return result.error ? asErr({ ...result, error: result.error as Error }) : asOk(result); + return isFailedRunResult(result) ? asErr({ ...result, error: result.error }) : asOk(result); } private shouldTryToScheduleRetry(): boolean { @@ -339,22 +342,31 @@ export class TaskManagerRunner implements TaskRunner { private rescheduleFailedRun = ( failureResult: FailedRunResult ): Result => { - if (this.shouldTryToScheduleRetry()) { - const { runAt, state, error } = failureResult; + const { state, error } = failureResult; + if (this.shouldTryToScheduleRetry() && !isUnrecoverableError(error)) { // if we're retrying, keep the number of attempts const { schedule, attempts } = this.instance; - if (runAt || schedule) { - return asOk({ state, attempts, runAt }); - } else { - // when result.error is truthy, then we're retrying because it failed - const newRunAt = this.getRetryDelay({ + + const reschedule = failureResult.runAt + ? { runAt: failureResult.runAt } + : failureResult.schedule + ? { schedule: failureResult.schedule } + : schedule + ? { schedule } + : // when result.error is truthy, then we're retrying because it failed + { + runAt: this.getRetryDelay({ + attempts, + error, + }), + }; + + if (reschedule.runAt || reschedule.schedule) { + return asOk({ + state, attempts, - error, + ...reschedule, }); - - if (newRunAt) { - return asOk({ state, attempts, runAt: newRunAt }); - } } } // scheduling a retry isn't possible,mark task as failed @@ -369,15 +381,19 @@ export class TaskManagerRunner implements TaskRunner { // if running the task has failed ,try to correct by scheduling a retry in the near future mapErr(this.rescheduleFailedRun), // if retrying is possible (new runAt) or this is an recurring task - reschedule - mapOk(({ runAt, state, attempts = 0 }: Partial) => { - const { startedAt, schedule: { interval = undefined } = {} } = this.instance; - return asOk({ - runAt: runAt || intervalFromDate(startedAt!, interval)!, - state, - attempts, - status: TaskStatus.Idle, - }); - }), + mapOk( + ({ runAt, schedule: reschedule, state, attempts = 0 }: Partial) => { + const { startedAt, schedule } = this.instance; + return asOk({ + runAt: + runAt || intervalFromDate(startedAt!, reschedule?.interval ?? schedule?.interval)!, + state, + schedule: reschedule ?? schedule, + attempts, + status: TaskStatus.Idle, + }); + } + ), unwrap )(result); @@ -422,13 +438,13 @@ export class TaskManagerRunner implements TaskRunner { const task = this.instance; await eitherAsync( result, - async ({ runAt }: SuccessfulRunResult) => { + async ({ runAt, schedule }: SuccessfulRunResult) => { this.onTaskEvent( asTaskRunEvent( this.id, asOk({ task, - result: await (runAt || task.schedule + result: await (runAt || schedule || task.schedule ? this.processResultForRecurringTask(result) : this.processResultWhenDone()), }), @@ -457,14 +473,11 @@ export class TaskManagerRunner implements TaskRunner { error: Error; attempts: number; addDuration?: string; - }): Date | null { - let result = null; - + }): Date | undefined { // Use custom retry logic, if any, otherwise we'll use the default logic - const retry: boolean | Date = this.definition.getRetry - ? this.definition.getRetry(attempts, error) - : true; + const retry: boolean | Date = this.definition.getRetry?.(attempts, error) ?? true; + let result; if (retry instanceof Date) { result = retry; } else if (retry === true) { diff --git a/x-pack/plugins/task_manager/server/task_scheduling.test.ts b/x-pack/plugins/task_manager/server/task_scheduling.test.ts index 8d660f57ab875..187e94f7bfd2a 100644 --- a/x-pack/plugins/task_manager/server/task_scheduling.test.ts +++ b/x-pack/plugins/task_manager/server/task_scheduling.test.ts @@ -21,7 +21,7 @@ import { asErr, asOk } from './lib/result_type'; import { ConcreteTaskInstance, TaskLifecycleResult, TaskStatus } from './task'; import { createInitialMiddleware } from './lib/middleware'; import { taskStoreMock } from './task_store.mock'; -import { TaskRunResult } from './task_runner'; +import { TaskRunResult } from './task_running'; import { mockLogger } from './test_utils'; describe('TaskScheduling', () => { diff --git a/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/update.ts b/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/update.ts index d75aa868253de..0ad2ca226ed5d 100644 --- a/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/update.ts +++ b/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/update.ts @@ -834,6 +834,71 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { throw new Error(`Scenario untested: ${JSON.stringify(scenario)}`); } }); + + it('should handle updates to an alert schedule by setting the new schedule for the underlying task', async () => { + const { body: createdAlert } = await supertest + .post(`${getUrlPrefix(space.id)}/api/alerts/alert`) + .set('kbn-xsrf', 'foo') + .send( + getTestAlertData({ + schedule: { interval: '1m' }, + }) + ) + .expect(200); + objectRemover.add(space.id, createdAlert.id, 'alert', 'alerts'); + + await retry.try(async () => { + const alertTask = (await getAlertingTaskById(createdAlert.scheduledTaskId)).docs[0]; + expect(alertTask.status).to.eql('idle'); + expect(alertTask.schedule).to.eql({ interval: '1m' }); + }); + + const updatedData = { + name: 'bcd', + tags: ['bar'], + params: { + foo: true, + }, + schedule: { interval: '1s' }, + actions: [], + throttle: '1m', + }; + const response = await supertestWithoutAuth + .put(`${getUrlPrefix(space.id)}/api/alerts/alert/${createdAlert.id}`) + .set('kbn-xsrf', 'foo') + .auth(user.username, user.password) + .send(updatedData); + + switch (scenario.id) { + case 'no_kibana_privileges at space1': + case 'space_1_all at space2': + case 'global_read at space1': + expect(response.statusCode).to.eql(403); + expect(response.body).to.eql({ + error: 'Forbidden', + message: getConsumerUnauthorizedErrorMessage( + 'update', + 'test.noop', + 'alertsFixture' + ), + statusCode: 403, + }); + break; + case 'superuser at space1': + case 'space_1_all at space1': + case 'space_1_all_alerts_none_actions at space1': + case 'space_1_all_with_restricted_fixture at space1': + expect(response.statusCode).to.eql(200); + await retry.try(async () => { + const alertTask = (await getAlertingTaskById(createdAlert.scheduledTaskId)).docs[0]; + expect(alertTask.status).to.eql('idle'); + expect(alertTask.schedule).to.eql({ interval: '1s' }); + }); + break; + default: + throw new Error(`Scenario untested: ${JSON.stringify(scenario)}`); + } + }); }); } }); diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/alerts_base.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/alerts_base.ts index 40d88a6bface5..75628f6c72487 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/alerts_base.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/alerts_base.ts @@ -135,14 +135,11 @@ instanceStateValue: true await taskManagerUtils.waitForActionTaskParamsToBeCleanedUp(testStart); }); - it('should reschedule failing alerts using the alerting interval and not the Task Manager retry logic', async () => { + it('should reschedule failing alerts using the Task Manager retry logic with alert schedule interval', async () => { /* - Alerting does not use the Task Manager schedule and instead implements its own due to a current limitation - in TaskManager's ability to update an existing Task. - For this reason we need to handle the retry when Alert executors fail, as TaskManager doesn't understand that - alerting tasks are recurring tasks. + Alerts should set the Task Manager schedule interval with initial value. */ - const alertIntervalInSeconds = 30; + const alertIntervalInSeconds = 10; const reference = alertUtils.generateReference(); const response = await alertUtils.createAlwaysFailingAction({ reference, @@ -157,7 +154,8 @@ instanceStateValue: true await retry.try(async () => { const alertTask = (await getAlertingTaskById(response.body.scheduledTaskId)).docs[0]; expect(alertTask.status).to.eql('idle'); - // ensure the alert is rescheduled to a minute from now + expect(alertTask.schedule.interval).to.eql('10s'); + // ensure the alert is rescheduled correctly ensureDatetimeIsWithinRange( Date.parse(alertTask.runAt), alertIntervalInSeconds * 1000,