From 32ad765aec24656df6696161860b08fca61deb9d Mon Sep 17 00:00:00 2001 From: Ahmad Bamieh Date: Sun, 4 Oct 2020 17:44:47 +0300 Subject: [PATCH 1/4] check collectors ready before sending --- src/plugins/telemetry/server/fetcher.test.ts | 37 +++++++++++++++++-- src/plugins/telemetry/server/fetcher.ts | 11 +++++- .../server/plugin.ts | 6 +++ .../server/types.ts | 2 + .../server/collector/collector_set.ts | 30 +++++++++------ 5 files changed, 70 insertions(+), 16 deletions(-) diff --git a/src/plugins/telemetry/server/fetcher.test.ts b/src/plugins/telemetry/server/fetcher.test.ts index 245adf59799cc..71f9b20bf08f5 100644 --- a/src/plugins/telemetry/server/fetcher.test.ts +++ b/src/plugins/telemetry/server/fetcher.test.ts @@ -27,15 +27,46 @@ describe('FetcherTask', () => { const initializerContext = coreMock.createPluginInitializerContext({}); const fetcherTask = new FetcherTask(initializerContext); const mockError = new Error('Some message.'); - fetcherTask['getCurrentConfigs'] = async () => { - throw mockError; - }; + const getCurrentConfigs = jest.fn().mockRejectedValue(mockError); + Object.assign(fetcherTask, { + getCurrentConfigs, + }); const result = await fetcherTask['sendIfDue'](); expect(result).toBe(undefined); + expect(getCurrentConfigs).toBeCalledTimes(1); expect(fetcherTask['logger'].warn).toBeCalledTimes(1); expect(fetcherTask['logger'].warn).toHaveBeenCalledWith( `Error fetching telemetry configs: ${mockError}` ); }); + + it('fails when all collectors are not ready', async () => { + const initializerContext = coreMock.createPluginInitializerContext({}); + const fetcherTask = new FetcherTask(initializerContext); + const getCurrentConfigs = jest.fn().mockResolvedValue({}); + const areAllCollectorsReady = jest.fn().mockResolvedValue(false); + const shouldSendReport = jest.fn().mockReturnValue(true); + const fetchTelemetry = jest.fn(); + const updateReportFailure = jest.fn(); + + Object.assign(fetcherTask, { + getCurrentConfigs, + areAllCollectorsReady, + shouldSendReport, + fetchTelemetry, + updateReportFailure, + }); + + await fetcherTask['sendIfDue'](); + + expect(fetchTelemetry).toBeCalledTimes(0); + + expect(areAllCollectorsReady).toBeCalledTimes(1); + expect(updateReportFailure).toBeCalledTimes(1); + expect(fetcherTask['logger'].warn).toBeCalledTimes(1); + expect(fetcherTask['logger'].warn).toHaveBeenCalledWith( + `Error sending telemetry usage data. (Error: Not all collectors are ready.)` + ); + }); }); }); diff --git a/src/plugins/telemetry/server/fetcher.ts b/src/plugins/telemetry/server/fetcher.ts index 75cfac721bcd3..0cd423c648622 100644 --- a/src/plugins/telemetry/server/fetcher.ts +++ b/src/plugins/telemetry/server/fetcher.ts @@ -94,6 +94,10 @@ export class FetcherTask { } } + private async areAllCollectorsReady() { + return await this.telemetryCollectionManager!.areAllCollectorsReady(); + } + private async sendIfDue() { if (this.isSending) { return; @@ -113,6 +117,11 @@ export class FetcherTask { try { this.isSending = true; + const allCollectorsReady = await this.areAllCollectorsReady(); + + if (!allCollectorsReady) { + throw new Error('Not all collectors are ready.'); + } const clusters = await this.fetchTelemetry(); const { telemetryUrl } = telemetryConfig; for (const cluster of clusters) { @@ -123,7 +132,7 @@ export class FetcherTask { } catch (err) { await this.updateReportFailure(telemetryConfig); - this.logger.warn(`Error sending telemetry usage data: ${err}`); + this.logger.warn(`Error sending telemetry usage data. (${err})`); } this.isSending = false; } diff --git a/src/plugins/telemetry_collection_manager/server/plugin.ts b/src/plugins/telemetry_collection_manager/server/plugin.ts index e54e7451a670a..ff63262004cf5 100644 --- a/src/plugins/telemetry_collection_manager/server/plugin.ts +++ b/src/plugins/telemetry_collection_manager/server/plugin.ts @@ -67,6 +67,7 @@ export class TelemetryCollectionManagerPlugin setCollection: this.setCollection.bind(this), getOptInStats: this.getOptInStats.bind(this), getStats: this.getStats.bind(this), + areAllCollectorsReady: this.areAllCollectorsReady.bind(this), }; } @@ -75,6 +76,7 @@ export class TelemetryCollectionManagerPlugin setCollection: this.setCollection.bind(this), getOptInStats: this.getOptInStats.bind(this), getStats: this.getStats.bind(this), + areAllCollectorsReady: this.areAllCollectorsReady.bind(this), }; } @@ -185,6 +187,10 @@ export class TelemetryCollectionManagerPlugin return []; } + private areAllCollectorsReady = async () => { + return await this.usageCollection?.areAllCollectorsReady(); + }; + private getOptInStatsForCollection = async ( collection: Collection, optInStatus: boolean, diff --git a/src/plugins/telemetry_collection_manager/server/types.ts b/src/plugins/telemetry_collection_manager/server/types.ts index 44970df30fd16..3b0936fb73a60 100644 --- a/src/plugins/telemetry_collection_manager/server/types.ts +++ b/src/plugins/telemetry_collection_manager/server/types.ts @@ -34,6 +34,7 @@ export interface TelemetryCollectionManagerPluginSetup { ) => void; getOptInStats: TelemetryCollectionManagerPlugin['getOptInStats']; getStats: TelemetryCollectionManagerPlugin['getStats']; + areAllCollectorsReady: TelemetryCollectionManagerPlugin['areAllCollectorsReady']; } export interface TelemetryCollectionManagerPluginStart { @@ -42,6 +43,7 @@ export interface TelemetryCollectionManagerPluginStart { ) => void; getOptInStats: TelemetryCollectionManagerPlugin['getOptInStats']; getStats: TelemetryCollectionManagerPlugin['getStats']; + areAllCollectorsReady: TelemetryCollectionManagerPlugin['areAllCollectorsReady']; } export interface TelemetryOptInStats { diff --git a/src/plugins/usage_collection/server/collector/collector_set.ts b/src/plugins/usage_collection/server/collector/collector_set.ts index 6861be7f4f76b..3b1390f7b2ae2 100644 --- a/src/plugins/usage_collection/server/collector/collector_set.ts +++ b/src/plugins/usage_collection/server/collector/collector_set.ts @@ -21,6 +21,7 @@ import { snakeCase } from 'lodash'; import { Logger, LegacyAPICaller, ElasticsearchClient } from 'kibana/server'; import { Collector, CollectorOptions } from './collector'; import { UsageCollector } from './usage_collector'; +import { awaitBefore } from './await_before'; interface CollectorSetConfig { logger: Logger; @@ -76,23 +77,27 @@ export class CollectorSet { }; public areAllCollectorsReady = async (collectorSet: CollectorSet = this) => { - // Kept this for runtime validation in JS code. if (!(collectorSet instanceof CollectorSet)) { throw new Error( `areAllCollectorsReady method given bad collectorSet parameter: ` + typeof collectorSet ); } - const collectorTypesNotReady = ( - await Promise.all( - [...collectorSet.collectors.values()].map(async (collector) => { - if (!(await collector.isReady())) { - return collector.type; - } - }) - ) - ).filter((collectorType): collectorType is string => !!collectorType); - const allReady = collectorTypesNotReady.length === 0; + const collectors = [...collectorSet.collectors.values()]; + const collectorsWithStatus = await Promise.all( + collectors.map(async (collector) => { + return { + isReady: await collector.isReady(), + collector, + }; + }) + ); + + const collectorsTypesNotReady = collectorsWithStatus + .filter((collectorWithStatus) => collectorWithStatus.isReady === false) + .map((collectorWithStatus) => collectorWithStatus.collector.type); + + const allReady = collectorsTypesNotReady.length === 0; if (!allReady && this.maximumWaitTimeForAllCollectorsInS >= 0) { const nowTimestamp = +new Date(); @@ -102,10 +107,11 @@ export class CollectorSet { const timeLeftInMS = this.maximumWaitTimeForAllCollectorsInS * 1000 - timeWaitedInMS; if (timeLeftInMS <= 0) { this.logger.debug( - `All collectors are not ready (waiting for ${collectorTypesNotReady.join(',')}) ` + + `All collectors are not ready (waiting for ${collectorsTypesNotReady.join(',')}) ` + `but we have waited the required ` + `${this.maximumWaitTimeForAllCollectorsInS}s and will return data from all collectors that are ready.` ); + return true; } else { this.logger.debug(`All collectors are not ready. Waiting for ${timeLeftInMS}ms longer.`); From 7033acba208ab175f0c2fba04856efcdc606f350 Mon Sep 17 00:00:00 2001 From: Ahmad Bamieh Date: Sun, 4 Oct 2020 18:02:30 +0300 Subject: [PATCH 2/4] remove await_before fn --- src/plugins/usage_collection/server/collector/collector_set.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/plugins/usage_collection/server/collector/collector_set.ts b/src/plugins/usage_collection/server/collector/collector_set.ts index 3b1390f7b2ae2..7bf4e19c72cc0 100644 --- a/src/plugins/usage_collection/server/collector/collector_set.ts +++ b/src/plugins/usage_collection/server/collector/collector_set.ts @@ -21,7 +21,6 @@ import { snakeCase } from 'lodash'; import { Logger, LegacyAPICaller, ElasticsearchClient } from 'kibana/server'; import { Collector, CollectorOptions } from './collector'; import { UsageCollector } from './usage_collector'; -import { awaitBefore } from './await_before'; interface CollectorSetConfig { logger: Logger; From fa67cfdfd664e9831a8bd0038fa3958b6a68df48 Mon Sep 17 00:00:00 2001 From: Ahmad Bamieh Date: Mon, 5 Oct 2020 14:21:51 +0300 Subject: [PATCH 3/4] prevent fetch from updating report failure --- src/plugins/telemetry/server/fetcher.test.ts | 53 +++++++++++++++++-- src/plugins/telemetry/server/fetcher.ts | 21 ++++++-- .../server/index.ts | 1 + 3 files changed, 65 insertions(+), 10 deletions(-) diff --git a/src/plugins/telemetry/server/fetcher.test.ts b/src/plugins/telemetry/server/fetcher.test.ts index 71f9b20bf08f5..45712df772e1c 100644 --- a/src/plugins/telemetry/server/fetcher.test.ts +++ b/src/plugins/telemetry/server/fetcher.test.ts @@ -23,30 +23,37 @@ import { coreMock } from '../../../core/server/mocks'; describe('FetcherTask', () => { describe('sendIfDue', () => { - it('returns undefined and warns when it fails to get telemetry configs', async () => { + it('stops when it fails to get telemetry configs', async () => { const initializerContext = coreMock.createPluginInitializerContext({}); const fetcherTask = new FetcherTask(initializerContext); const mockError = new Error('Some message.'); const getCurrentConfigs = jest.fn().mockRejectedValue(mockError); + const fetchTelemetry = jest.fn(); + const sendTelemetry = jest.fn(); Object.assign(fetcherTask, { getCurrentConfigs, + fetchTelemetry, + sendTelemetry, }); const result = await fetcherTask['sendIfDue'](); expect(result).toBe(undefined); expect(getCurrentConfigs).toBeCalledTimes(1); + expect(fetchTelemetry).toBeCalledTimes(0); + expect(sendTelemetry).toBeCalledTimes(0); expect(fetcherTask['logger'].warn).toBeCalledTimes(1); expect(fetcherTask['logger'].warn).toHaveBeenCalledWith( - `Error fetching telemetry configs: ${mockError}` + `Error getting telemetry configs. (${mockError})` ); }); - it('fails when all collectors are not ready', async () => { + it('stops when all collectors are not ready', async () => { const initializerContext = coreMock.createPluginInitializerContext({}); const fetcherTask = new FetcherTask(initializerContext); const getCurrentConfigs = jest.fn().mockResolvedValue({}); const areAllCollectorsReady = jest.fn().mockResolvedValue(false); const shouldSendReport = jest.fn().mockReturnValue(true); const fetchTelemetry = jest.fn(); + const sendTelemetry = jest.fn(); const updateReportFailure = jest.fn(); Object.assign(fetcherTask, { @@ -55,18 +62,54 @@ describe('FetcherTask', () => { shouldSendReport, fetchTelemetry, updateReportFailure, + sendTelemetry, }); await fetcherTask['sendIfDue'](); expect(fetchTelemetry).toBeCalledTimes(0); + expect(sendTelemetry).toBeCalledTimes(0); expect(areAllCollectorsReady).toBeCalledTimes(1); - expect(updateReportFailure).toBeCalledTimes(1); + expect(updateReportFailure).toBeCalledTimes(0); expect(fetcherTask['logger'].warn).toBeCalledTimes(1); expect(fetcherTask['logger'].warn).toHaveBeenCalledWith( - `Error sending telemetry usage data. (Error: Not all collectors are ready.)` + `Error fetching usage. (Error: Not all collectors are ready.)` ); }); + + it('fetches usage and send telemetry', async () => { + const initializerContext = coreMock.createPluginInitializerContext({}); + const fetcherTask = new FetcherTask(initializerContext); + const mockTelemetryUrl = 'mock_telemetry_url'; + const mockClusters = ['cluster_1', 'cluster_2']; + const getCurrentConfigs = jest.fn().mockResolvedValue({ + telemetryUrl: mockTelemetryUrl, + }); + const areAllCollectorsReady = jest.fn().mockResolvedValue(true); + const shouldSendReport = jest.fn().mockReturnValue(true); + + const fetchTelemetry = jest.fn().mockResolvedValue(mockClusters); + const sendTelemetry = jest.fn(); + const updateReportFailure = jest.fn(); + + Object.assign(fetcherTask, { + getCurrentConfigs, + areAllCollectorsReady, + shouldSendReport, + fetchTelemetry, + updateReportFailure, + sendTelemetry, + }); + + await fetcherTask['sendIfDue'](); + + expect(areAllCollectorsReady).toBeCalledTimes(1); + expect(fetchTelemetry).toBeCalledTimes(1); + expect(sendTelemetry).toBeCalledTimes(2); + expect(sendTelemetry).toHaveBeenNthCalledWith(1, mockTelemetryUrl, mockClusters[0]); + expect(sendTelemetry).toHaveBeenNthCalledWith(2, mockTelemetryUrl, mockClusters[1]); + expect(updateReportFailure).toBeCalledTimes(0); + }); }); }); diff --git a/src/plugins/telemetry/server/fetcher.ts b/src/plugins/telemetry/server/fetcher.ts index 0cd423c648622..884426afcb1b3 100644 --- a/src/plugins/telemetry/server/fetcher.ts +++ b/src/plugins/telemetry/server/fetcher.ts @@ -22,7 +22,10 @@ import { Observable } from 'rxjs'; import { take } from 'rxjs/operators'; // @ts-ignore import fetch from 'node-fetch'; -import { TelemetryCollectionManagerPluginStart } from 'src/plugins/telemetry_collection_manager/server'; +import { + TelemetryCollectionManagerPluginStart, + UsageStatsPayload, +} from 'src/plugins/telemetry_collection_manager/server'; import { PluginInitializerContext, Logger, @@ -107,7 +110,7 @@ export class FetcherTask { try { telemetryConfig = await this.getCurrentConfigs(); } catch (err) { - this.logger.warn(`Error fetching telemetry configs: ${err}`); + this.logger.warn(`Error getting telemetry configs. (${err})`); return; } @@ -115,14 +118,22 @@ export class FetcherTask { return; } + let clusters: Array = []; + this.isSending = true; + try { - this.isSending = true; const allCollectorsReady = await this.areAllCollectorsReady(); - if (!allCollectorsReady) { throw new Error('Not all collectors are ready.'); } - const clusters = await this.fetchTelemetry(); + clusters = await this.fetchTelemetry(); + } catch (err) { + this.logger.warn(`Error fetching usage. (${err})`); + this.isSending = false; + return; + } + + try { const { telemetryUrl } = telemetryConfig; for (const cluster of clusters) { await this.sendTelemetry(telemetryUrl, cluster); diff --git a/src/plugins/telemetry_collection_manager/server/index.ts b/src/plugins/telemetry_collection_manager/server/index.ts index 8761c28e14095..36ab64731fe58 100644 --- a/src/plugins/telemetry_collection_manager/server/index.ts +++ b/src/plugins/telemetry_collection_manager/server/index.ts @@ -38,4 +38,5 @@ export { ClusterDetails, ClusterDetailsGetter, LicenseGetter, + UsageStatsPayload, } from './types'; From ac6d6a27b0c643bf4ca0f497f090188d2104feb9 Mon Sep 17 00:00:00 2001 From: Ahmad Bamieh Date: Mon, 5 Oct 2020 14:42:25 +0300 Subject: [PATCH 4/4] Update src/plugins/telemetry/server/fetcher.ts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Alejandro Fernández Haro --- src/plugins/telemetry/server/fetcher.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/plugins/telemetry/server/fetcher.ts b/src/plugins/telemetry/server/fetcher.ts index 884426afcb1b3..e6d909965f5f7 100644 --- a/src/plugins/telemetry/server/fetcher.ts +++ b/src/plugins/telemetry/server/fetcher.ts @@ -98,7 +98,7 @@ export class FetcherTask { } private async areAllCollectorsReady() { - return await this.telemetryCollectionManager!.areAllCollectorsReady(); + return (await this.telemetryCollectionManager?.areAllCollectorsReady()) ?? false; } private async sendIfDue() {