From d568e65e78d51c6dac3436b8b4e349f1f6138d7f Mon Sep 17 00:00:00 2001 From: Tim Sullivan Date: Thu, 3 Jan 2019 16:32:09 -0700 Subject: [PATCH] Core task manager (#24356) * Core task manager (#23555) * Initial stab at core task manager logic * Update task_manager readme * Update task manager readme * Add cancelable helper package, OSS, and general purpose, but will be useful for writing cancelable x-pack tasks. * Make the cancellable package promise-compliant * Update task manager readme w/ reference to cancellable * Change pool from lazy to eager, add support for sub-pools per task type. * Move cancellable to task_manager, and typescriptify it. * Working proof of concept for task manager. Still have lots to do: clean up, tests, comments, validations, assertions, etc. * Add pagination support to task manager fetch * Move task manager to OSS * Remove task manager reference from x-pack * Make task_manager a valid core plugin * Modify how task resource allocation works * Remove the special case 'max' workers occupied value * Remove x-pack package.json changes * Make taskDefinitions a part of uiExports * Make task docs saved-object compliant. * Add kbnServer to the task context. * Allow tasks to have a void / undefined return type * revert x-pack change * move cancellable to src/utils * move to src/server * use afterPluginsInit hook * task_manager.ts rename * add a wrapper with a setClient method * Add tests for task runner * Break task_pool into smaller, testable pieces * return raw task doc fields for calling code * remove todo comment * helper module for default client - setClient takes a callback fn * fix misidentified field name * fix rest args warning * flatten task_pool * remove cancellable * return raw task doc fields for calling code * remove todo comment * helper module for default client - setClient takes a callback fn * fix rest args warning * typescript fixes * roll back setClient takes a callback * createTaskRunner returns an object with run/cancel functions * Test task store, tweak task poller tests * Rename num_workers to max_workers * Tweak task pool method names * Fix cancellable test, and ts errors * Rename doc to _doc * Fix the doc type in the task manager index mappings * Make task costs configurable in kibana.yml * fix a tslint warning * TaskManager.afterPluginsInit replaces circuitous stuff * addMiddleware, implement beforeSave * wip * comment * run context stuff * pretty fix * comments * lint/pretty fixes * safeties in case they don't define both beforeRun and beforeSave * task runner test constructor opts fix * Add task_manager integration tests * FIx readme mistakes, fix schedule signature * Fix integration tests * Add blurb about middleware * make a libs directory for the small things * test for extractTaskDefinitions * unit test for middleware * Comment, refactor, and test task_manager * Touch up comments * Make task store throw error if given an unknown task type * Fix sample task definition in readme * Make task manager toggle on / off based on Elasticsearch connectivity * Prevent task manager from crashing Kibana if elasticsearch plugin is disabled * Fix task manager signature * Move the task manager into the task manager mixin, fix tests. It's an uber integration object anyway, so it seemed to make sense to keep it in one place. * Fix task manager integration tests * Update the task manager index creation to use auto_expand_replicas * Fix task manager integration tests * Fix task manager integration tests * Fix the overzealous deletion of .kibana during tests * Core task manager * Allow hardcoded ID passed in * comments * don't deconstruct type and task for payload * remove uiExport support * move task_manager to x-pack/plugins * expose as client on the plugin namespace * fix tests * move plugin api integration tests * roll back readme sample task definition example * fix sample plugin link * server.taskManager * sanitizeTaskDefinitions * fix integration tests * sanitize rename * assert unintialized and check for duplicates in registerTaskDefinitions * Remove unnecessary defaults from task_manager.test.ts * Remove task manager from OSS * Remove task manager from src * Tidy up task_manager unit tests around elasticsearch status changes * Integration test for specifying ID in scheduling * Task_manager init -> preInit to allow other plugins to consume taskManager in their init. * Remove task manager integration tests from OSS * spelling * readme fix * fix test code impacted by hapi upgrade * Task Manager index creation changes (#24542) * Uses putTemplate for task manager mappings and index settings. * Removes create option. * Opposite day, the test catches up to the code changes. * Ignores index unavailable during searches. * Improve taskRunner's processResult (#24880) * Set task state to idle after run, and add failed state * fix tests * Test alerting demo (#25136) * Ensures that task store init gets called before schedule. * Removes unused option for debugging purposes. * Fix unit tests because a second callCluster was made. * Task manager starts sanely. Does not preInit Will not allow definitions after initialization Creates store immediately. Modifies store after all plugins have initialized Adds static tasks that will be defined by plugins. * Task manager should not allow operations before initialization. * Attempts to fix runner tests. * Fixes unit test contract with APIs. * Removes unused type definitions. * Removes unused package json. * Removes unused import type. * Removes unnecessary async applied to a function. * Returns diferently if task store has already initialized. * Explains how to add static task instances to task definitions. (#25880) * Tasks cannot be scheduled until task manager is initialized. * Adds task manager api integration tests to ci group4. * Context of describe test must be the test framework object. * Update src/es_archiver/lib/indices/kibana_index.js regex update to actually disallow non-kibana system indices Co-Authored-By: tsullivan * verify fillPool exception passing * readme update about max_workers/numWorkers * change forEach to reduce * use public interfaces in internal method * replace getMaxAttempts with public readonly maxAttempts * Update x-pack/plugins/task_manager/task_store.ts `throw new Error` and initializattion spell fix Co-Authored-By: tsullivan * min = 1 for max_workers Co-Authored-By: tsullivan * timeOut => timeout * min 1 * scope as an array * no retries Co-Authored-By: tsullivan * ConcreteTaskInstance is a TaskInstance * remove guard per joi logic * more return types for functions * more comments around incremental back-off Co-Authored-By: tsullivan * throw error instead of return undefined * poll_interval min 1000 * avoid handling err.stack directly * break up processResult * fix a few runtime issues * only fetch idle tasks * remove check for status idle * always return a state, and when there is an error return the state we had at beforeRun * check isStarted before attemptWork Co-Authored-By: tsullivan * ts fix --- src/es_archiver/lib/indices/kibana_index.js | 17 +- src/ui/ui_exports/ui_export_types/index.js | 4 + .../ui_export_types/task_definitions.js | 28 + x-pack/index.js | 2 + x-pack/plugins/task_manager/README.md | 306 +++++++++++ x-pack/plugins/task_manager/index.js | 44 ++ .../task_manager/lib/fill_pool.test.ts | 94 ++++ x-pack/plugins/task_manager/lib/fill_pool.ts | 41 ++ .../task_manager/lib/intervals.test.ts | 52 ++ x-pack/plugins/task_manager/lib/intervals.ts | 53 ++ x-pack/plugins/task_manager/lib/logger.ts | 44 ++ .../task_manager/lib/middleware.test.ts | 159 ++++++ x-pack/plugins/task_manager/lib/middleware.ts | 46 ++ .../lib/sanitize_task_definitions.test.ts | 174 ++++++ .../lib/sanitize_task_definitions.ts | 49 ++ x-pack/plugins/task_manager/task.ts | 236 ++++++++ .../plugins/task_manager/task_manager.test.ts | 120 ++++ x-pack/plugins/task_manager/task_manager.ts | 192 +++++++ .../plugins/task_manager/task_poller.test.ts | 135 +++++ x-pack/plugins/task_manager/task_poller.ts | 95 ++++ x-pack/plugins/task_manager/task_pool.test.ts | 207 +++++++ x-pack/plugins/task_manager/task_pool.ts | 106 ++++ .../plugins/task_manager/task_runner.test.ts | 289 ++++++++++ x-pack/plugins/task_manager/task_runner.ts | 273 ++++++++++ .../plugins/task_manager/task_store.test.ts | 515 ++++++++++++++++++ x-pack/plugins/task_manager/task_store.ts | 375 +++++++++++++ .../plugins/task_manager/test_utils/index.ts | 52 ++ x-pack/scripts/functional_tests.js | 1 + x-pack/test/plugin_api_integration/config.js | 46 ++ .../plugins/task_manager/index.js | 106 ++++ .../plugins/task_manager/init_routes.js | 60 ++ .../plugins/task_manager/package.json | 4 + .../test_suites/task_manager/index.js | 12 + .../task_manager/task_manager_integration.js | 163 ++++++ 34 files changed, 4098 insertions(+), 2 deletions(-) create mode 100644 src/ui/ui_exports/ui_export_types/task_definitions.js create mode 100644 x-pack/plugins/task_manager/README.md create mode 100644 x-pack/plugins/task_manager/index.js create mode 100644 x-pack/plugins/task_manager/lib/fill_pool.test.ts create mode 100644 x-pack/plugins/task_manager/lib/fill_pool.ts create mode 100644 x-pack/plugins/task_manager/lib/intervals.test.ts create mode 100644 x-pack/plugins/task_manager/lib/intervals.ts create mode 100644 x-pack/plugins/task_manager/lib/logger.ts create mode 100644 x-pack/plugins/task_manager/lib/middleware.test.ts create mode 100644 x-pack/plugins/task_manager/lib/middleware.ts create mode 100644 x-pack/plugins/task_manager/lib/sanitize_task_definitions.test.ts create mode 100644 x-pack/plugins/task_manager/lib/sanitize_task_definitions.ts create mode 100644 x-pack/plugins/task_manager/task.ts create mode 100644 x-pack/plugins/task_manager/task_manager.test.ts create mode 100644 x-pack/plugins/task_manager/task_manager.ts create mode 100644 x-pack/plugins/task_manager/task_poller.test.ts create mode 100644 x-pack/plugins/task_manager/task_poller.ts create mode 100644 x-pack/plugins/task_manager/task_pool.test.ts create mode 100644 x-pack/plugins/task_manager/task_pool.ts create mode 100644 x-pack/plugins/task_manager/task_runner.test.ts create mode 100644 x-pack/plugins/task_manager/task_runner.ts create mode 100644 x-pack/plugins/task_manager/task_store.test.ts create mode 100644 x-pack/plugins/task_manager/task_store.ts create mode 100644 x-pack/plugins/task_manager/test_utils/index.ts create mode 100644 x-pack/test/plugin_api_integration/config.js create mode 100644 x-pack/test/plugin_api_integration/plugins/task_manager/index.js create mode 100644 x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js create mode 100644 x-pack/test/plugin_api_integration/plugins/task_manager/package.json create mode 100644 x-pack/test/plugin_api_integration/test_suites/task_manager/index.js create mode 100644 x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js diff --git a/src/es_archiver/lib/indices/kibana_index.js b/src/es_archiver/lib/indices/kibana_index.js index 7a732adcc69e2..379a4078e32b5 100644 --- a/src/es_archiver/lib/indices/kibana_index.js +++ b/src/es_archiver/lib/indices/kibana_index.js @@ -48,8 +48,7 @@ const buildUiExports = _.once(async () => { * Deletes all indices that start with `.kibana` */ export async function deleteKibanaIndices({ client, stats, log }) { - const kibanaIndices = await client.cat.indices({ index: '.kibana*', format: 'json' }); - const indexNames = kibanaIndices.map(x => x.index); + const indexNames = await fetchKibanaIndices(client); if (!indexNames.length) { return; } @@ -152,6 +151,20 @@ export async function createDefaultSpace({ index, client }) { }); } +/** + * Migrations mean that the Kibana index will look something like: + * .kibana, .kibana_1, .kibana_323, etc. This finds all indices starting + * with .kibana, then filters out any that aren't actually Kibana's core + * index (e.g. we don't want to remove .kibana_task_manager or the like). + * + * @param {string} index + */ +async function fetchKibanaIndices(client) { + const kibanaIndices = await client.cat.indices({ index: '.kibana*', format: 'json' }); + const isKibanaIndex = (index) => (/^\.kibana(:?_\d*)?$/).test(index); + return kibanaIndices.map(x => x.index).filter(isKibanaIndex); +} + export async function cleanKibanaIndices({ client, stats, log, kibanaUrl }) { if (!await isSpacesEnabled({ kibanaUrl })) { return await deleteKibanaIndices({ diff --git a/src/ui/ui_exports/ui_export_types/index.js b/src/ui/ui_exports/ui_export_types/index.js index c12c445540efd..2a9d8d0c023e8 100644 --- a/src/ui/ui_exports/ui_export_types/index.js +++ b/src/ui/ui_exports/ui_export_types/index.js @@ -29,6 +29,10 @@ export { validations, } from './saved_object'; +export { + taskDefinitions +} from './task_definitions'; + export { app, apps, diff --git a/src/ui/ui_exports/ui_export_types/task_definitions.js b/src/ui/ui_exports/ui_export_types/task_definitions.js new file mode 100644 index 0000000000000..279583e516799 --- /dev/null +++ b/src/ui/ui_exports/ui_export_types/task_definitions.js @@ -0,0 +1,28 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { mergeAtType } from './reduce'; +import { alias, wrap, uniqueKeys } from './modify_reduce'; + +// How plugins define tasks that the task manager can run. +export const taskDefinitions = wrap( + alias('taskDefinitions'), + uniqueKeys(), + mergeAtType, +); diff --git a/x-pack/index.js b/x-pack/index.js index a2f952117ed34..9dbbc340f76a0 100644 --- a/x-pack/index.js +++ b/x-pack/index.js @@ -29,6 +29,7 @@ import { notifications } from './plugins/notifications'; import { kueryAutocomplete } from './plugins/kuery_autocomplete'; import { canvas } from './plugins/canvas'; import { infra } from './plugins/infra'; +import { taskManager } from './plugins/task_manager'; import { rollup } from './plugins/rollup'; import { remoteClusters } from './plugins/remote_clusters'; import { crossClusterReplication } from './plugins/cross_cluster_replication'; @@ -62,6 +63,7 @@ module.exports = function (kibana) { indexLifecycleManagement(kibana), kueryAutocomplete(kibana), infra(kibana), + taskManager(kibana), rollup(kibana), remoteClusters(kibana), crossClusterReplication(kibana), diff --git a/x-pack/plugins/task_manager/README.md b/x-pack/plugins/task_manager/README.md new file mode 100644 index 0000000000000..33f9dd87613a2 --- /dev/null +++ b/x-pack/plugins/task_manager/README.md @@ -0,0 +1,306 @@ +# Kibana task manager + +The task manager is a generic system for running background tasks. It supports: + +- Single-run and recurring tasks +- Scheduling tasks to run after a specified datetime +- Basic retry logic +- Recovery of stalled tasks / timeouts +- Tracking task state across multiple runs +- Configuring the run-parameters for specific tasks +- Basic coordination to prevent the same task instance from running on more than one Kibana system at a time + +## Implementation details + +At a high-level, the task manager works like this: + +- Every `{poll_interval}` milliseconds, check the `{index}` for any tasks that need to be run: + - `runAt` is past + - `attempts` is less than the configured threshold +- Attempt to claim the task by using optimistic concurrency to set: + - status to `running` + - `runAt` to now + the timeout specified by the task +- Execute the task, if the previous claim succeeded +- If the task fails, increment the `attempts` count and reschedule it +- If the task succeeds: + - If it is recurring, store the result of the run, and reschedule + - If it is not recurring, remove it from the index + +## Pooling + +Each task manager instance runs tasks in a pool which ensures that at most N tasks are run at a time, where N is configurable. This prevents the system from running too many tasks at once in resource constrained environments. In addition to this, each individual task type definition can have `numWorkers` specified, which tells the system how many workers are consumed by a single running instance of a task. This effectively limits how many tasks of a given type can be run at once. + +For example, we may have a system with a `max_workers` of 10, but a super expensive task (such as `reporting`) which specifies a `numWorkers` of 10. In this case, `reporting` tasks will run one at a time. + +If a task specifies a higher `numWorkers` than the system supports, the system's `max_workers` setting will be substituted for it. + +## Config options + +The task_manager can be configured via `taskManager` config options (e.g. `taskManager.maxAttempts`): + +- `max_attempts` - How many times a failing task instance will be retried before it is never run again +- `poll_interval` - How often the background worker should check the task_manager index for more work +- `index` - The name of the index that the task_manager +- `max_workers` - The maximum number of tasks a Kibana will run concurrently (defaults to 10) +- `credentials` - Encrypted user credentials. All tasks will run in the security context of this user. See [this issue](https://github.com/elastic/dev/issues/1045) for a discussion on task scheduler security. +- `override_num_workers`: An object of `taskType: number` that overrides the `num_workers` for tasks + - For example: `task_manager.override_num_workers.reporting: 2` would override the number of workers occupied by tasks of type `reporting` + - This allows sysadmins to tweak the operational performance of Kibana, allowing more or fewer tasks of a specific type to run simultaneously + +## Task definitions + +Plugins define tasks by calling the `registerTaskDefinitions` method on the `server.taskManager` object. + +A sample task can be found in the [x-pack/test/plugin_api_integration/plugins/task_manager](../../test/plugin_api_integration/plugins/task_manager/index.js) folder. + +```js +const { taskManager } = server; +taskManager.registerTaskDefinitions({ + // clusterMonitoring is the task type, and must be unique across the entire system + clusterMonitoring: { + // Human friendly name, used to represent this task in logs, UI, etc + title: 'Human friendly name', + + // Optional, human-friendly, more detailed description + description: 'Amazing!!', + + // Optional, how long, in minutes, the system should wait before + // a running instance of this task is considered to be timed out. + // This defaults to 5 minutes. + timeout: '5m', + + // The clusterMonitoring task occupies 2 workers, so if the system has 10 worker slots, + // 5 clusterMonitoring tasks could run concurrently per Kibana instance. This value is + // overridden by the `override_num_workers` config value, if specified. + numWorkers: 2, + + // The createTaskRunner function / method returns an object that is responsible for + // performing the work of the task. context: { taskInstance, kbnServer }, is documented below. + createTaskRunner(context) { + return { + // Perform the work of the task. The return value should fit the TaskResult interface, documented + // below. Invalid return values will result in a logged warning. + async run() { + // Do some work + // Conditionally send some alerts + // Return some result or other... + }, + + // Optional, will be called if a running instance of this task times out, allowing the task + // to attempt to clean itself up. + async cancel() { + // Do whatever is required to cancel this task, such as killing any spawned processes + }, + }; + }, + }, +}); +``` + +When Kibana attempts to claim and run a task instance, it looks its definition up, and executes its createTaskRunner's method, passing it a run context which looks like this: + +```js +{ + // An instance of the Kibana server object. + kbnServer, + + // The data associated with this instance of the task, with two properties being most notable: + // + // params: + // An object, specific to this task instance, used by the + // task to determine exactly what work should be performed. + // e.g. a cluster-monitoring task might have a `clusterName` + // property in here, but a movie-monitoring task might have + // a `directorName` property. + // + // state: + // The state returned from the previous run of this task instance. + // If this task instance has never succesfully run, this will + // be an empty object: {} + taskInstance, +} +``` + +## Task result + +The task runner's `run` method is expected to return a promise that resolves to undefined or to an object that looks like the following: +```js +{ + // Optional, if specified, this is used as the tasks' nextRun, overriding + // the default system scheduler. + runAt: "2020-07-24T17:34:35.272Z", + + // Optional, an error object, logged out as a warning. The pressence of this + // property indicates that the task did not succeed. + error: { message: 'Hrumph!' }, + + // Optional, this will be passed into the next run of the task, if + // this is a recurring task. + state: { + anything: 'goes here', + }, +} +``` + +Other return values will result in a warning, but the system should continue to work. + +## Task instances + +The task_manager module will store scheduled task instances in an index. This allows for recovery of failed tasks, coordination across Kibana clusters, persistence across Kibana reboots, etc. + +The data stored for a task instance looks something like this: + +```js +{ + // The type of task that will run this instance. + taskType: 'clusterMonitoring', + + // The next time this task instance should run. It is not guaranteed + // to run at this time, but it is guaranteed not to run earlier than + // this. + runAt: "2020-07-24T17:34:35.272Z", + + // Indicates that this is a recurring task. We currently only support + // 1 minute granularity. + interval: '5m', + + // How many times this task has been unsuccesfully attempted, + // this will be reset to 0 if the task ever succesfully completes. + // This is incremented if a task fails or times out. + attempts: 0, + + // Currently, this is either idle | running. It is used to + // coordinate which Kibana instance owns / is running a specific + // task instance. + status: 'idle', + + // The params specific to this task instance, which will be + // passed to the task when it runs, and will be used by the + // task to determine exactly what work should be performed. + // This is a JSON blob, and will be different per task type. + // e.g. a cluster-monitoring task might have a `clusterName` + // property in here, but a movie-monitoring task might have + // a `directorName` property. + params: '{ "task": "specific stuff here" }', + + // The result of the previous run of this task instance. This + // will be passed to the next run of the task, along with the + // params, and could be used by a task to do special logic If + // the task state changes (e.g. from green to red, or foo to bar) + // If there was no previous run (e.g. the instance has never succesfully + // completed, this will be an empty object.). This is a JSON blob, + // and will be different per task type. + state: '{ "status": "green" }', + + // An extension point for 3rd parties to build in security features on + // top of the task manager. For example, this might be the token of the user + // who scheduled this task. + userContext: 'the token of the user who scheduled this task', + + // An extension point for 3rd parties to build in security features on + // top of the task manager, and is expected to be the id of the user, if any, + // that scheduled this task. + user: '23lk3l42', + + // An application-specific designation, allowing different Kibana + // plugins / apps to query for only those tasks they care about. + scope: ['alerting'], +} +``` + +## Programmatic access + +The task manager mixin exposes a taskManager object on the Kibana server which plugins can use to manage scheduled tasks. Each method takes an optional `scope` argument and ensures that only tasks with the specified scope(s) will be affected. + +```js +const { taskManager } = server; +// Schedules a task. All properties are as documented in the previous +// storage section, except that here, params is an object, not a JSON +// string. +const task = await taskManager.schedule({ + taskType, + runAt, + interval, + params, + scope: ['my-fanci-app'], +}); + +// Removes the specified task +await manager.remove(task.id); + +// Fetches tasks, supports pagination, via the search-after API: +// https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-search-after.html +// If scope is not specified, all tasks are returned, otherwise only tasks +// with the given scope are returned. +const results = await manager.find({ scope: 'my-fanci-app', searchAfter: ['ids'] }); + +// results look something like this: +{ + searchAfter: ['233322'], + // Tasks is an array of task instances + tasks: [{ + id: '3242342', + taskType: 'reporting', + // etc + }] +} +``` + +More custom access to the tasks can be done directly via Elasticsearch, though that won't be officially supported, as we can change the document structure at any time. + +## Middleware + +The task manager exposes a middleware layer that allows modifying tasks before they are scheduled / persisted to the task manager index, and modifying tasks / the run context before a task is run. + +For example: + +```js +// In your plugin's init +server.taskManager.addMiddleware({ + async beforeSave({ taskInstance, ...opts }) { + console.log(`About to save a task of type ${taskInstance.taskType}`); + + return { + ...opts, + taskInstance: { + ...taskInstance, + params: { + ...taskInstance.params, + example: 'Added to params!', + }, + }, + }; + }, + + async beforeRun({ taskInstance, ...opts }) { + console.log(`About to run ${taskInstance.taskType} ${taskInstance.id}`); + const { example, ...taskWithoutExampleProp } = taskInstance; + + return { + ...opts, + taskInstance: taskWithoutExampleProp, + }; + }, +}); +``` + +## Limitations in v1.0 + +In v1, the system only understands 1 minute increments (e.g. '1m', '7m'). Tasks which need something more robust will need to specify their own "runAt" in their run method's return value. + +There is only a rudimentary mechanism for coordinating tasks and handling expired tasks. Tasks are considered expired if their runAt has arrived, and their status is still 'running'. + +There is no task history. Each run overwrites the previous run's state. One-time tasks are removed from the index upon completion regardless of success / failure. + +The task manager's public API is create / delete / list. Updates aren't directly supported, and listing should be scoped so that users only see their own tasks. + +## Testing + +- `node scripts/jest --testPathPattern=task_manager --watch` + +Integration tests can be run like so: + +``` +node scripts/functional_tests_server.js --config test/plugin_functional/config.js +node scripts/functional_test_runner --config test/plugin_functional/config.js --grep task_manager +``` diff --git a/x-pack/plugins/task_manager/index.js b/x-pack/plugins/task_manager/index.js new file mode 100644 index 0000000000000..e0116820a3e05 --- /dev/null +++ b/x-pack/plugins/task_manager/index.js @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { TaskManager } from './task_manager'; + +export function taskManager(kibana) { + return new kibana.Plugin({ + id: 'task_manager', + require: ['kibana', 'elasticsearch', 'xpack_main'], + configPrefix: 'xpack.task_manager', + config(Joi) { + return Joi.object({ + enabled: Joi.boolean().default(true), + max_attempts: Joi.number() + .description('The maximum number of times a task will be attempted before being abandoned as failed') + .min(0) // no retries + .default(3), + poll_interval: Joi.number() + .description('How often, in milliseconds, the task manager will look for more work.') + .min(1000) + .default(3000), + index: Joi.string() + .description('The name of the index used to store task information.') + .default('.kibana_task_manager'), + max_workers: Joi.number() + .description('The maximum number of tasks that this Kibana instance will run simultaneously.') + .min(1) // disable the task manager rather than trying to specify it with 0 workers + .default(10), + override_num_workers: Joi.object() + .pattern(/.*/, Joi.number().greater(0)) + .description('Customize the number of workers occupied by specific tasks (e.g. override_num_workers.reporting: 2)') + .default({}) + }).default(); + }, + init(server) { + const config = server.config(); + const taskManager = new TaskManager(this.kbnServer, server, config); + server.decorate('server', 'taskManager', taskManager); + }, + }); +} diff --git a/x-pack/plugins/task_manager/lib/fill_pool.test.ts b/x-pack/plugins/task_manager/lib/fill_pool.test.ts new file mode 100644 index 0000000000000..a0361103f0cc9 --- /dev/null +++ b/x-pack/plugins/task_manager/lib/fill_pool.test.ts @@ -0,0 +1,94 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import _ from 'lodash'; +import sinon from 'sinon'; +import { fillPool } from './fill_pool'; + +describe('fillPool', () => { + test('stops filling when there are no more tasks in the store', async () => { + const tasks = [[1, 2, 3], [4, 5]]; + let index = 0; + const fetchAvailableTasks = async () => tasks[index++] || []; + const run = sinon.spy(() => true); + const converter = _.identity; + + await fillPool(run, fetchAvailableTasks, converter); + + expect(_.flattenDeep(run.args)).toEqual([1, 2, 3, 4, 5]); + }); + + test('stops filling when the pool has no more capacity', async () => { + const tasks = [[1, 2, 3], [4, 5]]; + let index = 0; + const fetchAvailableTasks = async () => tasks[index++] || []; + const run = sinon.spy(() => false); + const converter = _.identity; + + await fillPool(run, fetchAvailableTasks, converter); + + expect(_.flattenDeep(run.args)).toEqual([1, 2, 3]); + }); + + test('calls the converter on the records prior to running', async () => { + const tasks = [[1, 2, 3], [4, 5]]; + let index = 0; + const fetchAvailableTasks = async () => tasks[index++] || []; + const run = sinon.spy(() => false); + const converter = (x: number) => x.toString(); + + await fillPool(run, fetchAvailableTasks, converter); + + expect(_.flattenDeep(run.args)).toEqual(['1', '2', '3']); + }); + + describe('error handling', () => { + test('throws exception from fetchAvailableTasks', async () => { + const run = sinon.spy(() => false); + const converter = (x: number) => x.toString(); + + try { + const fetchAvailableTasks = async () => Promise.reject('fetch is not working'); + + await fillPool(run, fetchAvailableTasks, converter); + } catch (err) { + expect(err.toString()).toBe('fetch is not working'); + expect(run.called).toBe(false); + } + }); + + test('throws exception from run', async () => { + const run = sinon.spy(() => Promise.reject('run is not working')); + const converter = (x: number) => x.toString(); + + try { + const tasks = [[1, 2, 3], [4, 5]]; + let index = 0; + const fetchAvailableTasks = async () => tasks[index++] || []; + + await fillPool(run, fetchAvailableTasks, converter); + } catch (err) { + expect(err.toString()).toBe('run is not working'); + } + }); + + test('throws exception from converter', async () => { + try { + const tasks = [[1, 2, 3], [4, 5]]; + let index = 0; + const fetchAvailableTasks = async () => tasks[index++] || []; + const run = sinon.spy(() => false); + const converter = (x: number) => { + throw new Error(`can not convert ${x}`); + }; + + await fillPool(run, fetchAvailableTasks, converter); + } catch (err) { + expect(err.toString()).toBe('Error: can not convert 1'); + } + }); + }); +}); diff --git a/x-pack/plugins/task_manager/lib/fill_pool.ts b/x-pack/plugins/task_manager/lib/fill_pool.ts new file mode 100644 index 0000000000000..a5970574abaaf --- /dev/null +++ b/x-pack/plugins/task_manager/lib/fill_pool.ts @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +type BatchRun = (tasks: T[]) => Promise; +type Fetcher = () => Promise; +type Converter = (t: T1) => T2; + +/** + * Given a function that runs a batch of tasks (e.g. taskPool.run), a function + * that fetches task records (e.g. store.fetchAvailableTasks), and a function + * that converts task records to the appropriate task runner, this function + * fills the pool with work. + * + * This is annoyingly general in order to simplify testing. + * + * @param run - a function that runs a batch of tasks (e.g. taskPool.run) + * @param fetchAvailableTasks - a function that fetches task records (e.g. store.fetchAvailableTasks) + * @param converter - a function that converts task records to the appropriate task runner + */ +export async function fillPool( + run: BatchRun, + fetchAvailableTasks: Fetcher, + converter: Converter +): Promise { + while (true) { + const instances = await fetchAvailableTasks(); + + if (!instances.length) { + return; + } + + const tasks = instances.map(converter); + + if (!(await run(tasks))) { + return; + } + } +} diff --git a/x-pack/plugins/task_manager/lib/intervals.test.ts b/x-pack/plugins/task_manager/lib/intervals.test.ts new file mode 100644 index 0000000000000..bba5cdf8591ff --- /dev/null +++ b/x-pack/plugins/task_manager/lib/intervals.test.ts @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import _ from 'lodash'; +import { assertValidInterval, intervalFromNow, minutesFromNow } from './intervals'; + +describe('taskIntervals', () => { + describe('assertValidInterval', () => { + test('it accepts intervals in the form `Nm`', () => { + expect(() => assertValidInterval(`${_.random(1000)}m`)).not.toThrow(); + }); + + test('it rejects intervals are not of the form `Nm`', () => { + expect(() => assertValidInterval(`5m 2s`)).toThrow( + /Invalid interval "5m 2s"\. Intervals must be of the form {number}m. Example: 5m/ + ); + expect(() => assertValidInterval(`hello`)).toThrow( + /Invalid interval "hello"\. Intervals must be of the form {number}m. Example: 5m/ + ); + }); + }); + + describe('intervalFromNow', () => { + test('it returns the current date plus n minutes', () => { + const mins = _.random(1, 100); + const expected = Date.now() + mins * 60 * 1000; + const nextRun = intervalFromNow(`${mins}m`)!.getTime(); + expect(Math.abs(nextRun - expected)).toBeLessThan(100); + }); + + test('it rejects intervals are not of the form `Nm`', () => { + expect(() => intervalFromNow(`5m 2s`)).toThrow( + /Invalid interval "5m 2s"\. Intervals must be of the form {number}m. Example: 5m/ + ); + expect(() => intervalFromNow(`hello`)).toThrow( + /Invalid interval "hello"\. Intervals must be of the form {number}m. Example: 5m/ + ); + }); + }); + + describe('minutesFromNow', () => { + test('it returns the current date plus a number of minutes', () => { + const mins = _.random(1, 100); + const expected = Date.now() + mins * 60 * 1000; + const nextRun = minutesFromNow(mins).getTime(); + expect(Math.abs(nextRun - expected)).toBeLessThan(100); + }); + }); +}); diff --git a/x-pack/plugins/task_manager/lib/intervals.ts b/x-pack/plugins/task_manager/lib/intervals.ts new file mode 100644 index 0000000000000..f095c336098f9 --- /dev/null +++ b/x-pack/plugins/task_manager/lib/intervals.ts @@ -0,0 +1,53 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +/** + * Returns a date that is the specified interval from now. Currently, + * only minute-intervals are supported. + * + * @param {string} interval - An interval of the form `Nm` such as `5m` + */ +export function intervalFromNow(interval?: string): Date | undefined { + if (interval === undefined) { + return; + } + + assertValidInterval(interval); + + return minutesFromNow(parseInterval(interval)); +} + +/** + * Returns a date that is mins minutes from now. + * + * @param mins The number of mintues from now + */ +export function minutesFromNow(mins: number): Date { + const now = new Date(); + + now.setMinutes(now.getMinutes() + mins); + + return now; +} + +/** + * Verifies that the specified interval matches our expected format. + * + * @param {string} interval - An interval such as `5m` + */ +export function assertValidInterval(interval: string) { + if (/^[0-9]+m$/.test(interval)) { + return interval; + } + + throw new Error( + `Invalid interval "${interval}". Intervals must be of the form {number}m. Example: 5m.` + ); +} + +function parseInterval(interval: string) { + return parseInt(interval, 10); +} diff --git a/x-pack/plugins/task_manager/lib/logger.ts b/x-pack/plugins/task_manager/lib/logger.ts new file mode 100644 index 0000000000000..932acaa15de23 --- /dev/null +++ b/x-pack/plugins/task_manager/lib/logger.ts @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export type LogFn = (prefix: string[], msg: string) => void; + +type SimpleLogFn = (msg: string) => void; + +export interface Logger { + error: SimpleLogFn; + warning: SimpleLogFn; + debug: SimpleLogFn; + info: SimpleLogFn; +} + +export class TaskManagerLogger implements Logger { + private write: LogFn; + + constructor(log: LogFn) { + this.write = log; + } + + public error(msg: string) { + this.log('error', msg); + } + + public warning(msg: string) { + this.log('warning', msg); + } + + public debug(msg: string) { + this.log('debug', msg); + } + + public info(msg: string) { + this.log('info', msg); + } + + private log(type: string, msg: string) { + this.write([type, 'task_manager'], msg); + } +} diff --git a/x-pack/plugins/task_manager/lib/middleware.test.ts b/x-pack/plugins/task_manager/lib/middleware.test.ts new file mode 100644 index 0000000000000..c249f86d15921 --- /dev/null +++ b/x-pack/plugins/task_manager/lib/middleware.test.ts @@ -0,0 +1,159 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import moment from 'moment'; +import { ConcreteTaskInstance, RunContext, TaskInstance, TaskStatus } from '../task'; +import { addMiddlewareToChain } from './middleware'; + +interface BeforeSaveOpts { + taskInstance: TaskInstance; +} + +const getMockTaskInstance = () => ({ + taskType: 'nice_task', + state: {}, + params: { abc: 'def' }, +}); +const getMockConcreteTaskInstance = () => { + const concrete: { + id: string; + version: number; + attempts: number; + status: TaskStatus; + runAt: Date; + state: any; + taskType: string; + params: any; + } = { + id: 'hy8o99o83', + version: 1, + attempts: 0, + status: 'idle', + runAt: new Date(moment('2018-09-18T05:33:09.588Z').valueOf()), + state: {}, + taskType: 'nice_task', + params: { abc: 'def' }, + }; + return concrete; +}; +const getMockRunContext = (runTask: ConcreteTaskInstance) => ({ + taskInstance: runTask, + kbnServer: {}, +}); + +const defaultBeforeSave = async (opts: BeforeSaveOpts) => { + return opts; +}; + +const defaultBeforeRun = async (opts: RunContext) => { + return opts; +}; + +describe('addMiddlewareToChain', () => { + it('chains the beforeSave functions', () => { + const m1 = { + beforeSave: async (opts: BeforeSaveOpts) => { + Object.assign(opts.taskInstance.params, { m1: true }); + return opts; + }, + beforeRun: defaultBeforeRun, + }; + const m2 = { + beforeSave: async (opts: BeforeSaveOpts) => { + Object.assign(opts.taskInstance.params, { m2: true }); + return opts; + }, + beforeRun: defaultBeforeRun, + }; + const m3 = { + beforeSave: async (opts: BeforeSaveOpts) => { + Object.assign(opts.taskInstance.params, { m3: true }); + return opts; + }, + beforeRun: defaultBeforeRun, + }; + + let middlewareChain; + middlewareChain = addMiddlewareToChain(m1, m2); + middlewareChain = addMiddlewareToChain(middlewareChain, m3); + + middlewareChain.beforeSave({ taskInstance: getMockTaskInstance() }).then((saveOpts: any) => { + expect(saveOpts).toMatchInlineSnapshot(` +Object { + "taskInstance": Object { + "params": Object { + "abc": "def", + "m1": true, + "m2": true, + "m3": true, + }, + "state": Object {}, + "taskType": "nice_task", + }, +} +`); + }); + }); + + it('chains the beforeRun functions', () => { + const m1 = { + beforeSave: defaultBeforeSave, + beforeRun: async (opts: RunContext) => { + return { + ...opts, + m1: true, + }; + }, + }; + const m2 = { + beforeSave: defaultBeforeSave, + beforeRun: async (opts: RunContext) => { + return { + ...opts, + m2: true, + }; + }, + }; + const m3 = { + beforeSave: defaultBeforeSave, + beforeRun: async (opts: RunContext) => { + return { + ...opts, + m3: true, + }; + }, + }; + + let middlewareChain; + middlewareChain = addMiddlewareToChain(m1, m2); + middlewareChain = addMiddlewareToChain(middlewareChain, m3); + + middlewareChain + .beforeRun(getMockRunContext(getMockConcreteTaskInstance())) + .then(contextOpts => { + expect(contextOpts).toMatchInlineSnapshot(` +Object { + "kbnServer": Object {}, + "m1": true, + "m2": true, + "m3": true, + "taskInstance": Object { + "attempts": 0, + "id": "hy8o99o83", + "params": Object { + "abc": "def", + }, + "runAt": 2018-09-18T05:33:09.588Z, + "state": Object {}, + "status": "idle", + "taskType": "nice_task", + "version": 1, + }, +} +`); + }); + }); +}); diff --git a/x-pack/plugins/task_manager/lib/middleware.ts b/x-pack/plugins/task_manager/lib/middleware.ts new file mode 100644 index 0000000000000..d81b76fda7e50 --- /dev/null +++ b/x-pack/plugins/task_manager/lib/middleware.ts @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { RunContext, TaskInstance } from '../task'; + +/* + * BeforeSaveMiddlewareParams is nearly identical to RunContext, but + * taskInstance is before save (no _id property) + * + * taskInstance property is guaranteed to exist. The params can optionally + * include fields from an "options" object passed as the 2nd parameter to + * taskManager.schedule() + */ +export interface BeforeSaveMiddlewareParams { + taskInstance: TaskInstance; +} + +export type BeforeSaveFunction = ( + params: BeforeSaveMiddlewareParams +) => Promise; + +export type BeforeRunFunction = (params: RunContext) => Promise; + +export interface Middleware { + beforeSave: BeforeSaveFunction; + beforeRun: BeforeRunFunction; +} + +export function addMiddlewareToChain(prevMiddleware: Middleware, middleware: Middleware) { + const beforeSave = middleware.beforeSave + ? (params: BeforeSaveMiddlewareParams) => + middleware.beforeSave(params).then(prevMiddleware.beforeSave) + : prevMiddleware.beforeSave; + + const beforeRun = middleware.beforeRun + ? (params: RunContext) => middleware.beforeRun(params).then(prevMiddleware.beforeRun) + : prevMiddleware.beforeRun; + + return { + beforeSave, + beforeRun, + }; +} diff --git a/x-pack/plugins/task_manager/lib/sanitize_task_definitions.test.ts b/x-pack/plugins/task_manager/lib/sanitize_task_definitions.test.ts new file mode 100644 index 0000000000000..af6ee536498b4 --- /dev/null +++ b/x-pack/plugins/task_manager/lib/sanitize_task_definitions.test.ts @@ -0,0 +1,174 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { get } from 'lodash'; +import { RunContext } from '../task'; +import { sanitizeTaskDefinitions } from './sanitize_task_definitions'; + +interface Opts { + numTasks: number; + numWorkers?: number; +} + +const getMockTaskDefinitions = (opts: Opts) => { + const { numTasks, numWorkers } = opts; + const tasks: any = {}; + + for (let i = 0; i < numTasks; i++) { + const type = `test_task_type_${i}`; + tasks[type] = { + type, + title: 'Test', + description: 'one super cool task', + numWorkers: numWorkers ? numWorkers : 1, + createTaskRunner(context: RunContext) { + const incre = get(context, 'taskInstance.state.incre', -1); + return { + run: () => ({ + state: { + incre: incre + 1, + }, + runAt: Date.now(), + }), + }; + }, + }; + } + return tasks; +}; + +describe('sanitizeTaskDefinitions', () => { + it('provides tasks with defaults if there are no overrides', () => { + const maxWorkers = 10; + const overrideNumWorkers = {}; + const taskDefinitions = getMockTaskDefinitions({ numTasks: 3 }); + const result = sanitizeTaskDefinitions(taskDefinitions, maxWorkers, overrideNumWorkers); + + expect(result).toMatchInlineSnapshot(` +Object { + "test_task_type_0": Object { + "createTaskRunner": [Function], + "description": "one super cool task", + "numWorkers": 1, + "timeout": "5m", + "title": "Test", + "type": "test_task_type_0", + }, + "test_task_type_1": Object { + "createTaskRunner": [Function], + "description": "one super cool task", + "numWorkers": 1, + "timeout": "5m", + "title": "Test", + "type": "test_task_type_1", + }, + "test_task_type_2": Object { + "createTaskRunner": [Function], + "description": "one super cool task", + "numWorkers": 1, + "timeout": "5m", + "title": "Test", + "type": "test_task_type_2", + }, +} +`); + }); + + it('scales down task definitions workers if larger than max workers', () => { + const maxWorkers = 2; + const overrideNumWorkers = {}; + const taskDefinitions = getMockTaskDefinitions({ numTasks: 2, numWorkers: 5 }); + const result = sanitizeTaskDefinitions(taskDefinitions, maxWorkers, overrideNumWorkers); + + expect(result).toMatchInlineSnapshot(` +Object { + "test_task_type_0": Object { + "createTaskRunner": [Function], + "description": "one super cool task", + "numWorkers": 2, + "timeout": "5m", + "title": "Test", + "type": "test_task_type_0", + }, + "test_task_type_1": Object { + "createTaskRunner": [Function], + "description": "one super cool task", + "numWorkers": 2, + "timeout": "5m", + "title": "Test", + "type": "test_task_type_1", + }, +} +`); + }); + + it('incorporates overrideNumWorkers to give certain type an override of number of workers', () => { + const overrideNumWorkers = { + test_task_type_0: 5, + test_task_type_1: 2, + }; + const maxWorkers = 5; + const taskDefinitions = getMockTaskDefinitions({ numTasks: 3 }); + const result = sanitizeTaskDefinitions(taskDefinitions, maxWorkers, overrideNumWorkers); + + expect(result).toMatchInlineSnapshot(` +Object { + "test_task_type_0": Object { + "createTaskRunner": [Function], + "description": "one super cool task", + "numWorkers": 5, + "timeout": "5m", + "title": "Test", + "type": "test_task_type_0", + }, + "test_task_type_1": Object { + "createTaskRunner": [Function], + "description": "one super cool task", + "numWorkers": 2, + "timeout": "5m", + "title": "Test", + "type": "test_task_type_1", + }, + "test_task_type_2": Object { + "createTaskRunner": [Function], + "description": "one super cool task", + "numWorkers": 1, + "timeout": "5m", + "title": "Test", + "type": "test_task_type_2", + }, +} +`); + }); + + it('throws a validation exception for invalid task definition', () => { + const runsanitize = () => { + const maxWorkers = 10; + const overrideNumWorkers = {}; + const taskDefinitions = { + some_kind_of_task: { + fail: 'extremely', // cause a validation failure + type: 'breaky_task', + title: 'Test XYZ', + description: `Actually this won't work`, + createTaskRunner() { + return { + async run() { + return { + state: {}, + }; + }, + }; + }, + }, + }; + + return sanitizeTaskDefinitions(taskDefinitions, maxWorkers, overrideNumWorkers); + }; + + expect(runsanitize).toThrowError(); + }); +}); diff --git a/x-pack/plugins/task_manager/lib/sanitize_task_definitions.ts b/x-pack/plugins/task_manager/lib/sanitize_task_definitions.ts new file mode 100644 index 0000000000000..07d7cdd0df407 --- /dev/null +++ b/x-pack/plugins/task_manager/lib/sanitize_task_definitions.ts @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import Joi from 'joi'; +import { + SanitizedTaskDefinition, + TaskDefinition, + TaskDictionary, + validateTaskDefinition, +} from '../task'; + +/** + * Sanitizes the system's task definitions. Task definitions have optional properties, and + * this ensures they all are given a reasonable default. This also overrides certain task + * definition properties with kibana.yml overrides (such as the `override_num_workers` config + * value). + * + * @param maxWorkers - The maxiumum numer of workers allowed to run at once + * @param taskDefinitions - The Kibana task definitions dictionary + * @param overrideNumWorkers - The kibana.yml overrides numWorkers per task type. + */ +export function sanitizeTaskDefinitions( + taskDefinitions: TaskDictionary = {}, + maxWorkers: number, + overrideNumWorkers: { [taskType: string]: number } +): TaskDictionary { + return Object.keys(taskDefinitions).reduce( + (acc, type) => { + const rawDefinition = taskDefinitions[type]; + rawDefinition.type = type; + const definition = Joi.attempt(rawDefinition, validateTaskDefinition) as TaskDefinition; + const numWorkers = Math.min( + maxWorkers, + overrideNumWorkers[definition.type] || definition.numWorkers || 1 + ); + + acc[type] = { + ...definition, + numWorkers, + }; + + return acc; + }, + {} as TaskDictionary + ); +} diff --git a/x-pack/plugins/task_manager/task.ts b/x-pack/plugins/task_manager/task.ts new file mode 100644 index 0000000000000..71f61ea0576b8 --- /dev/null +++ b/x-pack/plugins/task_manager/task.ts @@ -0,0 +1,236 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import Joi from 'joi'; + +/* + * Type definitions and validations for tasks. + */ + +/** + * A loosely typed definition of the elasticjs wrapper. It's beyond the scope + * of this work to try to make a comprehensive type definition of this. + */ +export type ElasticJs = (action: string, args: any) => Promise; + +/** + * The run context is passed into a task's run function as its sole argument. + */ +export interface RunContext { + /** + * The Kibana server object. This gives tasks full-access to the server object, + * including the various ES options client functions + */ + kbnServer: object; + + /** + * The document describing the task instance, its params, state, id, etc. + */ + taskInstance: ConcreteTaskInstance; +} + +/** + * The return value of a task's run function should be a promise of RunResult. + */ +export interface RunResult { + /** + * Specifies the next run date / time for this task. If unspecified, this is + * treated as a single-run task, and will not be rescheduled after + * completion. + */ + runAt?: Date; + + /** + * If specified, indicates that the task failed to accomplish its work. This is + * logged out as a warning, and the task will be reattempted after a delay. + */ + error?: object; + + /** + * The state which will be passed to the next run of this task (if this is a + * recurring task). See the RunContext type definition for more details. + */ + state: object; +} + +export const validateRunResult = Joi.object({ + runAt: Joi.date().optional(), + error: Joi.object().optional(), + state: Joi.object().optional(), +}).optional(); + +export type RunFunction = () => Promise; + +export type CancelFunction = () => Promise; + +export interface CancellableTask { + run: RunFunction; + cancel?: CancelFunction; +} + +export type TaskRunCreatorFunction = (context: RunContext) => CancellableTask; + +/** + * Defines a task which can be scheduled and run by the Kibana + * task manager. + */ +export interface TaskDefinition { + /** + * A unique identifier for the type of task being defined. + */ + type: string; + + /** + * A brief, human-friendly title for this task. + */ + title: string; + + /** + * An optional more detailed description of what this task does. + */ + description?: string; + + /** + * How long, in minutes, the system should wait for the task to complete + * before it is considered to be timed out. (e.g. '5m', the default). If + * the task takes longer than this, Kibana will send it a kill command and + * the task will be re-attempted. + */ + timeout?: string; + + /** + * The numer of workers / slots a running instance of this task occupies. + * This defaults to 1. + */ + numWorkers?: number; + + /** + * Creates an object that has a run function which performs the task's work, + * and an optional cancel function which cancels the task. + */ + createTaskRunner: TaskRunCreatorFunction; +} + +/** + * A task definition with all of its properties set to a valid value. + */ +export interface SanitizedTaskDefinition extends TaskDefinition { + numWorkers: number; +} + +export const validateTaskDefinition = Joi.object({ + type: Joi.string().required(), + title: Joi.string().optional(), + description: Joi.string().optional(), + timeout: Joi.string().default('5m'), + numWorkers: Joi.number() + .min(1) + .default(1), + createTaskRunner: Joi.func().required(), +}).default(); + +/** + * A dictionary mapping task types to their definitions. + */ +export interface TaskDictionary { + [taskType: string]: T; +} + +export type TaskStatus = 'idle' | 'running' | 'failed'; + +/* + * A task instance represents all of the data required to store, fetch, + * and execute a task. + */ +export interface TaskInstance { + /** + * Optional ID that can be passed by the caller. When ID is undefined, ES + * will auto-generate a unique id. Otherwise, ID will be used to either + * create a new document, or update existing document + */ + id?: string; + + /** + * The task definition type whose run function will execute this instance. + */ + taskType: string; + + /** + * The date and time that this task is scheduled to be run. It is not + * guaranteed to run at this time, but it is guaranteed not to run earlier + * than this. Defaults to immediately. + */ + runAt?: Date; + + /** + * An interval in minutes (e.g. '5m'). If specified, this is a recurring task. + */ + interval?: string; + + /** + * A task-specific set of parameters, used by the task's run function to tailor + * its work. This is generally user-input, such as { sms: '333-444-2222' }. + */ + params: object; + + /** + * The state passed into the task's run function, and returned by the previous + * run. If there was no previous run, or if the previous run did not return + * any state, this will be the empy object: {} + */ + state: object; + + /** + * The id of the user who scheduled this task. + */ + user?: string; + + /** + * Used to group tasks for querying. So, reporting might schedule tasks with a scope of 'reporting', + * and then query such tasks to provide a glimpse at only reporting tasks, rather than at all tasks. + */ + scope?: string[]; +} + +/** + * A task instance that has an id and is ready for storage. + */ +export interface ConcreteTaskInstance extends TaskInstance { + /** + * The id of the Elastic document that stores this instance's data. This can + * be passed by the caller when scheduling the task. + */ + id: string; + + /** + * The version of the Elaticsearch document. + */ + version: number; + + /** + * The number of unsuccessful attempts since the last successful run. This + * will be zeroed out after a successful run. + */ + attempts: number; + + /** + * Indicates whether or not the task is currently running. + */ + status: TaskStatus; + + /** + * The date and time that this task is scheduled to be run. It is not guaranteed + * to run at this time, but it is guaranteed not to run earlier than this. + */ + runAt: Date; + + /** + * The state passed into the task's run function, and returned by the previous + * run. If there was no previous run, or if the previous run did not return + * any state, this will be the empy object: {} + */ + state: object; +} diff --git a/x-pack/plugins/task_manager/task_manager.test.ts b/x-pack/plugins/task_manager/task_manager.test.ts new file mode 100644 index 0000000000000..72cdd890f56b0 --- /dev/null +++ b/x-pack/plugins/task_manager/task_manager.test.ts @@ -0,0 +1,120 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import _ from 'lodash'; +import sinon from 'sinon'; +import { TaskManager } from './task_manager'; + +describe('TaskManager', () => { + let clock: sinon.SinonFakeTimers; + const defaultConfig = { + task_manager: { + max_workers: 10, + override_num_workers: {}, + index: 'foo', + max_attempts: 9, + poll_interval: 6000000, + }, + }; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => clock.restore()); + + test('disallows schedule before init', async () => { + const { opts } = testOpts(); + const client = new TaskManager(opts.kbnServer, opts.server, opts.config); + const task = { + taskType: 'foo', + params: {}, + state: {}, + }; + await expect(client.schedule(task)).rejects.toThrow(/^NotInitialized: .*/i); + }); + + test('disallows fetch before init', async () => { + const { opts } = testOpts(); + const client = new TaskManager(opts.kbnServer, opts.server, opts.config); + await expect(client.fetch({})).rejects.toThrow(/^NotInitialized: .*/i); + }); + + test('disallows remove before init', async () => { + const { opts } = testOpts(); + const client = new TaskManager(opts.kbnServer, opts.server, opts.config); + await expect(client.remove('23')).rejects.toThrow(/^NotInitialized: .*/i); + }); + + test('allows middleware registration before init', () => { + const { opts } = testOpts(); + const client = new TaskManager(opts.kbnServer, opts.server, opts.config); + const middleware = { + beforeSave: async (saveOpts: any) => saveOpts, + beforeRun: async (runOpts: any) => runOpts, + }; + expect(() => client.addMiddleware(middleware)).not.toThrow(); + }); + + test('disallows middleware registration after init', async () => { + const { $test, opts } = testOpts(); + const client = new TaskManager(opts.kbnServer, opts.server, opts.config); + const middleware = { + beforeSave: async (saveOpts: any) => saveOpts, + beforeRun: async (runOpts: any) => runOpts, + }; + + $test.afterPluginsInit(); + + expect(() => client.addMiddleware(middleware)).toThrow( + /Cannot add middleware after the task manager is initialized/i + ); + }); + + function testOpts() { + const $test = { + events: {} as any, + afterPluginsInit: _.noop, + }; + + const opts = { + config: { + get: (path: string) => _.get(defaultConfig, path), + }, + kbnServer: { + uiExports: { + taskDefinitions: {}, + }, + afterPluginsInit(callback: any) { + $test.afterPluginsInit = callback; + }, + }, + server: { + log: sinon.spy(), + decorate(...args: any[]) { + _.set(opts, args.slice(0, -1), _.last(args)); + }, + plugins: { + elasticsearch: { + getCluster() { + return { callWithInternalUser: _.noop }; + }, + status: { + on(eventName: string, callback: () => any) { + $test.events[eventName] = callback; + }, + }, + }, + }, + }, + }; + + return { + $test, + opts, + }; + } +}); diff --git a/x-pack/plugins/task_manager/task_manager.ts b/x-pack/plugins/task_manager/task_manager.ts new file mode 100644 index 0000000000000..370ef6d4a2a56 --- /dev/null +++ b/x-pack/plugins/task_manager/task_manager.ts @@ -0,0 +1,192 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { fillPool } from './lib/fill_pool'; +import { Logger, TaskManagerLogger } from './lib/logger'; +import { addMiddlewareToChain, BeforeSaveMiddlewareParams, Middleware } from './lib/middleware'; +import { sanitizeTaskDefinitions } from './lib/sanitize_task_definitions'; +import { ConcreteTaskInstance, RunContext, TaskInstance } from './task'; +import { SanitizedTaskDefinition, TaskDefinition, TaskDictionary } from './task'; +import { TaskPoller } from './task_poller'; +import { TaskPool } from './task_pool'; +import { TaskManagerRunner } from './task_runner'; +import { FetchOpts, FetchResult, RemoveResult, TaskStore } from './task_store'; + +/* + * The TaskManager is the public interface into the task manager system. This glues together + * all of the disparate modules in one integration point. The task manager operates in two different ways: + * + * - pre-init, it allows middleware registration, but disallows task manipulation + * - post-init, it disallows middleware registration, but allows task manipulation + * + * Due to its complexity, this is mostly tested by integration tests (see readme). + */ + +/** + * The public interface into the task manager system. + */ +export class TaskManager { + private isInitialized = false; + private maxWorkers: number; + private overrideNumWorkers: { [taskType: string]: number }; + private definitions: TaskDictionary; + private store: TaskStore; + private poller: TaskPoller; + private logger: Logger; + private middleware = { + beforeSave: async (saveOpts: BeforeSaveMiddlewareParams) => saveOpts, + beforeRun: async (runOpts: RunContext) => runOpts, + }; + + /** + * Initializes the task manager, preventing any further addition of middleware, + * enabling the task manipulation methods, and beginning the background polling + * mechanism. + */ + public constructor(kbnServer: any, server: any, config: any) { + this.maxWorkers = config.get('xpack.task_manager.max_workers'); + this.overrideNumWorkers = config.get('xpack.task_manager.override_num_workers'); + this.definitions = {}; + + const logger = new TaskManagerLogger((...args: any[]) => server.log(...args)); + + const store = new TaskStore({ + callCluster: server.plugins.elasticsearch.getCluster('admin').callWithInternalUser, + index: config.get('xpack.task_manager.index'), + maxAttempts: config.get('xpack.task_manager.max_attempts'), + supportedTypes: Object.keys(this.definitions), + }); + const pool = new TaskPool({ + logger, + maxWorkers: this.maxWorkers, + }); + const createRunner = (instance: ConcreteTaskInstance) => + new TaskManagerRunner({ + logger, + kbnServer, + instance, + store, + definitions: this.definitions, + beforeRun: this.middleware.beforeRun, + }); + const poller = new TaskPoller({ + logger, + pollInterval: config.get('xpack.task_manager.poll_interval'), + work(): Promise { + return fillPool(pool.run, store.fetchAvailableTasks, createRunner); + }, + }); + + this.logger = logger; + this.store = store; + this.poller = poller; + + kbnServer.afterPluginsInit(async () => { + this.isInitialized = true; + store.addSupportedTypes(Object.keys(this.definitions)); + await store.init(); + await poller.start(); + }); + } + + /** + * Method for allowing consumers to register task definitions into the system. + * @param taskDefinitions - The Kibana task definitions dictionary + */ + public registerTaskDefinitions(taskDefinitions: TaskDictionary) { + this.assertUninitialized('register task definitions'); + const duplicate = Object.keys(taskDefinitions).find(k => !!this.definitions[k]); + if (duplicate) { + throw new Error(`Task ${duplicate} is already defined!`); + } + + try { + const sanitized = sanitizeTaskDefinitions( + taskDefinitions, + this.maxWorkers, + this.overrideNumWorkers + ); + + Object.assign(this.definitions, sanitized); + } catch (e) { + this.logger.error('Could not sanitize task definitions'); + } + } + + /** + * Adds middleware to the task manager, such as adding security layers, loggers, etc. + * + * @param {Middleware} middleware - The middlware being added. + */ + public addMiddleware(middleware: Middleware) { + this.assertUninitialized('add middleware'); + const prevMiddleWare = this.middleware; + this.middleware = addMiddlewareToChain(prevMiddleWare, middleware); + } + + /** + * Schedules a task. + * + * @param task - The task being scheduled. + * @returns {Promise} + */ + public async schedule(taskInstance: TaskInstance, options?: any): Promise { + this.assertInitialized('Tasks cannot be scheduled until after task manager is initialized!'); + const { taskInstance: modifiedTask } = await this.middleware.beforeSave({ + ...options, + taskInstance, + }); + const result = await this.store.schedule(modifiedTask); + this.poller.attemptWork(); + return result; + } + + /** + * Fetches a paginatable list of scheduled tasks. + * + * @param opts - The query options used to filter tasks + * @returns {Promise} + */ + public async fetch(opts: FetchOpts): Promise { + this.assertInitialized('Tasks cannot be fetched before task manager is initialized!'); + return this.store.fetch(opts); + } + + /** + * Removes the specified task from the index. + * + * @param {string} id + * @returns {Promise} + */ + public async remove(id: string): Promise { + this.assertInitialized('Tasks cannot be removed before task manager is initialized!'); + return this.store.remove(id); + } + + /** + * Ensures task manager IS NOT already initialized + * + * @param {string} message shown if task manager is already initialized + * @returns void + */ + private assertUninitialized(message: string) { + if (this.isInitialized) { + throw new Error(`Cannot ${message} after the task manager is initialized!`); + } + } + + /** + * Ensures task manager IS already initialized + * + * @param {string} message shown if task manager is not initialized + * @returns void + */ + private assertInitialized(message: string) { + if (!this.isInitialized) { + throw new Error(`NotInitialized: ${message}`); + } + } +} diff --git a/x-pack/plugins/task_manager/task_poller.test.ts b/x-pack/plugins/task_manager/task_poller.test.ts new file mode 100644 index 0000000000000..08bb21b5cdd5c --- /dev/null +++ b/x-pack/plugins/task_manager/task_poller.test.ts @@ -0,0 +1,135 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import _ from 'lodash'; +import sinon from 'sinon'; +import { TaskPoller } from './task_poller'; +import { mockLogger, resolvable, sleep } from './test_utils'; + +describe('TaskPoller', () => { + describe('interval tests', () => { + let clock: sinon.SinonFakeTimers; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => clock.restore()); + + test('runs the work function on an interval', async () => { + const pollInterval = _.random(10, 20); + const done = resolvable(); + const work = sinon.spy(() => { + done.resolve(); + return Promise.resolve(); + }); + const poller = new TaskPoller({ + pollInterval, + work, + logger: mockLogger(), + }); + + poller.start(); + + sinon.assert.calledOnce(work); + await done; + + clock.tick(pollInterval - 1); + sinon.assert.calledOnce(work); + clock.tick(1); + sinon.assert.calledTwice(work); + }); + }); + + test('logs, but does not crash if the work function fails', async () => { + let count = 0; + const logger = mockLogger(); + const doneWorking = resolvable(); + const poller = new TaskPoller({ + logger, + pollInterval: 1, + work: async () => { + ++count; + if (count === 1) { + throw new Error('Dang it!'); + } + if (count > 1) { + poller.stop(); + doneWorking.resolve(); + } + }, + }); + + poller.start(); + + await doneWorking; + + expect(count).toEqual(2); + sinon.assert.calledWithMatch(logger.error, /Dang it/i); + }); + + test('is stoppable', async () => { + const doneWorking = resolvable(); + const work = sinon.spy(async () => { + poller.stop(); + doneWorking.resolve(); + }); + + const poller = new TaskPoller({ + logger: mockLogger(), + pollInterval: 1, + work, + }); + + poller.start(); + await doneWorking; + await sleep(10); + + sinon.assert.calledOnce(work); + }); + + test('disregards duplicate calls to "start"', async () => { + const doneWorking = resolvable(); + const work = sinon.spy(async () => { + await doneWorking; + }); + const poller = new TaskPoller({ + pollInterval: 1, + logger: mockLogger(), + work, + }); + + poller.start(); + poller.start(); + poller.start(); + poller.start(); + + poller.stop(); + + doneWorking.resolve(); + + sinon.assert.calledOnce(work); + }); + + test('waits for work before polling', async () => { + const doneWorking = resolvable(); + const work = sinon.spy(async () => { + await sleep(10); + poller.stop(); + doneWorking.resolve(); + }); + const poller = new TaskPoller({ + pollInterval: 1, + logger: mockLogger(), + work, + }); + + poller.start(); + await doneWorking; + + sinon.assert.calledOnce(work); + }); +}); diff --git a/x-pack/plugins/task_manager/task_poller.ts b/x-pack/plugins/task_manager/task_poller.ts new file mode 100644 index 0000000000000..9aefdeb98fc40 --- /dev/null +++ b/x-pack/plugins/task_manager/task_poller.ts @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +/* + * This module contains the logic for polling the task manager index for new work. + */ + +import { Logger } from './lib/logger'; + +type WorkFn = () => Promise; + +interface Opts { + pollInterval: number; + logger: Logger; + work: WorkFn; +} + +/** + * Performs work on a scheduled interval, logging any errors. This waits for work to complete + * (or error) prior to attempting another run. + */ +export class TaskPoller { + private isStarted = false; + private isWorking = false; + private timeout: any; + private pollInterval: number; + private logger: Logger; + private work: WorkFn; + + /** + * Constructs a new TaskPoller. + * + * @param opts + * @prop {number} pollInterval - How often, in milliseconds, we will run the work function + * @prop {Logger} logger - The task manager logger + * @prop {WorkFn} work - An empty, asynchronous function that performs the desired work + */ + constructor(opts: Opts) { + this.pollInterval = opts.pollInterval; + this.logger = opts.logger; + this.work = opts.work; + } + + /** + * Starts the poller. If the poller is already running, this has no effect. + */ + public async start() { + if (this.isStarted) { + return; + } + this.isStarted = true; + + const poll = async () => { + await this.attemptWork(); + + if (this.isStarted) { + this.timeout = setTimeout(poll, this.pollInterval); + } + }; + + poll(); + } + + /** + * Stops the poller. + */ + public stop() { + this.isStarted = false; + clearTimeout(this.timeout); + this.timeout = undefined; + } + + /** + * Runs the work function. If the work function is currently running, + * this has no effect. + */ + public async attemptWork() { + if (!this.isStarted || this.isWorking) { + return; + } + + this.isWorking = true; + + try { + await this.work(); + } catch (err) { + this.logger.error(`Failed to poll for work: ${err}`); + } finally { + this.isWorking = false; + } + } +} diff --git a/x-pack/plugins/task_manager/task_pool.test.ts b/x-pack/plugins/task_manager/task_pool.test.ts new file mode 100644 index 0000000000000..b883831f2c6f9 --- /dev/null +++ b/x-pack/plugins/task_manager/task_pool.test.ts @@ -0,0 +1,207 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import sinon from 'sinon'; +import { TaskPool } from './task_pool'; +import { mockLogger, resolvable, sleep } from './test_utils'; + +describe('TaskPool', () => { + test('occupiedWorkers are a sum of worker costs', async () => { + const pool = new TaskPool({ + maxWorkers: 200, + logger: mockLogger(), + }); + + const result = await pool.run([ + { ...mockTask(), numWorkers: 10 }, + { ...mockTask(), numWorkers: 20 }, + { ...mockTask(), numWorkers: 30 }, + ]); + + expect(result).toBeTruthy(); + expect(pool.occupiedWorkers).toEqual(60); + }); + + test('availableWorkers are a function of total_capacity - occupiedWorkers', async () => { + const pool = new TaskPool({ + maxWorkers: 100, + logger: mockLogger(), + }); + + const result = await pool.run([ + { ...mockTask(), numWorkers: 20 }, + { ...mockTask(), numWorkers: 30 }, + { ...mockTask(), numWorkers: 40 }, + ]); + + expect(result).toBeTruthy(); + expect(pool.availableWorkers).toEqual(10); + }); + + test('does not run tasks that are beyond its available capacity', async () => { + const pool = new TaskPool({ + maxWorkers: 10, + logger: mockLogger(), + }); + + const shouldRun = mockRun(); + const shouldNotRun = mockRun(); + + const result = await pool.run([ + { ...mockTask(), numWorkers: 9, run: shouldRun }, + { ...mockTask(), numWorkers: 9, run: shouldNotRun }, + ]); + + expect(result).toBeFalsy(); + expect(pool.availableWorkers).toEqual(1); + sinon.assert.calledOnce(shouldRun); + sinon.assert.notCalled(shouldNotRun); + }); + + test('clears up capacity when a task completes', async () => { + const pool = new TaskPool({ + maxWorkers: 10, + logger: mockLogger(), + }); + + const firstWork = resolvable(); + const firstRun = sinon.spy(async () => { + await sleep(0); + firstWork.resolve(); + }); + const secondWork = resolvable(); + const secondRun = sinon.spy(async () => { + await sleep(0); + secondWork.resolve(); + }); + + const result = await pool.run([ + { ...mockTask(), numWorkers: 9, run: firstRun }, + { ...mockTask(), numWorkers: 2, run: secondRun }, + ]); + + expect(result).toBeFalsy(); + expect(pool.occupiedWorkers).toEqual(9); + expect(pool.availableWorkers).toEqual(1); + + await firstWork; + sinon.assert.calledOnce(firstRun); + sinon.assert.notCalled(secondRun); + + await pool.run([{ ...mockTask(), numWorkers: 2, run: secondRun }]); + + expect(pool.occupiedWorkers).toEqual(2); + expect(pool.availableWorkers).toEqual(8); + + await secondWork; + + expect(pool.occupiedWorkers).toEqual(0); + expect(pool.availableWorkers).toEqual(10); + sinon.assert.calledOnce(secondRun); + }); + + test('run cancels expired tasks prior to running new tasks', async () => { + const pool = new TaskPool({ + maxWorkers: 10, + logger: mockLogger(), + }); + + const expired = resolvable(); + const shouldRun = sinon.spy(() => Promise.resolve()); + const shouldNotRun = sinon.spy(() => Promise.resolve()); + const result = await pool.run([ + { + ...mockTask(), + numWorkers: 9, + async run() { + this.isExpired = true; + expired.resolve(); + await sleep(10); + return { + state: {}, + }; + }, + cancel: shouldRun, + }, + { + ...mockTask(), + numWorkers: 1, + async run() { + await sleep(10); + return { + state: {}, + }; + }, + cancel: shouldNotRun, + }, + ]); + + expect(result).toBeTruthy(); + expect(pool.occupiedWorkers).toEqual(10); + expect(pool.availableWorkers).toEqual(0); + + await expired; + + expect(await pool.run([{ ...mockTask(), numWorkers: 7 }])).toBeTruthy(); + sinon.assert.calledOnce(shouldRun); + sinon.assert.notCalled(shouldNotRun); + + expect(pool.occupiedWorkers).toEqual(8); + expect(pool.availableWorkers).toEqual(2); + }); + + test('logs if cancellation errors', async () => { + const logger = mockLogger(); + const pool = new TaskPool({ + logger, + maxWorkers: 20, + }); + + const cancelled = resolvable(); + const result = await pool.run([ + { + ...mockTask(), + numWorkers: 7, + async run() { + this.isExpired = true; + await sleep(10); + return { + state: {}, + }; + }, + async cancel() { + cancelled.resolve(); + throw new Error('Dern!'); + }, + toString: () => '"shooooo!"', + }, + ]); + + expect(result).toBeTruthy(); + await pool.run([]); + + expect(pool.occupiedWorkers).toEqual(0); + + // Allow the task to cancel... + await cancelled; + + sinon.assert.calledWithMatch(logger.error, /Failed to cancel task "shooooo!"/); + }); + + function mockRun() { + return sinon.spy(async () => sleep(0)); + } + + function mockTask() { + return { + numWorkers: 1, + isExpired: false, + cancel: async () => undefined, + claimOwnership: async () => true, + run: mockRun(), + }; + } +}); diff --git a/x-pack/plugins/task_manager/task_pool.ts b/x-pack/plugins/task_manager/task_pool.ts new file mode 100644 index 0000000000000..839f022dae75a --- /dev/null +++ b/x-pack/plugins/task_manager/task_pool.ts @@ -0,0 +1,106 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +/* + * This module contains the logic that ensures we don't run too many + * tasks at once in a given Kibana instance. + */ + +import { Logger } from './lib/logger'; +import { TaskRunner } from './task_runner'; + +interface Opts { + maxWorkers: number; + logger: Logger; +} + +/** + * Runs tasks in batches, taking costs into account. + */ +export class TaskPool { + private maxWorkers: number; + private running = new Set(); + private logger: Logger; + + /** + * Creates an instance of TaskPool. + * + * @param {Opts} opts + * @prop {number} maxWorkers - The total number of workers / work slots available + * (e.g. maxWorkers is 4, then 2 tasks of cost 2 can run at a time, or 4 tasks of cost 1) + * @prop {Logger} logger - The task manager logger. + */ + constructor(opts: Opts) { + this.maxWorkers = opts.maxWorkers; + this.logger = opts.logger; + } + + /** + * Gets how many workers are currently in use. + */ + get occupiedWorkers() { + const running = Array.from(this.running); // get array from a Set + return running.reduce((total, { numWorkers }) => (total += numWorkers), 0); + } + + /** + * Gets how many workers are currently available. + */ + get availableWorkers() { + return this.maxWorkers - this.occupiedWorkers; + } + + /** + * Attempts to run the specified list of tasks. Returns true if it was able + * to start every task in the list, false if there was not enough capacity + * to run every task. + * + * @param {TaskRunner[]} tasks + * @returns {Promise} + */ + public run = (tasks: TaskRunner[]) => { + this.cancelExpiredTasks(); + return this.attemptToRun(tasks); + }; + + private async attemptToRun(tasks: TaskRunner[]) { + for (const task of tasks) { + if (this.availableWorkers < task.numWorkers) { + return false; + } + + if (await task.claimOwnership()) { + this.running.add(task); + task + .run() + .catch(err => { + this.logger.warning(`Task ${task} failed in attempt to run: ${err}`); + }) + .then(() => this.running.delete(task)); + } + } + + return true; + } + + private cancelExpiredTasks() { + for (const task of this.running) { + if (task.isExpired) { + this.cancelTask(task); + } + } + } + + private async cancelTask(task: TaskRunner) { + try { + this.logger.debug(`Cancelling expired task ${task}.`); + this.running.delete(task); + await task.cancel(); + } catch (err) { + this.logger.error(`Failed to cancel task ${task}: ${err}`); + } + } +} diff --git a/x-pack/plugins/task_manager/task_runner.test.ts b/x-pack/plugins/task_manager/task_runner.test.ts new file mode 100644 index 0000000000000..a3866c65f9d19 --- /dev/null +++ b/x-pack/plugins/task_manager/task_runner.test.ts @@ -0,0 +1,289 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import _ from 'lodash'; +import sinon from 'sinon'; +import { minutesFromNow } from './lib/intervals'; +import { ConcreteTaskInstance } from './task'; +import { TaskManagerRunner } from './task_runner'; + +describe('TaskManagerRunner', () => { + test('provides details about the task that is running', () => { + const { runner } = testOpts({ + instance: { + id: 'foo', + taskType: 'bar', + }, + }); + + expect(runner.id).toEqual('foo'); + expect(runner.taskType).toEqual('bar'); + expect(runner.toString()).toEqual('bar "foo"'); + }); + + test('warns if the task returns an unexpected result', async () => { + await allowsReturnType(undefined); + await allowsReturnType({}); + await allowsReturnType({ + runAt: new Date(), + }); + await allowsReturnType({ + error: new Error('Dang it!'), + }); + await allowsReturnType({ + state: { shazm: true }, + }); + await disallowsReturnType('hm....'); + await disallowsReturnType({ + whatIsThis: '?!!?', + }); + }); + + test('queues a reattempt if the task fails', async () => { + const initialAttempts = _.random(0, 2); + const id = Date.now().toString(); + const { runner, store } = testOpts({ + instance: { + id, + attempts: initialAttempts, + params: { a: 'b' }, + state: { hey: 'there' }, + }, + definitions: { + testtype: { + createTaskRunner: () => ({ + async run() { + throw new Error('Dangit!'); + }, + }), + }, + }, + }); + + await runner.run(); + + sinon.assert.calledOnce(store.update); + const instance = store.update.args[0][0]; + + expect(instance.id).toEqual(id); + expect(instance.attempts).toEqual(initialAttempts + 1); + expect(instance.runAt.getTime()).toBeGreaterThan(Date.now()); + expect(instance.params).toEqual({ a: 'b' }); + expect(instance.state).toEqual({ hey: 'there' }); + }); + + test('reschedules tasks that have an interval', async () => { + const { runner, store } = testOpts({ + instance: { + interval: '10m', + }, + }); + + await runner.run(); + + sinon.assert.calledOnce(store.update); + const instance = store.update.args[0][0]; + + expect(instance.runAt.getTime()).toBeGreaterThan(minutesFromNow(9).getTime()); + expect(instance.runAt.getTime()).toBeLessThanOrEqual(minutesFromNow(10).getTime()); + }); + + test('reschedules tasks that return a runAt', async () => { + const runAt = minutesFromNow(_.random(1, 10)); + const { runner, store } = testOpts({ + definitions: { + bar: { + createTaskRunner: () => ({ + async run() { + return { runAt }; + }, + }), + }, + }, + }); + + await runner.run(); + + sinon.assert.calledOnce(store.update); + sinon.assert.calledWithMatch(store.update, { runAt }); + }); + + test('tasks that return runAt override interval', async () => { + const runAt = minutesFromNow(_.random(5)); + const { runner, store } = testOpts({ + instance: { + interval: '20m', + }, + definitions: { + bar: { + createTaskRunner: () => ({ + async run() { + return { runAt }; + }, + }), + }, + }, + }); + + await runner.run(); + + sinon.assert.calledOnce(store.update); + sinon.assert.calledWithMatch(store.update, { runAt }); + }); + + test('removes non-recurring tasks after they complete', async () => { + const id = _.random(1, 20).toString(); + const { runner, store } = testOpts({ + instance: { + id, + interval: undefined, + }, + definitions: { + bar: { + createTaskRunner: () => ({ + async run() { + return undefined; + }, + }), + }, + }, + }); + + await runner.run(); + + sinon.assert.calledOnce(store.remove); + sinon.assert.calledWith(store.remove, id); + }); + + test('cancel cancels the task runner, if it is cancellable', async () => { + let wasCancelled = false; + const { runner, logger } = testOpts({ + definitions: { + bar: { + createTaskRunner: () => ({ + async run() { + await new Promise(r => setTimeout(r, 1000)); + }, + async cancel() { + wasCancelled = true; + }, + }), + }, + }, + }); + + const promise = runner.run(); + await new Promise(r => setInterval(r, 1)); + await runner.cancel(); + await promise; + + expect(wasCancelled).toBeTruthy(); + sinon.assert.neverCalledWithMatch(logger.warning, /not cancellable/); + }); + + test('warns if cancel is called on a non-cancellable task', async () => { + const { runner, logger } = testOpts({ + definitions: { + testType: { + createTaskRunner: () => ({ + run: async () => undefined, + }), + }, + }, + }); + + const promise = runner.run(); + await runner.cancel(); + await promise; + + sinon.assert.calledWithMatch(logger.warning, /not cancellable/); + }); + + interface TestOpts { + instance?: Partial; + definitions?: any; + } + + function testOpts(opts: TestOpts) { + const callCluster = sinon.stub(); + const createTaskRunner = sinon.stub(); + const logger = { + error: sinon.stub(), + debug: sinon.stub(), + info: sinon.stub(), + warning: sinon.stub(), + }; + const store = { + update: sinon.stub(), + remove: sinon.stub(), + maxAttempts: 5, + }; + const runner = new TaskManagerRunner({ + kbnServer: sinon.stub(), + beforeRun: context => Promise.resolve(context), + logger, + store, + instance: Object.assign( + { + id: 'foo', + taskType: 'bar', + version: 32, + runAt: new Date(), + attempts: 0, + params: {}, + scope: ['reporting'], + state: {}, + status: 'idle', + user: 'example', + }, + opts.instance || {} + ), + definitions: Object.assign(opts.definitions || {}, { + testbar: { + type: 'bar', + title: 'Bar!', + createTaskRunner, + }, + }), + }); + + return { + callCluster, + createTaskRunner, + runner, + logger, + store, + }; + } + + async function testReturn(result: any, shouldBeValid: boolean) { + const { runner, logger } = testOpts({ + definitions: { + bar: { + createTaskRunner: () => ({ + run: async () => result, + }), + }, + }, + }); + + await runner.run(); + + if (shouldBeValid) { + sinon.assert.notCalled(logger.warning); + } else { + sinon.assert.calledWith(logger.warning, sinon.match(/invalid task result/i)); + } + } + + function allowsReturnType(result: any) { + return testReturn(result, true); + } + + function disallowsReturnType(result: any) { + return testReturn(result, false); + } +}); diff --git a/x-pack/plugins/task_manager/task_runner.ts b/x-pack/plugins/task_manager/task_runner.ts new file mode 100644 index 0000000000000..d5e0196d80864 --- /dev/null +++ b/x-pack/plugins/task_manager/task_runner.ts @@ -0,0 +1,273 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +/* + * This module contains the core logic for running an individual task. + * It handles the full lifecycle of a task run, including error handling, + * rescheduling, middleware application, etc. + */ + +import Joi from 'joi'; +import { intervalFromNow, minutesFromNow } from './lib/intervals'; +import { Logger } from './lib/logger'; +import { BeforeRunFunction } from './lib/middleware'; +import { + CancelFunction, + CancellableTask, + ConcreteTaskInstance, + RunResult, + SanitizedTaskDefinition, + TaskDictionary, + validateRunResult, +} from './task'; +import { RemoveResult } from './task_store'; + +export interface TaskRunner { + numWorkers: number; + isExpired: boolean; + cancel: CancelFunction; + claimOwnership: () => Promise; + run: () => Promise; + toString?: () => string; +} + +interface Updatable { + readonly maxAttempts: number; + update(doc: ConcreteTaskInstance): Promise; + remove(id: string): Promise; +} + +interface Opts { + logger: Logger; + definitions: TaskDictionary; + instance: ConcreteTaskInstance; + store: Updatable; + kbnServer: any; + beforeRun: BeforeRunFunction; +} + +/** + * Runs a background task, ensures that errors are properly handled, + * allows for cancellation. + * + * @export + * @class TaskManagerRunner + * @implements {TaskRunner} + */ +export class TaskManagerRunner implements TaskRunner { + private task?: CancellableTask; + private instance: ConcreteTaskInstance; + private definitions: TaskDictionary; + private logger: Logger; + private store: Updatable; + private kbnServer: any; + private beforeRun: BeforeRunFunction; + + /** + * Creates an instance of TaskManagerRunner. + * @param {Opts} opts + * @prop {Logger} logger - The task manager logger + * @prop {TaskDefinition} definition - The definition of the task being run + * @prop {ConcreteTaskInstance} instance - The record describing this particular task instance + * @prop {Updatable} store - The store used to read / write tasks instance info + * @prop {kbnServer} kbnServer - An async function that provides the task's run context + * @prop {BeforeRunFunction} beforeRun - A function that adjusts the run context prior to running the task + * @memberof TaskManagerRunner + */ + constructor(opts: Opts) { + this.instance = sanitizeInstance(opts.instance); + this.definitions = opts.definitions; + this.logger = opts.logger; + this.store = opts.store; + this.kbnServer = opts.kbnServer; + this.beforeRun = opts.beforeRun; + } + + /** + * Gets how many workers are occupied by this task instance. + * Per Joi validation logic, this will return a number >= 1 + */ + public get numWorkers() { + return this.definition.numWorkers; + } + + /** + * Gets the id of this task instance. + */ + public get id() { + return this.instance.id; + } + + /** + * Gets the task type of this task instance. + */ + public get taskType() { + return this.instance.taskType; + } + + /** + * Gets the task defintion from the dictionary. + */ + public get definition() { + return this.definitions[this.taskType]; + } + + /** + * Gets whether or not this task has run longer than its expiration setting allows. + */ + public get isExpired() { + return this.instance.runAt < new Date(); + } + + /** + * Returns a log-friendly representation of this task. + */ + public toString() { + return `${this.taskType} "${this.id}"`; + } + + /** + * Runs the task, handling the task result, errors, etc, rescheduling if need + * be. NOTE: the time of applying the middleware's beforeRun is incorporated + * into the total timeout time the task in configured with. We may decide to + * start the timer after beforeRun resolves + * + * @returns {Promise} + */ + public async run(): Promise { + this.logger.debug(`Running task ${this}`); + const modifiedContext = await this.beforeRun({ + kbnServer: this.kbnServer, + taskInstance: this.instance, + }); + + try { + this.task = this.definition.createTaskRunner(modifiedContext); + const result = await this.task.run(); + const validatedResult = this.validateResult(result); + return this.processResult(validatedResult); + } catch (err) { + this.logger.error(`Task ${this} failed: ${err}`); + + // in error scenario, we can not get the RunResult + // re-use modifiedContext's state, which is correct as of beforeRun + return this.processResult({ error: err, state: modifiedContext.taskInstance.state }); + } + } + + /** + * Attempts to claim exclusive rights to run the task. If the attempt fails + * with a 409 (http conflict), we assume another Kibana instance beat us to the punch. + * + * @returns {Promise} + */ + public async claimOwnership(): Promise { + const VERSION_CONFLICT_STATUS = 409; + + try { + this.instance = await this.store.update({ + ...this.instance, + status: 'running', + runAt: intervalFromNow(this.definition.timeout)!, + }); + + return true; + } catch (error) { + if (error.statusCode !== VERSION_CONFLICT_STATUS) { + throw error; + } + } + + return false; + } + + /** + * Attempts to cancel the task. + * + * @returns {Promise} + */ + public async cancel() { + const { task } = this; + if (task && task.cancel) { + this.task = undefined; + return task.cancel(); + } + + this.logger.warning(`The task ${this} is not cancellable.`); + } + + private validateResult(result?: RunResult | void): RunResult { + const { error } = Joi.validate(result, validateRunResult); + + if (error) { + this.logger.warning(`Invalid task result for ${this}: ${error.message}`); + } + + return result || { state: {} }; + } + + private async processResultForRecurringTask(result: RunResult): Promise { + // recurring task: update the task instance + const state = result.state || this.instance.state || {}; + const status = this.instance.attempts < this.store.maxAttempts ? 'idle' : 'failed'; + + let runAt; + if (status === 'failed') { + // task run errored, keep the same runAt + runAt = this.instance.runAt; + } else { + runAt = + result.runAt || + intervalFromNow(this.instance.interval) || + // when result.error is truthy, then we're retrying because it failed + minutesFromNow((this.instance.attempts + 1) * 5); // incrementally backs off an extra 5m per failure + } + + await this.store.update({ + ...this.instance, + runAt, + state, + status, + attempts: result.error ? this.instance.attempts + 1 : 0, + }); + + return result; + } + + private async processResultWhenDone(result: RunResult): Promise { + // not a recurring task: clean up by removing the task instance from store + try { + await this.store.remove(this.instance.id); + } catch (err) { + if (err.statusCode === 404) { + this.logger.warning( + `Task cleanup of ${this} failed in processing. Was remove called twice?` + ); + } else { + throw err; + } + } + + return result; + } + + private async processResult(result: RunResult): Promise { + if (result.runAt || this.instance.interval || result.error) { + await this.processResultForRecurringTask(result); + } else { + await this.processResultWhenDone(result); + } + return result; + } +} + +function sanitizeInstance(instance: ConcreteTaskInstance): ConcreteTaskInstance { + return { + ...instance, + params: instance.params || {}, + state: instance.state || {}, + }; +} diff --git a/x-pack/plugins/task_manager/task_store.test.ts b/x-pack/plugins/task_manager/task_store.test.ts new file mode 100644 index 0000000000000..a76ba03de0faf --- /dev/null +++ b/x-pack/plugins/task_manager/task_store.test.ts @@ -0,0 +1,515 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import _ from 'lodash'; +import sinon from 'sinon'; +import { TaskInstance, TaskStatus } from './task'; +import { FetchOpts, TaskStore } from './task_store'; + +describe('TaskStore', () => { + describe('init', () => { + test('creates the task manager index', async () => { + const callCluster = sinon.spy(); + const store = new TaskStore({ + callCluster, + index: 'tasky', + maxAttempts: 2, + supportedTypes: ['a', 'b', 'c'], + }); + + await store.init(); + + sinon.assert.calledOnce(callCluster); + + sinon.assert.calledWithMatch(callCluster, 'indices.putTemplate', { + body: { + index_patterns: ['tasky'], + settings: { + number_of_shards: 1, + auto_expand_replicas: '0-1', + }, + }, + name: 'tasky', + }); + }); + }); + + describe('schedule', () => { + async function testSchedule(task: TaskInstance) { + const callCluster = sinon.spy(() => + Promise.resolve({ + _id: 'testid', + _version: 3344, + }) + ); + const store = new TaskStore({ + callCluster, + index: 'tasky', + maxAttempts: 2, + supportedTypes: ['report', 'dernstraight', 'yawn'], + }); + const result = await store.schedule(task); + + sinon.assert.calledTwice(callCluster); + + return { result, callCluster, arg: callCluster.args[1][1] }; + } + + test('serializes the params and state', async () => { + const task = { + params: { hello: 'world' }, + state: { foo: 'bar' }, + taskType: 'report', + }; + const { callCluster, arg } = await testSchedule(task); + + sinon.assert.calledWith(callCluster, 'index'); + + expect(arg).toMatchObject({ + index: 'tasky', + type: '_doc', + body: { + task: { + params: JSON.stringify(task.params), + state: JSON.stringify(task.state), + }, + }, + }); + }); + + test('retiurns a concrete task instance', async () => { + const task = { + params: { hello: 'world' }, + state: { foo: 'bar' }, + taskType: 'report', + }; + const { result } = await testSchedule(task); + + expect(result).toMatchObject({ + ...task, + version: 3344, + id: 'testid', + }); + }); + + test('sets runAt to now if not specified', async () => { + const now = Date.now(); + const { arg } = await testSchedule({ taskType: 'dernstraight', params: {}, state: {} }); + expect(arg.body.task.runAt.getTime()).toBeGreaterThanOrEqual(now); + }); + + test('ensures params and state are not null', async () => { + const { arg } = await testSchedule({ taskType: 'yawn' } as any); + expect(arg.body.task.params).toEqual('{}'); + expect(arg.body.task.state).toEqual('{}'); + }); + + test('errors if the task type is unknown', async () => { + await expect(testSchedule({ taskType: 'nope', params: {}, state: {} })).rejects.toThrow( + /Unsupported task type "nope"/i + ); + }); + }); + + describe('fetch', () => { + async function testFetch(opts?: FetchOpts, hits: any[] = []) { + const callCluster = sinon.spy(async () => ({ hits: { hits } })); + const store = new TaskStore({ + callCluster, + index: 'tasky', + maxAttempts: 2, + supportedTypes: ['a', 'b', 'c'], + }); + + const result = await store.fetch(opts); + + sinon.assert.calledOnce(callCluster); + sinon.assert.calledWith(callCluster, 'search'); + + return { + result, + args: callCluster.args[0][1], + }; + } + + test('empty call filters by type, sorts by runAt and id', async () => { + const { args } = await testFetch(); + expect(args).toMatchObject({ + type: '_doc', + index: 'tasky', + body: { + sort: [{ 'task.runAt': 'asc' }, { _id: 'desc' }], + query: { term: { type: 'task' } }, + }, + }); + }); + + test('allows custom queries', async () => { + const { args } = await testFetch({ + query: { + term: { 'task.taskType': 'bar' }, + }, + }); + + expect(args).toMatchObject({ + body: { + query: { + bool: { + must: [{ term: { type: 'task' } }, { term: { 'task.taskType': 'bar' } }], + }, + }, + }, + }); + }); + + test('sorts by id if custom sort does not have an id sort in it', async () => { + const { args } = await testFetch({ + sort: [{ 'task.taskType': 'desc' }], + }); + + expect(args).toMatchObject({ + body: { + sort: [{ 'task.taskType': 'desc' }, { _id: 'desc' }], + }, + }); + }); + + test('allows custom sort by id', async () => { + const { args } = await testFetch({ + sort: [{ _id: 'asc' }], + }); + + expect(args).toMatchObject({ + body: { + sort: [{ _id: 'asc' }], + }, + }); + }); + + test('allows specifying pagination', async () => { + const now = new Date(); + const searchAfter = [now, '143243sdafa32']; + const { args } = await testFetch({ + searchAfter, + }); + + expect(args).toMatchObject({ + body: { + search_after: searchAfter, + }, + }); + }); + + test('returns paginated tasks', async () => { + const runAt = new Date(); + const { result } = await testFetch(undefined, [ + { + _id: 'aaa', + _source: { + type: 'task', + task: { + runAt, + taskType: 'foo', + interval: undefined, + attempts: 0, + status: 'idle', + params: '{ "hello": "world" }', + state: '{ "baby": "Henhen" }', + user: 'jimbo', + scope: ['reporting'], + }, + }, + sort: ['a', 1], + }, + { + _id: 'bbb', + _source: { + type: 'task', + task: { + runAt, + taskType: 'bar', + interval: '5m', + attempts: 2, + status: 'running', + params: '{ "shazm": 1 }', + state: '{ "henry": "The 8th" }', + user: 'dabo', + scope: ['reporting', 'ceo'], + }, + }, + sort: ['b', 2], + }, + ]); + + expect(result).toEqual({ + docs: [ + { + attempts: 0, + id: 'aaa', + interval: undefined, + params: { hello: 'world' }, + runAt, + scope: ['reporting'], + state: { baby: 'Henhen' }, + status: 'idle', + taskType: 'foo', + user: 'jimbo', + version: undefined, + }, + { + attempts: 2, + id: 'bbb', + interval: '5m', + params: { shazm: 1 }, + runAt, + scope: ['reporting', 'ceo'], + state: { henry: 'The 8th' }, + status: 'running', + taskType: 'bar', + user: 'dabo', + version: undefined, + }, + ], + searchAfter: ['b', 2], + }); + }); + }); + + describe('fetchAvailableTasks', () => { + async function testFetchAvailableTasks({ opts = {}, hits = [] }: any = {}) { + const callCluster = sinon.spy(async () => ({ hits: { hits } })); + const store = new TaskStore({ + callCluster, + supportedTypes: ['a', 'b', 'c'], + index: 'tasky', + maxAttempts: 2, + ...opts, + }); + + const result = await store.fetchAvailableTasks(); + + sinon.assert.calledOnce(callCluster); + sinon.assert.calledWith(callCluster, 'search'); + + return { + result, + args: callCluster.args[0][1], + }; + } + + test('it returns normally with no tasks when the index does not exist.', async () => { + const callCluster = sinon.spy(async () => ({ hits: { hits: [] } })); + const store = new TaskStore({ + callCluster, + supportedTypes: ['a', 'b', 'c'], + index: 'tasky', + maxAttempts: 2, + }); + + const result = await store.fetchAvailableTasks(); + + sinon.assert.calledOnce(callCluster); + sinon.assert.calledWithMatch(callCluster, 'search', { ignoreUnavailable: true }); + + expect(result.length).toBe(0); + }); + + test('it filters tasks by supported types, maxAttempts, and runAt', async () => { + const maxAttempts = _.random(2, 43); + const index = `index_${_.random(1, 234)}`; + const { args } = await testFetchAvailableTasks({ + opts: { + index, + maxAttempts, + supportedTypes: ['foo', 'bar'], + }, + }); + expect(args).toMatchObject({ + body: { + query: { + bool: { + must: [ + { term: { type: 'task' } }, + { + bool: { + must: [ + { terms: { 'task.taskType': ['foo', 'bar'] } }, + { range: { 'task.attempts': { lte: maxAttempts } } }, + { range: { 'task.runAt': { lte: 'now' } } }, + ], + }, + }, + ], + }, + }, + size: 10, + sort: { 'task.runAt': { order: 'asc' } }, + version: true, + }, + index, + type: '_doc', + }); + }); + + test('it returns task objects', async () => { + const runAt = new Date(); + const { result } = await testFetchAvailableTasks({ + hits: [ + { + _id: 'aaa', + _source: { + type: 'task', + task: { + runAt, + taskType: 'foo', + interval: undefined, + attempts: 0, + status: 'idle', + params: '{ "hello": "world" }', + state: '{ "baby": "Henhen" }', + user: 'jimbo', + scope: ['reporting'], + }, + }, + sort: ['a', 1], + }, + { + _id: 'bbb', + _source: { + type: 'task', + task: { + runAt, + taskType: 'bar', + interval: '5m', + attempts: 2, + status: 'running', + params: '{ "shazm": 1 }', + state: '{ "henry": "The 8th" }', + user: 'dabo', + scope: ['reporting', 'ceo'], + }, + }, + sort: ['b', 2], + }, + ], + }); + expect(result).toMatchObject([ + { + attempts: 0, + id: 'aaa', + interval: undefined, + params: { hello: 'world' }, + runAt, + scope: ['reporting'], + state: { baby: 'Henhen' }, + status: 'idle', + taskType: 'foo', + user: 'jimbo', + version: undefined, + }, + { + attempts: 2, + id: 'bbb', + interval: '5m', + params: { shazm: 1 }, + runAt, + scope: ['reporting', 'ceo'], + state: { henry: 'The 8th' }, + status: 'running', + taskType: 'bar', + user: 'dabo', + version: undefined, + }, + ]); + }); + }); + + describe('update', () => { + test('refreshes the index, handles versioning', async () => { + const runAt = new Date(); + const task = { + runAt, + id: 'task:324242', + params: { hello: 'world' }, + state: { foo: 'bar' }, + taskType: 'report', + version: 2, + attempts: 3, + status: 'idle' as TaskStatus, + }; + + const callCluster = sinon.spy(async () => ({ _version: task.version + 1 })); + const store = new TaskStore({ + callCluster, + index: 'tasky', + maxAttempts: 2, + supportedTypes: ['a', 'b', 'c'], + }); + + const result = await store.update(task); + + sinon.assert.calledOnce(callCluster); + sinon.assert.calledWith(callCluster, 'update'); + + expect(callCluster.args[0][1]).toMatchObject({ + id: task.id, + index: 'tasky', + type: '_doc', + version: 2, + refresh: true, + body: { + doc: { + task: { + ...['id', 'version'].reduce((acc, prop) => _.omit(acc, prop), task), + params: JSON.stringify(task.params), + state: JSON.stringify(task.state), + }, + }, + }, + }); + + expect(result).toEqual({ ...task, version: 3 }); + }); + }); + + describe('remove', () => { + test('removes the task with the specified id', async () => { + const id = `id-${_.random(1, 20)}`; + const callCluster = sinon.spy(() => + Promise.resolve({ + _index: 'myindex', + _id: id, + _version: 32, + result: 'deleted', + }) + ); + const store = new TaskStore({ + callCluster, + index: 'myindex', + maxAttempts: 2, + supportedTypes: ['a'], + }); + const result = await store.remove(id); + + sinon.assert.calledOnce(callCluster); + sinon.assert.calledWith(callCluster, 'delete'); + + expect(result).toEqual({ + id, + index: 'myindex', + version: 32, + result: 'deleted', + }); + + expect(callCluster.args[0][1]).toMatchObject({ + id, + index: 'myindex', + type: '_doc', + refresh: true, + }); + }); + }); +}); diff --git a/x-pack/plugins/task_manager/task_store.ts b/x-pack/plugins/task_manager/task_store.ts new file mode 100644 index 0000000000000..84ee7ba8bb11d --- /dev/null +++ b/x-pack/plugins/task_manager/task_store.ts @@ -0,0 +1,375 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +/* + * This module contains helpers for managing the task manager storage layer. + */ + +import { ConcreteTaskInstance, ElasticJs, TaskInstance, TaskStatus } from './task'; + +const DOC_TYPE = '_doc'; + +export interface StoreOpts { + callCluster: ElasticJs; + index: string; + maxAttempts: number; + supportedTypes: string[]; +} + +export interface FetchOpts { + searchAfter?: any[]; + sort?: object[]; + query?: object; +} + +export interface FetchResult { + searchAfter: any[]; + docs: ConcreteTaskInstance[]; +} + +export interface RemoveResult { + index: string; + id: string; + version: string; + result: string; +} + +// Internal, the raw document, as stored in the Kibana index. +export interface RawTaskDoc { + _id: string; + _index: string; + _type: string; + _version: number; + _source: { + type: string; + task: { + taskType: string; + runAt: Date; + interval?: string; + attempts: number; + status: TaskStatus; + params: string; + state: string; + user?: string; + scope?: string[]; + }; + }; +} + +/** + * Wraps an elasticsearch connection and provides a task manager-specific + * interface into the index. + */ +export class TaskStore { + public readonly maxAttempts: number; + private callCluster: ElasticJs; + private index: string; + private supportedTypes: string[]; + private wasInitialized = false; + + /** + * Constructs a new TaskStore. + * @param {StoreOpts} opts + * @prop {CallCluster} callCluster - The elastic search connection + * @prop {string} index - The name of the task manager index + * @prop {number} maxAttempts - The maximum number of attempts before a task will be abandoned + * @prop {string[]} supportedTypes - The task types supported by this store + */ + constructor(opts: StoreOpts) { + this.callCluster = opts.callCluster; + this.index = opts.index; + this.maxAttempts = opts.maxAttempts; + this.supportedTypes = opts.supportedTypes; + + this.fetchAvailableTasks = this.fetchAvailableTasks.bind(this); + } + + public addSupportedTypes(types: string[]) { + if (!this.wasInitialized) { + this.supportedTypes = this.supportedTypes.concat(types); + } else { + throw new Error('Cannot add task types after initialization'); + } + } + + /** + * Initializes the store, ensuring the task manager index is created and up to date. + */ + public async init() { + if (this.wasInitialized) { + throw new Error('TaskStore has already been initialized!'); + } + + const properties = { + type: { type: 'keyword' }, + task: { + properties: { + taskType: { type: 'keyword' }, + runAt: { type: 'date' }, + interval: { type: 'text' }, + attempts: { type: 'integer' }, + status: { type: 'keyword' }, + params: { type: 'text' }, + state: { type: 'text' }, + user: { type: 'keyword' }, + scope: { type: 'keyword' }, + }, + }, + }; + + try { + const templateResult = await this.callCluster('indices.putTemplate', { + name: this.index, + body: { + index_patterns: [this.index], + mappings: { + _doc: { + dynamic: 'strict', + properties, + }, + }, + settings: { + number_of_shards: 1, + auto_expand_replicas: '0-1', + }, + }, + }); + this.wasInitialized = true; + return templateResult; + } catch (err) { + throw err; + } + + return; + } + + /** + * Schedules a task. + * + * @param task - The task being scheduled. + */ + public async schedule(taskInstance: TaskInstance): Promise { + if (!this.wasInitialized) { + await this.init(); + } + + if (!this.supportedTypes.includes(taskInstance.taskType)) { + throw new Error( + `Unsupported task type "${ + taskInstance.taskType + }". Supported types are ${this.supportedTypes.join(', ')}` + ); + } + + const { id, ...body } = rawSource(taskInstance); + const result = await this.callCluster('index', { + id, + body, + index: this.index, + type: DOC_TYPE, + refresh: true, + }); + + const { task } = body; + return { + ...taskInstance, + id: result._id, + version: result._version, + attempts: 0, + status: task.status, + runAt: task.runAt, + state: taskInstance.state || {}, + }; + } + + /** + * Fetches a paginatable list of scheduled tasks. + * + * @param opts - The query options used to filter tasks + */ + public async fetch(opts: FetchOpts = {}): Promise { + const sort = paginatableSort(opts.sort); + return this.search({ + sort, + search_after: opts.searchAfter, + query: opts.query, + }); + } + + /** + * Fetches tasks from the index, which are ready to be run. + * - runAt is now or past + * - id is not currently running in this instance of Kibana + * - has a type that is in our task definitions + * + * @param {TaskQuery} query + * @prop {string[]} types - Task types to be queried + * @prop {number} size - The number of task instances to retrieve + * @returns {Promise} + */ + public async fetchAvailableTasks(): Promise { + const { docs } = await this.search({ + query: { + bool: { + must: [ + { terms: { 'task.taskType': this.supportedTypes } }, + { range: { 'task.attempts': { lte: this.maxAttempts } } }, + { range: { 'task.runAt': { lte: 'now' } } }, + ], + }, + }, + size: 10, + sort: { 'task.runAt': { order: 'asc' } }, + version: true, + }); + + return docs; + } + + /** + * Updates the specified doc in the index, returning the doc + * with its version up to date. + * + * @param {TaskDoc} doc + * @returns {Promise} + */ + public async update(doc: ConcreteTaskInstance): Promise { + const rawDoc = taskDocToRaw(doc, this.index); + + const { _version } = await this.callCluster('update', { + body: { + doc: rawDoc._source, + }, + id: doc.id, + index: this.index, + type: DOC_TYPE, + version: doc.version, + // The refresh is important so that if we immediately look for work, + // we don't pick up this task. + refresh: true, + }); + + return { + ...doc, + version: _version, + }; + } + + /** + * Removes the specified task from the index. + * + * @param {string} id + * @returns {Promise} + */ + public async remove(id: string): Promise { + const result = await this.callCluster('delete', { + id, + index: this.index, + type: DOC_TYPE, + // The refresh is important so that if we immediately look for work, + // we don't pick up this task. + refresh: true, + }); + + return { + index: result._index, + id: result._id, + version: result._version, + result: result.result, + }; + } + + private async search(opts: any = {}): Promise { + const originalQuery = opts.query; + const queryOnlyTasks = { term: { type: 'task' } }; + const query = originalQuery + ? { bool: { must: [queryOnlyTasks, originalQuery] } } + : queryOnlyTasks; + + const result = await this.callCluster('search', { + type: DOC_TYPE, + index: this.index, + ignoreUnavailable: true, + body: { + ...opts, + query, + }, + }); + + const rawDocs = result.hits.hits; + + return { + docs: (rawDocs as RawTaskDoc[]).map(rawToTaskDoc), + searchAfter: (rawDocs.length && rawDocs[rawDocs.length - 1].sort) || [], + }; + } +} + +function paginatableSort(sort: any[] = []) { + const sortById = { _id: 'desc' }; + + if (!sort.length) { + return [{ 'task.runAt': 'asc' }, sortById]; + } + + if (sort.find(({ _id }) => !!_id)) { + return sort; + } + + return [...sort, sortById]; +} + +function rawSource(doc: TaskInstance) { + const { id, ...taskFields } = doc; + const source = { + ...taskFields, + params: JSON.stringify(doc.params || {}), + state: JSON.stringify(doc.state || {}), + attempts: (doc as ConcreteTaskInstance).attempts || 0, + runAt: doc.runAt || new Date(), + status: (doc as ConcreteTaskInstance).status || 'idle', + }; + + delete (source as any).id; + delete (source as any).version; + delete (source as any).type; + + return { + id, + type: 'task', + task: source, + }; +} + +function taskDocToRaw(doc: ConcreteTaskInstance, index: string): RawTaskDoc { + const { type, task } = rawSource(doc); + + return { + _id: doc.id, + _index: index, + _source: { type, task }, + _type: DOC_TYPE, + _version: doc.version, + }; +} + +function rawToTaskDoc(doc: RawTaskDoc): ConcreteTaskInstance { + return { + ...doc._source.task, + id: doc._id, + version: doc._version, + params: parseJSONField(doc._source.task.params, 'params', doc), + state: parseJSONField(doc._source.task.state, 'state', doc), + }; +} + +function parseJSONField(json: string, fieldName: string, doc: RawTaskDoc) { + try { + return json ? JSON.parse(json) : {}; + } catch (error) { + throw new Error(`Task "${doc._id}"'s ${fieldName} field has invalid JSON: ${json}`); + } +} diff --git a/x-pack/plugins/task_manager/test_utils/index.ts b/x-pack/plugins/task_manager/test_utils/index.ts new file mode 100644 index 0000000000000..6a427de41b311 --- /dev/null +++ b/x-pack/plugins/task_manager/test_utils/index.ts @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +/* + * A handful of helper functions for testing the task manager. + */ + +import sinon from 'sinon'; + +// Caching this here to avoid setTimeout mocking affecting our tests. +const nativeTimeout = setTimeout; + +/** + * Creates a mock task manager Logger. + */ +export function mockLogger() { + return { + info: sinon.stub(), + debug: sinon.stub(), + warning: sinon.stub(), + error: sinon.stub(), + }; +} + +interface Resolvable { + resolve: () => void; +} + +/** + * Creates a promise which can be resolved externally, useful for + * coordinating async tests. + */ +export function resolvable(): PromiseLike & Resolvable { + let resolve: () => void; + const result = new Promise(r => (resolve = r)) as any; + + result.resolve = () => nativeTimeout(resolve, 0); + + return result; +} + +/** + * A simple helper for waiting a specified number of milliseconds. + * + * @param {number} ms + */ +export async function sleep(ms: number) { + return new Promise(r => nativeTimeout(r, ms)); +} diff --git a/x-pack/scripts/functional_tests.js b/x-pack/scripts/functional_tests.js index a97748f87c9de..67d13381b3a77 100644 --- a/x-pack/scripts/functional_tests.js +++ b/x-pack/scripts/functional_tests.js @@ -10,6 +10,7 @@ require('@kbn/test').runTestsCli([ require.resolve('../test/reporting/configs/chromium_functional.js'), require.resolve('../test/functional/config.js'), require.resolve('../test/api_integration/config.js'), + require.resolve('../test/plugin_api_integration/config.js'), require.resolve('../test/saml_api_integration/config.js'), require.resolve('../test/token_api_integration/config.js'), require.resolve('../test/spaces_api_integration/spaces_only/config'), diff --git a/x-pack/test/plugin_api_integration/config.js b/x-pack/test/plugin_api_integration/config.js new file mode 100644 index 0000000000000..e4f7743c44443 --- /dev/null +++ b/x-pack/test/plugin_api_integration/config.js @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import path from 'path'; +import fs from 'fs'; + +export default async function ({ readConfigFile }) { + const integrationConfig = await readConfigFile(require.resolve('../api_integration/config')); + const kibanaFunctionalConfig = await readConfigFile(require.resolve('../../../test/functional/config.js')); + + // Find all folders in ./plugins since we treat all them as plugin folder + const allFiles = fs.readdirSync(path.resolve(__dirname, 'plugins')); + const plugins = allFiles.filter(file => fs.statSync(path.resolve(__dirname, 'plugins', file)).isDirectory()); + + return { + testFiles: [ + require.resolve('./test_suites/task_manager'), + ], + services: { + retry: kibanaFunctionalConfig.get('services.retry'), + ...integrationConfig.get('services'), + }, + pageObjects: integrationConfig.get('pageObjects'), + servers: integrationConfig.get('servers'), + esTestCluster: integrationConfig.get('esTestCluster'), + apps: integrationConfig.get('apps'), + esArchiver: { + directory: path.resolve(__dirname, '../es_archives') + }, + screenshots: integrationConfig.get('screenshots'), + junit: { + reportName: 'Plugin Functional Tests', + }, + kbnTestServer: { + ...integrationConfig.get('kbnTestServer'), + serverArgs: [ + ...integrationConfig.get('kbnTestServer.serverArgs'), + ...plugins.map(pluginDir => `--plugin-path=${path.resolve(__dirname, 'plugins', pluginDir)}`), + ], + }, + }; +} + diff --git a/x-pack/test/plugin_api_integration/plugins/task_manager/index.js b/x-pack/test/plugin_api_integration/plugins/task_manager/index.js new file mode 100644 index 0000000000000..8d9a949f63f98 --- /dev/null +++ b/x-pack/test/plugin_api_integration/plugins/task_manager/index.js @@ -0,0 +1,106 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { initRoutes } from './init_routes'; + +export default function (kibana) { + return new kibana.Plugin({ + name: 'sampleTask', + require: ['elasticsearch', 'task_manager'], + + config(Joi) { + return Joi.object({ + enabled: Joi.boolean().default(true), + }).default(); + }, + + init(server) { + const { taskManager } = server; + + taskManager.registerTaskDefinitions({ + sampleTask: { + title: 'Sample Task', + description: 'A sample task for testing the task_manager.', + timeout: '1m', + numWorkers: 2, + + // This task allows tests to specify its behavior (whether it reschedules itself, whether it errors, etc) + // taskInstance.params has the following optional fields: + // nextRunMilliseconds: number - If specified, the run method will return a runAt that is now + nextRunMilliseconds + // failWith: string - If specified, the task will throw an error with the specified message + createTaskRunner: ({ kbnServer, taskInstance }) => ({ + async run() { + const { params, state } = taskInstance; + const prevState = state || { count: 0 }; + + if (params.failWith) { + throw new Error(params.failWith); + } + + const callCluster = kbnServer.server.plugins.elasticsearch.getCluster('admin').callWithInternalUser; + await callCluster('index', { + index: '.task_manager_test_result', + type: '_doc', + body: { + type: 'task', + taskId: taskInstance.id, + params: JSON.stringify(params), + state: JSON.stringify(state), + ranAt: new Date(), + }, + refresh: true, + }); + + return { + state: { count: (prevState.count || 0) + 1 }, + runAt: millisecondsFromNow(params.nextRunMilliseconds), + }; + }, + }), + }, + }); + + taskManager.addMiddleware({ + async beforeSave({ taskInstance, ...opts }) { + const modifiedInstance = { + ...taskInstance, + params: { + originalParams: taskInstance.params, + superFly: 'My middleware param!', + }, + }; + + return { + ...opts, + taskInstance: modifiedInstance, + }; + }, + + async beforeRun({ taskInstance, ...opts }) { + return { + ...opts, + taskInstance: { + ...taskInstance, + params: taskInstance.params.originalParams, + }, + }; + }, + }); + + initRoutes(server); + }, + }); +} + +function millisecondsFromNow(ms) { + if (!ms) { + return; + } + + const dt = new Date(); + dt.setTime(dt.getTime() + ms); + return dt; +} diff --git a/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js b/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js new file mode 100644 index 0000000000000..c266d756377d2 --- /dev/null +++ b/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import Joi from 'joi'; + +export function initRoutes(server) { + const { taskManager } = server; + + server.route({ + path: '/api/sample_tasks', + method: 'POST', + config: { + validate: { + payload: Joi.object({ + taskType: Joi.string().required(), + interval: Joi.string().optional(), + params: Joi.object().required(), + state: Joi.object().optional(), + id: Joi.string().optional(), + }), + }, + }, + async handler(request) { + try { + const task = await taskManager.schedule(request.payload, { request }); + return task; + } catch (err) { + return err; + } + }, + }); + + server.route({ + path: '/api/sample_tasks', + method: 'GET', + async handler() { + try { + return taskManager.fetch(); + } catch (err) { + return err; + } + } + }); + + server.route({ + path: '/api/sample_tasks', + method: 'DELETE', + async handler() { + try { + const { docs: tasks } = await taskManager.fetch(); + return Promise.all(tasks.map((task) => taskManager.remove(task.id))); + } catch (err) { + return err; + } + }, + }); +} diff --git a/x-pack/test/plugin_api_integration/plugins/task_manager/package.json b/x-pack/test/plugin_api_integration/plugins/task_manager/package.json new file mode 100644 index 0000000000000..ede03a08a2721 --- /dev/null +++ b/x-pack/test/plugin_api_integration/plugins/task_manager/package.json @@ -0,0 +1,4 @@ +{ + "name": "sample_task_plugin", + "version": "kibana" +} diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/index.js b/x-pack/test/plugin_api_integration/test_suites/task_manager/index.js new file mode 100644 index 0000000000000..c3efe56c80e8f --- /dev/null +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/index.js @@ -0,0 +1,12 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export default function ({ loadTestFile }) { + describe('task_manager', function taskManagerSuite() { + this.tags('ciGroup4'); + loadTestFile(require.resolve('./task_manager_integration')); + }); +} diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js new file mode 100644 index 0000000000000..c6c531f83cc62 --- /dev/null +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js @@ -0,0 +1,163 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import _ from 'lodash'; +import expect from 'expect.js'; +import url from 'url'; +import supertestAsPromised from 'supertest-as-promised'; + +export default function ({ getService }) { + const es = getService('es'); + const retry = getService('retry'); + const config = getService('config'); + const testHistoryIndex = '.task_manager_test_result'; + const supertest = supertestAsPromised(url.format(config.get('servers.kibana'))); + + describe('scheduling and running tasks', () => { + beforeEach(() => supertest.delete('/api/sample_tasks') + .set('kbn-xsrf', 'xxx') + .expect(200)); + + beforeEach(async () => + (await es.indices.exists({ index: testHistoryIndex })) && es.deleteByQuery({ + index: testHistoryIndex, + q: 'type:task', + refresh: true, + })); + + function currentTasks() { + return supertest.get('/api/sample_tasks') + .expect(200) + .then((response) => response.body); + } + + function historyDocs() { + return es.search({ + index: testHistoryIndex, + type: '_doc', + q: 'type:task', + }).then(result => result.hits.hits); + } + + function scheduleTask(task) { + return supertest.post('/api/sample_tasks') + .set('kbn-xsrf', 'xxx') + .send(task) + .expect(200) + .then((response) => response.body); + } + + it('should support middleware', async () => { + const historyItem = _.random(1, 100); + + await scheduleTask({ + taskType: 'sampleTask', + interval: '30m', + params: { historyItem }, + }); + + await retry.try(async () => { + expect((await historyDocs()).length).to.eql(1); + + const [task] = (await currentTasks()).docs; + + expect(task.attempts).to.eql(0); + expect(task.state.count).to.eql(1); + + expect(task.params).to.eql({ + superFly: 'My middleware param!', + originalParams: { historyItem }, + }); + }); + }); + + it('should remove non-recurring tasks after they complete', async () => { + await scheduleTask({ + taskType: 'sampleTask', + params: { }, + }); + + await retry.try(async () => { + const history = await historyDocs(); + expect(history.length).to.eql(1); + expect((await currentTasks()).docs).to.eql([]); + }); + }); + + it('should use a given ID as the task document ID', async () => { + const result = await scheduleTask({ + id: 'test-task-for-sample-task-plugin-to-test-task-manager', + taskType: 'sampleTask', + params: { }, + }); + + expect(result.id).to.be('test-task-for-sample-task-plugin-to-test-task-manager'); + }); + + it('should reschedule if task errors', async () => { + const task = await scheduleTask({ + taskType: 'sampleTask', + params: { failWith: 'Dangit!!!!!' }, + }); + + await retry.try(async () => { + const [scheduledTask] = (await currentTasks()).docs; + expect(scheduledTask.id).to.eql(task.id); + expect(scheduledTask.attempts).to.be.greaterThan(0); + expect(Date.parse(scheduledTask.runAt)).to.be.greaterThan(Date.parse(task.runAt)); + }); + }); + + it('should reschedule if task returns runAt', async () => { + const nextRunMilliseconds = _.random(60000, 200000); + const count = _.random(1, 20); + + const originalTask = await scheduleTask({ + taskType: 'sampleTask', + params: { nextRunMilliseconds }, + state: { count }, + }); + + await retry.try(async () => { + expect((await historyDocs()).length).to.eql(1); + + const [task] = (await currentTasks()).docs; + expect(task.attempts).to.eql(0); + expect(task.state.count).to.eql(count + 1); + + expectReschedule(originalTask, task, nextRunMilliseconds); + }); + }); + + it('should reschedule if task has an interval', async () => { + const interval = _.random(5, 200); + const intervalMilliseconds = interval * 60000; + + const originalTask = await scheduleTask({ + taskType: 'sampleTask', + interval: `${interval}m`, + params: { }, + }); + + await retry.try(async () => { + expect((await historyDocs()).length).to.eql(1); + + const [task] = (await currentTasks()).docs; + expect(task.attempts).to.eql(0); + expect(task.state.count).to.eql(1); + + expectReschedule(originalTask, task, intervalMilliseconds); + }); + }); + + async function expectReschedule(originalTask, currentTask, expectedDiff) { + const originalRunAt = Date.parse(originalTask.runAt); + const buffer = 10000; + expect(Date.parse(currentTask.runAt) - originalRunAt).to.be.greaterThan(expectedDiff - buffer); + expect(Date.parse(currentTask.runAt) - originalRunAt).to.be.lessThan(expectedDiff + buffer); + } + }); +}