Skip to content

Commit

Permalink
[APM] Upgrade ES client (#86594)
Browse files Browse the repository at this point in the history
Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
  • Loading branch information
dgieselaar and kibanamachine authored Jan 27, 2021
1 parent c8afae8 commit 9e68975
Show file tree
Hide file tree
Showing 25 changed files with 396 additions and 210 deletions.
32 changes: 18 additions & 14 deletions x-pack/plugins/apm/public/hooks/use_fetcher.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@ export interface FetcherResult<Data> {
error?: IHttpFetchError;
}

function getDetailsFromErrorResponse(error: IHttpFetchError) {
const message = error.body?.message ?? error.response?.statusText;
return (
<>
{message} ({error.response?.status})
<h5>
{i18n.translate('xpack.apm.fetcher.error.url', {
defaultMessage: `URL`,
})}
</h5>
{error.response?.url}
</>
);
}

// fetcher functions can return undefined OR a promise. Previously we had a more simple type
// but it led to issues when using object destructuring with default values
type InferResponseType<TReturn> = Exclude<TReturn, undefined> extends Promise<
Expand Down Expand Up @@ -82,25 +97,14 @@ export function useFetcher<TReturn>(

if (!didCancel) {
const errorDetails =
'response' in err ? (
<>
{err.response?.statusText} ({err.response?.status})
<h5>
{i18n.translate('xpack.apm.fetcher.error.url', {
defaultMessage: `URL`,
})}
</h5>
{err.response?.url}
</>
) : (
err.message
);
'response' in err ? getDetailsFromErrorResponse(err) : err.message;

if (showToastOnError) {
notifications.toasts.addWarning({
notifications.toasts.addDanger({
title: i18n.translate('xpack.apm.fetcher.error.title', {
defaultMessage: `Error while fetching resource`,
}),

text: toMountPoint(
<div>
<h5>
Expand Down
12 changes: 7 additions & 5 deletions x-pack/plugins/apm/scripts/upload-telemetry-data/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import { argv } from 'yargs';
import { Logger } from 'kibana/server';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { CollectTelemetryParams } from '../../server/lib/apm_telemetry/collect_data_telemetry';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { unwrapEsResponse } from '../../../observability/server/utils/unwrap_es_response';
import { downloadTelemetryTemplate } from '../shared/download-telemetry-template';
import { mergeApmTelemetryMapping } from '../../common/apm_telemetry';
import { generateSampleDocuments } from './generate-sample-documents';
Expand Down Expand Up @@ -80,18 +82,18 @@ async function uploadData() {
apmAgentConfigurationIndex: '.apm-agent-configuration',
},
search: (body) => {
return client.search(body as any).then((res) => res.body as any);
return unwrapEsResponse(client.search<any>(body));
},
indicesStats: (body) => {
return client.indices.stats(body as any).then((res) => res.body);
return unwrapEsResponse(client.indices.stats<any>(body));
},
transportRequest: ((params) => {
return client.transport
.request({
return unwrapEsResponse(
client.transport.request({
method: params.method,
path: params.path,
})
.then((res) => res.body);
);
}) as CollectTelemetryParams['transportRequest'],
},
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { merge } from 'lodash';
import { Logger, LegacyCallAPIOptions } from 'kibana/server';
import { IndicesStatsParams, Client } from 'elasticsearch';
import { Logger } from 'kibana/server';
import { RequestParams } from '@elastic/elasticsearch';
import {
ESSearchRequest,
ESSearchResponse,
Expand All @@ -20,9 +20,17 @@ type TelemetryTaskExecutor = (params: {
params: TSearchRequest
): Promise<ESSearchResponse<unknown, TSearchRequest>>;
indicesStats(
params: IndicesStatsParams,
options?: LegacyCallAPIOptions
): ReturnType<Client['indices']['stats']>;
params: RequestParams.IndicesStats
// promise returned by client has an abort property
// so we cannot use its ReturnType
): Promise<{
_all?: {
total?: { store?: { size_in_bytes?: number }; docs?: { count?: number } };
};
_shards?: {
total?: number;
};
}>;
transportRequest: (params: {
path: string;
method: 'get';
Expand Down
22 changes: 9 additions & 13 deletions x-pack/plugins/apm/server/lib/apm_telemetry/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
Logger,
SavedObjectsErrorHelpers,
} from '../../../../../../src/core/server';
import { unwrapEsResponse } from '../../../../observability/server';
import { APMConfig } from '../..';
import {
TaskManagerSetupContract,
Expand Down Expand Up @@ -65,27 +66,22 @@ export async function createApmTelemetry({
const collectAndStore = async () => {
const config = await config$.pipe(take(1)).toPromise();
const [{ elasticsearch }] = await core.getStartServices();
const esClient = elasticsearch.legacy.client;
const esClient = elasticsearch.client;

const indices = await getApmIndices({
config,
savedObjectsClient,
});

const search = esClient.callAsInternalUser.bind(
esClient,
'search'
) as CollectTelemetryParams['search'];
const search: CollectTelemetryParams['search'] = (params) =>
unwrapEsResponse(esClient.asInternalUser.search<any>(params));

const indicesStats = esClient.callAsInternalUser.bind(
esClient,
'indices.stats'
) as CollectTelemetryParams['indicesStats'];
const indicesStats: CollectTelemetryParams['indicesStats'] = (params) =>
unwrapEsResponse(esClient.asInternalUser.indices.stats(params));

const transportRequest = esClient.callAsInternalUser.bind(
esClient,
'transport.request'
) as CollectTelemetryParams['transportRequest'];
const transportRequest: CollectTelemetryParams['transportRequest'] = (
params
) => unwrapEsResponse(esClient.asInternalUser.transport.request(params));

const dataTelemetry = await collectDataTelemetry({
search,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,31 @@
/* eslint-disable no-console */

import chalk from 'chalk';
import {
LegacyAPICaller,
KibanaRequest,
} from '../../../../../../../src/core/server';
import { KibanaRequest } from '../../../../../../../src/core/server';

function formatObj(obj: Record<string, any>) {
return JSON.stringify(obj, null, 2);
}

export async function callClientWithDebug({
apiCaller,
operationName,
params,
export async function callAsyncWithDebug<T>({
cb,
getDebugMessage,
debug,
request,
}: {
apiCaller: LegacyAPICaller;
operationName: string;
params: Record<string, any>;
cb: () => Promise<T>;
getDebugMessage: () => { body: string; title: string };
debug: boolean;
request: KibanaRequest;
}) {
if (!debug) {
return cb();
}

const startTime = process.hrtime();

let res: any;
let esError = null;
try {
res = await apiCaller(operationName, params);
res = await cb();
} catch (e) {
// catch error and throw after outputting debug info
esError = e;
Expand All @@ -44,23 +41,14 @@ export async function callClientWithDebug({
const highlightColor = esError ? 'bgRed' : 'inverse';
const diff = process.hrtime(startTime);
const duration = `${Math.round(diff[0] * 1000 + diff[1] / 1e6)}ms`;
const routeInfo = `${request.route.method.toUpperCase()} ${
request.route.path
}`;

const { title, body } = getDebugMessage();

console.log(
chalk.bold[highlightColor](`=== Debug: ${routeInfo} (${duration}) ===`)
chalk.bold[highlightColor](`=== Debug: ${title} (${duration}) ===`)
);

if (operationName === 'search') {
console.log(`GET ${params.index}/_${operationName}`);
console.log(formatObj(params.body));
} else {
console.log(chalk.bold('ES operation:'), operationName);

console.log(chalk.bold('ES query:'));
console.log(formatObj(params));
}
console.log(body);
console.log(`\n`);
}

Expand All @@ -70,3 +58,19 @@ export async function callClientWithDebug({

return res;
}

export const getDebugBody = (
params: Record<string, any>,
operationName: string
) => {
if (operationName === 'search') {
return `GET ${params.index}/_search\n${formatObj(params.body)}`;
}

return `${chalk.bold('ES operation:')} ${operationName}\n${chalk.bold(
'ES query:'
)}\n${formatObj(params)}`;
};

export const getDebugTitle = (request: KibanaRequest) =>
`${request.route.method.toUpperCase()} ${request.route.path}`;
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { TransportRequestPromise } from '@elastic/elasticsearch/lib/Transport';
import { KibanaRequest } from 'src/core/server';

export function cancelEsRequestOnAbort<T extends TransportRequestPromise<any>>(
promise: T,
request: KibanaRequest
) {
const subscription = request.events.aborted$.subscribe(() => {
promise.abort();
});

// using .catch() here means unsubscribe will be called
// after it has thrown an error, so we use .then(onSuccess, onFailure)
// syntax
promise.then(
() => subscription.unsubscribe(),
() => subscription.unsubscribe()
);

return promise;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { contextServiceMock } from 'src/core/server/mocks';
import { createHttpServer } from 'src/core/server/test_utils';
import supertest from 'supertest';
import { createApmEventClient } from '.';

describe('createApmEventClient', () => {
let server: ReturnType<typeof createHttpServer>;

beforeEach(() => {
server = createHttpServer();
});

afterEach(async () => {
await server.stop();
});
it('cancels a search when a request is aborted', async () => {
const { server: innerServer, createRouter } = await server.setup({
context: contextServiceMock.createSetupContract(),
});
const router = createRouter('/');

const abort = jest.fn();
router.get(
{ path: '/', validate: false },
async (context, request, res) => {
const eventClient = createApmEventClient({
esClient: {
search: () => {
return Object.assign(
new Promise((resolve) => setTimeout(resolve, 3000)),
{ abort }
);
},
} as any,
debug: false,
request,
indices: {} as any,
options: {
includeFrozen: false,
},
});

await eventClient.search({
apm: {
events: [],
},
});

return res.ok({ body: 'ok' });
}
);

await server.start();

const incomingRequest = supertest(innerServer.listener)
.get('/')
// end required to send request
.end();

await new Promise((resolve) => {
setTimeout(() => {
incomingRequest.abort();
setTimeout(() => {
resolve(undefined);
}, 0);
}, 50);
});

expect(abort).toHaveBeenCalled();
});
});
Loading

0 comments on commit 9e68975

Please sign in to comment.