diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.ts b/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.ts index f9e5cfdebdf26..ea5b36710abe0 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.ts +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.ts @@ -5,29 +5,28 @@ * 2.0. */ -import { Observable, Subscription } from 'rxjs'; +import type { Observable, Subscription } from 'rxjs'; import { take } from 'rxjs/operators'; import moment from 'moment'; -import { - ElasticsearchServiceSetup, - ILegacyCustomClusterClient, +import type { + ElasticsearchClient, Logger, OpsMetrics, ServiceStatus, ServiceStatusLevel, - ServiceStatusLevels, -} from '../../../../../src/core/server'; +} from 'src/core/server'; +import { ServiceStatusLevels } from '../../../../../src/core/server'; import { KIBANA_STATS_TYPE_MONITORING, KIBANA_SETTINGS_TYPE } from '../../common/constants'; -import { sendBulkPayload, monitoringBulk } from './lib'; +import { sendBulkPayload } from './lib'; import { getKibanaSettings } from './collectors'; -import { MonitoringConfig } from '../config'; +import type { MonitoringConfig } from '../config'; +import type { IBulkUploader } from '../types'; export interface BulkUploaderOptions { log: Logger; config: MonitoringConfig; interval: number; - elasticsearch: ElasticsearchServiceSetup; statusGetter$: Observable; opsMetrics$: Observable; kibanaStats: KibanaStats; @@ -61,11 +60,11 @@ export interface KibanaStats { * @param {Object} server HapiJS server instance * @param {Object} xpackInfo server.plugins.xpack_main.info object */ -export class BulkUploader { +export class BulkUploader implements IBulkUploader { private readonly _log: Logger; - private readonly _cluster: ILegacyCustomClusterClient; private readonly kibanaStats: KibanaStats; - private readonly kibanaStatusGetter$: Subscription; + private readonly kibanaStatusGetter$: Observable; + private kibanaStatusSubscription?: Subscription; private readonly opsMetrics$: Observable; private kibanaStatus: ServiceStatusLevel | null; private _timer: NodeJS.Timer | null; @@ -75,7 +74,6 @@ export class BulkUploader { log, config, interval, - elasticsearch, statusGetter$, opsMetrics$, kibanaStats, @@ -91,16 +89,10 @@ export class BulkUploader { this._interval = interval; this._log = log; - this._cluster = elasticsearch.legacy.createClient('admin', { - plugins: [monitoringBulk], - }); - this.kibanaStats = kibanaStats; this.kibanaStatus = null; - this.kibanaStatusGetter$ = statusGetter$.subscribe((nextStatus) => { - this.kibanaStatus = nextStatus.level; - }); + this.kibanaStatusGetter$ = statusGetter$; } /* @@ -108,17 +100,21 @@ export class BulkUploader { * @param {usageCollection} usageCollection object to use for initial the fetch/upload and fetch/uploading on interval * @return undefined */ - public start() { + public start(esClient: ElasticsearchClient) { this._log.info('Starting monitoring stats collection'); + this.kibanaStatusSubscription = this.kibanaStatusGetter$.subscribe((nextStatus) => { + this.kibanaStatus = nextStatus.level; + }); + if (this._timer) { clearInterval(this._timer); } else { - this._fetchAndUpload(); // initial fetch + this._fetchAndUpload(esClient); // initial fetch } this._timer = setInterval(() => { - this._fetchAndUpload(); + this._fetchAndUpload(esClient); }, this._interval); } @@ -131,8 +127,7 @@ export class BulkUploader { if (this._timer) clearInterval(this._timer); this._timer = null; - this.kibanaStatusGetter$.unsubscribe(); - this._cluster.close(); + this.kibanaStatusSubscription?.unsubscribe(); const prefix = logPrefix ? logPrefix + ':' : ''; this._log.info(prefix + 'Monitoring stats collection is stopped'); @@ -168,7 +163,7 @@ export class BulkUploader { }; } - private async _fetchAndUpload() { + private async _fetchAndUpload(esClient: ElasticsearchClient) { const data = await Promise.all([ { type: KIBANA_STATS_TYPE_MONITORING, result: await this.getOpsMetrics() }, { type: KIBANA_SETTINGS_TYPE, result: await getKibanaSettings(this._log, this.config) }, @@ -178,7 +173,7 @@ export class BulkUploader { if (payload && payload.length > 0) { try { this._log.debug(`Uploading bulk stats payload to the local cluster`); - await this._onPayload(payload); + await this._onPayload(esClient, payload); this._log.debug(`Uploaded bulk stats payload to the local cluster`); } catch (err) { this._log.warn(err.stack); @@ -189,8 +184,8 @@ export class BulkUploader { } } - private async _onPayload(payload: object[]) { - return await sendBulkPayload(this._cluster, this._interval, payload); + private async _onPayload(esClient: ElasticsearchClient, payload: object[]) { + return await sendBulkPayload(esClient, this._interval, payload); } private getConvertedKibanaStatus() { diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.ts b/x-pack/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.ts index 3b26049505c4e..175f9b69ef523 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.ts +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/lib/send_bulk_payload.ts @@ -5,21 +5,22 @@ * 2.0. */ -import { ILegacyClusterClient } from 'src/core/server'; +import type { ElasticsearchClient } from 'src/core/server'; import { MONITORING_SYSTEM_API_VERSION, KIBANA_SYSTEM_ID } from '../../../common/constants'; /* * Send the Kibana usage data to the ES Monitoring Bulk endpoint */ export async function sendBulkPayload( - cluster: ILegacyClusterClient, + esClient: ElasticsearchClient, interval: number, payload: object[] ) { - return cluster.callAsInternalUser('monitoring.bulk', { + const { body } = await esClient.monitoring.bulk({ system_id: KIBANA_SYSTEM_ID, system_api_version: MONITORING_SYSTEM_API_VERSION, interval: interval + 'ms', body: payload, }); + return body; } diff --git a/x-pack/plugins/monitoring/server/plugin.test.ts b/x-pack/plugins/monitoring/server/plugin.test.ts index 5ce62e9587e8b..1bdef23dd0c36 100644 --- a/x-pack/plugins/monitoring/server/plugin.test.ts +++ b/x-pack/plugins/monitoring/server/plugin.test.ts @@ -32,7 +32,6 @@ jest.mock('./config', () => ({ describe('Monitoring plugin', () => { const coreSetup = coreMock.createSetup(); coreSetup.http.getServerInfo.mockReturnValue({ port: 5601 } as any); - coreSetup.status.overall$.subscribe = jest.fn(); const setupPlugins = { usageCollection: { @@ -60,13 +59,13 @@ describe('Monitoring plugin', () => { afterEach(() => { (setupPlugins.alerting.registerType as jest.Mock).mockReset(); - (coreSetup.status.overall$.subscribe as jest.Mock).mockReset(); }); it('always create the bulk uploader', async () => { const plugin = new MonitoringPlugin(initializerContext as any); await plugin.setup(coreSetup, setupPlugins as any); - expect(coreSetup.status.overall$.subscribe).toHaveBeenCalled(); + // eslint-disable-next-line dot-notation + expect(plugin['bulkUploader']).not.toBeUndefined(); }); it('should register all alerts', async () => { diff --git a/x-pack/plugins/monitoring/server/plugin.ts b/x-pack/plugins/monitoring/server/plugin.ts index 82e854a607def..b10cb47a07745 100644 --- a/x-pack/plugins/monitoring/server/plugin.ts +++ b/x-pack/plugins/monitoring/server/plugin.ts @@ -142,7 +142,6 @@ export class MonitoringPlugin // Always create the bulk uploader const kibanaMonitoringLog = this.getLogger(KIBANA_MONITORING_LOGGING_TAG); const bulkUploader = (this.bulkUploader = initBulkUploader({ - elasticsearch: core.elasticsearch, config, log: kibanaMonitoringLog, opsMetrics$: core.metrics.getOpsMetrics$(), @@ -214,7 +213,7 @@ export class MonitoringPlugin const monitoringBulkEnabled = mainMonitoring && mainMonitoring.isAvailable && mainMonitoring.isEnabled; if (monitoringBulkEnabled) { - this.bulkUploader?.start(); + this.bulkUploader?.start(core.elasticsearch.client.asInternalUser); } else { this.bulkUploader?.handleNotEnabled(); } diff --git a/x-pack/plugins/monitoring/server/types.ts b/x-pack/plugins/monitoring/server/types.ts index c0bfe32079cd2..de3d044ccabcb 100644 --- a/x-pack/plugins/monitoring/server/types.ts +++ b/x-pack/plugins/monitoring/server/types.ts @@ -12,6 +12,7 @@ import type { Logger, ILegacyCustomClusterClient, RequestHandlerContext, + ElasticsearchClient, } from 'kibana/server'; import { UsageCollectionSetup } from 'src/plugins/usage_collection/server'; import { LicenseFeature, ILicense } from '../../licensing/server'; @@ -92,7 +93,7 @@ export interface LegacyShimDependencies { export interface IBulkUploader { getKibanaStats: () => any; stop: () => void; - start: () => void; + start: (esClient: ElasticsearchClient) => void; handleNotEnabled: () => void; }