Skip to content

Commit

Permalink
[Monitoring] Bulk Uploader uses new ES client (elastic#94908)
Browse files Browse the repository at this point in the history
Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
# Conflicts:
#	x-pack/plugins/monitoring/server/plugin.test.ts
#	x-pack/plugins/monitoring/server/plugin.ts
#	x-pack/plugins/monitoring/server/types.ts
  • Loading branch information
afharo committed Mar 23, 2021
1 parent 940754c commit 748636b
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 66 deletions.
53 changes: 24 additions & 29 deletions x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,28 @@
* 2.0.
*/

import { Observable, Subscription } from 'rxjs';
import type { Observable, Subscription } from 'rxjs';
import { take } from 'rxjs/operators';
import moment from 'moment';
import {
ElasticsearchServiceSetup,
ILegacyCustomClusterClient,
import type {
ElasticsearchClient,
Logger,
OpsMetrics,
ServiceStatus,
ServiceStatusLevel,
ServiceStatusLevels,
} from '../../../../../src/core/server';
} from 'src/core/server';
import { ServiceStatusLevels } from '../../../../../src/core/server';
import { KIBANA_STATS_TYPE_MONITORING, KIBANA_SETTINGS_TYPE } from '../../common/constants';

import { sendBulkPayload, monitoringBulk } from './lib';
import { sendBulkPayload } from './lib';
import { getKibanaSettings } from './collectors';
import { MonitoringConfig } from '../config';
import type { MonitoringConfig } from '../config';
import type { IBulkUploader } from '../types';

export interface BulkUploaderOptions {
log: Logger;
config: MonitoringConfig;
interval: number;
elasticsearch: ElasticsearchServiceSetup;
statusGetter$: Observable<ServiceStatus>;
opsMetrics$: Observable<OpsMetrics>;
kibanaStats: KibanaStats;
Expand Down Expand Up @@ -61,11 +60,11 @@ export interface KibanaStats {
* @param {Object} server HapiJS server instance
* @param {Object} xpackInfo server.plugins.xpack_main.info object
*/
export class BulkUploader {
export class BulkUploader implements IBulkUploader {
private readonly _log: Logger;
private readonly _cluster: ILegacyCustomClusterClient;
private readonly kibanaStats: KibanaStats;
private readonly kibanaStatusGetter$: Subscription;
private readonly kibanaStatusGetter$: Observable<ServiceStatus>;
private kibanaStatusSubscription?: Subscription;
private readonly opsMetrics$: Observable<OpsMetrics>;
private kibanaStatus: ServiceStatusLevel | null;
private _timer: NodeJS.Timer | null;
Expand All @@ -75,7 +74,6 @@ export class BulkUploader {
log,
config,
interval,
elasticsearch,
statusGetter$,
opsMetrics$,
kibanaStats,
Expand All @@ -91,34 +89,32 @@ export class BulkUploader {
this._interval = interval;
this._log = log;

this._cluster = elasticsearch.legacy.createClient('admin', {
plugins: [monitoringBulk],
});

this.kibanaStats = kibanaStats;

this.kibanaStatus = null;
this.kibanaStatusGetter$ = statusGetter$.subscribe((nextStatus) => {
this.kibanaStatus = nextStatus.level;
});
this.kibanaStatusGetter$ = statusGetter$;
}

/*
* Start the interval timer
* @param {usageCollection} usageCollection object to use for initial the fetch/upload and fetch/uploading on interval
* @return undefined
*/
public start() {
public start(esClient: ElasticsearchClient) {
this._log.info('Starting monitoring stats collection');

this.kibanaStatusSubscription = this.kibanaStatusGetter$.subscribe((nextStatus) => {
this.kibanaStatus = nextStatus.level;
});

if (this._timer) {
clearInterval(this._timer);
} else {
this._fetchAndUpload(); // initial fetch
this._fetchAndUpload(esClient); // initial fetch
}

this._timer = setInterval(() => {
this._fetchAndUpload();
this._fetchAndUpload(esClient);
}, this._interval);
}

Expand All @@ -131,8 +127,7 @@ export class BulkUploader {
if (this._timer) clearInterval(this._timer);
this._timer = null;

this.kibanaStatusGetter$.unsubscribe();
this._cluster.close();
this.kibanaStatusSubscription?.unsubscribe();

const prefix = logPrefix ? logPrefix + ':' : '';
this._log.info(prefix + 'Monitoring stats collection is stopped');
Expand Down Expand Up @@ -168,7 +163,7 @@ export class BulkUploader {
};
}

private async _fetchAndUpload() {
private async _fetchAndUpload(esClient: ElasticsearchClient) {
const data = await Promise.all([
{ type: KIBANA_STATS_TYPE_MONITORING, result: await this.getOpsMetrics() },
{ type: KIBANA_SETTINGS_TYPE, result: await getKibanaSettings(this._log, this.config) },
Expand All @@ -178,7 +173,7 @@ export class BulkUploader {
if (payload && payload.length > 0) {
try {
this._log.debug(`Uploading bulk stats payload to the local cluster`);
await this._onPayload(payload);
await this._onPayload(esClient, payload);
this._log.debug(`Uploaded bulk stats payload to the local cluster`);
} catch (err) {
this._log.warn(err.stack);
Expand All @@ -189,8 +184,8 @@ export class BulkUploader {
}
}

private async _onPayload(payload: object[]) {
return await sendBulkPayload(this._cluster, this._interval, payload);
private async _onPayload(esClient: ElasticsearchClient, payload: object[]) {
return await sendBulkPayload(esClient, this._interval, payload);
}

private getConvertedKibanaStatus() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,22 @@
* 2.0.
*/

import { ILegacyClusterClient } from 'src/core/server';
import type { ElasticsearchClient } from 'src/core/server';
import { MONITORING_SYSTEM_API_VERSION, KIBANA_SYSTEM_ID } from '../../../common/constants';

/*
* Send the Kibana usage data to the ES Monitoring Bulk endpoint
*/
export async function sendBulkPayload(
cluster: ILegacyClusterClient,
esClient: ElasticsearchClient,
interval: number,
payload: object[]
) {
return cluster.callAsInternalUser('monitoring.bulk', {
const { body } = await esClient.monitoring.bulk({
system_id: KIBANA_SYSTEM_ID,
system_api_version: MONITORING_SYSTEM_API_VERSION,
interval: interval + 'ms',
body: payload,
});
return body;
}
5 changes: 2 additions & 3 deletions x-pack/plugins/monitoring/server/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ jest.mock('./config', () => ({
describe('Monitoring plugin', () => {
const coreSetup = coreMock.createSetup();
coreSetup.http.getServerInfo.mockReturnValue({ port: 5601 } as any);
coreSetup.status.overall$.subscribe = jest.fn();

const setupPlugins = {
usageCollection: {
Expand Down Expand Up @@ -60,13 +59,13 @@ describe('Monitoring plugin', () => {

afterEach(() => {
(setupPlugins.alerts.registerType as jest.Mock).mockReset();
(coreSetup.status.overall$.subscribe as jest.Mock).mockReset();
});

it('always create the bulk uploader', async () => {
const plugin = new MonitoringPlugin(initializerContext as any);
await plugin.setup(coreSetup, setupPlugins as any);
expect(coreSetup.status.overall$.subscribe).toHaveBeenCalled();
// eslint-disable-next-line dot-notation
expect(plugin['bulkUploader']).not.toBeUndefined();
});

it('should register all alerts', async () => {
Expand Down
62 changes: 32 additions & 30 deletions x-pack/plugins/monitoring/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ export class MonitoringPlugin
// Always create the bulk uploader
const kibanaMonitoringLog = this.getLogger(KIBANA_MONITORING_LOGGING_TAG);
const bulkUploader = (this.bulkUploader = initBulkUploader({
elasticsearch: core.elasticsearch,
config,
log: kibanaMonitoringLog,
opsMetrics$: core.metrics.getOpsMetrics$(),
Expand All @@ -169,34 +168,6 @@ export class MonitoringPlugin
},
}));

// If collection is enabled, start it
const kibanaCollectionEnabled = config.kibana.collection.enabled;
if (kibanaCollectionEnabled) {
// Do not use `this.licenseService` as that looks at the monitoring cluster
// whereas we want to check the production cluster here
if (plugins.licensing) {
plugins.licensing.license$.subscribe((license: any) => {
// use updated xpack license info to start/stop bulk upload
const mainMonitoring = license.getFeature('monitoring');
const monitoringBulkEnabled =
mainMonitoring && mainMonitoring.isAvailable && mainMonitoring.isEnabled;
if (monitoringBulkEnabled) {
bulkUploader.start();
} else {
bulkUploader.handleNotEnabled();
}
});
} else {
kibanaMonitoringLog.warn(
'Internal collection for Kibana monitoring is disabled due to missing license information.'
);
}
} else {
kibanaMonitoringLog.info(
'Internal collection for Kibana monitoring is disabled per configuration.'
);
}

// If the UI is enabled, then we want to register it so it shows up
// and start any other UI-related setup tasks
if (config.ui.enabled) {
Expand Down Expand Up @@ -228,7 +199,38 @@ export class MonitoringPlugin
};
}

start() {}
start(core: CoreStart, { licensing }: PluginsStart) {
const config = createConfig(this.initializerContext.config.get<TypeOf<typeof configSchema>>());

// If collection is enabled, start it
const kibanaMonitoringLog = this.getLogger(KIBANA_MONITORING_LOGGING_TAG);
const kibanaCollectionEnabled = config.kibana.collection.enabled;
if (kibanaCollectionEnabled) {
// Do not use `this.licenseService` as that looks at the monitoring cluster
// whereas we want to check the production cluster here
if (this.bulkUploader && licensing) {
licensing.license$.subscribe((license: any) => {
// use updated xpack license info to start/stop bulk upload
const mainMonitoring = license.getFeature('monitoring');
const monitoringBulkEnabled =
mainMonitoring && mainMonitoring.isAvailable && mainMonitoring.isEnabled;
if (monitoringBulkEnabled) {
this.bulkUploader?.start(core.elasticsearch.client.asInternalUser);
} else {
this.bulkUploader?.handleNotEnabled();
}
});
} else {
kibanaMonitoringLog.warn(
'Internal collection for Kibana monitoring is disabled due to missing license information.'
);
}
} else {
kibanaMonitoringLog.info(
'Internal collection for Kibana monitoring is disabled per configuration.'
);
}
}

stop() {
if (this.cluster) {
Expand Down
6 changes: 5 additions & 1 deletion x-pack/plugins/monitoring/server/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type {
Logger,
ILegacyCustomClusterClient,
RequestHandlerContext,
ElasticsearchClient,
} from 'kibana/server';
import { UsageCollectionSetup } from 'src/plugins/usage_collection/server';
import { LicenseFeature, ILicense } from '../../licensing/server';
Expand All @@ -25,7 +26,7 @@ import {
PluginSetupContract as AlertingPluginSetupContract,
} from '../../alerts/server';
import { InfraPluginSetup } from '../../infra/server';
import { LicensingPluginSetup } from '../../licensing/server';
import { LicensingPluginSetup, LicensingPluginStart } from '../../licensing/server';
import { PluginSetupContract as FeaturesPluginSetupContract } from '../../features/server';
import { EncryptedSavedObjectsPluginSetup } from '../../encrypted_saved_objects/server';
import { CloudSetup } from '../../cloud/server';
Expand Down Expand Up @@ -62,6 +63,7 @@ export interface RequestHandlerContextMonitoringPlugin extends RequestHandlerCon
export interface PluginsStart {
alerts: AlertingPluginStartContract;
actions: ActionsPluginsStartContact;
licensing: LicensingPluginStart;
}

export interface MonitoringCoreConfig {
Expand Down Expand Up @@ -92,6 +94,8 @@ export interface LegacyShimDependencies {
export interface IBulkUploader {
getKibanaStats: () => any;
stop: () => void;
start: (esClient: ElasticsearchClient) => void;
handleNotEnabled: () => void;
}

export interface MonitoringPluginSetup {
Expand Down

0 comments on commit 748636b

Please sign in to comment.