Skip to content

Commit

Permalink
[Metrics UI] Drop partial buckets from ALL Metrics UI queries (elasti…
Browse files Browse the repository at this point in the history
…c#104784)

* [Metrics UI] Change dropLastBucket to dropPartialBuckets

- Change offset calculation to millisecond percission
- Change dropLastBucket to dropPartialBuckets
- Impliment partial bucket filter
- Adding partial bucket filter to metric threshold alerts

* Cleaning up getElasticsearchMetricQuery

* Change timestamp to from_as_string to align to how date_histgram works

* Fixing tests to be more realistic

* fixing types; removing extra imports

* Fixing new mock data to work with previews

* Removing value checks since they don't really provide much value

* Removing test for refactored functinality

* Change value to match millisecond resolution

* Fixing values for new partial bucket scheme

* removing unused var

* Fixing lookback since drops more than last buckets

* Changing results count

* fixing more tests

* Removing empty describe

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
# Conflicts:
#	x-pack/plugins/infra/server/lib/alerting/metric_threshold/lib/metric_query.ts
  • Loading branch information
simianhacker authored and Zacqary committed Aug 6, 2021
1 parent b2615e2 commit 17d4653
Show file tree
Hide file tree
Showing 21 changed files with 432 additions and 144 deletions.
2 changes: 1 addition & 1 deletion x-pack/plugins/infra/common/http_api/metrics_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export const MetricsAPIRequestRT = rt.intersection([
afterKey: rt.union([rt.null, afterKeyObjectRT]),
limit: rt.union([rt.number, rt.null, rt.undefined]),
filters: rt.array(rt.object),
dropLastBucket: rt.boolean,
dropPartialBuckets: rt.boolean,
alignDataToEnd: rt.boolean,
}),
]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {
isTooManyBucketsPreviewException,
TOO_MANY_BUCKETS_PREVIEW_EXCEPTION,
} from '../../../../../common/alerting/metrics';
import { getIntervalInSeconds } from '../../../../utils/get_interval_in_seconds';
import { roundTimestamp } from '../../../../utils/round_timestamp';
import { InfraSource } from '../../../../../common/source_configuration/source_configuration';
import { InfraDatabaseSearchResponse } from '../../../adapters/framework/adapter_types';
import { createAfterKeyHandler } from '../../../../utils/create_afterkey_handler';
Expand All @@ -26,6 +28,7 @@ interface Aggregation {
aggregatedValue: { value: number; values?: Array<{ key: number; value: number }> };
doc_count: number;
to_as_string: string;
from_as_string: string;
key_as_string: string;
}>;
};
Expand Down Expand Up @@ -92,6 +95,8 @@ export const evaluateAlert = <Params extends EvaluatedAlertParams = EvaluatedAle
);
};

const MINIMUM_BUCKETS = 5;

const getMetric: (
esClient: ElasticsearchClient,
params: MetricExpressionParams,
Expand All @@ -109,14 +114,29 @@ const getMetric: (
filterQuery,
timeframe
) {
const { aggType } = params;
const { aggType, timeSize, timeUnit } = params;
const hasGroupBy = groupBy && groupBy.length;

const interval = `${timeSize}${timeUnit}`;
const intervalAsSeconds = getIntervalInSeconds(interval);
const intervalAsMS = intervalAsSeconds * 1000;

const to = roundTimestamp(timeframe ? timeframe.end : Date.now(), timeUnit);
// We need enough data for 5 buckets worth of data. We also need
// to convert the intervalAsSeconds to milliseconds.
const minimumFrom = to - intervalAsMS * MINIMUM_BUCKETS;

const from = roundTimestamp(
timeframe && timeframe.start <= minimumFrom ? timeframe.start : minimumFrom,
timeUnit
);

const searchBody = getElasticsearchMetricQuery(
params,
timefield,
{ start: from, end: to },
hasGroupBy ? groupBy : undefined,
filterQuery,
timeframe
filterQuery
);

try {
Expand All @@ -140,7 +160,11 @@ const getMetric: (
...result,
[Object.values(bucket.key)
.map((value) => value)
.join(', ')]: getValuesFromAggregations(bucket, aggType),
.join(', ')]: getValuesFromAggregations(bucket, aggType, {
from,
to,
bucketSizeInMillis: intervalAsMS,
}),
}),
{}
);
Expand All @@ -153,7 +177,8 @@ const getMetric: (
return {
[UNGROUPED_FACTORY_KEY]: getValuesFromAggregations(
(result.aggregations! as unknown) as Aggregation,
aggType
aggType,
{ from, to, bucketSizeInMillis: intervalAsMS }
),
};
} catch (e) {
Expand All @@ -173,32 +198,56 @@ const getMetric: (
}
};

interface DropPartialBucketOptions {
from: number;
to: number;
bucketSizeInMillis: number;
}

const dropPartialBuckets = ({ from, to, bucketSizeInMillis }: DropPartialBucketOptions) => (
row: {
key: string;
value: number;
} | null
) => {
if (row == null) return null;
const timestamp = new Date(row.key).valueOf();
return timestamp >= from && timestamp + bucketSizeInMillis <= to;
};

const getValuesFromAggregations = (
aggregations: Aggregation,
aggType: MetricExpressionParams['aggType']
aggType: MetricExpressionParams['aggType'],
dropPartialBucketsOptions: DropPartialBucketOptions
) => {
try {
const { buckets } = aggregations.aggregatedIntervals;
if (!buckets.length) return null; // No Data state

if (aggType === Aggregators.COUNT) {
return buckets.map((bucket) => ({
key: bucket.to_as_string,
value: bucket.doc_count,
}));
return buckets
.map((bucket) => ({
key: bucket.from_as_string,
value: bucket.doc_count,
}))
.filter(dropPartialBuckets(dropPartialBucketsOptions));
}
if (aggType === Aggregators.P95 || aggType === Aggregators.P99) {
return buckets.map((bucket) => {
const values = bucket.aggregatedValue?.values || [];
const firstValue = first(values);
if (!firstValue) return null;
return { key: bucket.to_as_string, value: firstValue.value };
});
return buckets
.map((bucket) => {
const values = bucket.aggregatedValue?.values || [];
const firstValue = first(values);
if (!firstValue) return null;
return { key: bucket.from_as_string, value: firstValue.value };
})
.filter(dropPartialBuckets(dropPartialBucketsOptions));
}
return buckets.map((bucket) => ({
key: bucket.key_as_string ?? bucket.to_as_string,
value: bucket.aggregatedValue?.value ?? null,
}));
return buckets
.map((bucket) => ({
key: bucket.key_as_string ?? bucket.from_as_string,
value: bucket.aggregatedValue?.value ?? null,
}))
.filter(dropPartialBuckets(dropPartialBucketsOptions));
} catch (e) {
return NaN; // Error state
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import { MetricExpressionParams } from '../types';
import { getElasticsearchMetricQuery } from './metric_query';
import moment from 'moment';

describe("The Metric Threshold Alert's getElasticsearchMetricQuery", () => {
const expressionParams = {
Expand All @@ -18,9 +19,13 @@ describe("The Metric Threshold Alert's getElasticsearchMetricQuery", () => {

const timefield = '@timestamp';
const groupBy = 'host.doggoname';
const timeframe = {
start: moment().subtract(5, 'minutes').valueOf(),
end: moment().valueOf(),
};

describe('when passed no filterQuery', () => {
const searchBody = getElasticsearchMetricQuery(expressionParams, timefield, groupBy);
const searchBody = getElasticsearchMetricQuery(expressionParams, timefield, timeframe, groupBy);
test('includes a range filter', () => {
expect(
searchBody.query.bool.filter.find((filter) => filter.hasOwnProperty('range'))
Expand All @@ -43,6 +48,7 @@ describe("The Metric Threshold Alert's getElasticsearchMetricQuery", () => {
const searchBody = getElasticsearchMetricQuery(
expressionParams,
timefield,
timeframe,
groupBy,
filterQuery
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ import moment from 'moment';
import { networkTraffic } from '../../../../../common/inventory_models/shared/metrics/snapshot/network_traffic';
import { MetricExpressionParams, Aggregators } from '../types';
import { getIntervalInSeconds } from '../../../../utils/get_interval_in_seconds';
import { roundTimestamp } from '../../../../utils/round_timestamp';
import { createPercentileAggregation } from './create_percentile_aggregation';
import { calculateDateHistogramOffset } from '../../../metrics/lib/calculate_date_histogram_offset';

const MINIMUM_BUCKETS = 5;
const COMPOSITE_RESULTS_PER_PAGE = 100;

const getParsedFilterQuery: (filterQuery: string | undefined) => Record<string, any> | null = (
Expand All @@ -25,9 +23,9 @@ const getParsedFilterQuery: (filterQuery: string | undefined) => Record<string,
export const getElasticsearchMetricQuery = (
{ metric, aggType, timeUnit, timeSize }: MetricExpressionParams,
timefield: string,
timeframe: { start: number; end: number },
groupBy?: string | string[],
filterQuery?: string,
timeframe?: { start: number; end: number }
filterQuery?: string
) => {
if (aggType === Aggregators.COUNT && metric) {
throw new Error('Cannot aggregate document count with a metric');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,12 @@ describe('The metric threshold alert type', () => {
expect(mostRecentAction(instanceID)).toBe(undefined);
});
test('reports expected values to the action context', async () => {
const now = 1577858400000;
await execute(Comparator.GT, [0.75]);
const { action } = mostRecentAction(instanceID);
expect(action.group).toBe('*');
expect(action.reason).toContain('current value is 1');
expect(action.reason).toContain('threshold of 0.75');
expect(action.reason).toContain('test.metric.1');
expect(action.timestamp).toBe(new Date(now).toISOString());
});
});

Expand Down Expand Up @@ -428,15 +426,13 @@ describe('The metric threshold alert type', () => {
},
});
test('reports values converted from decimals to percentages to the action context', async () => {
const now = 1577858400000;
await execute();
const { action } = mostRecentAction(instanceID);
expect(action.group).toBe('*');
expect(action.reason).toContain('current value is 100%');
expect(action.reason).toContain('threshold of 75%');
expect(action.threshold.condition0[0]).toBe('75%');
expect(action.value.condition0).toBe('100%');
expect(action.timestamp).toBe(new Date(now).toISOString());
});
});
});
Expand All @@ -460,7 +456,8 @@ const executor = createMetricThresholdExecutor(mockLibs);

const services: AlertServicesMock = alertsMock.createAlertServices();
services.scopedClusterClient.asCurrentUser.search.mockImplementation((params?: any): any => {
if (params.index === 'alternatebeat-*') return mocks.changedSourceIdResponse;
const from = params?.body.query.bool.filter[0]?.range['@timestamp'].gte;
if (params.index === 'alternatebeat-*') return mocks.changedSourceIdResponse(from);
const metric = params?.body.query.bool.filter[1]?.exists.field;
if (params?.body.aggs.groupings) {
if (params?.body.aggs.groupings.composite.after) {
Expand All @@ -470,25 +467,27 @@ services.scopedClusterClient.asCurrentUser.search.mockImplementation((params?: a
}
if (metric === 'test.metric.2') {
return elasticsearchClientMock.createSuccessTransportRequestPromise(
mocks.alternateCompositeResponse
mocks.alternateCompositeResponse(from)
);
}
return elasticsearchClientMock.createSuccessTransportRequestPromise(
mocks.basicCompositeResponse
mocks.basicCompositeResponse(from)
);
}
if (metric === 'test.metric.2') {
return elasticsearchClientMock.createSuccessTransportRequestPromise(
mocks.alternateMetricResponse
mocks.alternateMetricResponse(from)
);
} else if (metric === 'test.metric.3') {
return elasticsearchClientMock.createSuccessTransportRequestPromise(
params?.body.aggs.aggregatedIntervals.aggregations.aggregatedValue_max
params?.body.aggs.aggregatedIntervals.aggregations.aggregatedValueMax
? mocks.emptyRateResponse
: mocks.emptyMetricResponse
);
}
return elasticsearchClientMock.createSuccessTransportRequestPromise(mocks.basicMetricResponse);
return elasticsearchClientMock.createSuccessTransportRequestPromise(
mocks.basicMetricResponse(from)
);
});
services.savedObjectsClient.get.mockImplementation(async (type: string, sourceId: string) => {
if (sourceId === 'alternate')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ describe('Previewing the metric threshold alert type', () => {
const services: AlertServicesMock = alertsMock.createAlertServices();

services.scopedClusterClient.asCurrentUser.search.mockImplementation((params?: any): any => {
const from = params?.body.query.bool.filter[0]?.range['@timestamp'].gte;
const metric = params?.body.query.bool.filter[1]?.exists.field;
if (params?.body.aggs.groupings) {
if (params?.body.aggs.groupings.composite.after) {
Expand All @@ -175,21 +176,21 @@ services.scopedClusterClient.asCurrentUser.search.mockImplementation((params?: a
);
}
return elasticsearchClientMock.createSuccessTransportRequestPromise(
mocks.basicCompositePreviewResponse
mocks.basicCompositePreviewResponse(from)
);
}
if (metric === 'test.metric.2') {
return elasticsearchClientMock.createSuccessTransportRequestPromise(
mocks.alternateMetricPreviewResponse
mocks.alternateMetricPreviewResponse(from)
);
}
if (metric === 'test.metric.3') {
return elasticsearchClientMock.createSuccessTransportRequestPromise(
mocks.repeatingMetricPreviewResponse
mocks.repeatingMetricPreviewResponse(from)
);
}
return elasticsearchClientMock.createSuccessTransportRequestPromise(
mocks.basicMetricPreviewResponse
mocks.basicMetricPreviewResponse(from)
);
});

Expand Down
Loading

0 comments on commit 17d4653

Please sign in to comment.