From f8a221392bac6180a03b369d351d3cc4bdd04b7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Ferna=CC=81ndez=20Haro?= Date: Tue, 17 Nov 2020 15:20:13 +0100 Subject: [PATCH] [Monitoring] Stop collecting Kibana Usage in bulkUploader --- .../server/kibana_monitoring/bulk_uploader.js | 275 ------------------ .../server/kibana_monitoring/bulk_uploader.ts | 274 +++++++++++++++++ .../collectors/get_settings_collector.ts | 59 ++-- .../kibana_monitoring/collectors/index.ts | 2 +- .../kibana_monitoring/{index.js => index.ts} | 0 .../kibana_monitoring/{init.js => init.ts} | 6 +- .../lib/{index.js => index.ts} | 1 + ...d_bulk_payload.js => send_bulk_payload.ts} | 7 +- .../plugins/monitoring/server/plugin.test.ts | 31 +- x-pack/plugins/monitoring/server/plugin.ts | 11 +- x-pack/plugins/monitoring/server/types.ts | 1 + 11 files changed, 331 insertions(+), 336 deletions(-) delete mode 100644 x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js create mode 100644 x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.ts rename x-pack/plugins/monitoring/server/kibana_monitoring/{index.js => index.ts} (100%) rename x-pack/plugins/monitoring/server/kibana_monitoring/{init.js => init.ts} (76%) rename x-pack/plugins/monitoring/server/kibana_monitoring/lib/{index.js => index.ts} (96%) rename x-pack/plugins/monitoring/server/kibana_monitoring/lib/{send_bulk_payload.js => send_bulk_payload.ts} (78%) diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js b/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js deleted file mode 100644 index 5d8af8d71b7fc..0000000000000 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { defaultsDeep, uniq, compact } from 'lodash'; -import { ServiceStatusLevels } from '../../../../../src/core/server'; -import { - TELEMETRY_COLLECTION_INTERVAL, - KIBANA_STATS_TYPE_MONITORING, -} from '../../common/constants'; - -import { sendBulkPayload, monitoringBulk } from './lib'; - -/* - * Handles internal Kibana stats collection and uploading data to Monitoring - * bulk endpoint. - * - * NOTE: internal collection will be removed in 7.0 - * - * Depends on - * - 'monitoring.kibana.collection.enabled' config - * - monitoring enabled in ES (checked against xpack_main.info license info change) - * The dependencies are handled upstream - * - Ops Events - essentially Kibana's /api/status - * - Usage Stats - essentially Kibana's /api/stats - * - Kibana Settings - select uiSettings - * @param {Object} server HapiJS server instance - * @param {Object} xpackInfo server.plugins.xpack_main.info object - */ -export class BulkUploader { - constructor({ log, interval, elasticsearch, statusGetter$, kibanaStats }) { - if (typeof interval !== 'number') { - throw new Error('interval number of milliseconds is required'); - } - - this._timer = null; - // Hold sending and fetching usage until monitoring.bulk is successful. This means that we - // send usage data on the second tick. But would save a lot of bandwidth fetching usage on - // every tick when ES is failing or monitoring is disabled. - this._holdSendingUsage = false; - this._interval = interval; - this._lastFetchUsageTime = null; - // Limit sending and fetching usage to once per day once usage is successfully stored - // into the monitoring indices. - this._usageInterval = TELEMETRY_COLLECTION_INTERVAL; - this._log = log; - - this._cluster = elasticsearch.legacy.createClient('admin', { - plugins: [monitoringBulk], - }); - - this.kibanaStats = kibanaStats; - - this.kibanaStatus = null; - this.kibanaStatusGetter$ = statusGetter$.subscribe((nextStatus) => { - this.kibanaStatus = nextStatus.level; - }); - } - - filterCollectorSet(usageCollection) { - const successfulUploadInLastDay = - this._lastFetchUsageTime && this._lastFetchUsageTime + this._usageInterval > Date.now(); - - return usageCollection.getFilteredCollectorSet((c) => { - // this is internal bulk upload, so filter out API-only collectors - if (c.ignoreForInternalUploader) { - return false; - } - // Only collect usage data at the same interval as telemetry would (default to once a day) - if (usageCollection.isUsageCollector(c)) { - if (this._holdSendingUsage) { - return false; - } - if (successfulUploadInLastDay) { - return false; - } - } - - return true; - }); - } - - /* - * Start the interval timer - * @param {usageCollection} usageCollection object to use for initial the fetch/upload and fetch/uploading on interval - * @return undefined - */ - start(usageCollection) { - this._log.info('Starting monitoring stats collection'); - - if (this._timer) { - clearInterval(this._timer); - } else { - this._fetchAndUpload(this.filterCollectorSet(usageCollection)); // initial fetch - } - - this._timer = setInterval(() => { - this._fetchAndUpload(this.filterCollectorSet(usageCollection)); - }, this._interval); - } - - /* - * start() and stop() are lifecycle event handlers for - * xpackMainPlugin license changes - * @param {String} logPrefix help give context to the reason for stopping - */ - stop(logPrefix) { - clearInterval(this._timer); - this._timer = null; - - const prefix = logPrefix ? logPrefix + ':' : ''; - this._log.info(prefix + 'Monitoring stats collection is stopped'); - } - - handleNotEnabled() { - this.stop('Monitoring status upload endpoint is not enabled in Elasticsearch'); - } - handleConnectionLost() { - this.stop('Connection issue detected'); - } - - /* - * @param {usageCollection} usageCollection - * @return {Promise} - resolves to undefined - */ - async _fetchAndUpload(usageCollection) { - const collectorsReady = await usageCollection.areAllCollectorsReady(); - const hasUsageCollectors = usageCollection.some(usageCollection.isUsageCollector); - if (!collectorsReady) { - this._log.debug('Skipping bulk uploading because not all collectors are ready'); - if (hasUsageCollectors) { - this._lastFetchUsageTime = null; - this._log.debug('Resetting lastFetchWithUsage because not all collectors are ready'); - } - return; - } - - const data = await usageCollection.bulkFetch(this._cluster.callAsInternalUser); - const payload = this.toBulkUploadFormat(compact(data), usageCollection); - if (payload && payload.length > 0) { - try { - this._log.debug(`Uploading bulk stats payload to the local cluster`); - const result = await this._onPayload(payload); - const sendSuccessful = !result.ignored && !result.errors; - if (!sendSuccessful && hasUsageCollectors) { - this._lastFetchUsageTime = null; - this._holdSendingUsage = true; - this._log.debug( - 'Resetting lastFetchWithUsage because uploading to the cluster was not successful.' - ); - } - - if (sendSuccessful) { - this._holdSendingUsage = false; - if (hasUsageCollectors) { - this._lastFetchUsageTime = Date.now(); - } - } - this._log.debug(`Uploaded bulk stats payload to the local cluster`); - } catch (err) { - this._log.warn(err.stack); - this._log.warn(`Unable to bulk upload the stats payload to the local cluster`); - } - } else { - this._log.debug(`Skipping bulk uploading of an empty stats payload`); - } - } - - async _onPayload(payload) { - return await sendBulkPayload(this._cluster, this._interval, payload, this._log); - } - - getConvertedKibanaStatuss() { - if (this.kibanaStatus === ServiceStatusLevels.available) { - return 'green'; - } - if (this.kibanaStatus === ServiceStatusLevels.critical) { - return 'red'; - } - if (this.kibanaStatus === ServiceStatusLevels.degraded) { - return 'yellow'; - } - return 'unknown'; - } - - getKibanaStats(type) { - const stats = { - ...this.kibanaStats, - status: this.getConvertedKibanaStatuss(), - }; - - if (type === KIBANA_STATS_TYPE_MONITORING) { - delete stats.port; - delete stats.locale; - } - - return stats; - } - - /* - * Bulk stats are transformed into a bulk upload format - * Non-legacy transformation is done in CollectorSet.toApiStats - * - * Example: - * Before: - * [ - * { - * "type": "kibana_stats", - * "result": { - * "process": { ... }, - * "requests": { ... }, - * ... - * } - * }, - * ] - * - * After: - * [ - * { - * "index": { - * "_type": "kibana_stats" - * } - * }, - * { - * "kibana": { - * "host": "localhost", - * "uuid": "d619c5d1-4315-4f35-b69d-a3ac805489fb", - * "version": "7.0.0-alpha1", - * ... - * }, - * "process": { ... }, - * "requests": { ... }, - * ... - * } - * ] - */ - toBulkUploadFormat(rawData, usageCollection) { - if (rawData.length === 0) { - return []; - } - - // convert the raw data to a nested object by taking each payload through - // its formatter, organizing it per-type - const typesNested = rawData.reduce((accum, { type, result }) => { - const { type: uploadType, payload: uploadData } = usageCollection - .getCollectorByType(type) - .formatForBulkUpload(result); - return defaultsDeep(accum, { [uploadType]: uploadData }); - }, {}); - // convert the nested object into a flat array, with each payload prefixed - // with an 'index' instruction, for bulk upload - const flat = Object.keys(typesNested).reduce((accum, type) => { - return [ - ...accum, - { index: { _type: type } }, - { - kibana: this.getKibanaStats(type), - ...typesNested[type], - }, - ]; - }, []); - - return flat; - } - - static checkPayloadTypesUnique(payload) { - const ids = payload.map((item) => item[0].index._type); - const uniques = uniq(ids); - if (ids.length !== uniques.length) { - throw new Error('Duplicate collector type identifiers found in payload! ' + ids.join(',')); - } - } -} diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.ts b/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.ts new file mode 100644 index 0000000000000..e17d3e58e859c --- /dev/null +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.ts @@ -0,0 +1,274 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { Observable, Subscription } from 'rxjs'; +import { take } from 'rxjs/operators'; +import moment from 'moment'; +import { + ElasticsearchServiceSetup, + ILegacyCustomClusterClient, + Logger, + OpsMetrics, + ServiceStatus, + ServiceStatusLevel, + ServiceStatusLevels, +} from '../../../../../src/core/server'; +import { KIBANA_STATS_TYPE_MONITORING, KIBANA_SETTINGS_TYPE } from '../../common/constants'; + +import { sendBulkPayload, monitoringBulk } from './lib'; +import { getKibanaSettings } from './collectors'; +import { MonitoringConfig } from '../config'; + +export interface BulkUploaderOptions { + log: Logger; + config: MonitoringConfig; + interval: number; + elasticsearch: ElasticsearchServiceSetup; + statusGetter$: Observable; + opsMetrics$: Observable; + kibanaStats: KibanaStats; +} + +export interface KibanaStats { + uuid: string; + name: string; + index: string; + host: string; + locale: string; + port: string; + transport_address: string; + version: string; + snapshot: boolean; +} + +/* + * Handles internal Kibana stats collection and uploading data to Monitoring + * bulk endpoint. + * + * NOTE: internal collection will be removed in 7.0 + * + * Depends on + * - 'monitoring.kibana.collection.enabled' config + * - monitoring enabled in ES (checked against xpack_main.info license info change) + * The dependencies are handled upstream + * - Ops Events - essentially Kibana's /api/status + * - Usage Stats - essentially Kibana's /api/stats + * - Kibana Settings - select uiSettings + * @param {Object} server HapiJS server instance + * @param {Object} xpackInfo server.plugins.xpack_main.info object + */ +export class BulkUploader { + private readonly _log: Logger; + private readonly _cluster: ILegacyCustomClusterClient; + private readonly kibanaStats: KibanaStats; + private readonly kibanaStatusGetter$: Subscription; + private readonly opsMetrics$: Observable; + private kibanaStatus: ServiceStatusLevel | null; + private _timer: NodeJS.Timer | null; + private readonly _interval: number; + private readonly config: MonitoringConfig; + constructor({ + log, + config, + interval, + elasticsearch, + statusGetter$, + opsMetrics$, + kibanaStats, + }: BulkUploaderOptions) { + if (typeof interval !== 'number') { + throw new Error('interval number of milliseconds is required'); + } + + this.opsMetrics$ = opsMetrics$; + this.config = config; + + this._timer = null; + this._interval = interval; + this._log = log; + + this._cluster = elasticsearch.legacy.createClient('admin', { + plugins: [monitoringBulk], + }); + + this.kibanaStats = kibanaStats; + + this.kibanaStatus = null; + this.kibanaStatusGetter$ = statusGetter$.subscribe((nextStatus) => { + this.kibanaStatus = nextStatus.level; + }); + } + + /* + * Start the interval timer + * @param {usageCollection} usageCollection object to use for initial the fetch/upload and fetch/uploading on interval + * @return undefined + */ + public start() { + this._log.info('Starting monitoring stats collection'); + + if (this._timer) { + clearInterval(this._timer); + } else { + this._fetchAndUpload(); // initial fetch + } + + this._timer = setInterval(() => { + this._fetchAndUpload(); + }, this._interval); + } + + /* + * start() and stop() are lifecycle event handlers for + * xpackMainPlugin license changes + * @param {String} logPrefix help give context to the reason for stopping + */ + public stop(logPrefix?: string) { + if (this._timer) clearInterval(this._timer); + this._timer = null; + + this.kibanaStatusGetter$.unsubscribe(); + this._cluster.close(); + + const prefix = logPrefix ? logPrefix + ':' : ''; + this._log.info(prefix + 'Monitoring stats collection is stopped'); + } + + public handleNotEnabled() { + this.stop('Monitoring status upload endpoint is not enabled in Elasticsearch'); + } + public handleConnectionLost() { + this.stop('Connection issue detected'); + } + + /** + * Retrieves the OpsMetrics in the same format as the `kibana_stats` collector + * @private + */ + private async getOpsMetrics() { + const { + process: { pid, ...process }, + collected_at: collectedAt, + requests: { statusCodes, ...requests }, + ...lastMetrics + } = await this.opsMetrics$.pipe(take(1)).toPromise(); + return { + ...lastMetrics, + process, + requests, + response_times: { + average: lastMetrics.response_times.avg_in_millis, + max: lastMetrics.response_times.max_in_millis, + }, + timestamp: moment.utc(collectedAt).toISOString(), + }; + } + + private async _fetchAndUpload() { + const data = await Promise.all([ + { type: KIBANA_STATS_TYPE_MONITORING, result: await this.getOpsMetrics() }, + { type: KIBANA_SETTINGS_TYPE, result: await getKibanaSettings(this._log, this.config) }, + ]); + + const payload = this.toBulkUploadFormat(data); + if (payload && payload.length > 0) { + try { + this._log.debug(`Uploading bulk stats payload to the local cluster`); + await this._onPayload(payload); + this._log.debug(`Uploaded bulk stats payload to the local cluster`); + } catch (err) { + this._log.warn(err.stack); + this._log.warn(`Unable to bulk upload the stats payload to the local cluster`); + } + } else { + this._log.debug(`Skipping bulk uploading of an empty stats payload`); + } + } + + private async _onPayload(payload: object[]) { + return await sendBulkPayload(this._cluster, this._interval, payload); + } + + private getConvertedKibanaStatus() { + if (this.kibanaStatus === ServiceStatusLevels.available) { + return 'green'; + } + if (this.kibanaStatus === ServiceStatusLevels.critical) { + return 'red'; + } + if (this.kibanaStatus === ServiceStatusLevels.degraded) { + return 'yellow'; + } + return 'unknown'; + } + + public getKibanaStats(type?: string) { + const stats = { + ...this.kibanaStats, + status: this.getConvertedKibanaStatus(), + }; + + if (type === KIBANA_STATS_TYPE_MONITORING) { + // Do not report the keys `port` and `locale` + const { port, locale, ...rest } = stats; + return rest; + } + + return stats; + } + + /* + * Bulk stats are transformed into a bulk upload format + * Non-legacy transformation is done in CollectorSet.toApiStats + * + * Example: + * Before: + * [ + * { + * "type": "kibana_stats", + * "result": { + * "process": { ... }, + * "requests": { ... }, + * ... + * } + * }, + * ] + * + * After: + * [ + * { + * "index": { + * "_type": "kibana_stats" + * } + * }, + * { + * "kibana": { + * "host": "localhost", + * "uuid": "d619c5d1-4315-4f35-b69d-a3ac805489fb", + * "version": "7.0.0-alpha1", + * ... + * }, + * "process": { ... }, + * "requests": { ... }, + * ... + * } + * ] + */ + private toBulkUploadFormat(rawData: Array<{ type: string; result: any }>) { + // convert the raw data into a flat array, with each payload prefixed + // with an 'index' instruction, for bulk upload + return rawData.reduce((accum, { type, result }) => { + return [ + ...accum, + { index: { _type: type } }, + { + kibana: this.getKibanaStats(type), + ...result, + }, + ]; + }, [] as object[]); + } +} diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_settings_collector.ts b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_settings_collector.ts index 2b81f1078ad0a..858c50790fc2e 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_settings_collector.ts +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_settings_collector.ts @@ -4,6 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ +import { Logger } from 'src/core/server'; import { Collector, UsageCollectionSetup } from 'src/plugins/usage_collection/server'; import { KIBANA_SETTINGS_TYPE } from '../../../common/constants'; @@ -51,6 +52,37 @@ export interface KibanaSettingsCollectorExtraOptions { export type KibanaSettingsCollector = Collector & KibanaSettingsCollectorExtraOptions; +export function getEmailValueStructure(email: string | null) { + return { + xpack: { + default_admin_email: email, + }, + }; +} + +export async function getKibanaSettings(logger: Logger, config: MonitoringConfig) { + let kibanaSettingsData; + const defaultAdminEmail = await checkForEmailValue(config); + + // skip everything if defaultAdminEmail === undefined + if (defaultAdminEmail || (defaultAdminEmail === null && shouldUseNull)) { + kibanaSettingsData = getEmailValueStructure(defaultAdminEmail); + logger.debug( + `[${defaultAdminEmail}] default admin email setting found, sending [${KIBANA_SETTINGS_TYPE}] monitoring document.` + ); + } else { + logger.debug( + `not sending [${KIBANA_SETTINGS_TYPE}] monitoring document because [${defaultAdminEmail}] is null or invalid.` + ); + } + + // remember the current email so that we can mark it as successful if the bulk does not error out + shouldUseNull = !!defaultAdminEmail; + + // returns undefined if there was no result + return kibanaSettingsData; +} + export function getSettingsCollector( usageCollection: UsageCollectionSetup, config: MonitoringConfig @@ -69,33 +101,10 @@ export function getSettingsCollector( }, }, async fetch() { - let kibanaSettingsData; - const defaultAdminEmail = await checkForEmailValue(config); - - // skip everything if defaultAdminEmail === undefined - if (defaultAdminEmail || (defaultAdminEmail === null && shouldUseNull)) { - kibanaSettingsData = this.getEmailValueStructure(defaultAdminEmail); - this.log.debug( - `[${defaultAdminEmail}] default admin email setting found, sending [${KIBANA_SETTINGS_TYPE}] monitoring document.` - ); - } else { - this.log.debug( - `not sending [${KIBANA_SETTINGS_TYPE}] monitoring document because [${defaultAdminEmail}] is null or invalid.` - ); - } - - // remember the current email so that we can mark it as successful if the bulk does not error out - shouldUseNull = !!defaultAdminEmail; - - // returns undefined if there was no result - return kibanaSettingsData; + return getKibanaSettings(this.log, config); }, getEmailValueStructure(email: string | null) { - return { - xpack: { - default_admin_email: email, - }, - }; + return getEmailValueStructure(email); }, }); } diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/index.ts b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/index.ts index 25e243656898c..5fb1583a5c0db 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/index.ts +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/index.ts @@ -10,7 +10,7 @@ import { getSettingsCollector } from './get_settings_collector'; import { getMonitoringUsageCollector } from './get_usage_collector'; import { MonitoringConfig } from '../../config'; -export { KibanaSettingsCollector } from './get_settings_collector'; +export { KibanaSettingsCollector, getKibanaSettings } from './get_settings_collector'; export function registerCollectors( usageCollection: UsageCollectionSetup, diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/index.js b/x-pack/plugins/monitoring/server/kibana_monitoring/index.ts similarity index 100% rename from x-pack/plugins/monitoring/server/kibana_monitoring/index.js rename to x-pack/plugins/monitoring/server/kibana_monitoring/index.ts diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/init.js b/x-pack/plugins/monitoring/server/kibana_monitoring/init.ts similarity index 76% rename from x-pack/plugins/monitoring/server/kibana_monitoring/init.js rename to x-pack/plugins/monitoring/server/kibana_monitoring/init.ts index 79aafb8f361f3..c8c5fabb65db0 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/init.js +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/init.ts @@ -4,7 +4,9 @@ * you may not use this file except in compliance with the Elastic License. */ -import { BulkUploader } from './bulk_uploader'; +import { BulkUploader, BulkUploaderOptions } from './bulk_uploader'; + +export type InitBulkUploaderOptions = Omit; /** * Initialize different types of Kibana Monitoring @@ -15,7 +17,7 @@ import { BulkUploader } from './bulk_uploader'; * @param {Object} kbnServer manager of Kibana services - see `src/legacy/server/kbn_server` in Kibana core * @param {Object} server HapiJS server instance */ -export function initBulkUploader({ config, ...params }) { +export function initBulkUploader({ config, ...params }: InitBulkUploaderOptions) { const interval = config.kibana.collection.interval; return new BulkUploader({ interval, diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/lib/index.js b/x-pack/plugins/monitoring/server/kibana_monitoring/lib/index.ts similarity index 96% rename from x-pack/plugins/monitoring/server/kibana_monitoring/lib/index.js rename to x-pack/plugins/monitoring/server/kibana_monitoring/lib/index.ts index c5fdd29d4306d..a6c5583329861 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/lib/index.js +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/lib/index.ts @@ -5,4 +5,5 @@ */ export { sendBulkPayload } from './send_bulk_payload'; +// @ts-ignore export { monitoringBulk } from './monitoring_bulk'; diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.js b/x-pack/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.ts similarity index 78% rename from x-pack/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.js rename to x-pack/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.ts index 66799e4aa651a..78d689fe9f182 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.js +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.ts @@ -3,12 +3,17 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ +import { ILegacyClusterClient } from 'src/core/server'; import { MONITORING_SYSTEM_API_VERSION, KIBANA_SYSTEM_ID } from '../../../common/constants'; /* * Send the Kibana usage data to the ES Monitoring Bulk endpoint */ -export async function sendBulkPayload(cluster, interval, payload) { +export async function sendBulkPayload( + cluster: ILegacyClusterClient, + interval: number, + payload: object[] +) { return cluster.callAsInternalUser('monitoring.bulk', { system_id: KIBANA_SYSTEM_ID, system_api_version: MONITORING_SYSTEM_API_VERSION, diff --git a/x-pack/plugins/monitoring/server/plugin.test.ts b/x-pack/plugins/monitoring/server/plugin.test.ts index 3fc494d6c3706..b376fc2eec60b 100644 --- a/x-pack/plugins/monitoring/server/plugin.test.ts +++ b/x-pack/plugins/monitoring/server/plugin.test.ts @@ -3,6 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ +import { coreMock } from 'src/core/server/mocks'; import { Plugin } from './plugin'; import { combineLatest } from 'rxjs'; import { AlertsFactory } from './alerts'; @@ -53,31 +54,9 @@ describe('Monitoring plugin', () => { }, }; - const coreSetup = { - http: { - createRouter: jest.fn(), - getServerInfo: jest.fn().mockImplementation(() => ({ - port: 5601, - })), - basePath: { - serverBasePath: '', - }, - }, - elasticsearch: { - legacy: { - client: {}, - createClient: jest.fn(), - }, - }, - status: { - overall$: { - subscribe: jest.fn(), - }, - }, - savedObjects: { - registerType: jest.fn(), - }, - }; + const coreSetup = coreMock.createSetup(); + coreSetup.http.getServerInfo.mockReturnValue({ port: 5601 } as any); + coreSetup.status.overall$.subscribe = jest.fn(); const setupPlugins = { usageCollection: { @@ -124,7 +103,7 @@ describe('Monitoring plugin', () => { it('always create the bulk uploader', async () => { const plugin = new Plugin(initializerContext as any); - await plugin.setup(coreSetup as any, setupPlugins as any); + await plugin.setup(coreSetup, setupPlugins as any); expect(coreSetup.status.overall$.subscribe).toHaveBeenCalled(); }); diff --git a/x-pack/plugins/monitoring/server/plugin.ts b/x-pack/plugins/monitoring/server/plugin.ts index 41b501d88af99..38495ca63deae 100644 --- a/x-pack/plugins/monitoring/server/plugin.ts +++ b/x-pack/plugins/monitoring/server/plugin.ts @@ -32,11 +32,8 @@ import { SAVED_OBJECT_TELEMETRY, } from '../common/constants'; import { MonitoringConfig, createConfig, configSchema } from './config'; -// @ts-ignore import { requireUIRoutes } from './routes'; -// @ts-ignore import { initBulkUploader } from './kibana_monitoring'; -// @ts-ignore import { initInfraSource } from './lib/logs/init_infra_source'; import { mbSafeQuery } from './lib/mb_safe_query'; import { instantiateClient } from './es_client/instantiate_client'; @@ -75,7 +72,7 @@ export class Plugin { private licenseService = {} as MonitoringLicenseService; private monitoringCore = {} as MonitoringCore; private legacyShimDependencies = {} as LegacyShimDependencies; - private bulkUploader: IBulkUploader = {} as IBulkUploader; + private bulkUploader: IBulkUploader | undefined; private telemetryElasticsearchClient: IClusterClient | undefined; private telemetrySavedObjectsService: SavedObjectsServiceStart | undefined; @@ -182,6 +179,7 @@ export class Plugin { elasticsearch: core.elasticsearch, config, log: kibanaMonitoringLog, + opsMetrics$: core.metrics.getOpsMetrics$(), statusGetter$: core.status.overall$, kibanaStats: { uuid: this.initializerContext.env.instanceUuid, @@ -208,7 +206,7 @@ export class Plugin { const monitoringBulkEnabled = mainMonitoring && mainMonitoring.isAvailable && mainMonitoring.isEnabled; if (monitoringBulkEnabled) { - bulkUploader.start(plugins.usageCollection); + bulkUploader.start(); } else { bulkUploader.handleNotEnabled(); } @@ -249,7 +247,7 @@ export class Plugin { return { // OSS stats api needs to call this in order to centralize how // we fetch kibana specific stats - getKibanaStats: () => this.bulkUploader.getKibanaStats(), + getKibanaStats: () => bulkUploader.getKibanaStats(), }; } @@ -271,6 +269,7 @@ export class Plugin { if (this.licenseService) { this.licenseService.stop(); } + this.bulkUploader?.stop(); } registerPluginInUI(plugins: PluginsSetup) { diff --git a/x-pack/plugins/monitoring/server/types.ts b/x-pack/plugins/monitoring/server/types.ts index 543a12fb41356..88f70de6d5a73 100644 --- a/x-pack/plugins/monitoring/server/types.ts +++ b/x-pack/plugins/monitoring/server/types.ts @@ -74,6 +74,7 @@ export interface LegacyShimDependencies { export interface IBulkUploader { getKibanaStats: () => any; + stop: () => void; } export interface LegacyRequest {