Skip to content

Commit

Permalink
[Discover] Cancel long running requests in Discover alert (#130077)
Browse files Browse the repository at this point in the history
* [Discover] improve long running requests for search source within alert rule

* [Discover] add tests

* [Discover] fix linting

* [Discover] fix unit test

* [Discover] add getMetrics test

* [Discover] fix unit test

* [Discover] merge search clients metrics

* [Discover] wrap searchSourceClient

* [Discover] add unit tests

* [Discover] replace searchSourceUtils with searchSourceClient in tests

* [Discover] apply suggestions
  • Loading branch information
dimaanj authored May 19, 2022
1 parent 5ecde4b commit fdf2086
Show file tree
Hide file tree
Showing 14 changed files with 622 additions and 42 deletions.
15 changes: 15 additions & 0 deletions x-pack/plugins/alerting/server/lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

import * as t from 'io-ts';
import { either } from 'fp-ts/lib/Either';
import { Rule } from '../types';
import { RuleRunMetrics } from './rule_run_metrics_store';

// represents a Date from an ISO string
export const DateFromString = new t.Type<Date, string, unknown>(
'DateFromString',
Expand All @@ -24,3 +27,15 @@ export const DateFromString = new t.Type<Date, string, unknown>(
),
(valueToEncode) => valueToEncode.toISOString()
);

export type RuleInfo = Pick<Rule, 'name' | 'alertTypeId' | 'id'> & { spaceId: string };

export interface LogSearchMetricsOpts {
esSearchDuration: number;
totalSearchDuration: number;
}

export type SearchMetrics = Pick<
RuleRunMetrics,
'numSearches' | 'totalSearchDurationMs' | 'esSearchDurationMs'
>;
11 changes: 2 additions & 9 deletions x-pack/plugins/alerting/server/lib/wrap_scoped_cluster_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,8 @@ import type {
SearchRequest as SearchRequestWithBody,
AggregationsAggregate,
} from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { IScopedClusterClient, ElasticsearchClient, Logger } from '@kbn/core/server';
import { Rule } from '../types';
import { RuleRunMetrics } from './rule_run_metrics_store';

type RuleInfo = Pick<Rule, 'name' | 'alertTypeId' | 'id'> & { spaceId: string };
type SearchMetrics = Pick<
RuleRunMetrics,
'numSearches' | 'totalSearchDurationMs' | 'esSearchDurationMs'
>;
import type { IScopedClusterClient, ElasticsearchClient, Logger } from '@kbn/core/server';
import { SearchMetrics, RuleInfo } from './types';

interface WrapScopedClusterClientFactoryOpts {
scopedClusterClient: IScopedClusterClient;
Expand Down
157 changes: 157 additions & 0 deletions x-pack/plugins/alerting/server/lib/wrap_search_source_client.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { loggingSystemMock } from '@kbn/core/server/mocks';
import { ISearchStartSearchSource } from '@kbn/data-plugin/common';
import { createSearchSourceMock } from '@kbn/data-plugin/common/search/search_source/mocks';
import { of, throwError } from 'rxjs';
import { wrapSearchSourceClient } from './wrap_search_source_client';

const logger = loggingSystemMock.create().get();

const rule = {
name: 'test-rule',
alertTypeId: '.test-rule-type',
id: 'abcdefg',
spaceId: 'my-space',
};

const createSearchSourceClientMock = () => {
const searchSourceMock = createSearchSourceMock();
searchSourceMock.fetch$ = jest.fn().mockImplementation(() => of({ rawResponse: { took: 5 } }));

return {
searchSourceMock,
searchSourceClientMock: {
create: jest.fn().mockReturnValue(searchSourceMock),
createEmpty: jest.fn().mockReturnValue(searchSourceMock),
} as unknown as ISearchStartSearchSource,
};
};

describe('wrapSearchSourceClient', () => {
beforeAll(() => {
jest.useFakeTimers();
});

afterAll(() => {
jest.useRealTimers();
});

afterEach(() => {
jest.resetAllMocks();
});

test('searches with provided abort controller', async () => {
const abortController = new AbortController();
const { searchSourceMock, searchSourceClientMock } = createSearchSourceClientMock();

const { searchSourceClient } = wrapSearchSourceClient({
logger,
rule,
searchSourceClient: searchSourceClientMock,
abortController,
});
const wrappedSearchSource = await searchSourceClient.createEmpty();
await wrappedSearchSource.fetch();

expect(searchSourceMock.fetch$).toHaveBeenCalledWith({
abortSignal: abortController.signal,
});
});

test('uses search options when specified', async () => {
const abortController = new AbortController();
const { searchSourceMock, searchSourceClientMock } = createSearchSourceClientMock();

const { searchSourceClient } = wrapSearchSourceClient({
logger,
rule,
searchSourceClient: searchSourceClientMock,
abortController,
});
const wrappedSearchSource = await searchSourceClient.create();
await wrappedSearchSource.fetch({ isStored: true });

expect(searchSourceMock.fetch$).toHaveBeenCalledWith({
isStored: true,
abortSignal: abortController.signal,
});
});

test('keeps track of number of queries', async () => {
const abortController = new AbortController();
const { searchSourceMock, searchSourceClientMock } = createSearchSourceClientMock();
searchSourceMock.fetch$ = jest
.fn()
.mockImplementation(() => of({ rawResponse: { took: 333 } }));

const { searchSourceClient, getMetrics } = wrapSearchSourceClient({
logger,
rule,
searchSourceClient: searchSourceClientMock,
abortController,
});
const wrappedSearchSource = await searchSourceClient.create();
await wrappedSearchSource.fetch();
await wrappedSearchSource.fetch();
await wrappedSearchSource.fetch();

expect(searchSourceMock.fetch$).toHaveBeenCalledWith({
abortSignal: abortController.signal,
});

const stats = getMetrics();
expect(stats.numSearches).toEqual(3);
expect(stats.esSearchDurationMs).toEqual(999);

expect(logger.debug).toHaveBeenCalledWith(
`executing query for rule .test-rule-type:abcdefg in space my-space - with options {}`
);
});

test('re-throws error when search throws error', async () => {
const abortController = new AbortController();
const { searchSourceMock, searchSourceClientMock } = createSearchSourceClientMock();
searchSourceMock.fetch$ = jest
.fn()
.mockReturnValue(throwError(new Error('something went wrong!')));

const { searchSourceClient } = wrapSearchSourceClient({
logger,
rule,
searchSourceClient: searchSourceClientMock,
abortController,
});
const wrappedSearchSource = await searchSourceClient.create();
const fetch = wrappedSearchSource.fetch();

await expect(fetch).rejects.toThrowErrorMatchingInlineSnapshot('"something went wrong!"');
});

test('throws error when search throws abort error', async () => {
const abortController = new AbortController();
abortController.abort();
const { searchSourceMock, searchSourceClientMock } = createSearchSourceClientMock();
searchSourceMock.fetch$ = jest
.fn()
.mockReturnValue(throwError(new Error('Request has been aborted by the user')));

const { searchSourceClient } = wrapSearchSourceClient({
logger,
rule,
searchSourceClient: searchSourceClientMock,
abortController,
});
const wrappedSearchSource = await searchSourceClient.create();
const fetch = wrappedSearchSource.fetch();

await expect(fetch).rejects.toThrowErrorMatchingInlineSnapshot(
'"Search has been aborted due to cancelled execution"'
);
});
});
174 changes: 174 additions & 0 deletions x-pack/plugins/alerting/server/lib/wrap_search_source_client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { Logger } from '@kbn/core/server';
import {
ISearchOptions,
ISearchSource,
ISearchStartSearchSource,
SearchSource,
SerializedSearchSourceFields,
} from '@kbn/data-plugin/common';
import { catchError, tap, throwError } from 'rxjs';
import { LogSearchMetricsOpts, RuleInfo, SearchMetrics } from './types';

interface Props {
logger: Logger;
rule: RuleInfo;
abortController: AbortController;
searchSourceClient: ISearchStartSearchSource;
}

interface WrapParams<T extends ISearchSource | SearchSource> {
logger: Logger;
rule: RuleInfo;
abortController: AbortController;
pureSearchSource: T;
logMetrics: (metrics: LogSearchMetricsOpts) => void;
}

export function wrapSearchSourceClient({
logger,
rule,
abortController,
searchSourceClient: pureSearchSourceClient,
}: Props) {
let numSearches: number = 0;
let esSearchDurationMs: number = 0;
let totalSearchDurationMs: number = 0;

function logMetrics(metrics: LogSearchMetricsOpts) {
numSearches++;
esSearchDurationMs += metrics.esSearchDuration;
totalSearchDurationMs += metrics.totalSearchDuration;
}

const wrapParams = {
logMetrics,
logger,
rule,
abortController,
};

const wrappedSearchSourceClient: ISearchStartSearchSource = Object.create(pureSearchSourceClient);

wrappedSearchSourceClient.createEmpty = () => {
const pureSearchSource = pureSearchSourceClient.createEmpty();

return wrapSearchSource({
...wrapParams,
pureSearchSource,
});
};

wrappedSearchSourceClient.create = async (fields?: SerializedSearchSourceFields) => {
const pureSearchSource = await pureSearchSourceClient.create(fields);

return wrapSearchSource({
...wrapParams,
pureSearchSource,
});
};

return {
searchSourceClient: wrappedSearchSourceClient,
getMetrics: (): SearchMetrics => ({
esSearchDurationMs,
totalSearchDurationMs,
numSearches,
}),
};
}

function wrapSearchSource<T extends ISearchSource | SearchSource>({
pureSearchSource,
...wrapParams
}: WrapParams<T>): T {
const wrappedSearchSource = Object.create(pureSearchSource);

wrappedSearchSource.createChild = wrapCreateChild({ ...wrapParams, pureSearchSource });
wrappedSearchSource.createCopy = wrapCreateCopy({ ...wrapParams, pureSearchSource });
wrappedSearchSource.create = wrapCreate({ ...wrapParams, pureSearchSource });
wrappedSearchSource.fetch$ = wrapFetch$({ ...wrapParams, pureSearchSource });

return wrappedSearchSource;
}

function wrapCreate({ pureSearchSource, ...wrapParams }: WrapParams<ISearchSource>) {
return function () {
const pureCreatedSearchSource = pureSearchSource.create();

return wrapSearchSource({
...wrapParams,
pureSearchSource: pureCreatedSearchSource,
});
};
}

function wrapCreateChild({ pureSearchSource, ...wrapParams }: WrapParams<ISearchSource>) {
return function (options?: {}) {
const pureSearchSourceChild = pureSearchSource.createChild(options);

return wrapSearchSource({
...wrapParams,
pureSearchSource: pureSearchSourceChild,
});
};
}

function wrapCreateCopy({ pureSearchSource, ...wrapParams }: WrapParams<ISearchSource>) {
return function () {
const pureSearchSourceChild = pureSearchSource.createCopy();

return wrapSearchSource({
...wrapParams,
pureSearchSource: pureSearchSourceChild,
}) as SearchSource;
};
}

function wrapFetch$({
logger,
rule,
abortController,
pureSearchSource,
logMetrics,
}: WrapParams<ISearchSource>) {
return (options?: ISearchOptions) => {
const searchOptions = options ?? {};
const start = Date.now();

logger.debug(
`executing query for rule ${rule.alertTypeId}:${rule.id} in space ${
rule.spaceId
} - with options ${JSON.stringify(searchOptions)}`
);

return pureSearchSource
.fetch$({
...searchOptions,
abortSignal: abortController.signal,
})
.pipe(
catchError((error) => {
if (abortController.signal.aborted) {
return throwError(
() => new Error('Search has been aborted due to cancelled execution')
);
}
return throwError(() => error);
}),
tap((result) => {
const durationMs = Date.now() - start;
logMetrics({
esSearchDuration: result.rawResponse.took ?? 0,
totalSearchDuration: durationMs,
});
})
);
};
}
Loading

0 comments on commit fdf2086

Please sign in to comment.