From 13b73a17b33ed2ec02474fbef22a5ec35aa58b74 Mon Sep 17 00:00:00 2001 From: Gidi Meir Morris Date: Wed, 6 Jan 2021 14:37:56 +0000 Subject: [PATCH] added more polling results to stats --- .../plugins/task_manager/server/MONITORING.md | 16 ++++- .../task_manager/server/lib/fill_pool.test.ts | 11 ++-- .../task_manager/server/lib/fill_pool.ts | 62 ++++++++++++------- .../server/monitoring/task_run_statistics.ts | 6 ++ .../task_manager/server/polling_lifecycle.ts | 9 +-- .../plugins/task_manager/server/task_pool.ts | 6 ++ .../test_suites/task_manager/health_route.ts | 3 + 7 files changed, 78 insertions(+), 35 deletions(-) diff --git a/x-pack/plugins/task_manager/server/MONITORING.md b/x-pack/plugins/task_manager/server/MONITORING.md index 4960086411e9..3595b8631748 100644 --- a/x-pack/plugins/task_manager/server/MONITORING.md +++ b/x-pack/plugins/task_manager/server/MONITORING.md @@ -179,9 +179,21 @@ For example, if you _curl_ the `/api/task_manager/_health` endpoint, you might g /* What is the frequency of polling cycle result? Here we see 94% of "NoTasksClaimed" and 6% "PoolFilled" */ "result_frequency_percent_as_number": { + /* This tells us that the polling cycle didnt claim any new tasks */ "NoTasksClaimed": 94, - "RanOutOfCapacity": 0, /* This is a legacy result, we might want to rename - it tells us when a polling cycle resulted in claiming more tasks than we had workers for, butt he name doesn't make much sense outside of the context of the code */ - "PoolFilled": 6 + /* This is a legacy result we are renaming in 8.0.0 - + it tells us when a polling cycle resulted in claiming more tasks + than we had workers for, butt he name doesn't make much sense outside of the context of the code */ + "RanOutOfCapacity": 0, + /* This is a legacy result we are renaming in 8.0.0 - + it tells us when a polling cycle resulted in tasks being claimed but less the the available workers */ + "PoolFilled": 6, + /* This tells us when a polling cycle resulted in no tasks being claimed due to there being no available workers */ + "NoAvailableWorkers": 0, + /* This tells us when a polling cycle resulted in tasks being claimed at 100% capacity of the available workers */ + "RunningAtCapacity": 0, + /* This tells us when the poller failed to claim */ + "Failed": 0 } }, /* on average, the tasks in this deployment run 1.7s after their scheduled time */ diff --git a/x-pack/plugins/task_manager/server/lib/fill_pool.test.ts b/x-pack/plugins/task_manager/server/lib/fill_pool.test.ts index ebb72c3ed36d..a2c1eb514aeb 100644 --- a/x-pack/plugins/task_manager/server/lib/fill_pool.test.ts +++ b/x-pack/plugins/task_manager/server/lib/fill_pool.test.ts @@ -8,6 +8,7 @@ import _ from 'lodash'; import sinon from 'sinon'; import { fillPool } from './fill_pool'; import { TaskPoolRunResult } from '../task_pool'; +import { asOk } from './result_type'; describe('fillPool', () => { test('stops filling when pool runs all claimed tasks, even if there is more capacity', async () => { @@ -16,7 +17,7 @@ describe('fillPool', () => { [4, 5], ]; let index = 0; - const fetchAvailableTasks = async () => tasks[index++] || []; + const fetchAvailableTasks = async () => asOk(tasks[index++] || []); const run = sinon.spy(async () => TaskPoolRunResult.RunningAllClaimedTasks); const converter = _.identity; @@ -31,7 +32,7 @@ describe('fillPool', () => { [4, 5], ]; let index = 0; - const fetchAvailableTasks = async () => tasks[index++] || []; + const fetchAvailableTasks = async () => asOk(tasks[index++] || []); const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity); const converter = _.identity; @@ -46,7 +47,7 @@ describe('fillPool', () => { [4, 5], ]; let index = 0; - const fetchAvailableTasks = async () => tasks[index++] || []; + const fetchAvailableTasks = async () => asOk(tasks[index++] || []); const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity); const converter = (x: number) => x.toString(); @@ -80,7 +81,7 @@ describe('fillPool', () => { [4, 5], ]; let index = 0; - const fetchAvailableTasks = async () => tasks[index++] || []; + const fetchAvailableTasks = async () => asOk(tasks[index++] || []); await fillPool(fetchAvailableTasks, converter, run); } catch (err) { @@ -95,7 +96,7 @@ describe('fillPool', () => { [4, 5], ]; let index = 0; - const fetchAvailableTasks = async () => tasks[index++] || []; + const fetchAvailableTasks = async () => asOk(tasks[index++] || []); const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity); const converter = (x: number) => { throw new Error(`can not convert ${x}`); diff --git a/x-pack/plugins/task_manager/server/lib/fill_pool.ts b/x-pack/plugins/task_manager/server/lib/fill_pool.ts index 9e4894587203..5ab173755662 100644 --- a/x-pack/plugins/task_manager/server/lib/fill_pool.ts +++ b/x-pack/plugins/task_manager/server/lib/fill_pool.ts @@ -6,15 +6,19 @@ import { performance } from 'perf_hooks'; import { TaskPoolRunResult } from '../task_pool'; +import { Result, map } from './result_type'; export enum FillPoolResult { + Failed = 'Failed', + NoAvailableWorkers = 'NoAvailableWorkers', NoTasksClaimed = 'NoTasksClaimed', + RunningAtCapacity = 'RunningAtCapacity', RanOutOfCapacity = 'RanOutOfCapacity', PoolFilled = 'PoolFilled', } type BatchRun = (tasks: T[]) => Promise; -type Fetcher = () => Promise; +type Fetcher = () => Promise>; type Converter = (t: T1) => T2; /** @@ -30,33 +34,43 @@ type Converter = (t: T1) => T2; * @param converter - a function that converts task records to the appropriate task runner */ export async function fillPool( - fetchAvailableTasks: Fetcher, + fetchAvailableTasks: Fetcher, converter: Converter, run: BatchRun ): Promise { performance.mark('fillPool.start'); - const instances = await fetchAvailableTasks(); + return map>( + await fetchAvailableTasks(), + async (instances) => { + if (!instances.length) { + performance.mark('fillPool.bailNoTasks'); + performance.measure( + 'fillPool.activityDurationUntilNoTasks', + 'fillPool.start', + 'fillPool.bailNoTasks' + ); + return FillPoolResult.NoTasksClaimed; + } - if (!instances.length) { - performance.mark('fillPool.bailNoTasks'); - performance.measure( - 'fillPool.activityDurationUntilNoTasks', - 'fillPool.start', - 'fillPool.bailNoTasks' - ); - return FillPoolResult.NoTasksClaimed; - } - const tasks = instances.map(converter); + const tasks = instances.map(converter); - if ((await run(tasks)) === TaskPoolRunResult.RanOutOfCapacity) { - performance.mark('fillPool.bailExhaustedCapacity'); - performance.measure( - 'fillPool.activityDurationUntilExhaustedCapacity', - 'fillPool.start', - 'fillPool.bailExhaustedCapacity' - ); - return FillPoolResult.RanOutOfCapacity; - } - performance.mark('fillPool.cycle'); - return FillPoolResult.PoolFilled; + switch (await run(tasks)) { + case TaskPoolRunResult.RanOutOfCapacity: + performance.mark('fillPool.bailExhaustedCapacity'); + performance.measure( + 'fillPool.activityDurationUntilExhaustedCapacity', + 'fillPool.start', + 'fillPool.bailExhaustedCapacity' + ); + return FillPoolResult.RanOutOfCapacity; + case TaskPoolRunResult.RunningAtCapacity: + performance.mark('fillPool.cycle'); + return FillPoolResult.RunningAtCapacity; + default: + performance.mark('fillPool.cycle'); + return FillPoolResult.PoolFilled; + } + }, + async (result) => result + ); } diff --git a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts index 3e0d517172d0..c1851789a769 100644 --- a/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/task_run_statistics.ts @@ -52,8 +52,11 @@ export interface TaskRunStat extends JsonObject { interface FillPoolRawStat extends JsonObject { last_successful_poll: string; result_frequency_percent_as_number: { + [FillPoolResult.Failed]: number; + [FillPoolResult.NoAvailableWorkers]: number; [FillPoolResult.NoTasksClaimed]: number; [FillPoolResult.RanOutOfCapacity]: number; + [FillPoolResult.RunningAtCapacity]: number; [FillPoolResult.PoolFilled]: number; }; } @@ -163,8 +166,11 @@ const DEFAULT_TASK_RUN_FREQUENCIES = { [TaskRunResult.Failed]: 0, }; const DEFAULT_POLLING_FREQUENCIES = { + [FillPoolResult.Failed]: 0, + [FillPoolResult.NoAvailableWorkers]: 0, [FillPoolResult.NoTasksClaimed]: 0, [FillPoolResult.RanOutOfCapacity]: 0, + [FillPoolResult.RunningAtCapacity]: 0, [FillPoolResult.PoolFilled]: 0, }; diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.ts index 1a8108e34078..1876c52b0029 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.ts @@ -12,7 +12,7 @@ import { Option, some, map as mapOptional } from 'fp-ts/lib/Option'; import { tap } from 'rxjs/operators'; import { Logger } from '../../../../src/core/server'; -import { Result, asErr, mapErr } from './lib/result_type'; +import { Result, asErr, mapErr, asOk } from './lib/result_type'; import { ManagedConfiguration } from './lib/create_managed_configuration'; import { TaskManagerConfig } from './config'; @@ -232,7 +232,7 @@ export async function claimAvailableTasks( claim: (opts: OwnershipClaimingOpts) => Promise, availableWorkers: number, logger: Logger -) { +): Promise> { if (availableWorkers > 0) { performance.mark('claimAvailableTasks_start'); @@ -260,12 +260,13 @@ export async function claimAvailableTasks( } task(s) were fetched (${docs.map((doc) => doc.id).join(', ')})` ); } - return docs; + return asOk(docs); } catch (ex) { if (identifyEsError(ex).includes('cannot execute [inline] scripts')) { logger.warn( `Task Manager cannot operate when inline scripts are disabled in Elasticsearch` ); + return asErr(FillPoolResult.Failed); } else { throw ex; } @@ -275,6 +276,6 @@ export async function claimAvailableTasks( logger.debug( `[Task Ownership]: Task Manager has skipped Claiming Ownership of available tasks at it has ran out Available Workers.` ); + return asErr(FillPoolResult.NoAvailableWorkers); } - return []; } diff --git a/x-pack/plugins/task_manager/server/task_pool.ts b/x-pack/plugins/task_manager/server/task_pool.ts index acd2ea59ad30..6946cd613e0a 100644 --- a/x-pack/plugins/task_manager/server/task_pool.ts +++ b/x-pack/plugins/task_manager/server/task_pool.ts @@ -22,7 +22,11 @@ interface Opts { } export enum TaskPoolRunResult { + // This means we're running all the tasks we claimed RunningAllClaimedTasks = 'RunningAllClaimedTasks', + // This means we're running all the tasks we claimed and we're at capacity + RunningAtCapacity = 'RunningAtCapacity', + // This means we're prematurely out of capacity and have accidentally claimed more tasks than we had capacity for RanOutOfCapacity = 'RanOutOfCapacity', } @@ -123,6 +127,8 @@ export class TaskPool { return this.attemptToRun(leftOverTasks); } return TaskPoolRunResult.RanOutOfCapacity; + } else if (!this.availableWorkers) { + return TaskPoolRunResult.RunningAtCapacity; } return TaskPoolRunResult.RunningAllClaimedTasks; } diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts index 9b02b5857367..eb8e35fd871f 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/health_route.ts @@ -178,6 +178,9 @@ export default function ({ getService }: FtrProviderContext) { expect(typeof polling.result_frequency_percent_as_number.NoTasksClaimed).to.eql('number'); expect(typeof polling.result_frequency_percent_as_number.RanOutOfCapacity).to.eql('number'); expect(typeof polling.result_frequency_percent_as_number.PoolFilled).to.eql('number'); + expect(typeof polling.result_frequency_percent_as_number.NoAvailableWorkers).to.eql('number'); + expect(typeof polling.result_frequency_percent_as_number.RunningAtCapacity).to.eql('number'); + expect(typeof polling.result_frequency_percent_as_number.Failed).to.eql('number'); expect(typeof drift.p50).to.eql('number'); expect(typeof drift.p90).to.eql('number');