Skip to content

Commit

Permalink
[Synthetics] Enhance telemetry for synthetics monitors (elastic#132150)…
Browse files Browse the repository at this point in the history
… (elastic#132275)

(cherry picked from commit 6edf3dc)
  • Loading branch information
shahzad31 authored May 16, 2022
1 parent 6fdb90c commit ae54add
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 33 deletions.
30 changes: 27 additions & 3 deletions x-pack/plugins/uptime/server/lib/telemetry/sender.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,31 @@ import { loggingSystemMock } from 'src/core/server/mocks';
import { MONITOR_UPDATE_CHANNEL } from './constants';

import { TelemetryEventsSender } from './sender';
import { LicenseGetResponse } from '@elastic/elasticsearch/lib/api/types';

jest.mock('axios', () => {
return {
post: jest.fn(),
};
});

const licenseMock: LicenseGetResponse = {
license: {
status: 'active',
uid: '1d34eb9f-e66f-47d1-8d24-cd60d187587a',
type: 'trial',
issue_date: '2022-05-05T14:25:00.732Z',
issue_date_in_millis: 165176070074432,
expiry_date: '2022-06-04T14:25:00.732Z',
expiry_date_in_millis: 165435270073332,
max_nodes: 1000,
max_resource_units: null,
issued_to: '2c515bd215ce444441f83ffd36a9d3d2546',
issuer: 'elasticsearch',
start_date_in_millis: -1,
},
};

describe('TelemetryEventsSender', () => {
let logger: ReturnType<typeof loggingSystemMock.createLogger>;
let sender: TelemetryEventsSender;
Expand All @@ -42,6 +60,10 @@ describe('TelemetryEventsSender', () => {
beforeEach(() => {
logger = loggingSystemMock.createLogger();
sender = new TelemetryEventsSender(logger);
sender['fetchLicenseInfo'] = jest.fn(async () => {
return licenseMock as LicenseGetResponse;
});

sender['fetchClusterInfo'] = jest.fn(async () => {
return {
cluster_uuid: '1',
Expand Down Expand Up @@ -79,7 +101,6 @@ describe('TelemetryEventsSender', () => {

expect(sender['sendEvents']).toHaveBeenCalledWith(
`https://telemetry-staging.elastic.co/v3-dev/send/${MONITOR_UPDATE_CHANNEL}`,
{ cluster_name: 'name', cluster_uuid: '1', version: { number: '8.0.0' } },
expect.anything()
);
});
Expand Down Expand Up @@ -134,14 +155,17 @@ describe('TelemetryEventsSender', () => {
'X-Elastic-Stack-Version': '8.0.0',
},
};
const event1 = { 'event.kind': '1', ...licenseMock };
const event2 = { 'event.kind': '2', ...licenseMock };
const event3 = { 'event.kind': '3', ...licenseMock };
expect(axios.post).toHaveBeenCalledWith(
'https://telemetry.elastic.co/v3/send/my-channel',
'{"event.kind":"1"}\n{"event.kind":"2"}\n',
`${JSON.stringify(event1)}\n${JSON.stringify(event2)}\n`,
headers
);
expect(axios.post).toHaveBeenCalledWith(
'https://telemetry.elastic.co/v3/send/my-channel2',
'{"event.kind":"3"}\n',
`${JSON.stringify(event3)}\n`,
headers
);
});
Expand Down
58 changes: 28 additions & 30 deletions x-pack/plugins/uptime/server/lib/telemetry/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { cloneDeep } from 'lodash';

import axios from 'axios';

import type { InfoResponse } from '@elastic/elasticsearch/lib/api/types';
import type { InfoResponse, LicenseGetResponse } from '@elastic/elasticsearch/lib/api/types';

import { TelemetryQueue } from './queue';

Expand All @@ -35,6 +35,7 @@ export class TelemetryEventsSender {
private isOptedIn?: boolean = true; // Assume true until the first check
private esClient?: ElasticsearchClient;
private clusterInfo?: InfoResponse;
private licenseInfo?: LicenseGetResponse;

constructor(logger: Logger) {
this.logger = logger;
Expand All @@ -48,6 +49,7 @@ export class TelemetryEventsSender {
this.telemetryStart = telemetryStart;
this.esClient = core?.elasticsearch.client.asInternalUser;
this.clusterInfo = await this.fetchClusterInfo();
this.licenseInfo = await this.fetchLicenseInfo();

this.logger.debug(`Starting local task`);
setTimeout(() => {
Expand Down Expand Up @@ -95,48 +97,44 @@ export class TelemetryEventsSender {
}

for (const channel of Object.keys(this.queuesPerChannel)) {
await this.sendEvents(
await this.fetchTelemetryUrl(channel),
this.clusterInfo,
this.queuesPerChannel[channel]
);
await this.sendEvents(await this.fetchTelemetryUrl(channel), this.queuesPerChannel[channel]);
}

this.isSending = false;
}

private async fetchClusterInfo(): Promise<InfoResponse> {
if (this.esClient === undefined || this.esClient === null) {
throw Error('elasticsearch client is unavailable: cannot retrieve cluster infomation');
throw Error('elasticsearch client is unavailable: cannot retrieve cluster information');
}

return await this.esClient.info();
}

public async sendEvents(
telemetryUrl: string,
clusterInfo: InfoResponse | undefined,
queue: TelemetryQueue<any>
) {
const events = queue.getEvents();
private async fetchLicenseInfo() {
if (this.esClient === undefined || this.esClient === null) {
throw Error('elasticsearch client is unavailable: cannot retrieve license information');
}

return await this.esClient.license.get();
}

public async sendEvents(telemetryUrl: string, queue: TelemetryQueue<any>) {
let events = queue.getEvents();
if (events.length === 0) {
return;
}

events = events.map((event) => ({ ...event, license: this.licenseInfo?.license }));

try {
this.logger.debug(`Telemetry URL: ${telemetryUrl}`);

queue.clearEvents();

this.logger.debug(JSON.stringify(events));

await this.send(
events,
telemetryUrl,
clusterInfo?.cluster_uuid,
clusterInfo?.version?.number,
clusterInfo?.cluster_name
);
await this.send(events, telemetryUrl);
} catch (err) {
this.logger.debug(`Error sending telemetry events data: ${err}`);
queue.clearEvents();
Expand All @@ -159,13 +157,13 @@ export class TelemetryEventsSender {
return telemetryUrl.toString();
}

private async send(
events: unknown[],
telemetryUrl: string,
clusterUuid: string | undefined,
clusterVersionNumber: string | undefined,
clusterName: string | undefined
) {
private async send(events: unknown[], telemetryUrl: string) {
const {
cluster_name: clusterName,
cluster_uuid: clusterUuid,
version: clusterVersion,
} = this.clusterInfo ?? {};

// using ndjson so that each line will be wrapped in json envelope on server side
// see https://github.com/elastic/infra/blob/master/docs/telemetry/telemetry-next-dataflow.md#json-envelope
const ndjson = this.transformDataToNdjson(events);
Expand All @@ -174,9 +172,9 @@ export class TelemetryEventsSender {
const resp = await axios.post(telemetryUrl, ndjson, {
headers: {
'Content-Type': 'application/x-ndjson',
'X-Elastic-Cluster-ID': clusterUuid,
'X-Elastic-Cluster-Name': clusterName,
'X-Elastic-Stack-Version': clusterVersionNumber ? clusterVersionNumber : '8.2.0',
...(clusterUuid ? { 'X-Elastic-Cluster-ID': clusterUuid } : undefined),
...(clusterName ? { 'X-Elastic-Cluster-Name': clusterName } : undefined),
'X-Elastic-Stack-Version': clusterVersion?.number ? clusterVersion.number : '8.2.0',
},
});
this.logger.debug(`Events sent!. Response: ${resp.status} ${JSON.stringify(resp.data)}`);
Expand Down

0 comments on commit ae54add

Please sign in to comment.