From 0f494f420f29e09db0a84f87d3ebacd7cf31e893 Mon Sep 17 00:00:00 2001 From: John Schulz Date: Thu, 20 Aug 2020 11:07:54 -0400 Subject: [PATCH] Retry network errors on registry requests (#74507) --- .../services/epm/registry/requests.test.ts | 108 ++++++++++++++++++ .../server/services/epm/registry/requests.ts | 58 ++++++++-- .../server/services/epm/registry/streams.ts | 3 +- 3 files changed, 159 insertions(+), 10 deletions(-) create mode 100644 x-pack/plugins/ingest_manager/server/services/epm/registry/requests.test.ts diff --git a/x-pack/plugins/ingest_manager/server/services/epm/registry/requests.test.ts b/x-pack/plugins/ingest_manager/server/services/epm/registry/requests.test.ts new file mode 100644 index 0000000000000..f836a133a78a0 --- /dev/null +++ b/x-pack/plugins/ingest_manager/server/services/epm/registry/requests.test.ts @@ -0,0 +1,108 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import { fetchUrl } from './requests'; +import { RegistryError } from '../../../errors'; +jest.mock('node-fetch'); + +const { Response, FetchError } = jest.requireActual('node-fetch'); +// eslint-disable-next-line @typescript-eslint/no-var-requires +const fetchMock = require('node-fetch') as jest.Mock; + +jest.setTimeout(120 * 1000); +describe('setupIngestManager', () => { + beforeEach(async () => {}); + + afterEach(async () => { + jest.clearAllMocks(); + }); + + describe('fetchUrl / getResponse errors', () => { + it('regular Errors do not retry. Becomes RegistryError', async () => { + fetchMock.mockImplementationOnce(() => { + throw new Error('mocked'); + }); + const promise = fetchUrl(''); + await expect(promise).rejects.toThrow(RegistryError); + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + it('TypeErrors do not retry. Becomes RegistryError', async () => { + fetchMock.mockImplementationOnce(() => { + // @ts-expect-error + null.f(); + }); + const promise = fetchUrl(''); + await expect(promise).rejects.toThrow(RegistryError); + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + describe('only system errors retry (like ECONNRESET)', () => { + it('they eventually succeed', async () => { + const successValue = JSON.stringify({ name: 'attempt 4 works', version: '1.2.3' }); + fetchMock + .mockImplementationOnce(() => { + throw new FetchError('message 1', 'system', { code: 'ESOMETHING' }); + }) + .mockImplementationOnce(() => { + throw new FetchError('message 2', 'system', { code: 'ESOMETHING' }); + }) + .mockImplementationOnce(() => { + throw new FetchError('message 3', 'system', { code: 'ESOMETHING' }); + }) + // this one succeeds + .mockImplementationOnce(() => Promise.resolve(new Response(successValue))) + .mockImplementationOnce(() => { + throw new FetchError('message 5', 'system', { code: 'ESOMETHING' }); + }) + .mockImplementationOnce(() => { + throw new FetchError('message 6', 'system', { code: 'ESOMETHING' }); + }); + + const promise = fetchUrl(''); + await expect(promise).resolves.toEqual(successValue); + // doesn't retry after success + expect(fetchMock).toHaveBeenCalledTimes(4); + const actualResultsOrder = fetchMock.mock.results.map(({ type }: { type: string }) => type); + expect(actualResultsOrder).toEqual(['throw', 'throw', 'throw', 'return']); + }); + + it('or error after 1 failure & 5 retries with RegistryError', async () => { + fetchMock + .mockImplementationOnce(() => { + throw new FetchError('message 1', 'system', { code: 'ESOMETHING' }); + }) + .mockImplementationOnce(() => { + throw new FetchError('message 2', 'system', { code: 'ESOMETHING' }); + }) + .mockImplementationOnce(() => { + throw new FetchError('message 3', 'system', { code: 'ESOMETHING' }); + }) + .mockImplementationOnce(() => { + throw new FetchError('message 4', 'system', { code: 'ESOMETHING' }); + }) + .mockImplementationOnce(() => { + throw new FetchError('message 5', 'system', { code: 'ESOMETHING' }); + }) + .mockImplementationOnce(() => { + throw new FetchError('message 6', 'system', { code: 'ESOMETHING' }); + }) + .mockImplementationOnce(() => { + throw new FetchError('message 7', 'system', { code: 'ESOMETHING' }); + }) + .mockImplementationOnce(() => { + throw new FetchError('message 8', 'system', { code: 'ESOMETHING' }); + }); + + const promise = fetchUrl(''); + await expect(promise).rejects.toThrow(RegistryError); + // doesn't retry after 1 failure & 5 failed retries + expect(fetchMock).toHaveBeenCalledTimes(6); + const actualResultsOrder = fetchMock.mock.results.map(({ type }: { type: string }) => type); + expect(actualResultsOrder).toEqual(['throw', 'throw', 'throw', 'throw', 'throw', 'throw']); + }); + }); + }); +}); diff --git a/x-pack/plugins/ingest_manager/server/services/epm/registry/requests.ts b/x-pack/plugins/ingest_manager/server/services/epm/registry/requests.ts index abf77ddddfd7a..5939dc204aae6 100644 --- a/x-pack/plugins/ingest_manager/server/services/epm/registry/requests.ts +++ b/x-pack/plugins/ingest_manager/server/services/epm/registry/requests.ts @@ -4,20 +4,49 @@ * you may not use this file except in compliance with the Elastic License. */ -import fetch, { Response } from 'node-fetch'; +import fetch, { FetchError, Response } from 'node-fetch'; +import pRetry from 'p-retry'; import { streamToString } from './streams'; import { RegistryError } from '../../../errors'; +type FailedAttemptErrors = pRetry.FailedAttemptError | FetchError | Error; + +// not sure what to call this function, but we're not exporting it +async function registryFetch(url: string) { + const response = await fetch(url); + + if (response.ok) { + return response; + } else { + // 4xx & 5xx responses + // exit without retry & throw RegistryError + throw new pRetry.AbortError( + new RegistryError(`Error connecting to package registry at ${url}: ${response.statusText}`) + ); + } +} + export async function getResponse(url: string): Promise { try { - const response = await fetch(url); - if (response.ok) { - return response; - } else { - throw new RegistryError( - `Error connecting to package registry at ${url}: ${response.statusText}` - ); - } + // we only want to retry certain failures like network issues + // the rest should only try the one time then fail as they do now + const response = await pRetry(() => registryFetch(url), { + factor: 2, + retries: 5, + onFailedAttempt: (error) => { + // we only want to retry certain types of errors, like `ECONNREFUSED` and other operational errors + // and let the others through without retrying + // + // throwing in onFailedAttempt will abandon all retries & fail the request + // we only want to retry system errors, so throw a RegistryError for everything else + if (!isSystemError(error)) { + throw new RegistryError( + `Error connecting to package registry at ${url}: ${error.message}` + ); + } + }, + }); + return response; } catch (e) { throw new RegistryError(`Error connecting to package registry at ${url}: ${e.message}`); } @@ -31,3 +60,14 @@ export async function getResponseStream(url: string): Promise { return getResponseStream(url).then(streamToString); } + +// node-fetch throws a FetchError for those types of errors and +// "All errors originating from Node.js core are marked with error.type = 'system'" +// https://github.com/node-fetch/node-fetch/blob/master/docs/ERROR-HANDLING.md#error-handling-with-node-fetch +function isFetchError(error: FailedAttemptErrors): error is FetchError { + return error instanceof FetchError || error.name === 'FetchError'; +} + +function isSystemError(error: FailedAttemptErrors): boolean { + return isFetchError(error) && error.type === 'system'; +} diff --git a/x-pack/plugins/ingest_manager/server/services/epm/registry/streams.ts b/x-pack/plugins/ingest_manager/server/services/epm/registry/streams.ts index 97d6f7b40a588..3801303cf726f 100644 --- a/x-pack/plugins/ingest_manager/server/services/epm/registry/streams.ts +++ b/x-pack/plugins/ingest_manager/server/services/epm/registry/streams.ts @@ -11,7 +11,8 @@ export function bufferToStream(buffer: Buffer): PassThrough { return stream; } -export function streamToString(stream: NodeJS.ReadableStream): Promise { +export function streamToString(stream: NodeJS.ReadableStream | Buffer): Promise { + if (stream instanceof Buffer) return Promise.resolve(stream.toString()); return new Promise((resolve, reject) => { const body: string[] = []; stream.on('data', (chunk: string) => body.push(chunk));