Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test alerting demo #25136

Merged
merged 18 commits into from
Nov 19, 2018
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion x-pack/plugins/task_manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export function taskManager(kibana) {
.default({})
}).default();
},
preInit(server) {
init(server) {
const config = server.config();
const taskManager = new TaskManager(this.kbnServer, server, config);
server.decorate('server', 'taskManager', taskManager);
Expand Down
9 changes: 0 additions & 9 deletions x-pack/plugins/task_manager/package.json

This file was deleted.

9 changes: 9 additions & 0 deletions x-pack/plugins/task_manager/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ export interface TaskDefinition {
* and an optional cancel function which cancels the task.
*/
createTaskRunner: TaskRunCreatorFunction;

/**
* Static tasks are defined by dependant plugins before all plugins have
* initlialized. The task manager handles making sure they are created.
*/
static?: TaskInstance[];
}

/**
Expand All @@ -128,6 +134,9 @@ export const validateTaskDefinition = Joi.object({
timeOut: Joi.string().default('5m'),
numWorkers: Joi.number().default(1),
createTaskRunner: Joi.func().required(),
static: Joi.array()
.items(Joi.object())
.optional(),
}).default();

/**
Expand Down
37 changes: 4 additions & 33 deletions x-pack/plugins/task_manager/task_manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import _ from 'lodash';
import sinon from 'sinon';
import { bindToElasticSearchStatus, TaskManager } from './task_manager';
import { TaskManager } from './task_manager';

describe('TaskManager', () => {
let clock: sinon.SinonFakeTimers;
Expand All @@ -26,55 +26,26 @@ describe('TaskManager', () => {

afterEach(() => clock.restore());

test('starts / stops the poller when es goes green / red', async () => {
const handlers: any = {};
const es = {
status: {
on: (color: string, handler: any) => (handlers[color] = () => Promise.resolve(handler())),
},
};
const start = sinon.spy(async () => undefined);
const stop = sinon.spy(async () => undefined);
const init = sinon.spy(async () => undefined);

bindToElasticSearchStatus(es, { info: _.noop, debug: _.noop }, { stop, start }, { init });

await handlers.green();
sinon.assert.calledOnce(init);
sinon.assert.calledOnce(start);
sinon.assert.notCalled(stop);

await handlers.red();
sinon.assert.calledOnce(init);
sinon.assert.calledOnce(start);
sinon.assert.calledOnce(stop);

await handlers.green();
sinon.assert.calledTwice(init);
sinon.assert.calledTwice(start);
sinon.assert.calledOnce(stop);
});

test('disallows schedule before init', async () => {
const { opts } = testOpts();
const client = new TaskManager(opts.kbnServer, opts.server, opts.config);
const task = {
taskType: 'foo',
params: {},
};
await expect(client.schedule(task)).rejects.toThrow(/The task manager is initializing/i);
await expect(client.schedule(task)).rejects.toThrow(/^NotInitialized: .*/i);
});

test('disallows fetch before init', async () => {
const { opts } = testOpts();
const client = new TaskManager(opts.kbnServer, opts.server, opts.config);
await expect(client.fetch({})).rejects.toThrow(/The task manager is initializing/i);
await expect(client.fetch({})).rejects.toThrow(/^NotInitialized: .*/i);
});

test('disallows remove before init', async () => {
const { opts } = testOpts();
const client = new TaskManager(opts.kbnServer, opts.server, opts.config);
await expect(client.remove('23')).rejects.toThrow(/The task manager is initializing/i);
await expect(client.remove('23')).rejects.toThrow(/^NotInitialized: .*/i);
});

test('allows middleware registration before init', () => {
Expand Down
138 changes: 68 additions & 70 deletions x-pack/plugins/task_manager/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

import { fillPool } from './lib/fill_pool';
import { TaskManagerLogger } from './lib/logger';
import { Logger, TaskManagerLogger } from './lib/logger';
import { addMiddlewareToChain, BeforeSaveMiddlewareParams, Middleware } from './lib/middleware';
import { sanitizeTaskDefinitions } from './lib/sanitize_task_definitions';
import { ConcreteTaskInstance, RunContext, TaskInstance } from './task';
Expand Down Expand Up @@ -33,8 +33,9 @@ export class TaskManager {
private maxWorkers: number;
private overrideNumWorkers: { [taskType: string]: number };
private definitions: TaskDictionary<SanitizedTaskDefinition>;
private store?: TaskStore;
private poller?: TaskPoller;
private store: TaskStore;
private poller: TaskPoller;
private logger: Logger;
private middleware = {
beforeSave: async (saveOpts: BeforeSaveMiddlewareParams) => saveOpts,
beforeRun: async (runOpts: RunContext) => runOpts,
Expand All @@ -52,59 +53,78 @@ export class TaskManager {

const logger = new TaskManagerLogger((...args: any[]) => server.log(...args));

kbnServer.afterPluginsInit(() => {
const store = new TaskStore({
callCluster: server.plugins.elasticsearch.getCluster('admin').callWithInternalUser,
index: config.get('xpack.task_manager.index'),
maxAttempts: config.get('xpack.task_manager.max_attempts'),
supportedTypes: Object.keys(this.definitions),
});
const pool = new TaskPool({
logger,
maxWorkers: this.maxWorkers,
});
const createRunner = (instance: ConcreteTaskInstance) =>
new TaskManagerRunner({
logger,
kbnServer,
instance,
store,
definition: this.definitions[instance.taskType],
beforeRun: this.middleware.beforeRun,
});
const poller = new TaskPoller({
const store = new TaskStore({
callCluster: server.plugins.elasticsearch.getCluster('admin').callWithInternalUser,
index: config.get('xpack.task_manager.index'),
maxAttempts: config.get('xpack.task_manager.max_attempts'),
supportedTypes: Object.keys(this.definitions),
});
const pool = new TaskPool({
logger,
maxWorkers: this.maxWorkers,
});
const createRunner = (instance: ConcreteTaskInstance) =>
new TaskManagerRunner({
logger,
pollInterval: config.get('xpack.task_manager.poll_interval'),
work() {
return fillPool(pool.run, store.fetchAvailableTasks, createRunner);
},
kbnServer,
instance,
store,
definitions: this.definitions,
beforeRun: this.middleware.beforeRun,
});
const poller = new TaskPoller({
logger,
pollInterval: config.get('xpack.task_manager.poll_interval'),
work() {
return fillPool(pool.run, store.fetchAvailableTasks, createRunner);
},
});

bindToElasticSearchStatus(server.plugins.elasticsearch, logger, poller, store);
this.logger = logger;
this.store = store;
this.poller = poller;

this.store = store;
this.poller = poller;
kbnServer.afterPluginsInit(async () => {
this.isInitialized = true;
store.addSupportedTypes(Object.keys(this.definitions));
await store.init();
await poller.start();
// schedule all the static tasks
Object.keys(this.definitions).forEach((k: string) => {
const taskDef = this.definitions[k];

if (taskDef.static) {
taskDef.static.forEach(async (t: TaskInstance) => {
t.taskType = k;
await this.schedule(t);
});
}
});
});
}

/**
* Method for allowing consumers to register task definitions into the system.
* @param taskDefinitions - The Kibana task definitions dictionary
*/
public registerTaskDefinitions(taskDefinitions: TaskDictionary<TaskDefinition>) {
public async registerTaskDefinitions(taskDefinitions: TaskDictionary<TaskDefinition>) {
njd5475 marked this conversation as resolved.
Show resolved Hide resolved
this.assertUninitialized('register task definitions');
const duplicate = Object.keys(taskDefinitions).find(k => !!this.definitions[k]);
if (duplicate) {
throw new Error(`Task ${duplicate} is already defined!`);
}

const sanitized = sanitizeTaskDefinitions(
taskDefinitions,
this.maxWorkers,
this.overrideNumWorkers
);
Object.assign(this.definitions, sanitized);
try {
const sanitized = sanitizeTaskDefinitions(
taskDefinitions,
this.maxWorkers,
this.overrideNumWorkers
);

Object.assign(this.definitions, sanitized);
} catch (e) {
this.logger.error('Could not sanitize task definitions');
}
}

/**
Expand All @@ -124,13 +144,13 @@ export class TaskManager {
* @param task - The task being scheduled.
*/
public async schedule(taskInstance: TaskInstance, options?: any) {
this.assertInitialized();
this.assertInitialized('only static tasks can be scheduled before task manager is initialized');
const { taskInstance: modifiedTask } = await this.middleware.beforeSave({
...options,
taskInstance,
});
const result = await this.store!.schedule(modifiedTask);
this.poller!.attemptWork();
const result = await this.store.schedule(modifiedTask);
this.poller.attemptWork();
return result;
}

Expand All @@ -140,8 +160,8 @@ export class TaskManager {
* @param opts - The query options used to filter tasks
*/
public async fetch(opts: FetchOpts) {
this.assertInitialized();
return this.store!.fetch(opts);
this.assertInitialized('Tasks cannot be fetched before task manager is initialized!');
return this.store.fetch(opts);
}

/**
Expand All @@ -151,41 +171,19 @@ export class TaskManager {
* @returns {Promise<void>}
*/
public async remove(id: string) {
this.assertInitialized();
return this.store!.remove(id);
this.assertInitialized('Tasks cannot be removed before task manager is initialized!');
return this.store.remove(id);
}

private assertUninitialized(message: string) {
if (this.isInitialized) {
throw new Error(`Cannot ${message} after the task manager is initialized.`);
throw new Error(`Cannot ${message} after the task manager is initialized!`);
}
}

private assertInitialized() {
private assertInitialized(message: string) {
if (!this.isInitialized) {
throw new Error('The task manager is initializing.');
throw new Error(`NotInitialized: ${message}`);
}
}
}

// This is exported for test purposes. It is responsible for starting / stopping
// the poller based on the elasticsearch plugin status.
export function bindToElasticSearchStatus(
elasticsearch: any,
logger: { debug: (s: string) => any; info: (s: string) => any },
poller: { stop: () => any; start: () => Promise<any> },
store: { init: () => Promise<any> }
) {
elasticsearch.status.on('red', () => {
logger.debug('Lost connection to Elasticsearch, stopping the poller.');
poller.stop();
});

elasticsearch.status.on('green', async () => {
logger.debug('Initializing store');
await store.init();
logger.debug('Starting poller');
await poller.start();
logger.info('Connected to Elasticsearch, and watching for tasks');
});
}
Loading