Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] API integration tests for APM latency correlation. #104644

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,16 @@ export function MlLatencyCorrelations({ onClose }: Props) {
} = useApmPluginContext();

const { serviceName } = useParams<{ serviceName: string }>();
const { urlParams } = useUrlParams();

const fetchOptions = useMemo(
() => ({
...{
serviceName,
...urlParams,
},
}),
[serviceName, urlParams]
);
const {
urlParams: {
environment,
kuery,
transactionName,
transactionType,
start,
end,
},
} = useUrlParams();

const {
error,
Expand All @@ -84,7 +83,15 @@ export function MlLatencyCorrelations({ onClose }: Props) {
} = useCorrelations({
index: 'apm-*',
...{
...fetchOptions,
...{
environment,
kuery,
serviceName,
transactionName,
transactionType,
start,
end,
},
percentileThreshold: DEFAULT_PERCENTILE_THRESHOLD,
},
});
Expand Down Expand Up @@ -332,8 +339,7 @@ export function MlLatencyCorrelations({ onClose }: Props) {
{
defaultMessage: 'Latency distribution for {name}',
values: {
name:
fetchOptions.transactionName ?? fetchOptions.serviceName,
name: transactionName ?? serviceName,
},
}
)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ export const asyncSearchServiceProvider = (
percentileThresholdValue =
percentileThreshold[`${params.percentileThreshold}.0`];

// finish early if we weren't able to identify the percentileThresholdValue.
if (percentileThresholdValue === undefined) {
progress.loadedHistogramStepsize = 1;
progress.loadedOverallHistogram = 1;
progress.loadedFieldCanditates = 1;
progress.loadedFieldValuePairs = 1;
progress.loadedHistograms = 1;
isRunning = false;
return;
}

const histogramRangeSteps = await fetchTransactionDurationHistogramRangesteps(
esClient,
params
Expand Down Expand Up @@ -198,11 +209,11 @@ export const asyncSearchServiceProvider = (
loadedHistograms++;
progress.loadedHistograms = loadedHistograms / fieldValuePairs.length;
}

isRunning = false;
} catch (e) {
error = e;
}

isRunning = false;
};

fetchCorrelations();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@
* 2.0.
*/

import { pipe } from 'fp-ts/lib/pipeable';
import { getOrElse } from 'fp-ts/lib/Either';
import { failure } from 'io-ts/lib/PathReporter';
import * as t from 'io-ts';

import type { estypes } from '@elastic/elasticsearch';
import {
PROCESSOR_EVENT,
SERVICE_NAME,
TRANSACTION_DURATION,
TRANSACTION_NAME,
} from '../../../../common/elasticsearch_fieldnames';
import { TRANSACTION_DURATION } from '../../../../common/elasticsearch_fieldnames';
import type { SearchServiceParams } from '../../../../common/search_strategies/correlations/types';
import { environmentQuery as getEnvironmentQuery } from '../../../utils/queries';
import { ProcessorEvent } from '../../../../common/processor_event';
import { rangeRt } from '../../../routes/default_api_types';

import { Setup, SetupTimeRange } from '../../helpers/setup_request';

import { getCorrelationsFilters } from '../../correlations/get_filters';

const getPercentileThresholdValueQuery = (
percentileThresholdValue: number | undefined
Expand All @@ -39,26 +42,6 @@ export const getTermsQuery = (
return fieldName && fieldValue ? [{ term: { [fieldName]: fieldValue } }] : [];
};

const getRangeQuery = (
start?: string,
end?: string
): estypes.QueryDslQueryContainer[] => {
if (start === undefined && end === undefined) {
return [];
}

return [
{
range: {
'@timestamp': {
...(start !== undefined ? { gte: start } : {}),
...(end !== undefined ? { lte: end } : {}),
},
},
},
];
};

interface QueryParams {
params: SearchServiceParams;
fieldName?: string;
Expand All @@ -71,21 +54,37 @@ export const getQueryWithParams = ({
}: QueryParams) => {
const {
environment,
kuery,
serviceName,
start,
end,
percentileThresholdValue,
transactionType,
transactionName,
} = params;

// converts string based start/end to epochmillis
const setup = pipe(
rangeRt.decode({ start, end }),
getOrElse<t.Errors, { start: number; end: number }>((errors) => {
throw new Error(failure(errors).join('\n'));
})
) as Setup & SetupTimeRange;

const filters = getCorrelationsFilters({
setup,
environment,
kuery,
serviceName,
transactionType,
transactionName,
});

return {
bool: {
filter: [
...getTermsQuery(PROCESSOR_EVENT, ProcessorEvent.transaction),
...getTermsQuery(SERVICE_NAME, serviceName),
...getTermsQuery(TRANSACTION_NAME, transactionName),
...filters,
...getTermsQuery(fieldName, fieldValue),
...getRangeQuery(start, end),
...getEnvironmentQuery(environment),
...getPercentileThresholdValueQuery(percentileThresholdValue),
] as estypes.QueryDslQueryContainer[],
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ import type { SearchServiceParams } from '../../../../common/search_strategies/c

import { getQueryWithParams } from './get_query_with_params';

const getHistogramRangeSteps = (min: number, max: number, steps: number) => {
// A d3 based scale function as a helper to get equally distributed bins on a log scale.
const logFn = scaleLog().domain([min, max]).range([1, steps]);
return [...Array(steps).keys()]
.map(logFn.invert)
.map((d) => (isNaN(d) ? 0 : d));
};

export const getHistogramIntervalRequest = (
params: SearchServiceParams
): estypes.SearchRequest => ({
Expand All @@ -41,25 +49,26 @@ export const fetchTransactionDurationHistogramRangesteps = async (
esClient: ElasticsearchClient,
params: SearchServiceParams
): Promise<number[]> => {
const steps = 100;

const resp = await esClient.search(getHistogramIntervalRequest(params));

if ((resp.body.hits.total as estypes.SearchTotalHits).value === 0) {
return getHistogramRangeSteps(0, 1, 100);
}

if (resp.body.aggregations === undefined) {
throw new Error(
'fetchTransactionDurationHistogramInterval failed, did not return aggregations.'
'fetchTransactionDurationHistogramRangesteps failed, did not return aggregations.'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it rangeSteps or rangesteps? The file name and function name and error messages don't agree.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I renamed everything to be consistent rangeSteps and range_steps in 509a02d.

);
}

const steps = 100;
const min = (resp.body.aggregations
.transaction_duration_min as estypes.AggregationsValueAggregate).value;
const max =
(resp.body.aggregations
.transaction_duration_max as estypes.AggregationsValueAggregate).value *
2;

// A d3 based scale function as a helper to get equally distributed bins on a log scale.
const logFn = scaleLog().domain([min, max]).range([1, steps]);
return [...Array(steps).keys()]
.map(logFn.invert)
.map((d) => (isNaN(d) ? 0 : d));
return getHistogramRangeSteps(min, max, steps);
};
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export const getTransactionDurationPercentilesRequest = (
return {
index: params.index,
body: {
track_total_hits: true,
query,
size: 0,
aggs: {
Expand Down Expand Up @@ -71,11 +72,17 @@ export const fetchTransactionDurationPercentiles = async (
)
);

// return early with no results if the search didn't return any documents
if ((resp.body.hits.total as estypes.SearchTotalHits).value === 0) {
return {};
}

if (resp.body.aggregations === undefined) {
throw new Error(
'fetchTransactionDurationPercentiles failed, did not return aggregations.'
);
}

return (
(resp.body.aggregations
.transaction_duration_percentiles as estypes.AggregationsTDigestPercentilesAggregate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ describe('APM Correlations search strategy', () => {
} as unknown) as SearchStrategyDependencies;
params = {
index: 'apm-*',
start: '2020',
end: '2021',
};
});

Expand Down Expand Up @@ -154,10 +156,22 @@ describe('APM Correlations search strategy', () => {
},
query: {
bool: {
filter: [{ term: { 'processor.event': 'transaction' } }],
filter: [
{ term: { 'processor.event': 'transaction' } },
{
range: {
'@timestamp': {
format: 'epoch_millis',
gte: 1577836800000,
lte: 1609459200000,
},
},
},
],
},
},
size: 0,
track_total_hits: true,
})
);
});
Expand All @@ -167,11 +181,17 @@ describe('APM Correlations search strategy', () => {
it('retrieves the current request', async () => {
const searchStrategy = await apmCorrelationsSearchStrategyProvider();
const response = await searchStrategy
.search({ id: 'my-search-id', params }, {}, mockDeps)
.search({ params }, {}, mockDeps)
.toPromise();

expect(response).toEqual(
expect.objectContaining({ id: 'my-search-id' })
const searchStrategyId = response.id;

const response2 = await searchStrategy
.search({ id: searchStrategyId, params }, {}, mockDeps)
.toPromise();

expect(response2).toEqual(
expect.objectContaining({ id: searchStrategyId })
);
});
});
Expand Down Expand Up @@ -226,7 +246,7 @@ describe('APM Correlations search strategy', () => {

expect(response2.id).toEqual(response1.id);
expect(response2).toEqual(
expect.objectContaining({ loaded: 10, isRunning: false })
expect.objectContaining({ loaded: 100, isRunning: false })
);
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,36 @@ export const apmCorrelationsSearchStrategyProvider = (): ISearchStrategy<
throw new Error('Invalid request parameters.');
}

const id = request.id ?? uuid();
// The function to fetch the current state of the async search service.
// This will be either an existing service for a follow up fetch or a new one for new requests.
let getAsyncSearchServiceState: ReturnType<
typeof asyncSearchServiceProvider
>;

// If the request includes an ID, we require that the async search service already exists
// otherwise we throw an error. The client should never poll a service that's been cancelled or finished.
// This also avoids instantiating async search services when the service gets called with random IDs.
if (typeof request.id === 'string') {
const existingGetAsyncSearchServiceState = asyncSearchServiceMap.get(
request.id
);

const getAsyncSearchServiceState =
asyncSearchServiceMap.get(id) ??
asyncSearchServiceProvider(deps.esClient.asCurrentUser, request.params);
if (typeof existingGetAsyncSearchServiceState === 'undefined') {
throw new Error(
`AsyncSearchService with ID '${request.id}' does not exist.`
);
}

getAsyncSearchServiceState = existingGetAsyncSearchServiceState;
} else {
getAsyncSearchServiceState = asyncSearchServiceProvider(
deps.esClient.asCurrentUser,
request.params
);
}

// Reuse the request's id or create a new one.
const id = request.id ?? uuid();

const {
error,
Expand Down
Loading