From 8417b98d20c47328441459b584fb4f5e571225b5 Mon Sep 17 00:00:00 2001 From: Zacqary Adam Xeper Date: Tue, 10 Mar 2020 10:45:57 -0500 Subject: [PATCH] [Metrics Alerts] Add support for search query and groupBy on alerts (#59388) * Add filterQuery to metric alert params * Add groupBy alert support * Fix typings * Fix malformed query * Fix filterQuery merge * Fix groupBy afterkey insertion, add group name to alert action * Convert iife to getter * Fix type check * Fix type check again * Remove unnecessary order param --- .../register_metric_threshold_alert_type.ts | 172 ++++++++++++++---- .../lib/alerting/metric_threshold/types.ts | 1 + .../infra/server/lib/snapshot/snapshot.ts | 9 +- .../server/utils/get_all_composite_data.ts | 14 +- 4 files changed, 150 insertions(+), 46 deletions(-) diff --git a/x-pack/plugins/infra/server/lib/alerting/metric_threshold/register_metric_threshold_alert_type.ts b/x-pack/plugins/infra/server/lib/alerting/metric_threshold/register_metric_threshold_alert_type.ts index 8785957dbd98e..399f09bd3e776 100644 --- a/x-pack/plugins/infra/server/lib/alerting/metric_threshold/register_metric_threshold_alert_type.ts +++ b/x-pack/plugins/infra/server/lib/alerting/metric_threshold/register_metric_threshold_alert_type.ts @@ -4,8 +4,12 @@ * you may not use this file except in compliance with the Elastic License. */ import uuid from 'uuid'; +import { mapValues } from 'lodash'; import { i18n } from '@kbn/i18n'; import { schema } from '@kbn/config-schema'; +import { InfraDatabaseSearchResponse } from '../../adapters/framework/adapter_types'; +import { createAfterKeyHandler } from '../../../utils/create_afterkey_handler'; +import { getAllCompositeData } from '../../../utils/get_all_composite_data'; import { networkTraffic } from '../../../../common/inventory_models/shared/metrics/snapshot/network_traffic'; import { MetricExpressionParams, @@ -15,6 +19,16 @@ import { } from './types'; import { AlertServices, PluginSetupContract } from '../../../../../alerting/server'; +interface Aggregation { + aggregatedIntervals: { buckets: Array<{ aggregatedValue: { value: number } }> }; +} + +interface CompositeAggregationsResponse { + groupings: { + buckets: Aggregation[]; + }; +} + const FIRED_ACTIONS = { id: 'metrics.threshold.fired', name: i18n.translate('xpack.infra.metrics.alerting.threshold.fired', { @@ -22,11 +36,41 @@ const FIRED_ACTIONS = { }), }; -async function getMetric( - { callCluster }: AlertServices, - { metric, aggType, timeUnit, timeSize, indexPattern }: MetricExpressionParams +const getCurrentValueFromAggregations = (aggregations: Aggregation) => { + const { buckets } = aggregations.aggregatedIntervals; + const { value } = buckets[buckets.length - 1].aggregatedValue; + return value; +}; + +const getParsedFilterQuery: ( + filterQuery: string | undefined +) => Record = filterQuery => { + if (!filterQuery) return {}; + try { + return JSON.parse(filterQuery).bool; + } catch (e) { + return { + query_string: { + query: filterQuery, + analyze_wildcard: true, + }, + }; + } +}; + +const getMetric: ( + services: AlertServices, + params: MetricExpressionParams, + groupBy: string | undefined, + filterQuery: string | undefined +) => Promise> = async function( + { callCluster }, + { metric, aggType, timeUnit, timeSize, indexPattern }, + groupBy, + filterQuery ) { const interval = `${timeSize}${timeUnit}`; + const aggregations = aggType === 'rate' ? networkTraffic('aggregatedValue', metric) @@ -38,6 +82,38 @@ async function getMetric( }, }; + const baseAggs = { + aggregatedIntervals: { + date_histogram: { + field: '@timestamp', + fixed_interval: interval, + }, + aggregations, + }, + }; + + const aggs = groupBy + ? { + groupings: { + composite: { + size: 10, + sources: [ + { + groupBy: { + terms: { + field: groupBy, + }, + }, + }, + ], + }, + aggs: baseAggs, + }, + } + : baseAggs; + + const parsedFilterQuery = getParsedFilterQuery(filterQuery); + const searchBody = { query: { bool: { @@ -48,34 +124,49 @@ async function getMetric( gte: `now-${interval}`, }, }, + }, + { exists: { field: metric, }, }, ], + ...parsedFilterQuery, }, }, size: 0, - aggs: { - aggregatedIntervals: { - date_histogram: { - field: '@timestamp', - fixed_interval: interval, - }, - aggregations, - }, - }, + aggs, }; + if (groupBy) { + const bucketSelector = ( + response: InfraDatabaseSearchResponse<{}, CompositeAggregationsResponse> + ) => response.aggregations?.groupings?.buckets || []; + const afterKeyHandler = createAfterKeyHandler( + 'aggs.groupings.composite.after', + response => response.aggregations?.groupings?.after_key + ); + const compositeBuckets = (await getAllCompositeData( + body => callCluster('search', { body, index: indexPattern }), + searchBody, + bucketSelector, + afterKeyHandler + )) as Array; + return compositeBuckets.reduce( + (result, bucket) => ({ + ...result, + [bucket.key.groupBy]: getCurrentValueFromAggregations(bucket), + }), + {} + ); + } + const result = await callCluster('search', { body: searchBody, index: indexPattern, }); - - const { buckets } = result.aggregations.aggregatedIntervals; - const { value } = buckets[buckets.length - 1].aggregatedValue; - return value; -} + return { '*': getCurrentValueFromAggregations(result.aggregations) }; +}; const comparatorMap = { [Comparator.BETWEEN]: (value: number, [a, b]: number[]) => @@ -112,39 +203,54 @@ export async function registerMetricThresholdAlertType(alertingPlugin: PluginSet indexPattern: schema.string(), }) ), + groupBy: schema.maybe(schema.string()), + filterQuery: schema.maybe(schema.string()), }), }, defaultActionGroupId: FIRED_ACTIONS.id, actionGroups: [FIRED_ACTIONS], async executor({ services, params }) { - const { criteria } = params as { criteria: MetricExpressionParams[] }; - const alertInstance = services.alertInstanceFactory(alertUUID); + const { criteria, groupBy, filterQuery } = params as { + criteria: MetricExpressionParams[]; + groupBy: string | undefined; + filterQuery: string | undefined; + }; const alertResults = await Promise.all( - criteria.map(({ threshold, comparator }) => + criteria.map(criterion => (async () => { - const currentValue = await getMetric(services, params as MetricExpressionParams); - if (typeof currentValue === 'undefined') + const currentValues = await getMetric(services, criterion, groupBy, filterQuery); + if (typeof currentValues === 'undefined') throw new Error('Could not get current value of metric'); - + const { threshold, comparator } = criterion; const comparisonFunction = comparatorMap[comparator]; - return { shouldFire: comparisonFunction(currentValue, threshold), currentValue }; + + return mapValues(currentValues, value => ({ + shouldFire: comparisonFunction(value, threshold), + currentValue: value, + })); })() ) ); - const shouldAlertFire = alertResults.every(({ shouldFire }) => shouldFire); + const groups = Object.keys(alertResults[0]); + for (const group of groups) { + const alertInstance = services.alertInstanceFactory(`${alertUUID}-${group}`); + + const shouldAlertFire = alertResults.every(result => result[group].shouldFire); - if (shouldAlertFire) { - alertInstance.scheduleActions(FIRED_ACTIONS.id, { - value: alertResults.map(({ currentValue }) => currentValue), + if (shouldAlertFire) { + alertInstance.scheduleActions(FIRED_ACTIONS.id, { + group, + value: alertResults.map(result => result[group].currentValue), + }); + } + + // Future use: ability to fetch display current alert state + alertInstance.replaceState({ + alertState: shouldAlertFire ? AlertStates.ALERT : AlertStates.OK, }); } - - // Future use: ability to fetch display current alert state - alertInstance.replaceState({ - alertState: shouldAlertFire ? AlertStates.ALERT : AlertStates.OK, - }); }, }); } diff --git a/x-pack/plugins/infra/server/lib/alerting/metric_threshold/types.ts b/x-pack/plugins/infra/server/lib/alerting/metric_threshold/types.ts index 9bb0d8963ac66..1c3d0cea3dc84 100644 --- a/x-pack/plugins/infra/server/lib/alerting/metric_threshold/types.ts +++ b/x-pack/plugins/infra/server/lib/alerting/metric_threshold/types.ts @@ -31,4 +31,5 @@ export interface MetricExpressionParams { indexPattern: string; threshold: number[]; comparator: Comparator; + filterQuery: string; } diff --git a/x-pack/plugins/infra/server/lib/snapshot/snapshot.ts b/x-pack/plugins/infra/server/lib/snapshot/snapshot.ts index ef453be76d8d7..8e5f8e6716f3c 100644 --- a/x-pack/plugins/infra/server/lib/snapshot/snapshot.ts +++ b/x-pack/plugins/infra/server/lib/snapshot/snapshot.ts @@ -77,6 +77,11 @@ const handleAfterKey = createAfterKeyHandler( input => input?.aggregations?.nodes?.after_key ); +const callClusterFactory = (framework: KibanaFramework, requestContext: RequestHandlerContext) => ( + opts: any +) => + framework.callWithRequest<{}, InfraSnapshotAggregationResponse>(requestContext, 'search', opts); + const requestGroupedNodes = async ( requestContext: RequestHandlerContext, options: InfraSnapshotRequestOptions, @@ -119,7 +124,7 @@ const requestGroupedNodes = async ( return await getAllCompositeData< InfraSnapshotAggregationResponse, InfraSnapshotNodeGroupByBucket - >(framework, requestContext, query, bucketSelector, handleAfterKey); + >(callClusterFactory(framework, requestContext), query, bucketSelector, handleAfterKey); }; const requestNodeMetrics = async ( @@ -170,7 +175,7 @@ const requestNodeMetrics = async ( return await getAllCompositeData< InfraSnapshotAggregationResponse, InfraSnapshotNodeMetricsBucket - >(framework, requestContext, query, bucketSelector, handleAfterKey); + >(callClusterFactory(framework, requestContext), query, bucketSelector, handleAfterKey); }; // buckets can be InfraSnapshotNodeGroupByBucket[] or InfraSnapshotNodeMetricsBucket[] diff --git a/x-pack/plugins/infra/server/utils/get_all_composite_data.ts b/x-pack/plugins/infra/server/utils/get_all_composite_data.ts index c7ff1b077f685..093dd266ea915 100644 --- a/x-pack/plugins/infra/server/utils/get_all_composite_data.ts +++ b/x-pack/plugins/infra/server/utils/get_all_composite_data.ts @@ -4,8 +4,6 @@ * you may not use this file except in compliance with the Elastic License. */ -import { RequestHandlerContext } from 'src/core/server'; -import { KibanaFramework } from '../lib/adapters/framework/kibana_framework_adapter'; import { InfraDatabaseSearchResponse } from '../lib/adapters/framework'; export const getAllCompositeData = async < @@ -13,18 +11,13 @@ export const getAllCompositeData = async < Bucket = {}, Options extends object = {} >( - framework: KibanaFramework, - requestContext: RequestHandlerContext, + callCluster: (options: Options) => Promise>, options: Options, bucketSelector: (response: InfraDatabaseSearchResponse<{}, Aggregation>) => Bucket[], onAfterKey: (options: Options, response: InfraDatabaseSearchResponse<{}, Aggregation>) => Options, previousBuckets: Bucket[] = [] ): Promise => { - const response = await framework.callWithRequest<{}, Aggregation>( - requestContext, - 'search', - options - ); + const response = await callCluster(options); // Nothing available, return the previous buckets. if (response.hits.total.value === 0) { @@ -46,8 +39,7 @@ export const getAllCompositeData = async < // There is possibly more data, concat previous and current buckets and call ourselves recursively. const newOptions = onAfterKey(options, response); return getAllCompositeData( - framework, - requestContext, + callCluster, newOptions, bucketSelector, onAfterKey,