Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry authentication and other connection failures in Saved Object migrations #51324

Merged
merged 5 commits into from
Nov 22, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 79 additions & 5 deletions src/core/server/elasticsearch/retry_call_cluster.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@
*/
import * as legacyElasticsearch from 'elasticsearch';

import { retryCallCluster } from './retry_call_cluster';
import { retryCallCluster, migrationsRetryCallCluster } from './retry_call_cluster';
import { loggingServiceMock } from '../logging/logging_service.mock';

describe('retryCallCluster', () => {
it('retries ES API calls that rejects with NoConnection errors', () => {
it('retries ES API calls that rejects with NoConnections', () => {
expect.assertions(1);
const callEsApi = jest.fn();
let i = 0;
const ErrorConstructor = legacyElasticsearch.errors.NoConnections;
callEsApi.mockImplementation(() => {
return i++ <= 2
? Promise.reject(new legacyElasticsearch.errors.NoConnections())
: Promise.resolve('success');
return i++ <= 2 ? Promise.reject(new ErrorConstructor()) : Promise.resolve('success');
});
const retried = retryCallCluster(callEsApi);
return expect(retried('endpoint')).resolves.toMatchInlineSnapshot(`"success"`);
Expand Down Expand Up @@ -57,3 +57,77 @@ describe('retryCallCluster', () => {
return expect(retried('endpoint')).rejects.toMatchInlineSnapshot(`[Error: unknown error]`);
});
});

describe('migrationsRetryCallCluster', () => {
const errors = [
'NoConnections',
'ConnectionFault',
'ServiceUnavailable',
'RequestTimeout',
'AuthenticationException',
'AuthorizationException',
];

const mockLogger = loggingServiceMock.create();

beforeEach(() => {
loggingServiceMock.clear(mockLogger);
});

errors.forEach(errorName => {
it('retries ES API calls that rejects with ' + errorName, () => {
expect.assertions(1);
const callEsApi = jest.fn();
let i = 0;
const ErrorConstructor = (legacyElasticsearch.errors as any)[errorName];
callEsApi.mockImplementation(() => {
return i++ <= 2 ? Promise.reject(new ErrorConstructor()) : Promise.resolve('success');
});
const retried = migrationsRetryCallCluster(callEsApi, mockLogger.get('mock log'), 1);
return expect(retried('endpoint')).resolves.toMatchInlineSnapshot(`"success"`);
});
});

it('rejects when ES API calls reject with other errors', async () => {
expect.assertions(3);
const callEsApi = jest.fn();
let i = 0;
callEsApi.mockImplementation(() => {
i++;

return i === 1
? Promise.reject(new Error('unknown error'))
: i === 2
? Promise.resolve('success')
: i === 3 || i === 4
? Promise.reject(new legacyElasticsearch.errors.NoConnections())
: i === 5
? Promise.reject(new Error('unknown error'))
: null;
});
const retried = migrationsRetryCallCluster(callEsApi, mockLogger.get('mock log'), 1);
await expect(retried('endpoint')).rejects.toMatchInlineSnapshot(`[Error: unknown error]`);
await expect(retried('endpoint')).resolves.toMatchInlineSnapshot(`"success"`);
return expect(retried('endpoint')).rejects.toMatchInlineSnapshot(`[Error: unknown error]`);
});

it('logs only once for each unique error message', async () => {
const callEsApi = jest.fn();
callEsApi.mockRejectedValueOnce(new legacyElasticsearch.errors.NoConnections());
callEsApi.mockRejectedValueOnce(new legacyElasticsearch.errors.NoConnections());
callEsApi.mockRejectedValueOnce(new legacyElasticsearch.errors.AuthenticationException());
callEsApi.mockResolvedValueOnce('done');
const retried = migrationsRetryCallCluster(callEsApi, mockLogger.get('mock log'), 1);
await retried('endpoint');
expect(loggingServiceMock.collect(mockLogger).warn).toMatchInlineSnapshot(`
Array [
Array [
"Unable to connect to Elasticsearch. Error: No Living connections",
],
Array [
"Unable to connect to Elasticsearch. Error: Authentication Exception",
],
]
`);
});
});
56 changes: 56 additions & 0 deletions src/core/server/elasticsearch/retry_call_cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,62 @@ import { defer, throwError, iif, timer } from 'rxjs';
import * as legacyElasticsearch from 'elasticsearch';

import { CallAPIOptions } from '.';
import { Logger } from '../logging';

const esErrors = legacyElasticsearch.errors;

/**
* Retries the provided Elasticsearch API call when an error such as
* `AuthenticationException` `NoConnections`, `ConnectionFault`,
* `ServiceUnavailable` or `RequestTimeout` are encountered. The API call will
* be retried once a second, indefinitely, until a successful response or a
* different error is received.
*
* @param apiCaller
*/

// TODO: Replace with APICaller from './scoped_cluster_client' once #46668 is merged
export function migrationsRetryCallCluster(
apiCaller: (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: use APICaller since it exists now

endpoint: string,
clientParams: Record<string, any>,
options?: CallAPIOptions
) => Promise<any>,
log: Logger,
delay: number = 2500
) {
const previousErrors: string[] = [];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: use Set instead of array

return (endpoint: string, clientParams: Record<string, any> = {}, options?: CallAPIOptions) => {
return defer(() => apiCaller(endpoint, clientParams, options))
.pipe(
retryWhen(error$ =>
error$.pipe(
concatMap((error, i) => {
if (!previousErrors.includes(error.message)) {
log.warn(`Unable to connect to Elasticsearch. Error: ${error.message}`);
previousErrors.push(error.message);
}
return iif(
() => {
return (
error instanceof esErrors.NoConnections ||
error instanceof esErrors.ConnectionFault ||
error instanceof esErrors.ServiceUnavailable ||
error instanceof esErrors.RequestTimeout ||
error instanceof esErrors.AuthenticationException ||
error instanceof esErrors.AuthorizationException
);
},
timer(delay),
throwError(error)
);
})
)
)
)
.toPromise();
};
}

/**
* Retries the provided Elasticsearch API call when a `NoConnections` error is
Expand Down
7 changes: 5 additions & 2 deletions src/core/server/saved_objects/saved_objects_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import { CoreContext } from '../core_context';
import { LegacyServiceSetup } from '../legacy/legacy_service';
import { ElasticsearchServiceSetup } from '../elasticsearch';
import { KibanaConfigType } from '../kibana_config';
import { retryCallCluster } from '../elasticsearch/retry_call_cluster';
import { retryCallCluster, migrationsRetryCallCluster } from '../elasticsearch/retry_call_cluster';
import { SavedObjectsConfigType } from './saved_objects_config';
import { KibanaRequest } from '../http';
import { Logger } from '..';
Expand Down Expand Up @@ -105,7 +105,10 @@ export class SavedObjectsService
config: coreSetup.legacy.pluginExtendedConfig,
savedObjectsConfig,
kibanaConfig,
callCluster: retryCallCluster(adminClient.callAsInternalUser),
callCluster: migrationsRetryCallCluster(
adminClient.callAsInternalUser,
this.coreContext.logger.get('migrations')
),
}));

const mappings = this.migrator.getActiveMappings();
Expand Down