Skip to content

Commit

Permalink
Reset the metrics after each emission (#59551)
Browse files Browse the repository at this point in the history
* reset the metrics after each emission

* add test comment
  • Loading branch information
pgayvallet committed Mar 9, 2020
1 parent 84a7642 commit 857820a
Show file tree
Hide file tree
Showing 12 changed files with 212 additions and 46 deletions.
35 changes: 35 additions & 0 deletions src/core/server/metrics/collectors/mocks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { MetricsCollector } from './types';

const createMock = () => {
const mocked: jest.Mocked<MetricsCollector<any>> = {
collect: jest.fn(),
reset: jest.fn(),
};

mocked.collect.mockResolvedValue({});

return mocked;
};

export const collectorMock = {
create: createMock,
};
2 changes: 2 additions & 0 deletions src/core/server/metrics/collectors/os.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,6 @@ export class OsMetricsCollector implements MetricsCollector<OpsOsMetrics> {

return metrics;
}

public reset() {}
}
2 changes: 2 additions & 0 deletions src/core/server/metrics/collectors/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ export class ProcessMetricsCollector implements MetricsCollector<OpsProcessMetri
uptime_in_millis: process.uptime() * 1000,
};
}

public reset() {}
}

const getEventLoopDelay = (): Promise<number> => {
Expand Down
17 changes: 15 additions & 2 deletions src/core/server/metrics/collectors/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ interface ServerResponseTime {
}

export class ServerMetricsCollector implements MetricsCollector<OpsServerMetrics> {
private readonly requests: OpsServerMetrics['requests'] = {
private requests: OpsServerMetrics['requests'] = {
disconnects: 0,
total: 0,
statusCodes: {},
};
private readonly responseTimes: ServerResponseTime = {
private responseTimes: ServerResponseTime = {
count: 0,
total: 0,
max: 0,
Expand Down Expand Up @@ -77,4 +77,17 @@ export class ServerMetricsCollector implements MetricsCollector<OpsServerMetrics
concurrent_connections: connections,
};
}

public reset() {
this.requests = {
disconnects: 0,
total: 0,
statusCodes: {},
};
this.responseTimes = {
count: 0,
total: 0,
max: 0,
};
}
}
3 changes: 3 additions & 0 deletions src/core/server/metrics/collectors/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

/** Base interface for all metrics gatherers */
export interface MetricsCollector<T> {
/** collect the data currently gathered by the collector */
collect(): Promise<T>;
/** reset the internal state of the collector */
reset(): void;
}

/**
Expand Down
76 changes: 76 additions & 0 deletions src/core/server/metrics/integration_tests/server_collector.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,4 +200,80 @@ describe('ServerMetricsCollector', () => {
metrics = await collector.collect();
expect(metrics.concurrent_connections).toEqual(0);
});

describe('#reset', () => {
it('reset the requests state', async () => {
router.get({ path: '/', validate: false }, async (ctx, req, res) => {
return res.ok({ body: '' });
});
await server.start();

await sendGet('/');
await sendGet('/');
await sendGet('/not-found');

let metrics = await collector.collect();

expect(metrics.requests).toEqual({
total: 3,
disconnects: 0,
statusCodes: {
'200': 2,
'404': 1,
},
});

collector.reset();
metrics = await collector.collect();

expect(metrics.requests).toEqual({
total: 0,
disconnects: 0,
statusCodes: {},
});

await sendGet('/');
await sendGet('/not-found');

metrics = await collector.collect();

expect(metrics.requests).toEqual({
total: 2,
disconnects: 0,
statusCodes: {
'200': 1,
'404': 1,
},
});
});

it('resets the response times', async () => {
router.get({ path: '/no-delay', validate: false }, async (ctx, req, res) => {
return res.ok({ body: '' });
});
router.get({ path: '/500-ms', validate: false }, async (ctx, req, res) => {
await delay(500);
return res.ok({ body: '' });
});

await server.start();

await Promise.all([sendGet('/no-delay'), sendGet('/500-ms')]);
let metrics = await collector.collect();

expect(metrics.response_times.avg_in_millis).toBeGreaterThanOrEqual(250);
expect(metrics.response_times.max_in_millis).toBeGreaterThanOrEqual(500);

collector.reset();
metrics = await collector.collect();
expect(metrics.response_times.avg_in_millis).toBe(0);
expect(metrics.response_times.max_in_millis).toBeGreaterThanOrEqual(0);

await Promise.all([sendGet('/500-ms'), sendGet('/500-ms')]);
metrics = await collector.collect();

expect(metrics.response_times.avg_in_millis).toBeGreaterThanOrEqual(500);
expect(metrics.response_times.max_in_millis).toBeGreaterThanOrEqual(500);
});
});
});
7 changes: 4 additions & 3 deletions src/core/server/metrics/metrics_service.test.mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
* under the License.
*/

export const mockOpsCollector = {
collect: jest.fn(),
};
import { collectorMock } from './collectors/mocks';

export const mockOpsCollector = collectorMock.create();

jest.doMock('./ops_metrics_collector', () => ({
OpsMetricsCollector: jest.fn().mockImplementation(() => mockOpsCollector),
}));
39 changes: 26 additions & 13 deletions src/core/server/metrics/metrics_service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,37 +57,50 @@ describe('MetricsService', () => {
expect(setInterval).toHaveBeenCalledWith(expect.any(Function), testInterval);
});

it('emits the metrics at start', async () => {
it('collects the metrics at every interval', async () => {
mockOpsCollector.collect.mockResolvedValue(dummyMetrics);

const { getOpsMetrics$ } = await metricsService.setup({
http: httpMock,
});

await metricsService.setup({ http: httpMock });
await metricsService.start();

expect(mockOpsCollector.collect).toHaveBeenCalledTimes(1);
expect(
await getOpsMetrics$()
.pipe(take(1))
.toPromise()
).toEqual(dummyMetrics);

jest.advanceTimersByTime(testInterval);
expect(mockOpsCollector.collect).toHaveBeenCalledTimes(2);

jest.advanceTimersByTime(testInterval);
expect(mockOpsCollector.collect).toHaveBeenCalledTimes(3);
});

it('collects the metrics at every interval', async () => {
it('resets the collector after each collection', async () => {
mockOpsCollector.collect.mockResolvedValue(dummyMetrics);

await metricsService.setup({ http: httpMock });

const { getOpsMetrics$ } = await metricsService.setup({ http: httpMock });
await metricsService.start();

// `advanceTimersByTime` only ensure the interval handler is executed
// however the `reset` call is executed after the async call to `collect`
// meaning that we are going to miss the call if we don't wait for the
// actual observable emission that is performed after
const waitForNextEmission = () =>
getOpsMetrics$()
.pipe(take(1))
.toPromise();

expect(mockOpsCollector.collect).toHaveBeenCalledTimes(1);
expect(mockOpsCollector.reset).toHaveBeenCalledTimes(1);

let nextEmission = waitForNextEmission();
jest.advanceTimersByTime(testInterval);
await nextEmission;
expect(mockOpsCollector.collect).toHaveBeenCalledTimes(2);
expect(mockOpsCollector.reset).toHaveBeenCalledTimes(2);

nextEmission = waitForNextEmission();
jest.advanceTimersByTime(testInterval);
await nextEmission;
expect(mockOpsCollector.collect).toHaveBeenCalledTimes(3);
expect(mockOpsCollector.reset).toHaveBeenCalledTimes(3);
});

it('throws when called before setup', async () => {
Expand Down
9 changes: 5 additions & 4 deletions src/core/server/metrics/metrics_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
* under the License.
*/

import { ReplaySubject } from 'rxjs';
import { first, shareReplay } from 'rxjs/operators';
import { Subject } from 'rxjs';
import { first } from 'rxjs/operators';
import { CoreService } from '../../types';
import { CoreContext } from '../core_context';
import { Logger } from '../logging';
Expand All @@ -37,7 +37,7 @@ export class MetricsService
private readonly logger: Logger;
private metricsCollector?: OpsMetricsCollector;
private collectInterval?: NodeJS.Timeout;
private metrics$ = new ReplaySubject<OpsMetrics>(1);
private metrics$ = new Subject<OpsMetrics>();

constructor(private readonly coreContext: CoreContext) {
this.logger = coreContext.logger.get('metrics');
Expand All @@ -46,7 +46,7 @@ export class MetricsService
public async setup({ http }: MetricsServiceSetupDeps): Promise<InternalMetricsServiceSetup> {
this.metricsCollector = new OpsMetricsCollector(http.server);

const metricsObservable = this.metrics$.pipe(shareReplay(1));
const metricsObservable = this.metrics$.asObservable();

return {
getOpsMetrics$: () => metricsObservable,
Expand Down Expand Up @@ -74,6 +74,7 @@ export class MetricsService
private async refreshMetrics() {
this.logger.debug('Refreshing metrics');
const metrics = await this.metricsCollector!.collect();
this.metricsCollector!.reset();
this.metrics$.next(metrics);
}

Expand Down
14 changes: 5 additions & 9 deletions src/core/server/metrics/ops_metrics_collector.test.mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,19 @@
* under the License.
*/

export const mockOsCollector = {
collect: jest.fn(),
};
import { collectorMock } from './collectors/mocks';

export const mockOsCollector = collectorMock.create();
jest.doMock('./collectors/os', () => ({
OsMetricsCollector: jest.fn().mockImplementation(() => mockOsCollector),
}));

export const mockProcessCollector = {
collect: jest.fn(),
};
export const mockProcessCollector = collectorMock.create();
jest.doMock('./collectors/process', () => ({
ProcessMetricsCollector: jest.fn().mockImplementation(() => mockProcessCollector),
}));

export const mockServerCollector = {
collect: jest.fn(),
};
export const mockServerCollector = collectorMock.create();
jest.doMock('./collectors/server', () => ({
ServerMetricsCollector: jest.fn().mockImplementation(() => mockServerCollector),
}));
48 changes: 33 additions & 15 deletions src/core/server/metrics/ops_metrics_collector.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,43 @@ describe('OpsMetricsCollector', () => {
mockOsCollector.collect.mockResolvedValue('osMetrics');
});

it('gathers metrics from the underlying collectors', async () => {
mockOsCollector.collect.mockResolvedValue('osMetrics');
mockProcessCollector.collect.mockResolvedValue('processMetrics');
mockServerCollector.collect.mockResolvedValue({
requests: 'serverRequestsMetrics',
response_times: 'serverTimingMetrics',
describe('#collect', () => {
it('gathers metrics from the underlying collectors', async () => {
mockOsCollector.collect.mockResolvedValue('osMetrics');
mockProcessCollector.collect.mockResolvedValue('processMetrics');
mockServerCollector.collect.mockResolvedValue({
requests: 'serverRequestsMetrics',
response_times: 'serverTimingMetrics',
});

const metrics = await collector.collect();

expect(mockOsCollector.collect).toHaveBeenCalledTimes(1);
expect(mockProcessCollector.collect).toHaveBeenCalledTimes(1);
expect(mockServerCollector.collect).toHaveBeenCalledTimes(1);

expect(metrics).toEqual({
process: 'processMetrics',
os: 'osMetrics',
requests: 'serverRequestsMetrics',
response_times: 'serverTimingMetrics',
});
});
});

describe('#reset', () => {
it('call reset on the underlying collectors', () => {
collector.reset();

const metrics = await collector.collect();
expect(mockOsCollector.reset).toHaveBeenCalledTimes(1);
expect(mockProcessCollector.reset).toHaveBeenCalledTimes(1);
expect(mockServerCollector.reset).toHaveBeenCalledTimes(1);

expect(mockOsCollector.collect).toHaveBeenCalledTimes(1);
expect(mockProcessCollector.collect).toHaveBeenCalledTimes(1);
expect(mockServerCollector.collect).toHaveBeenCalledTimes(1);
collector.reset();

expect(metrics).toEqual({
process: 'processMetrics',
os: 'osMetrics',
requests: 'serverRequestsMetrics',
response_times: 'serverTimingMetrics',
expect(mockOsCollector.reset).toHaveBeenCalledTimes(2);
expect(mockProcessCollector.reset).toHaveBeenCalledTimes(2);
expect(mockServerCollector.reset).toHaveBeenCalledTimes(2);
});
});
});
6 changes: 6 additions & 0 deletions src/core/server/metrics/ops_metrics_collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,10 @@ export class OpsMetricsCollector implements MetricsCollector<OpsMetrics> {
...server,
};
}

public reset() {
this.processCollector.reset();
this.osCollector.reset();
this.serverCollector.reset();
}
}

0 comments on commit 857820a

Please sign in to comment.