Skip to content

Commit

Permalink
[Ingest Manager] Add .catch to handle Promise rejection (#80056)
Browse files Browse the repository at this point in the history
## Summary

Prevent an unhandled rejection (which crashes Kibana) when installing a package. Initially found as a part of #79560 (comment) & [this PR](https://github.com/elastic/kibana/pull/79791/files#diff-e2fac97e334155d887ecee2a8434120f9150f65c85725c4a25f261da79233cf0R336-R340) to fix it. 

While the conditions of #79560 are a bit of an edge case, we could encounter this behavior on any expected error in this promise chain.

 * Add .catch to prevent unhandled promise rejection warning/crash.
 * pulled `installPackage` out to a separate file so it's easier to test
 * added a test for this specific error where an error in `installIndexPatterns` or `installKibanaAssets` would ultimately crash Kibana. More detail in the comments https://github.com/elastic/kibana/blob/608fc3ef1b1306fc0dd76ff07b6652c221ed242b/x-pack/plugins/ingest_manager/server/services/epm/packages/_install_package.ts#L81-L95



### Checklist
- [x] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios
  • Loading branch information
John Schulz authored Oct 14, 2020
1 parent f6b01f5 commit 4caa5db
Show file tree
Hide file tree
Showing 3 changed files with 285 additions and 157 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { SavedObjectsClientContract, LegacyScopedClusterClient } from 'src/core/server';
import { savedObjectsClientMock, elasticsearchServiceMock } from 'src/core/server/mocks';
import { appContextService } from '../../app_context';
import { createAppContextStartContractMock } from '../../../mocks';

jest.mock('../elasticsearch/template/template');
jest.mock('../kibana/assets/install');
jest.mock('../kibana/index_pattern/install');
jest.mock('./install');
jest.mock('./get');

import { updateCurrentWriteIndices } from '../elasticsearch/template/template';
import { installKibanaAssets } from '../kibana/assets/install';
import { installIndexPatterns } from '../kibana/index_pattern/install';
import { _installPackage } from './_install_package';

const mockedUpdateCurrentWriteIndices = updateCurrentWriteIndices as jest.MockedFunction<
typeof updateCurrentWriteIndices
>;
const mockedGetKibanaAssets = installKibanaAssets as jest.MockedFunction<
typeof installKibanaAssets
>;
const mockedInstallIndexPatterns = installIndexPatterns as jest.MockedFunction<
typeof installIndexPatterns
>;

function sleep(millis: number) {
return new Promise((resolve) => setTimeout(resolve, millis));
}

describe('_installPackage', () => {
let soClient: jest.Mocked<SavedObjectsClientContract>;
let callCluster: jest.Mocked<LegacyScopedClusterClient['callAsCurrentUser']>;
beforeEach(async () => {
soClient = savedObjectsClientMock.create();
callCluster = elasticsearchServiceMock.createLegacyScopedClusterClient().callAsCurrentUser;
appContextService.start(createAppContextStartContractMock());
});
afterEach(async () => {
appContextService.stop();
});
it('handles errors from installIndexPatterns or installKibanaAssets', async () => {
// force errors from either/both these functions
mockedGetKibanaAssets.mockImplementation(async () => {
throw new Error('mocked async error A: should be caught');
});
mockedInstallIndexPatterns.mockImplementation(async () => {
throw new Error('mocked async error B: should be caught');
});

// pick any function between when those are called and when await Promise.all is defined later
// and force it to take long enough for the errors to occur
// @ts-expect-error about call signature
mockedUpdateCurrentWriteIndices.mockImplementation(async () => await sleep(1000));

const installationPromise = _installPackage({
savedObjectsClient: soClient,
callCluster,
pkgName: 'abc',
pkgVersion: '1.2.3',
paths: [],
removable: false,
internal: false,
packageInfo: {
name: 'xyz',
version: '4.5.6',
description: 'test',
type: 'x',
categories: ['this', 'that'],
format_version: 'string',
},
installType: 'install',
installSource: 'registry',
});

// if we have a .catch this will fail nicely (test pass)
// otherwise the test will fail with either of the mocked errors
await expect(installationPromise).rejects.toThrow('mocked');
await expect(installationPromise).rejects.toThrow('should be caught');
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { SavedObject, SavedObjectsClientContract } from 'src/core/server';
import { InstallablePackage, InstallSource } from '../../../../common';
import { PACKAGES_SAVED_OBJECT_TYPE } from '../../../constants';
import {
AssetReference,
Installation,
CallESAsCurrentUser,
ElasticsearchAssetType,
InstallType,
} from '../../../types';
import { installIndexPatterns } from '../kibana/index_pattern/install';
import { installTemplates } from '../elasticsearch/template/install';
import { generateESIndexPatterns } from '../elasticsearch/template/template';
import { installPipelines, deletePreviousPipelines } from '../elasticsearch/ingest_pipeline/';
import { installILMPolicy } from '../elasticsearch/ilm/install';
import { installKibanaAssets, getKibanaAssets } from '../kibana/assets/install';
import { updateCurrentWriteIndices } from '../elasticsearch/template/template';
import { deleteKibanaSavedObjectsAssets } from './remove';
import { installTransform } from '../elasticsearch/transform/install';
import { createInstallation, saveKibanaAssetsRefs, updateVersion } from './install';

// this is only exported for testing
// use a leading underscore to indicate it's not the supported path
// only the more explicit `installPackage*` functions should be used

export async function _installPackage({
savedObjectsClient,
callCluster,
pkgName,
pkgVersion,
installedPkg,
paths,
removable,
internal,
packageInfo,
installType,
installSource,
}: {
savedObjectsClient: SavedObjectsClientContract;
callCluster: CallESAsCurrentUser;
pkgName: string;
pkgVersion: string;
installedPkg?: SavedObject<Installation>;
paths: string[];
removable: boolean;
internal: boolean;
packageInfo: InstallablePackage;
installType: InstallType;
installSource: InstallSource;
}): Promise<AssetReference[]> {
const toSaveESIndexPatterns = generateESIndexPatterns(packageInfo.data_streams);
// add the package installation to the saved object.
// if some installation already exists, just update install info
if (!installedPkg) {
await createInstallation({
savedObjectsClient,
pkgName,
pkgVersion,
internal,
removable,
installed_kibana: [],
installed_es: [],
toSaveESIndexPatterns,
installSource,
});
} else {
await savedObjectsClient.update(PACKAGES_SAVED_OBJECT_TYPE, pkgName, {
install_version: pkgVersion,
install_status: 'installing',
install_started_at: new Date().toISOString(),
install_source: installSource,
});
}

// kick off `installIndexPatterns` & `installKibanaAssets` as early as possible because they're the longest running operations
// we don't `await` here because we don't want to delay starting the many other `install*` functions
// however, without an `await` or a `.catch` we haven't defined how to handle a promise rejection
// we define it many lines and potentially seconds of wall clock time later in
// `await Promise.all([installKibanaAssetsPromise, installIndexPatternPromise]);`
// if we encounter an error before we there, we'll have an "unhandled rejection" which causes its own problems
// the program will log something like this _and exit/crash_
// Unhandled Promise rejection detected:
// RegistryResponseError or some other error
// Terminating process...
// server crashed with status code 1
//
// add a `.catch` to prevent the "unhandled rejection" case
// in that `.catch`, set something that indicates a failure
// check for that failure later and act accordingly (throw, ignore, return)
let installIndexPatternError;
const installIndexPatternPromise = installIndexPatterns(
savedObjectsClient,
pkgName,
pkgVersion
).catch((reason) => (installIndexPatternError = reason));
const kibanaAssets = await getKibanaAssets(paths);
if (installedPkg)
await deleteKibanaSavedObjectsAssets(
savedObjectsClient,
installedPkg.attributes.installed_kibana
);
// save new kibana refs before installing the assets
const installedKibanaAssetsRefs = await saveKibanaAssetsRefs(
savedObjectsClient,
pkgName,
kibanaAssets
);
let installKibanaAssetsError;
const installKibanaAssetsPromise = installKibanaAssets({
savedObjectsClient,
pkgName,
kibanaAssets,
}).catch((reason) => (installKibanaAssetsError = reason));

// the rest of the installation must happen in sequential order
// currently only the base package has an ILM policy
// at some point ILM policies can be installed/modified
// per data stream and we should then save them
await installILMPolicy(paths, callCluster);

// installs versionized pipelines without removing currently installed ones
const installedPipelines = await installPipelines(
packageInfo,
paths,
callCluster,
savedObjectsClient
);
// install or update the templates referencing the newly installed pipelines
const installedTemplates = await installTemplates(
packageInfo,
callCluster,
paths,
savedObjectsClient
);

// update current backing indices of each data stream
await updateCurrentWriteIndices(callCluster, installedTemplates);

const installedTransforms = await installTransform(
packageInfo,
paths,
callCluster,
savedObjectsClient
);

// if this is an update or retrying an update, delete the previous version's pipelines
if ((installType === 'update' || installType === 'reupdate') && installedPkg) {
await deletePreviousPipelines(
callCluster,
savedObjectsClient,
pkgName,
installedPkg.attributes.version
);
}
// pipelines from a different version may have installed during a failed update
if (installType === 'rollback' && installedPkg) {
await deletePreviousPipelines(
callCluster,
savedObjectsClient,
pkgName,
installedPkg.attributes.install_version
);
}
const installedTemplateRefs = installedTemplates.map((template) => ({
id: template.templateName,
type: ElasticsearchAssetType.indexTemplate,
}));

// make sure the assets are installed (or didn't error)
if (installIndexPatternError) throw installIndexPatternError;
if (installKibanaAssetsError) throw installKibanaAssetsError;
await Promise.all([installKibanaAssetsPromise, installIndexPatternPromise]);

// update to newly installed version when all assets are successfully installed
if (installedPkg) await updateVersion(savedObjectsClient, pkgName, pkgVersion);
await savedObjectsClient.update(PACKAGES_SAVED_OBJECT_TYPE, pkgName, {
install_version: pkgVersion,
install_status: 'installed',
});
return [
...installedKibanaAssetsRefs,
...installedPipelines,
...installedTemplateRefs,
...installedTransforms,
];
}
Loading

0 comments on commit 4caa5db

Please sign in to comment.