Skip to content

Commit

Permalink
[ML] API integration tests for APM latency correlation. (elastic#104644)
Browse files Browse the repository at this point in the history
Adds API integration tests for APM Latency Correlations code.

Writing the tests surfaced some glitches fixed as part of this PR:
- If the applied filters don't return any docs, we won't throw an error anymore. Instead, the async search service finishes early and just returns no results.
- If for whatever reason the async search service throws an error, it will also set its state now to isRunning = false.
- If the client triggers a request with a service ID we now make sure that async search service still exists. We throw an error if that service no longer exists. This avoids re-instantiating async search services when they've already finished or failed and for whatever reason a client triggers another request with the same ID.
- Refactored requests to reuse APM's own getCorrelationsFilters(). We now require start/end to be set and it will be converted from ISO (client side) to epochmillis (server side) to be more in line with APM's existing code.
- The async search service now creates a simple internal log. This gets exposed via the API and we assert it using the API tests. In the future, we might also expose it in the UI to allow for better problem investigation for users and support.
  • Loading branch information
walterra authored and kibanamachine committed Jul 13, 2021
1 parent 3bf8ae8 commit 96db2fc
Show file tree
Hide file tree
Showing 21 changed files with 675 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,16 @@ export function MlLatencyCorrelations({ onClose }: Props) {
} = useApmPluginContext();

const { serviceName } = useParams<{ serviceName: string }>();
const { urlParams } = useUrlParams();

const fetchOptions = useMemo(
() => ({
...{
serviceName,
...urlParams,
},
}),
[serviceName, urlParams]
);
const {
urlParams: {
environment,
kuery,
transactionName,
transactionType,
start,
end,
},
} = useUrlParams();

const {
error,
Expand All @@ -85,7 +84,15 @@ export function MlLatencyCorrelations({ onClose }: Props) {
} = useCorrelations({
index: 'apm-*',
...{
...fetchOptions,
...{
environment,
kuery,
serviceName,
transactionName,
transactionType,
start,
end,
},
percentileThreshold: DEFAULT_PERCENTILE_THRESHOLD,
},
});
Expand Down Expand Up @@ -322,8 +329,7 @@ export function MlLatencyCorrelations({ onClose }: Props) {
{
defaultMessage: 'Latency distribution for {name}',
values: {
name:
fetchOptions.transactionName ?? fetchOptions.serviceName,
name: transactionName ?? serviceName,
},
}
)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ interface RawResponse {
took: number;
values: SearchServiceValue[];
overallHistogram: HistogramItem[];
log: string[];
}

export const useCorrelations = (params: CorrelationsOptions) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { fetchTransactionDurationFieldCandidates } from './query_field_candidate
import { fetchTransactionDurationFieldValuePairs } from './query_field_value_pairs';
import { fetchTransactionDurationPercentiles } from './query_percentiles';
import { fetchTransactionDurationCorrelation } from './query_correlation';
import { fetchTransactionDurationHistogramRangesteps } from './query_histogram_rangesteps';
import { fetchTransactionDurationHistogramRangeSteps } from './query_histogram_range_steps';
import { fetchTransactionDurationRanges, HistogramItem } from './query_ranges';
import type {
AsyncSearchProviderProgress,
Expand All @@ -24,13 +24,18 @@ import { fetchTransactionDurationFractions } from './query_fractions';
const CORRELATION_THRESHOLD = 0.3;
const KS_TEST_THRESHOLD = 0.1;

const currentTimeAsString = () => new Date().toISOString();

export const asyncSearchServiceProvider = (
esClient: ElasticsearchClient,
params: SearchServiceParams
) => {
let isCancelled = false;
let isRunning = true;
let error: Error;
const log: string[] = [];
const logMessage = (message: string) =>
log.push(`${currentTimeAsString()}: ${message}`);

const progress: AsyncSearchProviderProgress = {
started: Date.now(),
Expand All @@ -53,26 +58,50 @@ export const asyncSearchServiceProvider = (
let percentileThresholdValue: number;

const cancel = () => {
logMessage(`Service cancelled.`);
isCancelled = true;
};

const fetchCorrelations = async () => {
try {
// 95th percentile to be displayed as a marker in the log log chart
const percentileThreshold = await fetchTransactionDurationPercentiles(
const {
totalDocs,
percentiles: percentileThreshold,
} = await fetchTransactionDurationPercentiles(
esClient,
params,
params.percentileThreshold ? [params.percentileThreshold] : undefined
);
percentileThresholdValue =
percentileThreshold[`${params.percentileThreshold}.0`];

const histogramRangeSteps = await fetchTransactionDurationHistogramRangesteps(
logMessage(
`Fetched ${params.percentileThreshold}th percentile value of ${percentileThresholdValue} based on ${totalDocs} documents.`
);

// finish early if we weren't able to identify the percentileThresholdValue.
if (percentileThresholdValue === undefined) {
logMessage(
`Abort service since percentileThresholdValue could not be determined.`
);
progress.loadedHistogramStepsize = 1;
progress.loadedOverallHistogram = 1;
progress.loadedFieldCanditates = 1;
progress.loadedFieldValuePairs = 1;
progress.loadedHistograms = 1;
isRunning = false;
return;
}

const histogramRangeSteps = await fetchTransactionDurationHistogramRangeSteps(
esClient,
params
);
progress.loadedHistogramStepsize = 1;

logMessage(`Loaded histogram range steps.`);

if (isCancelled) {
isRunning = false;
return;
Expand All @@ -86,20 +115,22 @@ export const asyncSearchServiceProvider = (
progress.loadedOverallHistogram = 1;
overallHistogram = overallLogHistogramChartData;

logMessage(`Loaded overall histogram chart data.`);

if (isCancelled) {
isRunning = false;
return;
}

// Create an array of ranges [2, 4, 6, ..., 98]
const percents = Array.from(range(2, 100, 2));
const percentilesRecords = await fetchTransactionDurationPercentiles(
esClient,
params,
percents
);
const {
percentiles: percentilesRecords,
} = await fetchTransactionDurationPercentiles(esClient, params, percents);
const percentiles = Object.values(percentilesRecords);

logMessage(`Loaded percentiles.`);

if (isCancelled) {
isRunning = false;
return;
Expand All @@ -110,6 +141,8 @@ export const asyncSearchServiceProvider = (
params
);

logMessage(`Identified ${fieldCandidates.length} fieldCandidates.`);

progress.loadedFieldCanditates = 1;

const fieldValuePairs = await fetchTransactionDurationFieldValuePairs(
Expand All @@ -119,6 +152,8 @@ export const asyncSearchServiceProvider = (
progress
);

logMessage(`Identified ${fieldValuePairs.length} fieldValuePairs.`);

if (isCancelled) {
isRunning = false;
return;
Expand All @@ -133,6 +168,8 @@ export const asyncSearchServiceProvider = (
totalDocCount,
} = await fetchTransactionDurationFractions(esClient, params, ranges);

logMessage(`Loaded fractions and totalDocCount of ${totalDocCount}.`);

async function* fetchTransactionDurationHistograms() {
for (const item of shuffle(fieldValuePairs)) {
if (item === undefined || isCancelled) {
Expand Down Expand Up @@ -185,7 +222,11 @@ export const asyncSearchServiceProvider = (
yield undefined;
}
} catch (e) {
error = e;
// don't fail the whole process for individual correlation queries, just add the error to the internal log.
logMessage(
`Failed to fetch correlation/kstest for '${item.field}/${item.value}'`
);
yield undefined;
}
}
}
Expand All @@ -199,10 +240,14 @@ export const asyncSearchServiceProvider = (
progress.loadedHistograms = loadedHistograms / fieldValuePairs.length;
}

isRunning = false;
logMessage(
`Identified ${values.length} significant correlations out of ${fieldValuePairs.length} field/value pairs.`
);
} catch (e) {
error = e;
}

isRunning = false;
};

fetchCorrelations();
Expand All @@ -212,6 +257,7 @@ export const asyncSearchServiceProvider = (

return {
error,
log,
isRunning,
loaded: Math.round(progress.getOverallProgress() * 100),
overallHistogram,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,23 @@ import { getQueryWithParams } from './get_query_with_params';
describe('correlations', () => {
describe('getQueryWithParams', () => {
it('returns the most basic query filtering on processor.event=transaction', () => {
const query = getQueryWithParams({ params: { index: 'apm-*' } });
const query = getQueryWithParams({
params: { index: 'apm-*', start: '2020', end: '2021' },
});
expect(query).toEqual({
bool: {
filter: [{ term: { 'processor.event': 'transaction' } }],
filter: [
{ term: { 'processor.event': 'transaction' } },
{
range: {
'@timestamp': {
format: 'epoch_millis',
gte: 1577836800000,
lte: 1609459200000,
},
},
},
],
},
});
});
Expand All @@ -24,31 +37,26 @@ describe('correlations', () => {
index: 'apm-*',
serviceName: 'actualServiceName',
transactionName: 'actualTransactionName',
start: '01-01-2021',
end: '31-01-2021',
start: '2020',
end: '2021',
environment: 'dev',
percentileThresholdValue: 75,
},
});
expect(query).toEqual({
bool: {
filter: [
{ term: { 'processor.event': 'transaction' } },
{
term: {
'service.name': 'actualServiceName',
},
},
{
term: {
'transaction.name': 'actualTransactionName',
'processor.event': 'transaction',
},
},
{
range: {
'@timestamp': {
gte: '01-01-2021',
lte: '31-01-2021',
format: 'epoch_millis',
gte: 1577836800000,
lte: 1609459200000,
},
},
},
Expand All @@ -57,6 +65,16 @@ describe('correlations', () => {
'service.environment': 'dev',
},
},
{
term: {
'service.name': 'actualServiceName',
},
},
{
term: {
'transaction.name': 'actualTransactionName',
},
},
{
range: {
'transaction.duration.us': {
Expand All @@ -71,14 +89,23 @@ describe('correlations', () => {

it('returns a query considering a custom field/value pair', () => {
const query = getQueryWithParams({
params: { index: 'apm-*' },
params: { index: 'apm-*', start: '2020', end: '2021' },
fieldName: 'actualFieldName',
fieldValue: 'actualFieldValue',
});
expect(query).toEqual({
bool: {
filter: [
{ term: { 'processor.event': 'transaction' } },
{
range: {
'@timestamp': {
format: 'epoch_millis',
gte: 1577836800000,
lte: 1609459200000,
},
},
},
{
term: {
actualFieldName: 'actualFieldValue',
Expand Down
Loading

0 comments on commit 96db2fc

Please sign in to comment.