Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Discover] Cancel long running requests in Discover alert #130077

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5a4335f
[Discover] improve long running requests for search source within ale…
dimaanj Apr 13, 2022
311a66f
Merge branch 'main' of https://github.com/elastic/kibana into improve…
dimaanj Apr 20, 2022
30492e7
[Discover] add tests
dimaanj Apr 21, 2022
4ff15e2
Merge branch 'main' into improve-long-running-requests-for-alert-rule
kibanamachine Apr 21, 2022
9e717c2
[Discover] fix linting
dimaanj Apr 22, 2022
98f90fd
[Discover] fix unit test
dimaanj Apr 22, 2022
239b90e
[Discover] add getMetrics test
dimaanj Apr 22, 2022
12d6312
[Discover] fix unit test
dimaanj Apr 25, 2022
a3e0ff7
Merge branch 'main' of https://github.com/elastic/kibana into improve…
dimaanj Apr 25, 2022
1fc32ca
Merge branch 'main' into improve-long-running-requests-for-alert-rule
kibanamachine Apr 25, 2022
7c86fe8
Merge branch 'main' into improve-long-running-requests-for-alert-rule
kibanamachine Apr 25, 2022
4f1a917
Merge branch 'main' of https://github.com/elastic/kibana into improve…
dimaanj Apr 26, 2022
7d03945
[Discover] merge search clients metrics
dimaanj Apr 26, 2022
fae9f28
Merge branch 'main' of https://github.com/elastic/kibana into improve…
dimaanj Apr 28, 2022
bcb52ae
[Discover] wrap searchSourceClient
dimaanj Apr 30, 2022
157362d
Merge branch 'main' into improve-long-running-requests-for-alert-rule
kibanamachine Apr 30, 2022
5b03b8a
Merge branch 'main' of https://github.com/elastic/kibana into improve…
dimaanj May 11, 2022
80cf064
[Discover] add unit tests
dimaanj May 14, 2022
63df64c
Merge branch 'main' into improve-long-running-requests-for-alert-rule
kibanamachine May 14, 2022
7ab49d7
[Discover] replace searchSourceUtils with searchSourceClient in tests
dimaanj May 14, 2022
b5e1cc9
Merge branch 'main' into improve-long-running-requests-for-alert-rule
kibanamachine May 15, 2022
efb5d59
[Discover] apply suggestions
dimaanj May 17, 2022
59a5ebf
Merge branch 'main' into improve-long-running-requests-for-alert-rule
kibanamachine May 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import type {
AggregationsAggregate,
} from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { IScopedClusterClient, ElasticsearchClient, Logger } from 'src/core/server';
import { ISearchSource } from 'src/plugins/data/common';
import { RuleExecutionMetrics } from '../types';
import { Rule } from '../types';

Expand All @@ -46,7 +47,7 @@ interface LogSearchMetricsOpts {
}
type LogSearchMetricsFn = (metrics: LogSearchMetricsOpts) => void;

export function createWrappedScopedClusterClientFactory(opts: WrapScopedClusterClientFactoryOpts) {
export function createWrappedSearchClientsFactory(opts: WrapScopedClusterClientFactoryOpts) {
let numSearches: number = 0;
let esSearchDurationMs: number = 0;
let totalSearchDurationMs: number = 0;
Expand All @@ -57,10 +58,12 @@ export function createWrappedScopedClusterClientFactory(opts: WrapScopedClusterC
totalSearchDurationMs += metrics.totalSearchDuration;
}

const wrappedClient = wrapScopedClusterClient({ ...opts, logMetricsFn: logMetrics });
const wrappedScopedClusterClient = wrapScopedClusterClient({ ...opts, logMetricsFn: logMetrics });
const wrappedSearchSourceFetch = wrapSearchSourceFetch({ ...opts, logMetricsFn: logMetrics });

return {
client: () => wrappedClient,
scopedClusterClient: () => wrappedScopedClusterClient,
searchSourceFetch: () => wrappedSearchSourceFetch,
getMetrics: (): RuleExecutionMetrics => {
return {
esSearchDurationMs,
Expand All @@ -71,6 +74,27 @@ export function createWrappedScopedClusterClientFactory(opts: WrapScopedClusterC
};
}

export function wrapSearchSourceFetch(opts: WrapScopedClusterClientOpts) {
return async (searchSource: ISearchSource) => {
try {
const start = Date.now();
opts.logger.debug(
`executing query for rule ${opts.rule.alertTypeId}:${opts.rule.id} in space ${opts.rule.spaceId}`
);
const result = await searchSource.fetch({ abortSignal: opts.abortController.signal });

const durationMs = Date.now() - start;
opts.logMetricsFn({ esSearchDuration: result.took ?? 0, totalSearchDuration: durationMs });
return result;
} catch (e) {
if (opts.abortController.signal.aborted) {
throw new Error('Search has been aborted due to cancelled execution');
}
throw e;
dimaanj marked this conversation as resolved.
Show resolved Hide resolved
}
};
}

function wrapScopedClusterClient(opts: WrapScopedClusterClientOpts): IScopedClusterClient {
const { scopedClusterClient, ...rest } = opts;
return {
Expand Down
13 changes: 9 additions & 4 deletions x-pack/plugins/alerting/server/task_runner/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import { ConcreteTaskInstance, throwUnrecoverableError } from '../../../task_man
import { createExecutionHandler, ExecutionHandler } from './create_execution_handler';
import { Alert, createAlertFactory } from '../alert';
import {
createWrappedScopedClusterClientFactory,
ElasticsearchError,
ErrorWithReason,
executionStatusFromError,
Expand Down Expand Up @@ -74,6 +73,7 @@ import {
ScheduleActionsForRecoveredAlertsParams,
TrackAlertDurationsParams,
} from './types';
import { createWrappedSearchClientsFactory } from '../lib/wrap_scoped_cluster_client';

const FALLBACK_RETRY_INTERVAL = '5m';
const CONNECTIVITY_RETRY_INTERVAL = '5m';
Expand Down Expand Up @@ -356,7 +356,7 @@ export class TaskRunner<
const ruleLabel = `${this.ruleType.id}:${ruleId}: '${name}'`;

const scopedClusterClient = this.context.elasticsearch.client.asScoped(fakeRequest);
const wrappedScopedClusterClient = createWrappedScopedClusterClientFactory({
const wrappedScopedClusterClient = createWrappedSearchClientsFactory({
scopedClusterClient,
rule: {
name: rule.name,
Expand All @@ -367,6 +367,11 @@ export class TaskRunner<
logger: this.logger,
abortController: this.searchAbortController,
});
const searchSourceClient = await this.context.data.search.searchSource.asScoped(fakeRequest);
const searchSourceUtils = {
searchSourceClient,
wrappedFetch: wrappedScopedClusterClient.searchSourceFetch(),
};

let updatedRuleTypeState: void | Record<string, unknown>;
try {
Expand All @@ -389,9 +394,9 @@ export class TaskRunner<
executionId: this.executionId,
services: {
savedObjectsClient,
searchSourceUtils,
uiSettingsClient: this.context.uiSettings.asScopedToClient(savedObjectsClient),
scopedClusterClient: wrappedScopedClusterClient.client(),
searchSourceClient: this.context.data.search.searchSource.asScoped(fakeRequest),
scopedClusterClient: wrappedScopedClusterClient.scopedClusterClient(),
alertFactory: createAlertFactory<
InstanceState,
InstanceContext,
Expand Down
13 changes: 11 additions & 2 deletions x-pack/plugins/alerting/server/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import type {
IUiSettingsClient,
} from 'src/core/server';
import type { PublicMethodsOf } from '@kbn/utility-types';
import {
AggregationsAggregate,
SearchResponse,
} from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { AlertFactoryDoneUtils, PublicAlert } from './alert';
import { RuleTypeRegistry as OrigruleTypeRegistry } from './rule_type_registry';
import { PluginSetupContract, PluginStartContract } from './plugin';
Expand Down Expand Up @@ -42,7 +46,7 @@ import {
MappedParams,
} from '../common';
import { LicenseType } from '../../licensing/server';
import { ISearchStartSearchSource } from '../../../../src/plugins/data/common';
import { ISearchSource, ISearchStartSearchSource } from '../../../../src/plugins/data/common';
import { RuleTypeConfig } from './config';
export type WithoutQueryAndParams<T> = Pick<T, Exclude<keyof T, 'query' | 'params'>>;
export type SpaceIdToNamespaceFunction = (spaceId?: string) => string | undefined;
Expand Down Expand Up @@ -74,7 +78,12 @@ export interface RuleExecutorServices<
InstanceContext extends AlertInstanceContext = AlertInstanceContext,
ActionGroupIds extends string = never
> {
searchSourceClient: Promise<ISearchStartSearchSource>;
searchSourceUtils: {
searchSourceClient: ISearchStartSearchSource;
wrappedFetch: (
searchSource: ISearchSource
) => Promise<SearchResponse<unknown, Record<string, AggregationsAggregate>>>;
};
savedObjectsClient: SavedObjectsClientContract;
uiSettingsClient: IUiSettingsClient;
scopedClusterClient: IScopedClusterClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export async function executor(
) {
const esQueryAlert = isEsQueryAlert(options);
const { alertId, name, services, params, state } = options;
const { alertFactory, scopedClusterClient, searchSourceClient } = services;
const { alertFactory, scopedClusterClient, searchSourceUtils } = services;
const currentTimestamp = new Date().toISOString();
const publicBaseUrl = core.http.basePath.publicBaseUrl ?? '';

Expand All @@ -51,10 +51,7 @@ export async function executor(
alertId,
params as OnlySearchSourceAlertParams,
latestTimestamp,
{
searchSourceClient,
logger,
}
{ searchSourceUtils, logger }
);

// apply the alert condition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
*/
import { buildRangeFilter, Filter } from '@kbn/es-query';
import { Logger } from 'kibana/server';
import {
AggregationsAggregate,
SearchResponse,
} from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { OnlySearchSourceAlertParams } from '../types';
import {
getTime,
Expand All @@ -14,18 +18,26 @@ import {
SortDirection,
} from '../../../../../../../src/plugins/data/common';

interface SearchSourceUtils {
searchSourceClient: ISearchStartSearchSource;
wrappedFetch: (
searchSource: ISearchSource
) => Promise<SearchResponse<unknown, Record<string, AggregationsAggregate>>>;
}

export async function fetchSearchSourceQuery(
alertId: string,
params: OnlySearchSourceAlertParams,
latestTimestamp: string | undefined,
services: {
logger: Logger;
searchSourceClient: Promise<ISearchStartSearchSource>;
searchSourceUtils: SearchSourceUtils;
}
) {
const { logger, searchSourceClient } = services;
const client = await searchSourceClient;
const initialSearchSource = await client.create(params.searchConfiguration);
const { logger, searchSourceUtils } = services;
const { searchSourceClient, wrappedFetch } = searchSourceUtils;

const initialSearchSource = await searchSourceClient.create(params.searchConfiguration);

const { searchSource, dateStart, dateEnd } = updateSearchSource(
initialSearchSource,
Expand All @@ -39,7 +51,7 @@ export async function fetchSearchSourceQuery(
)}`
);

const searchResult = await searchSource.fetch();
const searchResult = await wrappedFetch(searchSource);

return {
numMatches: Number(searchResult.hits.total),
Expand Down