diff --git a/x-pack/plugins/alerting/server/lib/types.ts b/x-pack/plugins/alerting/server/lib/types.ts index 701ac32e6974e..173ba1119a72a 100644 --- a/x-pack/plugins/alerting/server/lib/types.ts +++ b/x-pack/plugins/alerting/server/lib/types.ts @@ -7,6 +7,9 @@ import * as t from 'io-ts'; import { either } from 'fp-ts/lib/Either'; +import { Rule } from '../types'; +import { RuleRunMetrics } from './rule_run_metrics_store'; + // represents a Date from an ISO string export const DateFromString = new t.Type( 'DateFromString', @@ -24,3 +27,15 @@ export const DateFromString = new t.Type( ), (valueToEncode) => valueToEncode.toISOString() ); + +export type RuleInfo = Pick & { spaceId: string }; + +export interface LogSearchMetricsOpts { + esSearchDuration: number; + totalSearchDuration: number; +} + +export type SearchMetrics = Pick< + RuleRunMetrics, + 'numSearches' | 'totalSearchDurationMs' | 'esSearchDurationMs' +>; diff --git a/x-pack/plugins/alerting/server/lib/wrap_scoped_cluster_client.ts b/x-pack/plugins/alerting/server/lib/wrap_scoped_cluster_client.ts index 28c5301e9a8b9..e1156d177116c 100644 --- a/x-pack/plugins/alerting/server/lib/wrap_scoped_cluster_client.ts +++ b/x-pack/plugins/alerting/server/lib/wrap_scoped_cluster_client.ts @@ -20,15 +20,8 @@ import type { SearchRequest as SearchRequestWithBody, AggregationsAggregate, } from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; -import { IScopedClusterClient, ElasticsearchClient, Logger } from '@kbn/core/server'; -import { Rule } from '../types'; -import { RuleRunMetrics } from './rule_run_metrics_store'; - -type RuleInfo = Pick & { spaceId: string }; -type SearchMetrics = Pick< - RuleRunMetrics, - 'numSearches' | 'totalSearchDurationMs' | 'esSearchDurationMs' ->; +import type { IScopedClusterClient, ElasticsearchClient, Logger } from '@kbn/core/server'; +import { SearchMetrics, RuleInfo } from './types'; interface WrapScopedClusterClientFactoryOpts { scopedClusterClient: IScopedClusterClient; diff --git a/x-pack/plugins/alerting/server/lib/wrap_search_source_client.test.ts b/x-pack/plugins/alerting/server/lib/wrap_search_source_client.test.ts new file mode 100644 index 0000000000000..9c10e619e3ebb --- /dev/null +++ b/x-pack/plugins/alerting/server/lib/wrap_search_source_client.test.ts @@ -0,0 +1,157 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { loggingSystemMock } from '@kbn/core/server/mocks'; +import { ISearchStartSearchSource } from '@kbn/data-plugin/common'; +import { createSearchSourceMock } from '@kbn/data-plugin/common/search/search_source/mocks'; +import { of, throwError } from 'rxjs'; +import { wrapSearchSourceClient } from './wrap_search_source_client'; + +const logger = loggingSystemMock.create().get(); + +const rule = { + name: 'test-rule', + alertTypeId: '.test-rule-type', + id: 'abcdefg', + spaceId: 'my-space', +}; + +const createSearchSourceClientMock = () => { + const searchSourceMock = createSearchSourceMock(); + searchSourceMock.fetch$ = jest.fn().mockImplementation(() => of({ rawResponse: { took: 5 } })); + + return { + searchSourceMock, + searchSourceClientMock: { + create: jest.fn().mockReturnValue(searchSourceMock), + createEmpty: jest.fn().mockReturnValue(searchSourceMock), + } as unknown as ISearchStartSearchSource, + }; +}; + +describe('wrapSearchSourceClient', () => { + beforeAll(() => { + jest.useFakeTimers(); + }); + + afterAll(() => { + jest.useRealTimers(); + }); + + afterEach(() => { + jest.resetAllMocks(); + }); + + test('searches with provided abort controller', async () => { + const abortController = new AbortController(); + const { searchSourceMock, searchSourceClientMock } = createSearchSourceClientMock(); + + const { searchSourceClient } = wrapSearchSourceClient({ + logger, + rule, + searchSourceClient: searchSourceClientMock, + abortController, + }); + const wrappedSearchSource = await searchSourceClient.createEmpty(); + await wrappedSearchSource.fetch(); + + expect(searchSourceMock.fetch$).toHaveBeenCalledWith({ + abortSignal: abortController.signal, + }); + }); + + test('uses search options when specified', async () => { + const abortController = new AbortController(); + const { searchSourceMock, searchSourceClientMock } = createSearchSourceClientMock(); + + const { searchSourceClient } = wrapSearchSourceClient({ + logger, + rule, + searchSourceClient: searchSourceClientMock, + abortController, + }); + const wrappedSearchSource = await searchSourceClient.create(); + await wrappedSearchSource.fetch({ isStored: true }); + + expect(searchSourceMock.fetch$).toHaveBeenCalledWith({ + isStored: true, + abortSignal: abortController.signal, + }); + }); + + test('keeps track of number of queries', async () => { + const abortController = new AbortController(); + const { searchSourceMock, searchSourceClientMock } = createSearchSourceClientMock(); + searchSourceMock.fetch$ = jest + .fn() + .mockImplementation(() => of({ rawResponse: { took: 333 } })); + + const { searchSourceClient, getMetrics } = wrapSearchSourceClient({ + logger, + rule, + searchSourceClient: searchSourceClientMock, + abortController, + }); + const wrappedSearchSource = await searchSourceClient.create(); + await wrappedSearchSource.fetch(); + await wrappedSearchSource.fetch(); + await wrappedSearchSource.fetch(); + + expect(searchSourceMock.fetch$).toHaveBeenCalledWith({ + abortSignal: abortController.signal, + }); + + const stats = getMetrics(); + expect(stats.numSearches).toEqual(3); + expect(stats.esSearchDurationMs).toEqual(999); + + expect(logger.debug).toHaveBeenCalledWith( + `executing query for rule .test-rule-type:abcdefg in space my-space - with options {}` + ); + }); + + test('re-throws error when search throws error', async () => { + const abortController = new AbortController(); + const { searchSourceMock, searchSourceClientMock } = createSearchSourceClientMock(); + searchSourceMock.fetch$ = jest + .fn() + .mockReturnValue(throwError(new Error('something went wrong!'))); + + const { searchSourceClient } = wrapSearchSourceClient({ + logger, + rule, + searchSourceClient: searchSourceClientMock, + abortController, + }); + const wrappedSearchSource = await searchSourceClient.create(); + const fetch = wrappedSearchSource.fetch(); + + await expect(fetch).rejects.toThrowErrorMatchingInlineSnapshot('"something went wrong!"'); + }); + + test('throws error when search throws abort error', async () => { + const abortController = new AbortController(); + abortController.abort(); + const { searchSourceMock, searchSourceClientMock } = createSearchSourceClientMock(); + searchSourceMock.fetch$ = jest + .fn() + .mockReturnValue(throwError(new Error('Request has been aborted by the user'))); + + const { searchSourceClient } = wrapSearchSourceClient({ + logger, + rule, + searchSourceClient: searchSourceClientMock, + abortController, + }); + const wrappedSearchSource = await searchSourceClient.create(); + const fetch = wrappedSearchSource.fetch(); + + await expect(fetch).rejects.toThrowErrorMatchingInlineSnapshot( + '"Search has been aborted due to cancelled execution"' + ); + }); +}); diff --git a/x-pack/plugins/alerting/server/lib/wrap_search_source_client.ts b/x-pack/plugins/alerting/server/lib/wrap_search_source_client.ts new file mode 100644 index 0000000000000..442f0c3e292bf --- /dev/null +++ b/x-pack/plugins/alerting/server/lib/wrap_search_source_client.ts @@ -0,0 +1,174 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Logger } from '@kbn/core/server'; +import { + ISearchOptions, + ISearchSource, + ISearchStartSearchSource, + SearchSource, + SerializedSearchSourceFields, +} from '@kbn/data-plugin/common'; +import { catchError, tap, throwError } from 'rxjs'; +import { LogSearchMetricsOpts, RuleInfo, SearchMetrics } from './types'; + +interface Props { + logger: Logger; + rule: RuleInfo; + abortController: AbortController; + searchSourceClient: ISearchStartSearchSource; +} + +interface WrapParams { + logger: Logger; + rule: RuleInfo; + abortController: AbortController; + pureSearchSource: T; + logMetrics: (metrics: LogSearchMetricsOpts) => void; +} + +export function wrapSearchSourceClient({ + logger, + rule, + abortController, + searchSourceClient: pureSearchSourceClient, +}: Props) { + let numSearches: number = 0; + let esSearchDurationMs: number = 0; + let totalSearchDurationMs: number = 0; + + function logMetrics(metrics: LogSearchMetricsOpts) { + numSearches++; + esSearchDurationMs += metrics.esSearchDuration; + totalSearchDurationMs += metrics.totalSearchDuration; + } + + const wrapParams = { + logMetrics, + logger, + rule, + abortController, + }; + + const wrappedSearchSourceClient: ISearchStartSearchSource = Object.create(pureSearchSourceClient); + + wrappedSearchSourceClient.createEmpty = () => { + const pureSearchSource = pureSearchSourceClient.createEmpty(); + + return wrapSearchSource({ + ...wrapParams, + pureSearchSource, + }); + }; + + wrappedSearchSourceClient.create = async (fields?: SerializedSearchSourceFields) => { + const pureSearchSource = await pureSearchSourceClient.create(fields); + + return wrapSearchSource({ + ...wrapParams, + pureSearchSource, + }); + }; + + return { + searchSourceClient: wrappedSearchSourceClient, + getMetrics: (): SearchMetrics => ({ + esSearchDurationMs, + totalSearchDurationMs, + numSearches, + }), + }; +} + +function wrapSearchSource({ + pureSearchSource, + ...wrapParams +}: WrapParams): T { + const wrappedSearchSource = Object.create(pureSearchSource); + + wrappedSearchSource.createChild = wrapCreateChild({ ...wrapParams, pureSearchSource }); + wrappedSearchSource.createCopy = wrapCreateCopy({ ...wrapParams, pureSearchSource }); + wrappedSearchSource.create = wrapCreate({ ...wrapParams, pureSearchSource }); + wrappedSearchSource.fetch$ = wrapFetch$({ ...wrapParams, pureSearchSource }); + + return wrappedSearchSource; +} + +function wrapCreate({ pureSearchSource, ...wrapParams }: WrapParams) { + return function () { + const pureCreatedSearchSource = pureSearchSource.create(); + + return wrapSearchSource({ + ...wrapParams, + pureSearchSource: pureCreatedSearchSource, + }); + }; +} + +function wrapCreateChild({ pureSearchSource, ...wrapParams }: WrapParams) { + return function (options?: {}) { + const pureSearchSourceChild = pureSearchSource.createChild(options); + + return wrapSearchSource({ + ...wrapParams, + pureSearchSource: pureSearchSourceChild, + }); + }; +} + +function wrapCreateCopy({ pureSearchSource, ...wrapParams }: WrapParams) { + return function () { + const pureSearchSourceChild = pureSearchSource.createCopy(); + + return wrapSearchSource({ + ...wrapParams, + pureSearchSource: pureSearchSourceChild, + }) as SearchSource; + }; +} + +function wrapFetch$({ + logger, + rule, + abortController, + pureSearchSource, + logMetrics, +}: WrapParams) { + return (options?: ISearchOptions) => { + const searchOptions = options ?? {}; + const start = Date.now(); + + logger.debug( + `executing query for rule ${rule.alertTypeId}:${rule.id} in space ${ + rule.spaceId + } - with options ${JSON.stringify(searchOptions)}` + ); + + return pureSearchSource + .fetch$({ + ...searchOptions, + abortSignal: abortController.signal, + }) + .pipe( + catchError((error) => { + if (abortController.signal.aborted) { + return throwError( + () => new Error('Search has been aborted due to cancelled execution') + ); + } + return throwError(() => error); + }), + tap((result) => { + const durationMs = Date.now() - start; + logMetrics({ + esSearchDuration: result.rawResponse.took ?? 0, + totalSearchDuration: durationMs, + }); + }) + ); + }; +} diff --git a/x-pack/plugins/alerting/server/mocks.ts b/x-pack/plugins/alerting/server/mocks.ts index f7525c2c5f570..fd554783111d2 100644 --- a/x-pack/plugins/alerting/server/mocks.ts +++ b/x-pack/plugins/alerting/server/mocks.ts @@ -9,9 +9,8 @@ import { elasticsearchServiceMock, savedObjectsClientMock, uiSettingsServiceMock, - httpServerMock, } from '@kbn/core/server/mocks'; -import { dataPluginMock } from '@kbn/data-plugin/server/mocks'; +import { searchSourceCommonMock } from '@kbn/data-plugin/common/search/search_source/mocks'; import { rulesClientMock } from './rules_client.mock'; import { PluginSetupContract, PluginStartContract } from './plugin'; import { Alert, AlertFactoryDoneUtils } from './alert'; @@ -113,11 +112,7 @@ const createRuleExecutorServicesMock = < shouldWriteAlerts: () => true, shouldStopExecution: () => true, search: createAbortableSearchServiceMock(), - searchSourceClient: Promise.resolve( - dataPluginMock - .createStartContract() - .search.searchSource.asScoped(httpServerMock.createKibanaRequest()) - ), + searchSourceClient: searchSourceCommonMock, }; }; export type RuleExecutorServicesMock = ReturnType; diff --git a/x-pack/plugins/alerting/server/task_runner/task_runner.ts b/x-pack/plugins/alerting/server/task_runner/task_runner.ts index 6cd6b73b9539e..f7f54a2eef810 100644 --- a/x-pack/plugins/alerting/server/task_runner/task_runner.ts +++ b/x-pack/plugins/alerting/server/task_runner/task_runner.ts @@ -17,7 +17,6 @@ import { TaskRunnerContext } from './task_runner_factory'; import { createExecutionHandler, ExecutionHandler } from './create_execution_handler'; import { Alert, createAlertFactory } from '../alert'; import { - createWrappedScopedClusterClientFactory, ElasticsearchError, ErrorWithReason, executionStatusFromError, @@ -68,9 +67,12 @@ import { RuleRunResult, RuleTaskStateAndMetrics, } from './types'; +import { createWrappedScopedClusterClientFactory } from '../lib/wrap_scoped_cluster_client'; import { IExecutionStatusAndMetrics } from '../lib/rule_execution_status'; import { RuleRunMetricsStore } from '../lib/rule_run_metrics_store'; +import { wrapSearchSourceClient } from '../lib/wrap_search_source_client'; import { AlertingEventLogger } from '../lib/alerting_event_logger/alerting_event_logger'; +import { SearchMetrics } from '../lib/types'; const FALLBACK_RETRY_INTERVAL = '5m'; const CONNECTIVITY_RETRY_INTERVAL = '5m'; @@ -348,9 +350,7 @@ export class TaskRunner< const ruleLabel = `${this.ruleType.id}:${ruleId}: '${name}'`; - const scopedClusterClient = this.context.elasticsearch.client.asScoped(fakeRequest); - const wrappedScopedClusterClient = createWrappedScopedClusterClientFactory({ - scopedClusterClient, + const wrappedClientOptions = { rule: { name: rule.name, alertTypeId: rule.alertTypeId, @@ -359,6 +359,16 @@ export class TaskRunner< }, logger: this.logger, abortController: this.searchAbortController, + }; + const scopedClusterClient = this.context.elasticsearch.client.asScoped(fakeRequest); + const wrappedScopedClusterClient = createWrappedScopedClusterClientFactory({ + ...wrappedClientOptions, + scopedClusterClient, + }); + const searchSourceClient = await this.context.data.search.searchSource.asScoped(fakeRequest); + const wrappedSearchSourceClient = wrapSearchSourceClient({ + ...wrappedClientOptions, + searchSourceClient, }); let updatedRuleTypeState: void | Record; @@ -382,9 +392,9 @@ export class TaskRunner< executionId: this.executionId, services: { savedObjectsClient, + searchSourceClient: wrappedSearchSourceClient.searchSourceClient, uiSettingsClient: this.context.uiSettings.asScopedToClient(savedObjectsClient), scopedClusterClient: wrappedScopedClusterClient.client(), - searchSourceClient: this.context.data.search.searchSource.asScoped(fakeRequest), alertFactory: createAlertFactory< InstanceState, InstanceContext, @@ -437,9 +447,19 @@ export class TaskRunner< this.alertingEventLogger.setExecutionSucceeded(`rule executed: ${ruleLabel}`); + const scopedClusterClientMetrics = wrappedScopedClusterClient.getMetrics(); + const searchSourceClientMetrics = wrappedSearchSourceClient.getMetrics(); + const searchMetrics: SearchMetrics = { + numSearches: scopedClusterClientMetrics.numSearches + searchSourceClientMetrics.numSearches, + totalSearchDurationMs: + scopedClusterClientMetrics.totalSearchDurationMs + + searchSourceClientMetrics.totalSearchDurationMs, + esSearchDurationMs: + scopedClusterClientMetrics.esSearchDurationMs + + searchSourceClientMetrics.esSearchDurationMs, + }; const ruleRunMetricsStore = new RuleRunMetricsStore(); - const searchMetrics = wrappedScopedClusterClient.getMetrics(); ruleRunMetricsStore.setNumSearches(searchMetrics.numSearches); ruleRunMetricsStore.setTotalSearchDurationMs(searchMetrics.totalSearchDurationMs); ruleRunMetricsStore.setEsSearchDurationMs(searchMetrics.esSearchDurationMs); diff --git a/x-pack/plugins/alerting/server/types.ts b/x-pack/plugins/alerting/server/types.ts index 1c453df386e24..8f7cc936ca789 100644 --- a/x-pack/plugins/alerting/server/types.ts +++ b/x-pack/plugins/alerting/server/types.ts @@ -10,13 +10,15 @@ import type { CustomRequestHandlerContext, SavedObjectReference, IUiSettingsClient, +} from '@kbn/core/server'; +import { ISearchStartSearchSource } from '@kbn/data-plugin/common'; +import { LicenseType } from '@kbn/licensing-plugin/server'; +import { IScopedClusterClient, SavedObjectAttributes, SavedObjectsClientContract, } from '@kbn/core/server'; import type { PublicMethodsOf } from '@kbn/utility-types'; -import { ISearchStartSearchSource } from '@kbn/data-plugin/common'; -import { LicenseType } from '@kbn/licensing-plugin/server'; import { AlertFactoryDoneUtils, PublicAlert } from './alert'; import { RuleTypeRegistry as OrigruleTypeRegistry } from './rule_type_registry'; import { PluginSetupContract, PluginStartContract } from './plugin'; @@ -71,7 +73,7 @@ export interface RuleExecutorServices< InstanceContext extends AlertInstanceContext = AlertInstanceContext, ActionGroupIds extends string = never > { - searchSourceClient: Promise; + searchSourceClient: ISearchStartSearchSource; savedObjectsClient: SavedObjectsClientContract; uiSettingsClient: IUiSettingsClient; scopedClusterClient: IScopedClusterClient; diff --git a/x-pack/plugins/rule_registry/server/utils/create_lifecycle_rule_type.test.ts b/x-pack/plugins/rule_registry/server/utils/create_lifecycle_rule_type.test.ts index 7894478aedf22..9387a9ce8c0ed 100644 --- a/x-pack/plugins/rule_registry/server/utils/create_lifecycle_rule_type.test.ts +++ b/x-pack/plugins/rule_registry/server/utils/create_lifecycle_rule_type.test.ts @@ -118,7 +118,7 @@ function createRule(shouldWriteAlerts: boolean = true) { shouldWriteAlerts: () => shouldWriteAlerts, shouldStopExecution: () => false, search: {} as any, - searchSourceClient: Promise.resolve({} as ISearchStartSearchSource), + searchSourceClient: {} as ISearchStartSearchSource, }, spaceId: 'spaceId', state, diff --git a/x-pack/plugins/rule_registry/server/utils/rule_executor_test_utils.ts b/x-pack/plugins/rule_registry/server/utils/rule_executor_test_utils.ts index 05c069d80ed3e..b2c25973f7cc4 100644 --- a/x-pack/plugins/rule_registry/server/utils/rule_executor_test_utils.ts +++ b/x-pack/plugins/rule_registry/server/utils/rule_executor_test_utils.ts @@ -7,7 +7,6 @@ import { elasticsearchServiceMock, savedObjectsClientMock, - httpServerMock, uiSettingsServiceMock, } from '@kbn/core/server/mocks'; import { @@ -18,7 +17,7 @@ import { RuleTypeState, } from '@kbn/alerting-plugin/server'; import { alertsMock } from '@kbn/alerting-plugin/server/mocks'; -import { dataPluginMock } from '@kbn/data-plugin/server/mocks'; +import { searchSourceCommonMock } from '@kbn/data-plugin/common/search/search_source/mocks'; export const createDefaultAlertExecutorOptions = < Params extends RuleTypeParams = never, @@ -77,11 +76,7 @@ export const createDefaultAlertExecutorOptions = < scopedClusterClient: elasticsearchServiceMock.createScopedClusterClient(), shouldWriteAlerts: () => shouldWriteAlerts, shouldStopExecution: () => false, - searchSourceClient: Promise.resolve( - dataPluginMock - .createStartContract() - .search.searchSource.asScoped(httpServerMock.createKibanaRequest()) - ), + searchSourceClient: searchSourceCommonMock, }, state, updatedBy: null, diff --git a/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/preview_rules_route.ts b/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/preview_rules_route.ts index 00fc13315ff36..de60e82e336ef 100644 --- a/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/preview_rules_route.ts +++ b/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/preview_rules_route.ts @@ -54,6 +54,7 @@ import { import { createSecurityRuleTypeWrapper } from '../../rule_types/create_security_rule_type_wrapper'; import { RULE_PREVIEW_INVOCATION_COUNT } from '../../../../../common/detection_engine/constants'; import { RuleExecutionContext, StatusChangeArgs } from '../../rule_execution_log'; +import { wrapSearchSourceClient } from './utils/wrap_search_source_client'; const PREVIEW_TIMEOUT_SECONDS = 60; @@ -86,7 +87,7 @@ export const previewRulesRoute = async ( } try { const [, { data, security: securityService }] = await getStartServices(); - const searchSourceClient = data.search.searchSource.asScoped(request); + const searchSourceClient = await data.search.searchSource.asScoped(request); const savedObjectsClient = coreContext.savedObjects.client; const siemClient = (await context.securitySolution).getAppClient(); @@ -242,7 +243,10 @@ export const previewRulesRoute = async ( abortController, scopedClusterClient: coreContext.elasticsearch.client, }), - searchSourceClient, + searchSourceClient: wrapSearchSourceClient({ + abortController, + searchSourceClient, + }), uiSettingsClient: coreContext.uiSettings.client, }, spaceId, diff --git a/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/utils/wrap_search_source_client.test.ts b/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/utils/wrap_search_source_client.test.ts new file mode 100644 index 0000000000000..c8fff85476957 --- /dev/null +++ b/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/utils/wrap_search_source_client.test.ts @@ -0,0 +1,108 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ISearchStartSearchSource } from '@kbn/data-plugin/common'; +import { createSearchSourceMock } from '@kbn/data-plugin/common/search/search_source/mocks'; +import { of, throwError } from 'rxjs'; +import { wrapSearchSourceClient } from './wrap_search_source_client'; + +const createSearchSourceClientMock = () => { + const searchSourceMock = createSearchSourceMock(); + searchSourceMock.fetch$ = jest.fn().mockImplementation(() => of({})); + + return { + searchSourceMock, + searchSourceClientMock: { + create: jest.fn().mockReturnValue(searchSourceMock), + createEmpty: jest.fn().mockReturnValue(searchSourceMock), + } as unknown as ISearchStartSearchSource, + }; +}; + +describe('wrapSearchSourceClient', () => { + beforeAll(() => { + jest.useFakeTimers(); + }); + + afterAll(() => { + jest.useRealTimers(); + }); + + afterEach(() => { + jest.resetAllMocks(); + }); + + test('searches with provided abort controller', async () => { + const abortController = new AbortController(); + const { searchSourceMock, searchSourceClientMock } = createSearchSourceClientMock(); + + const wrappedSearchClient = wrapSearchSourceClient({ + searchSourceClient: searchSourceClientMock, + abortController, + }); + const wrappedSearchSource = await wrappedSearchClient.createEmpty(); + await wrappedSearchSource.fetch(); + + expect(searchSourceMock.fetch$).toHaveBeenCalledWith({ + abortSignal: abortController.signal, + }); + }); + + test('uses search options when specified', async () => { + const abortController = new AbortController(); + const { searchSourceMock, searchSourceClientMock } = createSearchSourceClientMock(); + + const wrappedSearchClient = wrapSearchSourceClient({ + searchSourceClient: searchSourceClientMock, + abortController, + }); + const wrappedSearchSource = await wrappedSearchClient.create(); + await wrappedSearchSource.fetch({ isStored: true }); + + expect(searchSourceMock.fetch$).toHaveBeenCalledWith({ + isStored: true, + abortSignal: abortController.signal, + }); + }); + + test('re-throws error when search throws error', async () => { + const abortController = new AbortController(); + const { searchSourceMock, searchSourceClientMock } = createSearchSourceClientMock(); + searchSourceMock.fetch$ = jest + .fn() + .mockReturnValue(throwError(new Error('something went wrong!'))); + + const wrappedSearchClient = wrapSearchSourceClient({ + searchSourceClient: searchSourceClientMock, + abortController, + }); + const wrappedSearchSource = await wrappedSearchClient.create(); + const fetch = wrappedSearchSource.fetch(); + + await expect(fetch).rejects.toThrowErrorMatchingInlineSnapshot('"something went wrong!"'); + }); + + test('throws error when search throws abort error', async () => { + const abortController = new AbortController(); + abortController.abort(); + const { searchSourceMock, searchSourceClientMock } = createSearchSourceClientMock(); + searchSourceMock.fetch$ = jest + .fn() + .mockReturnValue(throwError(new Error('Request has been aborted by the user'))); + + const wrappedSearchClient = wrapSearchSourceClient({ + searchSourceClient: searchSourceClientMock, + abortController, + }); + const wrappedSearchSource = await wrappedSearchClient.create(); + const fetch = wrappedSearchSource.fetch(); + + await expect(fetch).rejects.toThrowErrorMatchingInlineSnapshot( + '"Search has been aborted due to cancelled execution"' + ); + }); +}); diff --git a/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/utils/wrap_search_source_client.ts b/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/utils/wrap_search_source_client.ts new file mode 100644 index 0000000000000..619a4dee788f7 --- /dev/null +++ b/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/utils/wrap_search_source_client.ts @@ -0,0 +1,120 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + ISearchOptions, + ISearchSource, + ISearchStartSearchSource, + SearchSource, + SerializedSearchSourceFields, +} from '@kbn/data-plugin/common'; +import { catchError, throwError } from 'rxjs'; + +interface Props { + abortController: AbortController; + searchSourceClient: ISearchStartSearchSource; +} + +interface WrapParams { + abortController: AbortController; + pureSearchSource: T; +} + +export function wrapSearchSourceClient({ + abortController, + searchSourceClient: pureSearchSourceClient, +}: Props) { + const wrappedSearchSourceClient: ISearchStartSearchSource = Object.create(pureSearchSourceClient); + + wrappedSearchSourceClient.createEmpty = () => { + const pureSearchSource = pureSearchSourceClient.createEmpty(); + + return wrapSearchSource({ + abortController, + pureSearchSource, + }); + }; + + wrappedSearchSourceClient.create = async (fields?: SerializedSearchSourceFields) => { + const pureSearchSource = await pureSearchSourceClient.create(fields); + + return wrapSearchSource({ + abortController, + pureSearchSource, + }); + }; + + return wrappedSearchSourceClient; +} + +function wrapSearchSource({ + pureSearchSource, + ...wrapParams +}: WrapParams): T { + const wrappedSearchSource = Object.create(pureSearchSource); + + wrappedSearchSource.createChild = wrapCreateChild({ ...wrapParams, pureSearchSource }); + wrappedSearchSource.createCopy = wrapCreateCopy({ ...wrapParams, pureSearchSource }); + wrappedSearchSource.create = wrapCreate({ ...wrapParams, pureSearchSource }); + wrappedSearchSource.fetch$ = wrapFetch$({ ...wrapParams, pureSearchSource }); + + return wrappedSearchSource; +} + +function wrapCreate({ pureSearchSource, ...wrapParams }: WrapParams) { + return function () { + const pureCreatedSearchSource = pureSearchSource.create(); + + return wrapSearchSource({ + ...wrapParams, + pureSearchSource: pureCreatedSearchSource, + }); + }; +} + +function wrapCreateChild({ pureSearchSource, ...wrapParams }: WrapParams) { + return function (options?: {}) { + const pureSearchSourceChild = pureSearchSource.createChild(options); + + return wrapSearchSource({ + ...wrapParams, + pureSearchSource: pureSearchSourceChild, + }); + }; +} + +function wrapCreateCopy({ pureSearchSource, ...wrapParams }: WrapParams) { + return function () { + const pureSearchSourceChild = pureSearchSource.createCopy(); + + return wrapSearchSource({ + ...wrapParams, + pureSearchSource: pureSearchSourceChild, + }) as SearchSource; + }; +} + +function wrapFetch$({ abortController, pureSearchSource }: WrapParams) { + return (options?: ISearchOptions) => { + const searchOptions = options ?? {}; + return pureSearchSource + .fetch$({ + ...searchOptions, + abortSignal: abortController.signal, + }) + .pipe( + catchError((error) => { + if (abortController.signal.aborted) { + return throwError( + () => new Error('Search has been aborted due to cancelled execution') + ); + } + return throwError(() => error); + }) + ); + }; +} diff --git a/x-pack/plugins/stack_alerts/server/alert_types/es_query/executor.ts b/x-pack/plugins/stack_alerts/server/alert_types/es_query/executor.ts index 4f203b064592d..44708a1df90fd 100644 --- a/x-pack/plugins/stack_alerts/server/alert_types/es_query/executor.ts +++ b/x-pack/plugins/stack_alerts/server/alert_types/es_query/executor.ts @@ -51,10 +51,7 @@ export async function executor( alertId, params as OnlySearchSourceAlertParams, latestTimestamp, - { - searchSourceClient, - logger, - } + { searchSourceClient, logger } ); // apply the alert condition diff --git a/x-pack/plugins/stack_alerts/server/alert_types/es_query/lib/fetch_search_source_query.ts b/x-pack/plugins/stack_alerts/server/alert_types/es_query/lib/fetch_search_source_query.ts index cff24f8975f0f..66e5ae8023a47 100644 --- a/x-pack/plugins/stack_alerts/server/alert_types/es_query/lib/fetch_search_source_query.ts +++ b/x-pack/plugins/stack_alerts/server/alert_types/es_query/lib/fetch_search_source_query.ts @@ -20,12 +20,12 @@ export async function fetchSearchSourceQuery( latestTimestamp: string | undefined, services: { logger: Logger; - searchSourceClient: Promise; + searchSourceClient: ISearchStartSearchSource; } ) { const { logger, searchSourceClient } = services; - const client = await searchSourceClient; - const initialSearchSource = await client.create(params.searchConfiguration); + + const initialSearchSource = await searchSourceClient.create(params.searchConfiguration); const { searchSource, dateStart, dateEnd } = updateSearchSource( initialSearchSource,