From 8bd2ffed8dc9d7d815d6b0e5f8b5ccc4ebfb4d9d Mon Sep 17 00:00:00 2001 From: Nicholas Dziedzic Date: Mon, 5 Nov 2018 09:45:41 -0500 Subject: [PATCH 01/12] Ensures that task store init gets called before schedule. --- x-pack/plugins/task_manager/task_store.ts | 81 +++++++++++++---------- 1 file changed, 45 insertions(+), 36 deletions(-) diff --git a/x-pack/plugins/task_manager/task_store.ts b/x-pack/plugins/task_manager/task_store.ts index be72cad3c9e8d..6218b633a9a24 100644 --- a/x-pack/plugins/task_manager/task_store.ts +++ b/x-pack/plugins/task_manager/task_store.ts @@ -68,6 +68,7 @@ export class TaskStore { private index: string; private maxAttempts: number; private supportedTypes: string[]; + private wasInitialized = false; /** * Constructs a new TaskStore. @@ -88,43 +89,47 @@ export class TaskStore { * Initializes the store, ensuring the task manager index is created and up to date. */ public async init() { - const properties = { - type: { type: 'keyword' }, - task: { - properties: { - taskType: { type: 'keyword' }, - runAt: { type: 'date' }, - interval: { type: 'text' }, - attempts: { type: 'integer' }, - status: { type: 'keyword' }, - params: { type: 'text' }, - state: { type: 'text' }, - user: { type: 'keyword' }, - scope: { type: 'keyword' }, + if (!this.wasInitialized) { + const properties = { + type: { type: 'keyword' }, + task: { + properties: { + taskType: { type: 'keyword' }, + runAt: { type: 'date' }, + interval: { type: 'text' }, + attempts: { type: 'integer' }, + status: { type: 'keyword' }, + params: { type: 'text' }, + state: { type: 'text' }, + user: { type: 'keyword' }, + scope: { type: 'keyword' }, + }, }, - }, - }; - - try { - return await this.callCluster('indices.putTemplate', { - name: this.index, - body: { - index_patterns: [this.index], - mappings: { - _doc: { - dynamic: 'strict', - properties, + }; + + try { + return await this.callCluster('indices.putTemplate', { + name: this.index, + body: { + index_patterns: [this.index], + mappings: { + _doc: { + dynamic: 'strict', + properties, + }, + }, + settings: { + number_of_shards: 1, + auto_expand_replicas: '0-1', }, }, - settings: { - number_of_shards: 1, - auto_expand_replicas: '0-1', - }, - }, - }); - } catch (err) { - throw err; + }); + } catch (err) { + throw err; + } + this.wasInitialized = true; } + return; } /** @@ -133,6 +138,7 @@ export class TaskStore { * @param task - The task being scheduled. */ public async schedule(taskInstance: TaskInstance): Promise { + await this.init(); if (!this.supportedTypes.includes(taskInstance.taskType)) { throw new Error(`Unsupported task type "${taskInstance.taskType}".`); } @@ -163,12 +169,13 @@ export class TaskStore { * * @param opts - The query options used to filter tasks */ - public async fetch(opts: FetchOpts = {}): Promise { + public async fetch(opts: FetchOpts = {}, verbose: undefined | boolean): Promise { const sort = paginatableSort(opts.sort); return this.search({ sort, search_after: opts.searchAfter, query: opts.query, + verbose, }); } @@ -271,7 +278,7 @@ export class TaskStore { ? { bool: { must: [queryOnlyTasks, originalQuery] } } : queryOnlyTasks; - const result = await this.callCluster('search', { + const searchObj = { type: DOC_TYPE, index: this.index, ignoreUnavailable: true, @@ -279,7 +286,9 @@ export class TaskStore { ...opts, query, }, - }); + }; + + const result = await this.callCluster('search', searchObj); const rawDocs = result.hits.hits; From 741f6421a9df58c45454ad6259871eadabfe8e26 Mon Sep 17 00:00:00 2001 From: Nicholas Dziedzic Date: Mon, 5 Nov 2018 11:23:24 -0500 Subject: [PATCH 02/12] Removes unused option for debugging purposes. --- x-pack/plugins/task_manager/task_store.ts | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/x-pack/plugins/task_manager/task_store.ts b/x-pack/plugins/task_manager/task_store.ts index 6218b633a9a24..5dd6964d90457 100644 --- a/x-pack/plugins/task_manager/task_store.ts +++ b/x-pack/plugins/task_manager/task_store.ts @@ -169,13 +169,12 @@ export class TaskStore { * * @param opts - The query options used to filter tasks */ - public async fetch(opts: FetchOpts = {}, verbose: undefined | boolean): Promise { + public async fetch(opts: FetchOpts = {}): Promise { const sort = paginatableSort(opts.sort); return this.search({ sort, search_after: opts.searchAfter, query: opts.query, - verbose, }); } @@ -278,7 +277,7 @@ export class TaskStore { ? { bool: { must: [queryOnlyTasks, originalQuery] } } : queryOnlyTasks; - const searchObj = { + const result = await this.callCluster('search', { type: DOC_TYPE, index: this.index, ignoreUnavailable: true, @@ -286,9 +285,7 @@ export class TaskStore { ...opts, query, }, - }; - - const result = await this.callCluster('search', searchObj); + }); const rawDocs = result.hits.hits; From 2324a1755e5d1bb3ec9308f0c8fb9d758b613971 Mon Sep 17 00:00:00 2001 From: Nicholas Dziedzic Date: Mon, 5 Nov 2018 13:19:21 -0500 Subject: [PATCH 03/12] Fix unit tests because a second callCluster was made. --- x-pack/plugins/task_manager/task_store.test.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/x-pack/plugins/task_manager/task_store.test.ts b/x-pack/plugins/task_manager/task_store.test.ts index 27ecba9a8e834..c31b4891ed7b6 100644 --- a/x-pack/plugins/task_manager/task_store.test.ts +++ b/x-pack/plugins/task_manager/task_store.test.ts @@ -51,12 +51,11 @@ describe('TaskStore', () => { maxAttempts: 2, supportedTypes: ['report', 'dernstraight', 'yawn'], }); - const result = await store.schedule(task); - sinon.assert.calledOnce(callCluster); + sinon.assert.calledTwice(callCluster); - return { result, callCluster, arg: callCluster.args[0][1] }; + return { result, callCluster, arg: callCluster.args[1][1] }; } test('serializes the params and state', async () => { @@ -67,7 +66,6 @@ describe('TaskStore', () => { }; const { callCluster, arg } = await testSchedule(task); - sinon.assert.calledOnce(callCluster); sinon.assert.calledWith(callCluster, 'index'); expect(arg).toMatchObject({ From 53d8aed825f18456d6a2ee9e97b836bc02c71fb4 Mon Sep 17 00:00:00 2001 From: Nicholas Dziedzic Date: Thu, 8 Nov 2018 17:15:58 -0500 Subject: [PATCH 04/12] Task manager starts sanely. Does not preInit Will not allow definitions after initialization Creates store immediately. Modifies store after all plugins have initialized Adds static tasks that will be defined by plugins. --- x-pack/plugins/task_manager/index.js | 2 +- x-pack/plugins/task_manager/task.ts | 9 ++ x-pack/plugins/task_manager/task_manager.ts | 138 ++++++++++---------- x-pack/plugins/task_manager/task_runner.ts | 15 ++- x-pack/plugins/task_manager/task_store.ts | 19 ++- 5 files changed, 106 insertions(+), 77 deletions(-) diff --git a/x-pack/plugins/task_manager/index.js b/x-pack/plugins/task_manager/index.js index 821a454122b03..f79556822083c 100644 --- a/x-pack/plugins/task_manager/index.js +++ b/x-pack/plugins/task_manager/index.js @@ -32,7 +32,7 @@ export function taskManager(kibana) { .default({}) }).default(); }, - preInit(server) { + init(server) { const config = server.config(); const taskManager = new TaskManager(this.kbnServer, server, config); server.decorate('server', 'taskManager', taskManager); diff --git a/x-pack/plugins/task_manager/task.ts b/x-pack/plugins/task_manager/task.ts index 9c5fd0cc86139..562174c50066c 100644 --- a/x-pack/plugins/task_manager/task.ts +++ b/x-pack/plugins/task_manager/task.ts @@ -112,6 +112,12 @@ export interface TaskDefinition { * and an optional cancel function which cancels the task. */ createTaskRunner: TaskRunCreatorFunction; + + /** + * Static tasks are defined by dependant plugins before all plugins have + * initlialized. The task manager handles making sure they are created. + */ + static?: TaskInstance[]; } /** @@ -128,6 +134,9 @@ export const validateTaskDefinition = Joi.object({ timeOut: Joi.string().default('5m'), numWorkers: Joi.number().default(1), createTaskRunner: Joi.func().required(), + static: Joi.array() + .items(Joi.object()) + .optional(), }).default(); /** diff --git a/x-pack/plugins/task_manager/task_manager.ts b/x-pack/plugins/task_manager/task_manager.ts index 7f55085a2804a..dd291e0462085 100644 --- a/x-pack/plugins/task_manager/task_manager.ts +++ b/x-pack/plugins/task_manager/task_manager.ts @@ -5,7 +5,7 @@ */ import { fillPool } from './lib/fill_pool'; -import { TaskManagerLogger } from './lib/logger'; +import { Logger, TaskManagerLogger } from './lib/logger'; import { addMiddlewareToChain, BeforeSaveMiddlewareParams, Middleware } from './lib/middleware'; import { sanitizeTaskDefinitions } from './lib/sanitize_task_definitions'; import { ConcreteTaskInstance, RunContext, TaskInstance } from './task'; @@ -33,8 +33,9 @@ export class TaskManager { private maxWorkers: number; private overrideNumWorkers: { [taskType: string]: number }; private definitions: TaskDictionary; - private store?: TaskStore; - private poller?: TaskPoller; + private store: TaskStore; + private poller: TaskPoller; + private logger: Logger; private middleware = { beforeSave: async (saveOpts: BeforeSaveMiddlewareParams) => saveOpts, beforeRun: async (runOpts: RunContext) => runOpts, @@ -52,39 +53,53 @@ export class TaskManager { const logger = new TaskManagerLogger((...args: any[]) => server.log(...args)); - kbnServer.afterPluginsInit(() => { - const store = new TaskStore({ - callCluster: server.plugins.elasticsearch.getCluster('admin').callWithInternalUser, - index: config.get('xpack.task_manager.index'), - maxAttempts: config.get('xpack.task_manager.max_attempts'), - supportedTypes: Object.keys(this.definitions), - }); - const pool = new TaskPool({ - logger, - maxWorkers: this.maxWorkers, - }); - const createRunner = (instance: ConcreteTaskInstance) => - new TaskManagerRunner({ - logger, - kbnServer, - instance, - store, - definition: this.definitions[instance.taskType], - beforeRun: this.middleware.beforeRun, - }); - const poller = new TaskPoller({ + const store = new TaskStore({ + callCluster: server.plugins.elasticsearch.getCluster('admin').callWithInternalUser, + index: config.get('xpack.task_manager.index'), + maxAttempts: config.get('xpack.task_manager.max_attempts'), + supportedTypes: Object.keys(this.definitions), + }); + const pool = new TaskPool({ + logger, + maxWorkers: this.maxWorkers, + }); + const createRunner = (instance: ConcreteTaskInstance) => + new TaskManagerRunner({ logger, - pollInterval: config.get('xpack.task_manager.poll_interval'), - work() { - return fillPool(pool.run, store.fetchAvailableTasks, createRunner); - }, + kbnServer, + instance, + store, + definitions: this.definitions, + beforeRun: this.middleware.beforeRun, }); + const poller = new TaskPoller({ + logger, + pollInterval: config.get('xpack.task_manager.poll_interval'), + work() { + return fillPool(pool.run, store.fetchAvailableTasks, createRunner); + }, + }); - bindToElasticSearchStatus(server.plugins.elasticsearch, logger, poller, store); + this.logger = logger; + this.store = store; + this.poller = poller; - this.store = store; - this.poller = poller; + kbnServer.afterPluginsInit(async () => { this.isInitialized = true; + store.addSupportedTypes(Object.keys(this.definitions)); + await store.init(); + await poller.start(); + // schedule all the static tasks + Object.keys(this.definitions).forEach((k: string) => { + const taskDef = this.definitions[k]; + + if (taskDef.static) { + taskDef.static.forEach(async (t: TaskInstance) => { + t.taskType = k; + await this.schedule(t); + }); + } + }); }); } @@ -92,19 +107,24 @@ export class TaskManager { * Method for allowing consumers to register task definitions into the system. * @param taskDefinitions - The Kibana task definitions dictionary */ - public registerTaskDefinitions(taskDefinitions: TaskDictionary) { + public async registerTaskDefinitions(taskDefinitions: TaskDictionary) { this.assertUninitialized('register task definitions'); const duplicate = Object.keys(taskDefinitions).find(k => !!this.definitions[k]); if (duplicate) { throw new Error(`Task ${duplicate} is already defined!`); } - const sanitized = sanitizeTaskDefinitions( - taskDefinitions, - this.maxWorkers, - this.overrideNumWorkers - ); - Object.assign(this.definitions, sanitized); + try { + const sanitized = sanitizeTaskDefinitions( + taskDefinitions, + this.maxWorkers, + this.overrideNumWorkers + ); + + Object.assign(this.definitions, sanitized); + } catch (e) { + this.logger.error('Could not sanitize task definitions'); + } } /** @@ -124,13 +144,13 @@ export class TaskManager { * @param task - The task being scheduled. */ public async schedule(taskInstance: TaskInstance, options?: any) { - this.assertInitialized(); + this.assertInitialized('only static tasks can be scheduled before task manager is initialized'); const { taskInstance: modifiedTask } = await this.middleware.beforeSave({ ...options, taskInstance, }); - const result = await this.store!.schedule(modifiedTask); - this.poller!.attemptWork(); + const result = await this.store.schedule(modifiedTask); + this.poller.attemptWork(); return result; } @@ -140,8 +160,8 @@ export class TaskManager { * @param opts - The query options used to filter tasks */ public async fetch(opts: FetchOpts) { - this.assertInitialized(); - return this.store!.fetch(opts); + this.assertInitialized('Tasks cannot be fetched before task manager is initialized!'); + return this.store.fetch(opts); } /** @@ -151,41 +171,19 @@ export class TaskManager { * @returns {Promise} */ public async remove(id: string) { - this.assertInitialized(); - return this.store!.remove(id); + this.assertInitialized('Tasks cannot be removed before task manager is initialized!'); + return this.store.remove(id); } private assertUninitialized(message: string) { if (this.isInitialized) { - throw new Error(`Cannot ${message} after the task manager is initialized.`); + throw new Error(`Cannot ${message} after the task manager is initialized!`); } } - private assertInitialized() { + private assertInitialized(message: string) { if (!this.isInitialized) { - throw new Error('The task manager is initializing.'); + throw new Error(`NotInitialized: ${message}`); } } } - -// This is exported for test purposes. It is responsible for starting / stopping -// the poller based on the elasticsearch plugin status. -export function bindToElasticSearchStatus( - elasticsearch: any, - logger: { debug: (s: string) => any; info: (s: string) => any }, - poller: { stop: () => any; start: () => Promise }, - store: { init: () => Promise } -) { - elasticsearch.status.on('red', () => { - logger.debug('Lost connection to Elasticsearch, stopping the poller.'); - poller.stop(); - }); - - elasticsearch.status.on('green', async () => { - logger.debug('Initializing store'); - await store.init(); - logger.debug('Starting poller'); - await poller.start(); - logger.info('Connected to Elasticsearch, and watching for tasks'); - }); -} diff --git a/x-pack/plugins/task_manager/task_runner.ts b/x-pack/plugins/task_manager/task_runner.ts index 2255075b03d91..aa545ab8f780e 100644 --- a/x-pack/plugins/task_manager/task_runner.ts +++ b/x-pack/plugins/task_manager/task_runner.ts @@ -19,7 +19,9 @@ import { CancellableTask, ConcreteTaskInstance, RunResult, + SanitizedTaskDefinition, TaskDefinition, + TaskDictionary, validateRunResult, } from './task'; import { RemoveResult } from './task_store'; @@ -41,7 +43,7 @@ interface Updatable { interface Opts { logger: Logger; - definition: TaskDefinition; + definitions: TaskDictionary; instance: ConcreteTaskInstance; store: Updatable; kbnServer: any; @@ -59,7 +61,7 @@ interface Opts { export class TaskManagerRunner implements TaskRunner { private task?: CancellableTask; private instance: ConcreteTaskInstance; - private definition: TaskDefinition; + private definitions: TaskDictionary; private logger: Logger; private store: Updatable; private kbnServer: any; @@ -78,7 +80,7 @@ export class TaskManagerRunner implements TaskRunner { */ constructor(opts: Opts) { this.instance = sanitizeInstance(opts.instance); - this.definition = opts.definition; + this.definitions = opts.definitions; this.logger = opts.logger; this.store = opts.store; this.kbnServer = opts.kbnServer; @@ -106,6 +108,13 @@ export class TaskManagerRunner implements TaskRunner { return this.instance.taskType; } + /** + * Gets the task defintion from the dictionary. + */ + public get definition() { + return this.definitions[this.taskType]; + } + /** * Gets whether or not this task has run longer than its expiration setting allows. */ diff --git a/x-pack/plugins/task_manager/task_store.ts b/x-pack/plugins/task_manager/task_store.ts index 5dd6964d90457..a8096c8de5b17 100644 --- a/x-pack/plugins/task_manager/task_store.ts +++ b/x-pack/plugins/task_manager/task_store.ts @@ -85,6 +85,14 @@ export class TaskStore { this.supportedTypes = opts.supportedTypes; } + public addSupportedTypes(types: string[]) { + if (!this.wasInitialized) { + this.supportedTypes = this.supportedTypes.concat(types); + } else { + throw Error('Cannot add task types after initializattion'); + } + } + /** * Initializes the store, ensuring the task manager index is created and up to date. */ @@ -108,7 +116,7 @@ export class TaskStore { }; try { - return await this.callCluster('indices.putTemplate', { + const templateResult = await this.callCluster('indices.putTemplate', { name: this.index, body: { index_patterns: [this.index], @@ -124,10 +132,11 @@ export class TaskStore { }, }, }); + this.wasInitialized = true; + return templateResult; } catch (err) { throw err; } - this.wasInitialized = true; } return; } @@ -140,7 +149,11 @@ export class TaskStore { public async schedule(taskInstance: TaskInstance): Promise { await this.init(); if (!this.supportedTypes.includes(taskInstance.taskType)) { - throw new Error(`Unsupported task type "${taskInstance.taskType}".`); + throw new Error( + `Unsupported task type "${ + taskInstance.taskType + }". Supported types are ${this.supportedTypes.join(', ')}` + ); } const { id, ...body } = rawSource(taskInstance); From 454858ca38bce56e79e930f9a145b86fde5c1c1f Mon Sep 17 00:00:00 2001 From: Nicholas Dziedzic Date: Fri, 9 Nov 2018 11:26:24 -0500 Subject: [PATCH 05/12] Task manager should not allow operations before initialization. --- .../plugins/task_manager/task_manager.test.ts | 37 ++----------------- 1 file changed, 4 insertions(+), 33 deletions(-) diff --git a/x-pack/plugins/task_manager/task_manager.test.ts b/x-pack/plugins/task_manager/task_manager.test.ts index d836f81fb0af5..24f911ed8ace6 100644 --- a/x-pack/plugins/task_manager/task_manager.test.ts +++ b/x-pack/plugins/task_manager/task_manager.test.ts @@ -6,7 +6,7 @@ import _ from 'lodash'; import sinon from 'sinon'; -import { bindToElasticSearchStatus, TaskManager } from './task_manager'; +import { TaskManager } from './task_manager'; describe('TaskManager', () => { let clock: sinon.SinonFakeTimers; @@ -26,35 +26,6 @@ describe('TaskManager', () => { afterEach(() => clock.restore()); - test('starts / stops the poller when es goes green / red', async () => { - const handlers: any = {}; - const es = { - status: { - on: (color: string, handler: any) => (handlers[color] = () => Promise.resolve(handler())), - }, - }; - const start = sinon.spy(async () => undefined); - const stop = sinon.spy(async () => undefined); - const init = sinon.spy(async () => undefined); - - bindToElasticSearchStatus(es, { info: _.noop, debug: _.noop }, { stop, start }, { init }); - - await handlers.green(); - sinon.assert.calledOnce(init); - sinon.assert.calledOnce(start); - sinon.assert.notCalled(stop); - - await handlers.red(); - sinon.assert.calledOnce(init); - sinon.assert.calledOnce(start); - sinon.assert.calledOnce(stop); - - await handlers.green(); - sinon.assert.calledTwice(init); - sinon.assert.calledTwice(start); - sinon.assert.calledOnce(stop); - }); - test('disallows schedule before init', async () => { const { opts } = testOpts(); const client = new TaskManager(opts.kbnServer, opts.server, opts.config); @@ -62,19 +33,19 @@ describe('TaskManager', () => { taskType: 'foo', params: {}, }; - await expect(client.schedule(task)).rejects.toThrow(/The task manager is initializing/i); + await expect(client.schedule(task)).rejects.toThrow(/^NotInitialized: .*/i); }); test('disallows fetch before init', async () => { const { opts } = testOpts(); const client = new TaskManager(opts.kbnServer, opts.server, opts.config); - await expect(client.fetch({})).rejects.toThrow(/The task manager is initializing/i); + await expect(client.fetch({})).rejects.toThrow(/^NotInitialized: .*/i); }); test('disallows remove before init', async () => { const { opts } = testOpts(); const client = new TaskManager(opts.kbnServer, opts.server, opts.config); - await expect(client.remove('23')).rejects.toThrow(/The task manager is initializing/i); + await expect(client.remove('23')).rejects.toThrow(/^NotInitialized: .*/i); }); test('allows middleware registration before init', () => { From 66eddfe1a55dd08265257cf48906d1098fc4ae82 Mon Sep 17 00:00:00 2001 From: Nicholas Dziedzic Date: Fri, 9 Nov 2018 11:27:20 -0500 Subject: [PATCH 06/12] Attempts to fix runner tests. --- .../plugins/task_manager/task_runner.test.ts | 114 ++++++++++-------- 1 file changed, 65 insertions(+), 49 deletions(-) diff --git a/x-pack/plugins/task_manager/task_runner.test.ts b/x-pack/plugins/task_manager/task_runner.test.ts index 94d81b7d4c5c5..7bd26b82a4a1b 100644 --- a/x-pack/plugins/task_manager/task_runner.test.ts +++ b/x-pack/plugins/task_manager/task_runner.test.ts @@ -7,7 +7,7 @@ import _ from 'lodash'; import sinon from 'sinon'; import { minutesFromNow } from './lib/intervals'; -import { ConcreteTaskInstance, TaskDefinition } from './task'; +import { ConcreteTaskInstance, TaskDefinition, TaskDictionary } from './task'; import { TaskManagerRunner } from './task_runner'; describe('TaskManagerRunner', () => { @@ -52,12 +52,14 @@ describe('TaskManagerRunner', () => { params: { a: 'b' }, state: { hey: 'there' }, }, - definition: { - createTaskRunner: () => ({ - async run() { - throw new Error('Dangit!'); - }, - }), + definitions: { + testtype: { + createTaskRunner: () => ({ + async run() { + throw new Error('Dangit!'); + }, + }), + }, }, }); @@ -92,12 +94,14 @@ describe('TaskManagerRunner', () => { test('reschedules tasks that return a runAt', async () => { const runAt = minutesFromNow(_.random(1, 10)); const { runner, store } = testOpts({ - definition: { - createTaskRunner: () => ({ - async run() { - return { runAt }; - }, - }), + definitions: { + testType: { + createTaskRunner: () => ({ + async run() { + return { runAt }; + }, + }), + }, }, }); @@ -113,12 +117,14 @@ describe('TaskManagerRunner', () => { instance: { interval: '20m', }, - definition: { - createTaskRunner: () => ({ - async run() { - return { runAt }; - }, - }), + definitions: { + testType: { + createTaskRunner: () => ({ + async run() { + return { runAt }; + }, + }), + }, }, }); @@ -135,12 +141,14 @@ describe('TaskManagerRunner', () => { id, interval: undefined, }, - definition: { - createTaskRunner: () => ({ - async run() { - return undefined; - }, - }), + definitions: { + testType: { + createTaskRunner: () => ({ + async run() { + return undefined; + }, + }), + }, }, }); @@ -153,20 +161,22 @@ describe('TaskManagerRunner', () => { test('cancel cancels the task runner, if it is cancellable', async () => { let wasCancelled = false; const { runner, logger } = testOpts({ - definition: { - createTaskRunner: () => ({ - async run() { - await new Promise(r => setTimeout(r, 1000)); - }, - async cancel() { - wasCancelled = true; - }, - }), + definitions: { + testType: { + createTaskRunner: () => ({ + async run() { + await new Promise(r => setTimeout(r, 1000)); + }, + async cancel() { + wasCancelled = true; + }, + }), + }, }, }); const promise = runner.run(); - await new Promise(r => setTimeout(r, 1)); + await new Promise(r => setInterval(r, 1)); await runner.cancel(); await promise; @@ -176,10 +186,12 @@ describe('TaskManagerRunner', () => { test('warns if cancel is called on a non-cancellable task', async () => { const { runner, logger } = testOpts({ - definition: { - createTaskRunner: () => ({ - run: async () => undefined, - }), + definitions: { + testType: { + createTaskRunner: () => ({ + run: async () => undefined, + }), + }, }, }); @@ -192,7 +204,7 @@ describe('TaskManagerRunner', () => { interface TestOpts { instance?: Partial; - definition?: Partial; + definitions?: any; } function testOpts(opts: TestOpts) { @@ -229,13 +241,15 @@ describe('TaskManagerRunner', () => { }, opts.instance || {} ), - definition: Object.assign( + definitions: Object.assign( { - type: 'bar', - title: 'Bar!', - createTaskRunner, + bar: { + type: 'bar', + title: 'Bar!', + createTaskRunner, + }, }, - opts.definition || {} + opts.definitions || {} ), }); @@ -250,10 +264,12 @@ describe('TaskManagerRunner', () => { async function testReturn(result: any, shouldBeValid: boolean) { const { runner, logger } = testOpts({ - definition: { - createTaskRunner: () => ({ - run: async () => result, - }), + definitions: { + testType: { + createTaskRunner: () => ({ + run: async () => result, + }), + }, }, }); From ec1e068e9b9da34a53c3b819cf20e840b6e9b41d Mon Sep 17 00:00:00 2001 From: Nicholas Dziedzic Date: Fri, 9 Nov 2018 16:50:55 -0500 Subject: [PATCH 07/12] Fixes unit test contract with APIs. --- .../plugins/task_manager/task_runner.test.ts | 95 ++++++++++--------- 1 file changed, 51 insertions(+), 44 deletions(-) diff --git a/x-pack/plugins/task_manager/task_runner.test.ts b/x-pack/plugins/task_manager/task_runner.test.ts index 7bd26b82a4a1b..33f77bbda91b2 100644 --- a/x-pack/plugins/task_manager/task_runner.test.ts +++ b/x-pack/plugins/task_manager/task_runner.test.ts @@ -25,21 +25,37 @@ describe('TaskManagerRunner', () => { }); test('warns if the task returns an unexpected result', async () => { - await allowsReturnType(undefined); - await allowsReturnType({}); - await allowsReturnType({ - runAt: new Date(), - }); - await allowsReturnType({ - error: new Error('Dang it!'), - }); - await allowsReturnType({ - state: { shazm: true }, - }); - await disallowsReturnType('hm....'); - await disallowsReturnType({ - whatIsThis: '?!!?', - }); + await allowsReturnType(undefined, __filename, 28); + await allowsReturnType({}, __filename, 29); + await allowsReturnType( + { + runAt: new Date(), + }, + __filename, + 35 + ); + await allowsReturnType( + { + error: new Error('Dang it!'), + }, + __filename, + 42 + ); + await allowsReturnType( + { + state: { shazm: true }, + }, + __filename, + 49 + ); + await disallowsReturnType('hm....', __filename, 51); + await disallowsReturnType( + { + whatIsThis: '?!!?', + }, + __filename, + 57 + ); }); test('queues a reattempt if the task fails', async () => { @@ -95,7 +111,7 @@ describe('TaskManagerRunner', () => { const runAt = minutesFromNow(_.random(1, 10)); const { runner, store } = testOpts({ definitions: { - testType: { + bar: { createTaskRunner: () => ({ async run() { return { runAt }; @@ -118,7 +134,7 @@ describe('TaskManagerRunner', () => { interval: '20m', }, definitions: { - testType: { + bar: { createTaskRunner: () => ({ async run() { return { runAt }; @@ -142,7 +158,7 @@ describe('TaskManagerRunner', () => { interval: undefined, }, definitions: { - testType: { + bar: { createTaskRunner: () => ({ async run() { return undefined; @@ -162,7 +178,7 @@ describe('TaskManagerRunner', () => { let wasCancelled = false; const { runner, logger } = testOpts({ definitions: { - testType: { + bar: { createTaskRunner: () => ({ async run() { await new Promise(r => setTimeout(r, 1000)); @@ -241,16 +257,13 @@ describe('TaskManagerRunner', () => { }, opts.instance || {} ), - definitions: Object.assign( - { - bar: { - type: 'bar', - title: 'Bar!', - createTaskRunner, - }, + definitions: Object.assign(opts.definitions || {}, { + testbar: { + type: 'bar', + title: 'Bar!', + createTaskRunner, }, - opts.definitions || {} - ), + }), }); return { @@ -262,10 +275,10 @@ describe('TaskManagerRunner', () => { }; } - async function testReturn(result: any, shouldBeValid: boolean) { + async function testReturn(result: any, shouldBeValid: boolean, file: string, line: number) { const { runner, logger } = testOpts({ definitions: { - testType: { + bar: { createTaskRunner: () => ({ run: async () => result, }), @@ -275,24 +288,18 @@ describe('TaskManagerRunner', () => { await runner.run(); - try { - if (shouldBeValid) { - sinon.assert.notCalled(logger.warning); - } else { - sinon.assert.calledWith(logger.warning, sinon.match(/invalid task result/i)); - } - } catch (err) { - sinon.assert.fail( - `Expected result ${JSON.stringify(result)} to be ${shouldBeValid ? 'valid' : 'invalid'}` - ); + if (shouldBeValid) { + sinon.assert.notCalled(logger.warning); + } else { + sinon.assert.calledWith(logger.warning, sinon.match(/invalid task result/i)); } } - function allowsReturnType(result: any) { - return testReturn(result, true); + function allowsReturnType(result: any, file: string, line: number) { + return testReturn(result, true, file, line); } - function disallowsReturnType(result: any) { - return testReturn(result, false); + function disallowsReturnType(result: any, file: string, line: number) { + return testReturn(result, false, file, line); } }); From 8fe6c089b6751fff23f81f427110cb31f8ea2bdc Mon Sep 17 00:00:00 2001 From: Nicholas Dziedzic Date: Tue, 13 Nov 2018 10:38:55 -0800 Subject: [PATCH 08/12] Removes unused type definitions. --- .../plugins/task_manager/task_runner.test.ts | 58 +++++++------------ 1 file changed, 21 insertions(+), 37 deletions(-) diff --git a/x-pack/plugins/task_manager/task_runner.test.ts b/x-pack/plugins/task_manager/task_runner.test.ts index 33f77bbda91b2..3afcf52c67ebb 100644 --- a/x-pack/plugins/task_manager/task_runner.test.ts +++ b/x-pack/plugins/task_manager/task_runner.test.ts @@ -7,7 +7,7 @@ import _ from 'lodash'; import sinon from 'sinon'; import { minutesFromNow } from './lib/intervals'; -import { ConcreteTaskInstance, TaskDefinition, TaskDictionary } from './task'; +import { ConcreteTaskInstance } from './task'; import { TaskManagerRunner } from './task_runner'; describe('TaskManagerRunner', () => { @@ -25,37 +25,21 @@ describe('TaskManagerRunner', () => { }); test('warns if the task returns an unexpected result', async () => { - await allowsReturnType(undefined, __filename, 28); - await allowsReturnType({}, __filename, 29); - await allowsReturnType( - { - runAt: new Date(), - }, - __filename, - 35 - ); - await allowsReturnType( - { - error: new Error('Dang it!'), - }, - __filename, - 42 - ); - await allowsReturnType( - { - state: { shazm: true }, - }, - __filename, - 49 - ); - await disallowsReturnType('hm....', __filename, 51); - await disallowsReturnType( - { - whatIsThis: '?!!?', - }, - __filename, - 57 - ); + await allowsReturnType(undefined); + await allowsReturnType({}); + await allowsReturnType({ + runAt: new Date(), + }); + await allowsReturnType({ + error: new Error('Dang it!'), + }); + await allowsReturnType({ + state: { shazm: true }, + }); + await disallowsReturnType('hm....'); + await disallowsReturnType({ + whatIsThis: '?!!?', + }); }); test('queues a reattempt if the task fails', async () => { @@ -275,7 +259,7 @@ describe('TaskManagerRunner', () => { }; } - async function testReturn(result: any, shouldBeValid: boolean, file: string, line: number) { + async function testReturn(result: any, shouldBeValid: boolean) { const { runner, logger } = testOpts({ definitions: { bar: { @@ -295,11 +279,11 @@ describe('TaskManagerRunner', () => { } } - function allowsReturnType(result: any, file: string, line: number) { - return testReturn(result, true, file, line); + function allowsReturnType(result: any) { + return testReturn(result, true); } - function disallowsReturnType(result: any, file: string, line: number) { - return testReturn(result, false, file, line); + function disallowsReturnType(result: any) { + return testReturn(result, false); } }); From 15a00b1e81ce72425a28439961e7db1d6b803ada Mon Sep 17 00:00:00 2001 From: Nicholas Dziedzic Date: Wed, 14 Nov 2018 15:07:25 -0800 Subject: [PATCH 09/12] Removes unused package json. --- x-pack/plugins/task_manager/package.json | 9 --------- 1 file changed, 9 deletions(-) delete mode 100644 x-pack/plugins/task_manager/package.json diff --git a/x-pack/plugins/task_manager/package.json b/x-pack/plugins/task_manager/package.json deleted file mode 100644 index 43b706b730bb7..0000000000000 --- a/x-pack/plugins/task_manager/package.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "name": "task_manager", - "version": "kibana", - "config": { - "@elastic/eslint-import-resolver-kibana": { - "projectRoot": false - } - } -} \ No newline at end of file From 02089c60ed2e4acc500d43f9d300c2ae81531c11 Mon Sep 17 00:00:00 2001 From: Nicholas Dziedzic Date: Wed, 14 Nov 2018 15:40:42 -0800 Subject: [PATCH 10/12] Removes unused import type. --- x-pack/plugins/task_manager/task_runner.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugins/task_manager/task_runner.ts b/x-pack/plugins/task_manager/task_runner.ts index aa545ab8f780e..f205a1b2abf8b 100644 --- a/x-pack/plugins/task_manager/task_runner.ts +++ b/x-pack/plugins/task_manager/task_runner.ts @@ -20,7 +20,6 @@ import { ConcreteTaskInstance, RunResult, SanitizedTaskDefinition, - TaskDefinition, TaskDictionary, validateRunResult, } from './task'; From 3c2883aa4f0a76b056f6fa1e263ca5f083290ee9 Mon Sep 17 00:00:00 2001 From: Nicholas Dziedzic Date: Thu, 15 Nov 2018 14:08:39 -0800 Subject: [PATCH 11/12] Removes unnecessary async applied to a function. --- x-pack/plugins/task_manager/task_manager.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugins/task_manager/task_manager.ts b/x-pack/plugins/task_manager/task_manager.ts index dd291e0462085..bc7af883f4eb3 100644 --- a/x-pack/plugins/task_manager/task_manager.ts +++ b/x-pack/plugins/task_manager/task_manager.ts @@ -107,7 +107,7 @@ export class TaskManager { * Method for allowing consumers to register task definitions into the system. * @param taskDefinitions - The Kibana task definitions dictionary */ - public async registerTaskDefinitions(taskDefinitions: TaskDictionary) { + public registerTaskDefinitions(taskDefinitions: TaskDictionary) { this.assertUninitialized('register task definitions'); const duplicate = Object.keys(taskDefinitions).find(k => !!this.definitions[k]); if (duplicate) { From 76f208d299ee5e50eeb7730c950e35c78f269914 Mon Sep 17 00:00:00 2001 From: Nicholas Dziedzic Date: Mon, 19 Nov 2018 10:56:50 -0500 Subject: [PATCH 12/12] Returns diferently if task store has already initialized. --- x-pack/plugins/task_manager/task_store.ts | 77 ++++++++++++----------- 1 file changed, 40 insertions(+), 37 deletions(-) diff --git a/x-pack/plugins/task_manager/task_store.ts b/x-pack/plugins/task_manager/task_store.ts index a8096c8de5b17..63e329d5202df 100644 --- a/x-pack/plugins/task_manager/task_store.ts +++ b/x-pack/plugins/task_manager/task_store.ts @@ -97,47 +97,50 @@ export class TaskStore { * Initializes the store, ensuring the task manager index is created and up to date. */ public async init() { - if (!this.wasInitialized) { - const properties = { - type: { type: 'keyword' }, - task: { - properties: { - taskType: { type: 'keyword' }, - runAt: { type: 'date' }, - interval: { type: 'text' }, - attempts: { type: 'integer' }, - status: { type: 'keyword' }, - params: { type: 'text' }, - state: { type: 'text' }, - user: { type: 'keyword' }, - scope: { type: 'keyword' }, - }, + if (this.wasInitialized) { + return; + } + + const properties = { + type: { type: 'keyword' }, + task: { + properties: { + taskType: { type: 'keyword' }, + runAt: { type: 'date' }, + interval: { type: 'text' }, + attempts: { type: 'integer' }, + status: { type: 'keyword' }, + params: { type: 'text' }, + state: { type: 'text' }, + user: { type: 'keyword' }, + scope: { type: 'keyword' }, }, - }; - - try { - const templateResult = await this.callCluster('indices.putTemplate', { - name: this.index, - body: { - index_patterns: [this.index], - mappings: { - _doc: { - dynamic: 'strict', - properties, - }, - }, - settings: { - number_of_shards: 1, - auto_expand_replicas: '0-1', + }, + }; + + try { + const templateResult = await this.callCluster('indices.putTemplate', { + name: this.index, + body: { + index_patterns: [this.index], + mappings: { + _doc: { + dynamic: 'strict', + properties, }, }, - }); - this.wasInitialized = true; - return templateResult; - } catch (err) { - throw err; - } + settings: { + number_of_shards: 1, + auto_expand_replicas: '0-1', + }, + }, + }); + this.wasInitialized = true; + return templateResult; + } catch (err) { + throw err; } + return; }