Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Monitoring] Bulk Uploader uses new ES client #94908

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 24 additions & 29 deletions x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,28 @@
* 2.0.
*/

import { Observable, Subscription } from 'rxjs';
import type { Observable, Subscription } from 'rxjs';
import { take } from 'rxjs/operators';
import moment from 'moment';
import {
ElasticsearchServiceSetup,
ILegacyCustomClusterClient,
import type {
ElasticsearchClient,
Logger,
OpsMetrics,
ServiceStatus,
ServiceStatusLevel,
ServiceStatusLevels,
} from '../../../../../src/core/server';
} from 'src/core/server';
import { ServiceStatusLevels } from '../../../../../src/core/server';
import { KIBANA_STATS_TYPE_MONITORING, KIBANA_SETTINGS_TYPE } from '../../common/constants';

import { sendBulkPayload, monitoringBulk } from './lib';
import { sendBulkPayload } from './lib';
import { getKibanaSettings } from './collectors';
import { MonitoringConfig } from '../config';
import type { MonitoringConfig } from '../config';
import type { IBulkUploader } from '../types';

export interface BulkUploaderOptions {
log: Logger;
config: MonitoringConfig;
interval: number;
elasticsearch: ElasticsearchServiceSetup;
statusGetter$: Observable<ServiceStatus>;
opsMetrics$: Observable<OpsMetrics>;
kibanaStats: KibanaStats;
Expand Down Expand Up @@ -61,11 +60,11 @@ export interface KibanaStats {
* @param {Object} server HapiJS server instance
* @param {Object} xpackInfo server.plugins.xpack_main.info object
*/
export class BulkUploader {
export class BulkUploader implements IBulkUploader {
private readonly _log: Logger;
private readonly _cluster: ILegacyCustomClusterClient;
private readonly kibanaStats: KibanaStats;
private readonly kibanaStatusGetter$: Subscription;
private readonly kibanaStatusGetter$: Observable<ServiceStatus>;
private kibanaStatusSubscription?: Subscription;
private readonly opsMetrics$: Observable<OpsMetrics>;
private kibanaStatus: ServiceStatusLevel | null;
private _timer: NodeJS.Timer | null;
Expand All @@ -75,7 +74,6 @@ export class BulkUploader {
log,
config,
interval,
elasticsearch,
statusGetter$,
opsMetrics$,
kibanaStats,
Expand All @@ -91,34 +89,32 @@ export class BulkUploader {
this._interval = interval;
this._log = log;

this._cluster = elasticsearch.legacy.createClient('admin', {
plugins: [monitoringBulk],
});

this.kibanaStats = kibanaStats;

this.kibanaStatus = null;
this.kibanaStatusGetter$ = statusGetter$.subscribe((nextStatus) => {
this.kibanaStatus = nextStatus.level;
});
this.kibanaStatusGetter$ = statusGetter$;
}

/*
* Start the interval timer
* @param {usageCollection} usageCollection object to use for initial the fetch/upload and fetch/uploading on interval
* @return undefined
*/
public start() {
public start(esClient: ElasticsearchClient) {
this._log.info('Starting monitoring stats collection');

this.kibanaStatusSubscription = this.kibanaStatusGetter$.subscribe((nextStatus) => {
this.kibanaStatus = nextStatus.level;
});

if (this._timer) {
clearInterval(this._timer);
} else {
this._fetchAndUpload(); // initial fetch
this._fetchAndUpload(esClient); // initial fetch
}

this._timer = setInterval(() => {
this._fetchAndUpload();
this._fetchAndUpload(esClient);
}, this._interval);
}

Expand All @@ -131,8 +127,7 @@ export class BulkUploader {
if (this._timer) clearInterval(this._timer);
this._timer = null;

this.kibanaStatusGetter$.unsubscribe();
this._cluster.close();
this.kibanaStatusSubscription?.unsubscribe();

const prefix = logPrefix ? logPrefix + ':' : '';
this._log.info(prefix + 'Monitoring stats collection is stopped');
Expand Down Expand Up @@ -168,7 +163,7 @@ export class BulkUploader {
};
}

private async _fetchAndUpload() {
private async _fetchAndUpload(esClient: ElasticsearchClient) {
const data = await Promise.all([
{ type: KIBANA_STATS_TYPE_MONITORING, result: await this.getOpsMetrics() },
{ type: KIBANA_SETTINGS_TYPE, result: await getKibanaSettings(this._log, this.config) },
Expand All @@ -178,7 +173,7 @@ export class BulkUploader {
if (payload && payload.length > 0) {
try {
this._log.debug(`Uploading bulk stats payload to the local cluster`);
await this._onPayload(payload);
await this._onPayload(esClient, payload);
this._log.debug(`Uploaded bulk stats payload to the local cluster`);
} catch (err) {
this._log.warn(err.stack);
Expand All @@ -189,8 +184,8 @@ export class BulkUploader {
}
}

private async _onPayload(payload: object[]) {
return await sendBulkPayload(this._cluster, this._interval, payload);
private async _onPayload(esClient: ElasticsearchClient, payload: object[]) {
return await sendBulkPayload(esClient, this._interval, payload);
}

private getConvertedKibanaStatus() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,22 @@
* 2.0.
*/

import { ILegacyClusterClient } from 'src/core/server';
import type { ElasticsearchClient } from 'src/core/server';
import { MONITORING_SYSTEM_API_VERSION, KIBANA_SYSTEM_ID } from '../../../common/constants';

/*
* Send the Kibana usage data to the ES Monitoring Bulk endpoint
*/
export async function sendBulkPayload(
cluster: ILegacyClusterClient,
esClient: ElasticsearchClient,
interval: number,
payload: object[]
) {
return cluster.callAsInternalUser('monitoring.bulk', {
const { body } = await esClient.monitoring.bulk({
system_id: KIBANA_SYSTEM_ID,
system_api_version: MONITORING_SYSTEM_API_VERSION,
interval: interval + 'ms',
body: payload,
});
return body;
}
5 changes: 2 additions & 3 deletions x-pack/plugins/monitoring/server/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ jest.mock('./config', () => ({
describe('Monitoring plugin', () => {
const coreSetup = coreMock.createSetup();
coreSetup.http.getServerInfo.mockReturnValue({ port: 5601 } as any);
coreSetup.status.overall$.subscribe = jest.fn();

const setupPlugins = {
usageCollection: {
Expand Down Expand Up @@ -60,13 +59,13 @@ describe('Monitoring plugin', () => {

afterEach(() => {
(setupPlugins.alerting.registerType as jest.Mock).mockReset();
(coreSetup.status.overall$.subscribe as jest.Mock).mockReset();
});

it('always create the bulk uploader', async () => {
const plugin = new MonitoringPlugin(initializerContext as any);
await plugin.setup(coreSetup, setupPlugins as any);
expect(coreSetup.status.overall$.subscribe).toHaveBeenCalled();
// eslint-disable-next-line dot-notation
expect(plugin['bulkUploader']).not.toBeUndefined();
});

it('should register all alerts', async () => {
Expand Down
3 changes: 1 addition & 2 deletions x-pack/plugins/monitoring/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ export class MonitoringPlugin
// Always create the bulk uploader
const kibanaMonitoringLog = this.getLogger(KIBANA_MONITORING_LOGGING_TAG);
const bulkUploader = (this.bulkUploader = initBulkUploader({
elasticsearch: core.elasticsearch,
config,
log: kibanaMonitoringLog,
opsMetrics$: core.metrics.getOpsMetrics$(),
Expand Down Expand Up @@ -210,7 +209,7 @@ export class MonitoringPlugin
const monitoringBulkEnabled =
mainMonitoring && mainMonitoring.isAvailable && mainMonitoring.isEnabled;
if (monitoringBulkEnabled) {
this.bulkUploader?.start();
this.bulkUploader?.start(core.elasticsearch.client.asInternalUser);
} else {
this.bulkUploader?.handleNotEnabled();
}
Expand Down
3 changes: 2 additions & 1 deletion x-pack/plugins/monitoring/server/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type {
Logger,
ILegacyCustomClusterClient,
RequestHandlerContext,
ElasticsearchClient,
} from 'kibana/server';
import { UsageCollectionSetup } from 'src/plugins/usage_collection/server';
import { LicenseFeature, ILicense } from '../../licensing/server';
Expand Down Expand Up @@ -92,7 +93,7 @@ export interface LegacyShimDependencies {
export interface IBulkUploader {
getKibanaStats: () => any;
stop: () => void;
start: () => void;
start: (esClient: ElasticsearchClient) => void;
handleNotEnabled: () => void;
}

Expand Down