Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify task manager getRetryDelay to be more custom #42064

Merged
merged 14 commits into from
Jul 29, 2019
11 changes: 7 additions & 4 deletions x-pack/legacy/plugins/task_manager/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,13 @@ export interface TaskDefinition {
maxAttempts?: number;

/**
* Function that returns the delay in seconds to wait before attempting the
* failed task again.
* Function that customizes how the task should behave when the task fails. This
* function can return `true`, `false` or a Date. True will tell task manager
* to retry using default delay logic. False will tell task manager to stop retrying
* this task. Date will suggest when to the task manager the task should retry.
* This function isn't used for interval type tasks, those retry at the next interval.
*/
getRetryDelay?: (attempts: number, error: object) => number;
getRetry?: (attempts: number, error: object) => boolean | Date;

/**
* The numer of workers / slots a running instance of this task occupies.
Expand Down Expand Up @@ -145,7 +148,7 @@ export const validateTaskDefinition = Joi.object({
.min(1)
.default(1),
createTaskRunner: Joi.func().required(),
getRetryDelay: Joi.func().optional(),
getRetry: Joi.func().optional(),
}).default();

/**
Expand Down
236 changes: 219 additions & 17 deletions x-pack/legacy/plugins/task_manager/task_runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import _ from 'lodash';
import sinon from 'sinon';
import { minutesFromNow, secondsFromNow } from './lib/intervals';
import { minutesFromNow } from './lib/intervals';
import { ConcreteTaskInstance } from './task';
import { TaskManagerRunner } from './task_runner';

Expand Down Expand Up @@ -255,20 +255,116 @@ describe('TaskManagerRunner', () => {
);
});

test('uses getRetryDelay function on error when defined', async () => {
const initialAttempts = _.random(0, 2);
const retryDelay = _.random(15, 100);
test('uses getRetry function (returning date) on error when defined', async () => {
const initialAttempts = _.random(1, 3);
const nextRetry = new Date(Date.now() + _.random(15, 100) * 1000);
const id = Date.now().toString();
const getRetryStub = sinon.stub().returns(nextRetry);
const error = new Error('Dangit!');
const { runner, store } = testOpts({
instance: {
id,
attempts: initialAttempts,
},
definitions: {
bar: {
getRetry: getRetryStub,
createTaskRunner: () => ({
async run() {
throw error;
},
}),
},
},
});

await runner.run();

sinon.assert.calledOnce(store.update);
sinon.assert.calledWith(getRetryStub, initialAttempts, error);
const instance = store.update.args[0][0];

expect(instance.runAt.getTime()).toEqual(nextRetry.getTime());
});

test('uses getRetry function (returning true) on error when defined', async () => {
const initialAttempts = _.random(1, 3);
const id = Date.now().toString();
const getRetryStub = sinon.stub().returns(true);
const error = new Error('Dangit!');
const { runner, store } = testOpts({
instance: {
id,
attempts: initialAttempts,
},
definitions: {
bar: {
getRetry: getRetryStub,
createTaskRunner: () => ({
async run() {
throw error;
},
}),
},
},
});

await runner.run();

sinon.assert.calledOnce(store.update);
sinon.assert.calledWith(getRetryStub, initialAttempts, error);
const instance = store.update.args[0][0];

const expectedRunAt = new Date(Date.now() + initialAttempts * 5 * 60 * 1000);
expect(instance.runAt.getTime()).toEqual(expectedRunAt.getTime());
});

test('uses getRetry function (returning false) on error when defined', async () => {
const initialAttempts = _.random(1, 3);
const id = Date.now().toString();
const getRetryStub = sinon.stub().returns(false);
const error = new Error('Dangit!');
const { runner, store } = testOpts({
instance: {
id,
attempts: initialAttempts,
},
definitions: {
bar: {
getRetry: getRetryStub,
createTaskRunner: () => ({
async run() {
throw error;
},
}),
},
},
});

await runner.run();

sinon.assert.calledOnce(store.update);
sinon.assert.calledWith(getRetryStub, initialAttempts, error);
const instance = store.update.args[0][0];

expect(instance.status).toBe('failed');
});

test('bypasses getRetry function (returning false) on error of a recurring task', async () => {
const initialAttempts = _.random(1, 3);
const id = Date.now().toString();
const getRetryDelayStub = sinon.stub().returns(retryDelay);
const getRetryStub = sinon.stub().returns(false);
const error = new Error('Dangit!');
const { runner, store } = testOpts({
instance: {
id,
attempts: initialAttempts,
interval: '1m',
startedAt: new Date(),
},
definitions: {
bar: {
getRetryDelay: getRetryDelayStub,
getRetry: getRetryStub,
createTaskRunner: () => ({
async run() {
throw error;
Expand All @@ -281,18 +377,20 @@ describe('TaskManagerRunner', () => {
await runner.run();

sinon.assert.calledOnce(store.update);
sinon.assert.calledWith(getRetryDelayStub, initialAttempts, error);
sinon.assert.notCalled(getRetryStub);
const instance = store.update.args[0][0];

expect(instance.runAt.getTime()).toEqual(secondsFromNow(retryDelay).getTime());
const nextIntervalDelay = 60000; // 1m
const expectedRunAt = new Date(Date.now() + nextIntervalDelay);
expect(instance.runAt.getTime()).toEqual(expectedRunAt.getTime());
});

test('uses getRetryDelay to set retryAt when defined', async () => {
test('uses getRetry (returning date) to set retryAt when defined', async () => {
const id = _.random(1, 20).toString();
const initialAttempts = _.random(0, 2);
const retryDelay = _.random(15, 100);
const initialAttempts = _.random(1, 3);
const nextRetry = new Date(Date.now() + _.random(15, 100) * 1000);
const timeoutMinutes = 1;
const getRetryDelayStub = sinon.stub().returns(retryDelay);
const getRetryStub = sinon.stub().returns(nextRetry);
const { runner, store } = testOpts({
instance: {
id,
Expand All @@ -302,7 +400,7 @@ describe('TaskManagerRunner', () => {
definitions: {
bar: {
timeout: `${timeoutMinutes}m`,
getRetryDelay: getRetryDelayStub,
getRetry: getRetryStub,
createTaskRunner: () => ({
run: async () => undefined,
}),
Expand All @@ -313,14 +411,114 @@ describe('TaskManagerRunner', () => {
await runner.claimOwnership();

sinon.assert.calledOnce(store.update);
sinon.assert.calledWith(getRetryDelayStub, initialAttempts + 1);
sinon.assert.calledWith(getRetryStub, initialAttempts + 1);
const instance = store.update.args[0][0];

expect(instance.retryAt.getTime()).toEqual(
secondsFromNow(retryDelay).getTime() + timeoutMinutes * 60 * 1000
new Date(nextRetry.getTime() + timeoutMinutes * 60 * 1000).getTime()
);
});

test('uses getRetry (returning true) to set retryAt when defined', async () => {
const id = _.random(1, 20).toString();
const initialAttempts = _.random(1, 3);
const timeoutMinutes = 1;
const getRetryStub = sinon.stub().returns(true);
const { runner, store } = testOpts({
instance: {
id,
attempts: initialAttempts,
interval: undefined,
},
definitions: {
bar: {
timeout: `${timeoutMinutes}m`,
getRetry: getRetryStub,
createTaskRunner: () => ({
run: async () => undefined,
}),
},
},
});

await runner.claimOwnership();

sinon.assert.calledOnce(store.update);
sinon.assert.calledWith(getRetryStub, initialAttempts + 1);
const instance = store.update.args[0][0];

const attemptDelay = (initialAttempts + 1) * 5 * 60 * 1000;
const timeoutDelay = timeoutMinutes * 60 * 1000;
expect(instance.retryAt.getTime()).toEqual(
new Date(Date.now() + attemptDelay + timeoutDelay).getTime()
);
});

test('uses getRetry (returning false) to set retryAt when defined', async () => {
const id = _.random(1, 20).toString();
const initialAttempts = _.random(1, 3);
const timeoutMinutes = 1;
const getRetryStub = sinon.stub().returns(false);
const { runner, store } = testOpts({
instance: {
id,
attempts: initialAttempts,
interval: undefined,
},
definitions: {
bar: {
timeout: `${timeoutMinutes}m`,
getRetry: getRetryStub,
createTaskRunner: () => ({
run: async () => undefined,
}),
},
},
});

await runner.claimOwnership();

sinon.assert.calledOnce(store.update);
sinon.assert.calledWith(getRetryStub, initialAttempts + 1);
const instance = store.update.args[0][0];

expect(instance.retryAt).toBeNull();
expect(instance.status).toBe('running');
});

test('bypasses getRetry (returning false) of a recurring task to set retryAt when defined', async () => {
const id = _.random(1, 20).toString();
const initialAttempts = _.random(1, 3);
const timeoutMinutes = 1;
const getRetryStub = sinon.stub().returns(false);
const { runner, store } = testOpts({
instance: {
id,
attempts: initialAttempts,
interval: '1m',
startedAt: new Date(),
},
definitions: {
bar: {
timeout: `${timeoutMinutes}m`,
getRetry: getRetryStub,
createTaskRunner: () => ({
run: async () => undefined,
}),
},
},
});

await runner.claimOwnership();

sinon.assert.calledOnce(store.update);
sinon.assert.notCalled(getRetryStub);
const instance = store.update.args[0][0];

const timeoutDelay = timeoutMinutes * 60 * 1000;
expect(instance.retryAt.getTime()).toEqual(new Date(Date.now() + timeoutDelay).getTime());
});

test('Fails non-recurring task when maxAttempts reached', async () => {
const id = _.random(1, 20).toString();
const initialAttempts = 3;
Expand Down Expand Up @@ -355,11 +553,13 @@ describe('TaskManagerRunner', () => {
test(`Doesn't fail recurring tasks when maxAttempts reached`, async () => {
const id = _.random(1, 20).toString();
const initialAttempts = 3;
const intervalSeconds = 10;
const { runner, store } = testOpts({
instance: {
id,
attempts: initialAttempts,
interval: '10s',
interval: `${intervalSeconds}s`,
startedAt: new Date(),
},
definitions: {
bar: {
Expand All @@ -379,7 +579,9 @@ describe('TaskManagerRunner', () => {
const instance = store.update.args[0][0];
expect(instance.attempts).toEqual(3);
expect(instance.status).toEqual('idle');
expect(instance.runAt.getTime()).toEqual(minutesFromNow(15).getTime());
expect(instance.runAt.getTime()).toEqual(
new Date(Date.now() + intervalSeconds * 1000).getTime()
);
});

interface TestOpts {
Expand Down
Loading