diff --git a/src/core/server/metrics/collectors/mocks.ts b/src/core/server/metrics/collectors/mocks.ts new file mode 100644 index 0000000000000..d1eb15637779a --- /dev/null +++ b/src/core/server/metrics/collectors/mocks.ts @@ -0,0 +1,35 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { MetricsCollector } from './types'; + +const createMock = () => { + const mocked: jest.Mocked> = { + collect: jest.fn(), + reset: jest.fn(), + }; + + mocked.collect.mockResolvedValue({}); + + return mocked; +}; + +export const collectorMock = { + create: createMock, +}; diff --git a/src/core/server/metrics/collectors/os.ts b/src/core/server/metrics/collectors/os.ts index d3d9bb0be86fa..59bef9d8ddd2b 100644 --- a/src/core/server/metrics/collectors/os.ts +++ b/src/core/server/metrics/collectors/os.ts @@ -57,4 +57,6 @@ export class OsMetricsCollector implements MetricsCollector { return metrics; } + + public reset() {} } diff --git a/src/core/server/metrics/collectors/process.ts b/src/core/server/metrics/collectors/process.ts index aa68abaf74e41..a3b59a7cc8b7c 100644 --- a/src/core/server/metrics/collectors/process.ts +++ b/src/core/server/metrics/collectors/process.ts @@ -40,6 +40,8 @@ export class ProcessMetricsCollector implements MetricsCollector => { diff --git a/src/core/server/metrics/collectors/server.ts b/src/core/server/metrics/collectors/server.ts index e46ac2f653df6..84204d0466ff3 100644 --- a/src/core/server/metrics/collectors/server.ts +++ b/src/core/server/metrics/collectors/server.ts @@ -26,12 +26,12 @@ interface ServerResponseTime { } export class ServerMetricsCollector implements MetricsCollector { - private readonly requests: OpsServerMetrics['requests'] = { + private requests: OpsServerMetrics['requests'] = { disconnects: 0, total: 0, statusCodes: {}, }; - private readonly responseTimes: ServerResponseTime = { + private responseTimes: ServerResponseTime = { count: 0, total: 0, max: 0, @@ -77,4 +77,17 @@ export class ServerMetricsCollector implements MetricsCollector { + /** collect the data currently gathered by the collector */ collect(): Promise; + /** reset the internal state of the collector */ + reset(): void; } /** diff --git a/src/core/server/metrics/integration_tests/server_collector.test.ts b/src/core/server/metrics/integration_tests/server_collector.test.ts index 6baf95894b9b4..dd5c256cf1600 100644 --- a/src/core/server/metrics/integration_tests/server_collector.test.ts +++ b/src/core/server/metrics/integration_tests/server_collector.test.ts @@ -200,4 +200,80 @@ describe('ServerMetricsCollector', () => { metrics = await collector.collect(); expect(metrics.concurrent_connections).toEqual(0); }); + + describe('#reset', () => { + it('reset the requests state', async () => { + router.get({ path: '/', validate: false }, async (ctx, req, res) => { + return res.ok({ body: '' }); + }); + await server.start(); + + await sendGet('/'); + await sendGet('/'); + await sendGet('/not-found'); + + let metrics = await collector.collect(); + + expect(metrics.requests).toEqual({ + total: 3, + disconnects: 0, + statusCodes: { + '200': 2, + '404': 1, + }, + }); + + collector.reset(); + metrics = await collector.collect(); + + expect(metrics.requests).toEqual({ + total: 0, + disconnects: 0, + statusCodes: {}, + }); + + await sendGet('/'); + await sendGet('/not-found'); + + metrics = await collector.collect(); + + expect(metrics.requests).toEqual({ + total: 2, + disconnects: 0, + statusCodes: { + '200': 1, + '404': 1, + }, + }); + }); + + it('resets the response times', async () => { + router.get({ path: '/no-delay', validate: false }, async (ctx, req, res) => { + return res.ok({ body: '' }); + }); + router.get({ path: '/500-ms', validate: false }, async (ctx, req, res) => { + await delay(500); + return res.ok({ body: '' }); + }); + + await server.start(); + + await Promise.all([sendGet('/no-delay'), sendGet('/500-ms')]); + let metrics = await collector.collect(); + + expect(metrics.response_times.avg_in_millis).toBeGreaterThanOrEqual(250); + expect(metrics.response_times.max_in_millis).toBeGreaterThanOrEqual(500); + + collector.reset(); + metrics = await collector.collect(); + expect(metrics.response_times.avg_in_millis).toBe(0); + expect(metrics.response_times.max_in_millis).toBeGreaterThanOrEqual(0); + + await Promise.all([sendGet('/500-ms'), sendGet('/500-ms')]); + metrics = await collector.collect(); + + expect(metrics.response_times.avg_in_millis).toBeGreaterThanOrEqual(500); + expect(metrics.response_times.max_in_millis).toBeGreaterThanOrEqual(500); + }); + }); }); diff --git a/src/core/server/metrics/metrics_service.test.mocks.ts b/src/core/server/metrics/metrics_service.test.mocks.ts index 8e91775283042..fe46e5693bf45 100644 --- a/src/core/server/metrics/metrics_service.test.mocks.ts +++ b/src/core/server/metrics/metrics_service.test.mocks.ts @@ -17,9 +17,10 @@ * under the License. */ -export const mockOpsCollector = { - collect: jest.fn(), -}; +import { collectorMock } from './collectors/mocks'; + +export const mockOpsCollector = collectorMock.create(); + jest.doMock('./ops_metrics_collector', () => ({ OpsMetricsCollector: jest.fn().mockImplementation(() => mockOpsCollector), })); diff --git a/src/core/server/metrics/metrics_service.test.ts b/src/core/server/metrics/metrics_service.test.ts index 10d6761adbe7d..f6334cc5d3c0f 100644 --- a/src/core/server/metrics/metrics_service.test.ts +++ b/src/core/server/metrics/metrics_service.test.ts @@ -57,37 +57,50 @@ describe('MetricsService', () => { expect(setInterval).toHaveBeenCalledWith(expect.any(Function), testInterval); }); - it('emits the metrics at start', async () => { + it('collects the metrics at every interval', async () => { mockOpsCollector.collect.mockResolvedValue(dummyMetrics); - const { getOpsMetrics$ } = await metricsService.setup({ - http: httpMock, - }); - + await metricsService.setup({ http: httpMock }); await metricsService.start(); expect(mockOpsCollector.collect).toHaveBeenCalledTimes(1); - expect( - await getOpsMetrics$() - .pipe(take(1)) - .toPromise() - ).toEqual(dummyMetrics); + + jest.advanceTimersByTime(testInterval); + expect(mockOpsCollector.collect).toHaveBeenCalledTimes(2); + + jest.advanceTimersByTime(testInterval); + expect(mockOpsCollector.collect).toHaveBeenCalledTimes(3); }); - it('collects the metrics at every interval', async () => { + it('resets the collector after each collection', async () => { mockOpsCollector.collect.mockResolvedValue(dummyMetrics); - await metricsService.setup({ http: httpMock }); - + const { getOpsMetrics$ } = await metricsService.setup({ http: httpMock }); await metricsService.start(); + // `advanceTimersByTime` only ensure the interval handler is executed + // however the `reset` call is executed after the async call to `collect` + // meaning that we are going to miss the call if we don't wait for the + // actual observable emission that is performed after + const waitForNextEmission = () => + getOpsMetrics$() + .pipe(take(1)) + .toPromise(); + expect(mockOpsCollector.collect).toHaveBeenCalledTimes(1); + expect(mockOpsCollector.reset).toHaveBeenCalledTimes(1); + let nextEmission = waitForNextEmission(); jest.advanceTimersByTime(testInterval); + await nextEmission; expect(mockOpsCollector.collect).toHaveBeenCalledTimes(2); + expect(mockOpsCollector.reset).toHaveBeenCalledTimes(2); + nextEmission = waitForNextEmission(); jest.advanceTimersByTime(testInterval); + await nextEmission; expect(mockOpsCollector.collect).toHaveBeenCalledTimes(3); + expect(mockOpsCollector.reset).toHaveBeenCalledTimes(3); }); it('throws when called before setup', async () => { diff --git a/src/core/server/metrics/metrics_service.ts b/src/core/server/metrics/metrics_service.ts index 1aed89a4aad60..0ea9d00792600 100644 --- a/src/core/server/metrics/metrics_service.ts +++ b/src/core/server/metrics/metrics_service.ts @@ -17,8 +17,8 @@ * under the License. */ -import { ReplaySubject } from 'rxjs'; -import { first, shareReplay } from 'rxjs/operators'; +import { Subject } from 'rxjs'; +import { first } from 'rxjs/operators'; import { CoreService } from '../../types'; import { CoreContext } from '../core_context'; import { Logger } from '../logging'; @@ -37,7 +37,7 @@ export class MetricsService private readonly logger: Logger; private metricsCollector?: OpsMetricsCollector; private collectInterval?: NodeJS.Timeout; - private metrics$ = new ReplaySubject(1); + private metrics$ = new Subject(); constructor(private readonly coreContext: CoreContext) { this.logger = coreContext.logger.get('metrics'); @@ -46,7 +46,7 @@ export class MetricsService public async setup({ http }: MetricsServiceSetupDeps): Promise { this.metricsCollector = new OpsMetricsCollector(http.server); - const metricsObservable = this.metrics$.pipe(shareReplay(1)); + const metricsObservable = this.metrics$.asObservable(); return { getOpsMetrics$: () => metricsObservable, @@ -74,6 +74,7 @@ export class MetricsService private async refreshMetrics() { this.logger.debug('Refreshing metrics'); const metrics = await this.metricsCollector!.collect(); + this.metricsCollector!.reset(); this.metrics$.next(metrics); } diff --git a/src/core/server/metrics/ops_metrics_collector.test.mocks.ts b/src/core/server/metrics/ops_metrics_collector.test.mocks.ts index 8265796d57970..cf51f8a753729 100644 --- a/src/core/server/metrics/ops_metrics_collector.test.mocks.ts +++ b/src/core/server/metrics/ops_metrics_collector.test.mocks.ts @@ -17,23 +17,19 @@ * under the License. */ -export const mockOsCollector = { - collect: jest.fn(), -}; +import { collectorMock } from './collectors/mocks'; + +export const mockOsCollector = collectorMock.create(); jest.doMock('./collectors/os', () => ({ OsMetricsCollector: jest.fn().mockImplementation(() => mockOsCollector), })); -export const mockProcessCollector = { - collect: jest.fn(), -}; +export const mockProcessCollector = collectorMock.create(); jest.doMock('./collectors/process', () => ({ ProcessMetricsCollector: jest.fn().mockImplementation(() => mockProcessCollector), })); -export const mockServerCollector = { - collect: jest.fn(), -}; +export const mockServerCollector = collectorMock.create(); jest.doMock('./collectors/server', () => ({ ServerMetricsCollector: jest.fn().mockImplementation(() => mockServerCollector), })); diff --git a/src/core/server/metrics/ops_metrics_collector.test.ts b/src/core/server/metrics/ops_metrics_collector.test.ts index 04302a195fb6c..559588db60a42 100644 --- a/src/core/server/metrics/ops_metrics_collector.test.ts +++ b/src/core/server/metrics/ops_metrics_collector.test.ts @@ -35,25 +35,43 @@ describe('OpsMetricsCollector', () => { mockOsCollector.collect.mockResolvedValue('osMetrics'); }); - it('gathers metrics from the underlying collectors', async () => { - mockOsCollector.collect.mockResolvedValue('osMetrics'); - mockProcessCollector.collect.mockResolvedValue('processMetrics'); - mockServerCollector.collect.mockResolvedValue({ - requests: 'serverRequestsMetrics', - response_times: 'serverTimingMetrics', + describe('#collect', () => { + it('gathers metrics from the underlying collectors', async () => { + mockOsCollector.collect.mockResolvedValue('osMetrics'); + mockProcessCollector.collect.mockResolvedValue('processMetrics'); + mockServerCollector.collect.mockResolvedValue({ + requests: 'serverRequestsMetrics', + response_times: 'serverTimingMetrics', + }); + + const metrics = await collector.collect(); + + expect(mockOsCollector.collect).toHaveBeenCalledTimes(1); + expect(mockProcessCollector.collect).toHaveBeenCalledTimes(1); + expect(mockServerCollector.collect).toHaveBeenCalledTimes(1); + + expect(metrics).toEqual({ + process: 'processMetrics', + os: 'osMetrics', + requests: 'serverRequestsMetrics', + response_times: 'serverTimingMetrics', + }); }); + }); + + describe('#reset', () => { + it('call reset on the underlying collectors', () => { + collector.reset(); - const metrics = await collector.collect(); + expect(mockOsCollector.reset).toHaveBeenCalledTimes(1); + expect(mockProcessCollector.reset).toHaveBeenCalledTimes(1); + expect(mockServerCollector.reset).toHaveBeenCalledTimes(1); - expect(mockOsCollector.collect).toHaveBeenCalledTimes(1); - expect(mockProcessCollector.collect).toHaveBeenCalledTimes(1); - expect(mockServerCollector.collect).toHaveBeenCalledTimes(1); + collector.reset(); - expect(metrics).toEqual({ - process: 'processMetrics', - os: 'osMetrics', - requests: 'serverRequestsMetrics', - response_times: 'serverTimingMetrics', + expect(mockOsCollector.reset).toHaveBeenCalledTimes(2); + expect(mockProcessCollector.reset).toHaveBeenCalledTimes(2); + expect(mockServerCollector.reset).toHaveBeenCalledTimes(2); }); }); }); diff --git a/src/core/server/metrics/ops_metrics_collector.ts b/src/core/server/metrics/ops_metrics_collector.ts index 04344f21f57f7..525515dba1457 100644 --- a/src/core/server/metrics/ops_metrics_collector.ts +++ b/src/core/server/metrics/ops_metrics_collector.ts @@ -49,4 +49,10 @@ export class OpsMetricsCollector implements MetricsCollector { ...server, }; } + + public reset() { + this.processCollector.reset(); + this.osCollector.reset(); + this.serverCollector.reset(); + } }