Skip to content

Commit

Permalink
Core task manager (elastic#24356)
Browse files Browse the repository at this point in the history
* Core task manager (elastic#23555)

* Initial stab at core task manager logic

* Update task_manager readme

* Update task manager readme

* Add cancelable helper package, OSS, and general purpose, but
will be useful for writing cancelable x-pack tasks.

* Make the cancellable package promise-compliant

* Update task manager readme w/ reference to cancellable

* Change pool from lazy to eager, add support for sub-pools per task type.

* Move cancellable to task_manager, and typescriptify it.

* Working proof of concept for task manager. Still have lots to do:
clean up, tests, comments, validations, assertions, etc.

* Add pagination support to task manager fetch

* Move task manager to OSS

* Remove task manager reference from x-pack

* Make task_manager a valid core plugin

* Modify how task resource allocation works

* Remove the special case 'max' workers occupied value

* Remove x-pack package.json changes

* Make taskDefinitions a part of uiExports

* Make task docs saved-object compliant.

* Add kbnServer to the task context.

* Allow tasks to have a void / undefined return type

* revert x-pack change

* move cancellable to src/utils

* move to src/server

* use afterPluginsInit hook

* task_manager.ts rename

* add a wrapper with a setClient method

* Add tests for task runner

* Break task_pool into smaller, testable pieces

* return raw task doc fields for calling code

* remove todo comment

* helper module for default client
 - setClient takes a callback fn

* fix misidentified field name

* fix rest args warning

* flatten task_pool

* remove cancellable

* return raw task doc fields for calling code

* remove todo comment

* helper module for default client
 - setClient takes a callback fn

* fix rest args warning

* typescript fixes

* roll back setClient takes a callback

* createTaskRunner returns an object with run/cancel functions

* Test task store, tweak task poller tests

* Rename num_workers to max_workers

* Tweak task pool method names

* Fix cancellable test, and ts errors

* Rename doc to _doc

* Fix the doc type in the task manager index mappings

* Make task costs configurable in kibana.yml

* fix a tslint warning

* TaskManager.afterPluginsInit replaces circuitous stuff

* addMiddleware, implement beforeSave

* wip

* comment

* run context stuff

* pretty fix

* comments

* lint/pretty fixes

* safeties in case they don't define both beforeRun and beforeSave

* task runner test constructor opts fix

* Add task_manager integration tests

* FIx readme mistakes, fix schedule signature

* Fix integration tests

* Add blurb about middleware

* make a libs directory for the small things

* test for extractTaskDefinitions

* unit test for middleware

* Comment, refactor, and test task_manager

* Touch up comments

* Make task store throw error if given an unknown task type

* Fix sample task definition in readme

* Make task manager toggle on / off based on Elasticsearch connectivity

* Prevent task manager from crashing Kibana if
elasticsearch plugin is disabled

* Fix task manager signature

* Move the task manager into the task manager mixin,
fix tests. It's an uber integration object anyway, so
it seemed to make sense to keep it in one place.

* Fix task manager integration tests

* Update the task manager index creation to use auto_expand_replicas

* Fix task manager integration tests

* Fix task manager integration tests

* Fix the overzealous deletion of .kibana during tests

* Core task manager

* Allow hardcoded ID passed in

* comments

* don't deconstruct type and task for payload

* remove uiExport support

* move task_manager to x-pack/plugins

* expose as client on the plugin namespace

* fix tests

* move plugin api integration tests

* roll back readme sample task definition example

* fix sample plugin link

* server.taskManager

* sanitizeTaskDefinitions

* fix integration tests

* sanitize rename

* assert unintialized and check for duplicates in registerTaskDefinitions

* Remove unnecessary defaults from task_manager.test.ts

* Remove task manager from OSS

* Remove task manager from src

* Tidy up task_manager unit tests around elasticsearch status changes

* Integration test for specifying ID in scheduling

* Task_manager init -> preInit to allow other plugins to consume
taskManager in their init.

* Remove task manager integration tests from OSS

* spelling

* readme fix

* fix test code impacted by hapi upgrade

* Task Manager index creation changes (elastic#24542)

* Uses putTemplate for task manager mappings and index settings.

* Removes create option.

* Opposite day, the test catches up to the code changes.

* Ignores index unavailable during searches.

* Improve taskRunner's processResult (elastic#24880)

* Set task state to idle after run, and add failed state

* fix tests

* Test alerting demo (elastic#25136)

* Ensures that task store init gets called before schedule.

* Removes unused option for debugging purposes.

* Fix unit tests because a second callCluster was made.

* Task manager starts sanely.

Does not preInit
Will not allow definitions after initialization
Creates store immediately.
Modifies store after all plugins have initialized
Adds static tasks that will be defined by plugins.

* Task manager should not allow operations before initialization.

* Attempts to fix runner tests.

* Fixes unit test contract with APIs.

* Removes unused type definitions.

* Removes unused package json.

* Removes unused import type.

* Removes unnecessary async applied to a function.

* Returns diferently if task store has already initialized.

* Explains how to add static task instances to task definitions. (elastic#25880)

* Tasks cannot be scheduled until task manager is initialized.

* Adds task manager api integration tests to ci group4.

* Context of describe test must be the test framework object.

* Update src/es_archiver/lib/indices/kibana_index.js

regex update to actually disallow non-kibana system indices

Co-Authored-By: tsullivan <tsullivan@users.noreply.github.com>

* verify fillPool exception passing

* readme update about max_workers/numWorkers

* change forEach to reduce

* use public interfaces in internal method

* replace getMaxAttempts with public readonly maxAttempts

* Update x-pack/plugins/task_manager/task_store.ts

`throw new Error` and initializattion spell fix

Co-Authored-By: tsullivan <tsullivan@users.noreply.github.com>

* min = 1 for max_workers

Co-Authored-By: tsullivan <tsullivan@users.noreply.github.com>

* timeOut => timeout

* min 1

* scope as an array

* no retries

Co-Authored-By: tsullivan <tsullivan@users.noreply.github.com>

* ConcreteTaskInstance is a TaskInstance

* remove guard per joi logic

* more return types for functions

* more comments around incremental back-off

Co-Authored-By: tsullivan <tsullivan@users.noreply.github.com>

* throw error instead of return undefined

* poll_interval min 1000

* avoid handling err.stack directly

* break up processResult

* fix a few runtime issues

* only fetch idle tasks

* remove check for status idle

* always return a state, and when there is an error return the state we had at beforeRun

* check isStarted before attemptWork

Co-Authored-By: tsullivan <tsullivan@users.noreply.github.com>

* ts fix
  • Loading branch information
tsullivan committed Jan 3, 2019
1 parent 379aa21 commit f72f6ad
Show file tree
Hide file tree
Showing 34 changed files with 4,098 additions and 2 deletions.
17 changes: 15 additions & 2 deletions src/es_archiver/lib/indices/kibana_index.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ const buildUiExports = _.once(async () => {
* Deletes all indices that start with `.kibana`
*/
export async function deleteKibanaIndices({ client, stats, log }) {
const kibanaIndices = await client.cat.indices({ index: '.kibana*', format: 'json' });
const indexNames = kibanaIndices.map(x => x.index);
const indexNames = await fetchKibanaIndices(client);
if (!indexNames.length) {
return;
}
Expand Down Expand Up @@ -151,3 +150,17 @@ export async function createDefaultSpace({ index, client }) {
}
});
}

/**
* Migrations mean that the Kibana index will look something like:
* .kibana, .kibana_1, .kibana_323, etc. This finds all indices starting
* with .kibana, then filters out any that aren't actually Kibana's core
* index (e.g. we don't want to remove .kibana_task_manager or the like).
*
* @param {string} index
*/
async function fetchKibanaIndices(client) {
const kibanaIndices = await client.cat.indices({ index: '.kibana*', format: 'json' });
const isKibanaIndex = (index) => (/^\.kibana(:?_\d*)?$/).test(index);
return kibanaIndices.map(x => x.index).filter(isKibanaIndex);
}
4 changes: 4 additions & 0 deletions src/ui/ui_exports/ui_export_types/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ export {
validations,
} from './saved_object';

export {
taskDefinitions
} from './task_definitions';

export {
app,
apps,
Expand Down
28 changes: 28 additions & 0 deletions src/ui/ui_exports/ui_export_types/task_definitions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { mergeAtType } from './reduce';
import { alias, wrap, uniqueKeys } from './modify_reduce';

// How plugins define tasks that the task manager can run.
export const taskDefinitions = wrap(
alias('taskDefinitions'),
uniqueKeys(),
mergeAtType,
);
2 changes: 2 additions & 0 deletions x-pack/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { notifications } from './plugins/notifications';
import { kueryAutocomplete } from './plugins/kuery_autocomplete';
import { canvas } from './plugins/canvas';
import { infra } from './plugins/infra';
import { taskManager } from './plugins/task_manager';
import { rollup } from './plugins/rollup';
import { remoteClusters } from './plugins/remote_clusters';
import { crossClusterReplication } from './plugins/cross_cluster_replication';
Expand Down Expand Up @@ -61,6 +62,7 @@ module.exports = function (kibana) {
indexLifecycleManagement(kibana),
kueryAutocomplete(kibana),
infra(kibana),
taskManager(kibana),
rollup(kibana),
remoteClusters(kibana),
crossClusterReplication(kibana),
Expand Down
306 changes: 306 additions & 0 deletions x-pack/plugins/task_manager/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,306 @@
# Kibana task manager

The task manager is a generic system for running background tasks. It supports:

- Single-run and recurring tasks
- Scheduling tasks to run after a specified datetime
- Basic retry logic
- Recovery of stalled tasks / timeouts
- Tracking task state across multiple runs
- Configuring the run-parameters for specific tasks
- Basic coordination to prevent the same task instance from running on more than one Kibana system at a time

## Implementation details

At a high-level, the task manager works like this:

- Every `{poll_interval}` milliseconds, check the `{index}` for any tasks that need to be run:
- `runAt` is past
- `attempts` is less than the configured threshold
- Attempt to claim the task by using optimistic concurrency to set:
- status to `running`
- `runAt` to now + the timeout specified by the task
- Execute the task, if the previous claim succeeded
- If the task fails, increment the `attempts` count and reschedule it
- If the task succeeds:
- If it is recurring, store the result of the run, and reschedule
- If it is not recurring, remove it from the index

## Pooling

Each task manager instance runs tasks in a pool which ensures that at most N tasks are run at a time, where N is configurable. This prevents the system from running too many tasks at once in resource constrained environments. In addition to this, each individual task type definition can have `numWorkers` specified, which tells the system how many workers are consumed by a single running instance of a task. This effectively limits how many tasks of a given type can be run at once.

For example, we may have a system with a `max_workers` of 10, but a super expensive task (such as `reporting`) which specifies a `numWorkers` of 10. In this case, `reporting` tasks will run one at a time.

If a task specifies a higher `numWorkers` than the system supports, the system's `max_workers` setting will be substituted for it.

## Config options

The task_manager can be configured via `taskManager` config options (e.g. `taskManager.maxAttempts`):

- `max_attempts` - How many times a failing task instance will be retried before it is never run again
- `poll_interval` - How often the background worker should check the task_manager index for more work
- `index` - The name of the index that the task_manager
- `max_workers` - The maximum number of tasks a Kibana will run concurrently (defaults to 10)
- `credentials` - Encrypted user credentials. All tasks will run in the security context of this user. See [this issue](https://github.com/elastic/dev/issues/1045) for a discussion on task scheduler security.
- `override_num_workers`: An object of `taskType: number` that overrides the `num_workers` for tasks
- For example: `task_manager.override_num_workers.reporting: 2` would override the number of workers occupied by tasks of type `reporting`
- This allows sysadmins to tweak the operational performance of Kibana, allowing more or fewer tasks of a specific type to run simultaneously

## Task definitions

Plugins define tasks by calling the `registerTaskDefinitions` method on the `server.taskManager` object.

A sample task can be found in the [x-pack/test/plugin_api_integration/plugins/task_manager](../../test/plugin_api_integration/plugins/task_manager/index.js) folder.

```js
const { taskManager } = server;
taskManager.registerTaskDefinitions({
// clusterMonitoring is the task type, and must be unique across the entire system
clusterMonitoring: {
// Human friendly name, used to represent this task in logs, UI, etc
title: 'Human friendly name',

// Optional, human-friendly, more detailed description
description: 'Amazing!!',

// Optional, how long, in minutes, the system should wait before
// a running instance of this task is considered to be timed out.
// This defaults to 5 minutes.
timeout: '5m',

// The clusterMonitoring task occupies 2 workers, so if the system has 10 worker slots,
// 5 clusterMonitoring tasks could run concurrently per Kibana instance. This value is
// overridden by the `override_num_workers` config value, if specified.
numWorkers: 2,

// The createTaskRunner function / method returns an object that is responsible for
// performing the work of the task. context: { taskInstance, kbnServer }, is documented below.
createTaskRunner(context) {
return {
// Perform the work of the task. The return value should fit the TaskResult interface, documented
// below. Invalid return values will result in a logged warning.
async run() {
// Do some work
// Conditionally send some alerts
// Return some result or other...
},

// Optional, will be called if a running instance of this task times out, allowing the task
// to attempt to clean itself up.
async cancel() {
// Do whatever is required to cancel this task, such as killing any spawned processes
},
};
},
},
});
```

When Kibana attempts to claim and run a task instance, it looks its definition up, and executes its createTaskRunner's method, passing it a run context which looks like this:

```js
{
// An instance of the Kibana server object.
kbnServer,

// The data associated with this instance of the task, with two properties being most notable:
//
// params:
// An object, specific to this task instance, used by the
// task to determine exactly what work should be performed.
// e.g. a cluster-monitoring task might have a `clusterName`
// property in here, but a movie-monitoring task might have
// a `directorName` property.
//
// state:
// The state returned from the previous run of this task instance.
// If this task instance has never succesfully run, this will
// be an empty object: {}
taskInstance,
}
```

## Task result

The task runner's `run` method is expected to return a promise that resolves to undefined or to an object that looks like the following:
```js
{
// Optional, if specified, this is used as the tasks' nextRun, overriding
// the default system scheduler.
runAt: "2020-07-24T17:34:35.272Z",

// Optional, an error object, logged out as a warning. The pressence of this
// property indicates that the task did not succeed.
error: { message: 'Hrumph!' },

// Optional, this will be passed into the next run of the task, if
// this is a recurring task.
state: {
anything: 'goes here',
},
}
```

Other return values will result in a warning, but the system should continue to work.

## Task instances

The task_manager module will store scheduled task instances in an index. This allows for recovery of failed tasks, coordination across Kibana clusters, persistence across Kibana reboots, etc.

The data stored for a task instance looks something like this:

```js
{
// The type of task that will run this instance.
taskType: 'clusterMonitoring',

// The next time this task instance should run. It is not guaranteed
// to run at this time, but it is guaranteed not to run earlier than
// this.
runAt: "2020-07-24T17:34:35.272Z",

// Indicates that this is a recurring task. We currently only support
// 1 minute granularity.
interval: '5m',

// How many times this task has been unsuccesfully attempted,
// this will be reset to 0 if the task ever succesfully completes.
// This is incremented if a task fails or times out.
attempts: 0,

// Currently, this is either idle | running. It is used to
// coordinate which Kibana instance owns / is running a specific
// task instance.
status: 'idle',

// The params specific to this task instance, which will be
// passed to the task when it runs, and will be used by the
// task to determine exactly what work should be performed.
// This is a JSON blob, and will be different per task type.
// e.g. a cluster-monitoring task might have a `clusterName`
// property in here, but a movie-monitoring task might have
// a `directorName` property.
params: '{ "task": "specific stuff here" }',

// The result of the previous run of this task instance. This
// will be passed to the next run of the task, along with the
// params, and could be used by a task to do special logic If
// the task state changes (e.g. from green to red, or foo to bar)
// If there was no previous run (e.g. the instance has never succesfully
// completed, this will be an empty object.). This is a JSON blob,
// and will be different per task type.
state: '{ "status": "green" }',

// An extension point for 3rd parties to build in security features on
// top of the task manager. For example, this might be the token of the user
// who scheduled this task.
userContext: 'the token of the user who scheduled this task',

// An extension point for 3rd parties to build in security features on
// top of the task manager, and is expected to be the id of the user, if any,
// that scheduled this task.
user: '23lk3l42',

// An application-specific designation, allowing different Kibana
// plugins / apps to query for only those tasks they care about.
scope: ['alerting'],
}
```

## Programmatic access

The task manager mixin exposes a taskManager object on the Kibana server which plugins can use to manage scheduled tasks. Each method takes an optional `scope` argument and ensures that only tasks with the specified scope(s) will be affected.

```js
const { taskManager } = server;
// Schedules a task. All properties are as documented in the previous
// storage section, except that here, params is an object, not a JSON
// string.
const task = await taskManager.schedule({
taskType,
runAt,
interval,
params,
scope: ['my-fanci-app'],
});

// Removes the specified task
await manager.remove(task.id);

// Fetches tasks, supports pagination, via the search-after API:
// https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-search-after.html
// If scope is not specified, all tasks are returned, otherwise only tasks
// with the given scope are returned.
const results = await manager.find({ scope: 'my-fanci-app', searchAfter: ['ids'] });

// results look something like this:
{
searchAfter: ['233322'],
// Tasks is an array of task instances
tasks: [{
id: '3242342',
taskType: 'reporting',
// etc
}]
}
```

More custom access to the tasks can be done directly via Elasticsearch, though that won't be officially supported, as we can change the document structure at any time.

## Middleware

The task manager exposes a middleware layer that allows modifying tasks before they are scheduled / persisted to the task manager index, and modifying tasks / the run context before a task is run.

For example:

```js
// In your plugin's init
server.taskManager.addMiddleware({
async beforeSave({ taskInstance, ...opts }) {
console.log(`About to save a task of type ${taskInstance.taskType}`);

return {
...opts,
taskInstance: {
...taskInstance,
params: {
...taskInstance.params,
example: 'Added to params!',
},
},
};
},

async beforeRun({ taskInstance, ...opts }) {
console.log(`About to run ${taskInstance.taskType} ${taskInstance.id}`);
const { example, ...taskWithoutExampleProp } = taskInstance;

return {
...opts,
taskInstance: taskWithoutExampleProp,
};
},
});
```

## Limitations in v1.0

In v1, the system only understands 1 minute increments (e.g. '1m', '7m'). Tasks which need something more robust will need to specify their own "runAt" in their run method's return value.

There is only a rudimentary mechanism for coordinating tasks and handling expired tasks. Tasks are considered expired if their runAt has arrived, and their status is still 'running'.

There is no task history. Each run overwrites the previous run's state. One-time tasks are removed from the index upon completion regardless of success / failure.

The task manager's public API is create / delete / list. Updates aren't directly supported, and listing should be scoped so that users only see their own tasks.

## Testing

- `node scripts/jest --testPathPattern=task_manager --watch`

Integration tests can be run like so:

```
node scripts/functional_tests_server.js --config test/plugin_functional/config.js
node scripts/functional_test_runner --config test/plugin_functional/config.js --grep task_manager
```
Loading

0 comments on commit f72f6ad

Please sign in to comment.