Skip to content

Commit

Permalink
Rewriting SO id during migration (#97222)
Browse files Browse the repository at this point in the history
* some typos

* implement an alternative client-side migration algorithm

required to enforce idempotent id generation for SO

* update tests

* lol

* remove unnecessary param from request generic

* remove unused parameter

* optimize search when quierying SO for migration

* fix wrong type in fixtures

* try shard_doc asc

* add an integration test

* cleanup

* track_total_hits: false to improve perf

* add happy path test for transformDocs action

* remove unused types

* fix wrong typing

* add cleanup phase

* add an integration test for cleanup phase

* add unit-tests for cleanup function

* address comments

* Fix functional test

* set defaultIndex before each test. otherwise it is deleted in the first test file during cleanup phase

* sourceIndex: Option.some<> for consistency

* Revert "set defaultIndex before each test. otherwise it is deleted in the first test file during cleanup phase"

This reverts commit a128d7b.

* address comments from Pierre

* fix test

* Revert "fix test"

This reverts commit 97315b6.

* revert min convert version back to 8.0

Co-authored-by: Matthias Wilhelm <matthias.wilhelm@elastic.co>
  • Loading branch information
mshustov and kertal authored Apr 26, 2021
1 parent 52a650d commit e6ba8cc
Show file tree
Hide file tree
Showing 29 changed files with 1,209 additions and 311 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,8 @@ function assertNoDowngrades(
* that we can later regenerate any inbound object references to match.
*
* @note This is only intended to be used when single-namespace object types are converted into multi-namespace object types.
* @internal
*/
function deterministicallyRegenerateObjectId(namespace: string, type: string, id: string) {
export function deterministicallyRegenerateObjectId(namespace: string, type: string, id: string) {
return uuidv5(`${namespace}:${type}:${id}`, uuidv5.DNS); // the uuidv5 namespace constant (uuidv5.DNS) is arbitrary
}
35 changes: 15 additions & 20 deletions src/core/server/saved_objects/migrations/core/elastic_index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import _ from 'lodash';
import { estypes } from '@elastic/elasticsearch';
import { MigrationEsClient } from './migration_es_client';
import { CountResponse, SearchResponse } from '../../../elasticsearch';
import { IndexMapping } from '../../mappings';
import { SavedObjectsMigrationVersion } from '../../types';
import { AliasAction, RawDoc } from './call_cluster';
Expand Down Expand Up @@ -95,11 +94,11 @@ export async function fetchInfo(client: MigrationEsClient, index: string): Promi
* Creates a reader function that serves up batches of documents from the index. We aren't using
* an async generator, as that feature currently breaks Kibana's tooling.
*
* @param {CallCluster} callCluster - The elastic search connection
* @param {string} - The index to be read from
* @param client - The elastic search connection
* @param index - The index to be read from
* @param {opts}
* @prop {number} batchSize - The number of documents to read at a time
* @prop {string} scrollDuration - The scroll duration used for scrolling through the index
* @prop batchSize - The number of documents to read at a time
* @prop scrollDuration - The scroll duration used for scrolling through the index
*/
export function reader(
client: MigrationEsClient,
Expand All @@ -111,11 +110,11 @@ export function reader(

const nextBatch = () =>
scrollId !== undefined
? client.scroll<SearchResponse<SavedObjectsRawDocSource>>({
? client.scroll<SavedObjectsRawDocSource>({
scroll,
scroll_id: scrollId,
})
: client.search<SearchResponse<SavedObjectsRawDocSource>>({
: client.search<SavedObjectsRawDocSource>({
body: {
size: batchSize,
query: excludeUnusedTypesQuery,
Expand Down Expand Up @@ -143,10 +142,6 @@ export function reader(
/**
* Writes the specified documents to the index, throws an exception
* if any of the documents fail to save.
*
* @param {CallCluster} callCluster
* @param {string} index
* @param {RawDoc[]} docs
*/
export async function write(client: MigrationEsClient, index: string, docs: RawDoc[]) {
const { body } = await client.bulk({
Expand Down Expand Up @@ -184,9 +179,9 @@ export async function write(client: MigrationEsClient, index: string, docs: RawD
* it performs the check *each* time it is called, rather than memoizing itself,
* as this is used to determine if migrations are complete.
*
* @param {CallCluster} callCluster
* @param {string} index
* @param {SavedObjectsMigrationVersion} migrationVersion - The latest versions of the migrations
* @param client - The connection to ElasticSearch
* @param index
* @param migrationVersion - The latest versions of the migrations
*/
export async function migrationsUpToDate(
client: MigrationEsClient,
Expand All @@ -207,7 +202,7 @@ export async function migrationsUpToDate(
return true;
}

const { body } = await client.count<CountResponse>({
const { body } = await client.count({
body: {
query: {
bool: {
Expand Down Expand Up @@ -271,9 +266,9 @@ export async function createIndex(
* is a concrete index. This function will reindex `alias` into a new index, delete the `alias`
* index, and then create an alias `alias` that points to the new index.
*
* @param {CallCluster} callCluster - The connection to ElasticSearch
* @param {FullIndexInfo} info - Information about the mappings and name of the new index
* @param {string} alias - The name of the index being converted to an alias
* @param client - The ElasticSearch connection
* @param info - Information about the mappings and name of the new index
* @param alias - The name of the index being converted to an alias
*/
export async function convertToAlias(
client: MigrationEsClient,
Expand All @@ -297,7 +292,7 @@ export async function convertToAlias(
* alias, meaning that it will only point to one index at a time, so we
* remove any other indices from the alias.
*
* @param {CallCluster} callCluster
* @param {CallCluster} client
* @param {string} index
* @param {string} alias
* @param {AliasAction[]} aliasActions - Optional actions to be added to the updateAliases call
Expand Down Expand Up @@ -377,7 +372,7 @@ async function reindex(
) {
// We poll instead of having the request wait for completion, as for large indices,
// the request times out on the Elasticsearch side of things. We have a relatively tight
// polling interval, as the request is fairly efficent, and we don't
// polling interval, as the request is fairly efficient, and we don't
// want to block index migrations for too long on this.
const pollInterval = 250;
const { body: reindexBody } = await client.reindex({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,7 @@ async function migrateSourceToDest(context: Context) {
serializer,
documentMigrator.migrateAndConvert,
// @ts-expect-error @elastic/elasticsearch `Hit._id` may be a string | number in ES, but we always expect strings in the SO index.
docs,
log
docs
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import _ from 'lodash';
import { SavedObjectTypeRegistry } from '../../saved_objects_type_registry';
import { SavedObjectsSerializer } from '../../serialization';
import { migrateRawDocs } from './migrate_raw_docs';
import { createSavedObjectsMigrationLoggerMock } from '../../migrations/mocks';

describe('migrateRawDocs', () => {
test('converts raw docs to saved objects', async () => {
Expand All @@ -24,8 +23,7 @@ describe('migrateRawDocs', () => {
[
{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } },
{ _id: 'c:d', _source: { type: 'c', c: { name: 'DDD' } } },
],
createSavedObjectsMigrationLoggerMock()
]
);

expect(result).toEqual([
Expand Down Expand Up @@ -59,7 +57,6 @@ describe('migrateRawDocs', () => {
});

test('throws when encountering a corrupt saved object document', async () => {
const logger = createSavedObjectsMigrationLoggerMock();
const transform = jest.fn<any, any>((doc: any) => [
set(_.cloneDeep(doc), 'attributes.name', 'TADA'),
]);
Expand All @@ -69,8 +66,7 @@ describe('migrateRawDocs', () => {
[
{ _id: 'foo:b', _source: { type: 'a', a: { name: 'AAA' } } },
{ _id: 'c:d', _source: { type: 'c', c: { name: 'DDD' } } },
],
logger
]
);

expect(result).rejects.toMatchInlineSnapshot(
Expand All @@ -88,8 +84,7 @@ describe('migrateRawDocs', () => {
const result = await migrateRawDocs(
new SavedObjectsSerializer(new SavedObjectTypeRegistry()),
transform,
[{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } }],
createSavedObjectsMigrationLoggerMock()
[{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } }]
);

expect(result).toEqual([
Expand Down Expand Up @@ -119,12 +114,9 @@ describe('migrateRawDocs', () => {
throw new Error('error during transform');
});
await expect(
migrateRawDocs(
new SavedObjectsSerializer(new SavedObjectTypeRegistry()),
transform,
[{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } }],
createSavedObjectsMigrationLoggerMock()
)
migrateRawDocs(new SavedObjectsSerializer(new SavedObjectTypeRegistry()), transform, [
{ _id: 'a:b', _source: { type: 'a', a: { name: 'AAA' } } },
])
).rejects.toThrowErrorMatchingInlineSnapshot(`"error during transform"`);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import {
SavedObjectUnsanitizedDoc,
} from '../../serialization';
import { MigrateAndConvertFn } from './document_migrator';
import { SavedObjectsMigrationLogger } from '.';

/**
* Error thrown when saved object migrations encounter a corrupt saved object.
Expand Down Expand Up @@ -46,8 +45,7 @@ export class CorruptSavedObjectError extends Error {
export async function migrateRawDocs(
serializer: SavedObjectsSerializer,
migrateDoc: MigrateAndConvertFn,
rawDocs: SavedObjectsRawDoc[],
log: SavedObjectsMigrationLogger
rawDocs: SavedObjectsRawDoc[]
): Promise<SavedObjectsRawDoc[]> {
const migrateDocWithoutBlocking = transformNonBlocking(migrateDoc);
const processedDocs = [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,48 +229,6 @@ describe('KibanaMigrator', () => {
jest.clearAllMocks();
});

it('creates a V2 migrator that initializes a new index and migrates an existing index', async () => {
const options = mockV2MigrationOptions();
const migrator = new KibanaMigrator(options);
const migratorStatus = migrator.getStatus$().pipe(take(3)).toPromise();
migrator.prepareMigrations();
await migrator.runMigrations();

// Basic assertions that we're creating and reindexing the expected indices
expect(options.client.indices.create).toHaveBeenCalledTimes(3);
expect(options.client.indices.create.mock.calls).toEqual(
expect.arrayContaining([
// LEGACY_CREATE_REINDEX_TARGET
expect.arrayContaining([expect.objectContaining({ index: '.my-index_pre8.2.3_001' })]),
// CREATE_REINDEX_TEMP
expect.arrayContaining([
expect.objectContaining({ index: '.my-index_8.2.3_reindex_temp' }),
]),
// CREATE_NEW_TARGET
expect.arrayContaining([expect.objectContaining({ index: 'other-index_8.2.3_001' })]),
])
);
// LEGACY_REINDEX
expect(options.client.reindex.mock.calls[0][0]).toEqual(
expect.objectContaining({
body: expect.objectContaining({
source: expect.objectContaining({ index: '.my-index' }),
dest: expect.objectContaining({ index: '.my-index_pre8.2.3_001' }),
}),
})
);
// REINDEX_SOURCE_TO_TEMP
expect(options.client.reindex.mock.calls[1][0]).toEqual(
expect.objectContaining({
body: expect.objectContaining({
source: expect.objectContaining({ index: '.my-index_pre8.2.3_001' }),
dest: expect.objectContaining({ index: '.my-index_8.2.3_reindex_temp' }),
}),
})
);
const { status } = await migratorStatus;
return expect(status).toEqual('completed');
});
it('emits results on getMigratorResult$()', async () => {
const options = mockV2MigrationOptions();
const migrator = new KibanaMigrator(options);
Expand Down Expand Up @@ -378,6 +336,24 @@ const mockV2MigrationOptions = () => {
} as estypes.GetTaskResponse)
);

options.client.search = jest
.fn()
.mockImplementation(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ hits: { hits: [] } })
);

options.client.openPointInTime = jest
.fn()
.mockImplementationOnce(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ id: 'pit_id' })
);

options.client.closePointInTime = jest
.fn()
.mockImplementationOnce(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ succeeded: true })
);

return options;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import { ISavedObjectTypeRegistry } from '../../saved_objects_type_registry';
import { SavedObjectsType } from '../../types';
import { runResilientMigrator } from '../../migrationsv2';
import { migrateRawDocs } from '../core/migrate_raw_docs';
import { MigrationLogger } from '../core/migration_logger';

export interface KibanaMigratorOptions {
client: ElasticsearchClient;
Expand Down Expand Up @@ -185,12 +184,7 @@ export class KibanaMigrator {
logger: this.log,
preMigrationScript: indexMap[index].script,
transformRawDocs: (rawDocs: SavedObjectsRawDoc[]) =>
migrateRawDocs(
this.serializer,
this.documentMigrator.migrateAndConvert,
rawDocs,
new MigrationLogger(this.log)
),
migrateRawDocs(this.serializer, this.documentMigrator.migrateAndConvert, rawDocs),
migrationVersionPerType: this.documentMigrator.migrationVersion,
indexPrefix: index,
migrationsConfig: this.soMigrationsConfig,
Expand Down
50 changes: 49 additions & 1 deletion src/core/server/saved_objects/migrationsv2/actions/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,54 @@ describe('actions', () => {
});
});

describe('openPit', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.openPit(client, 'my_index');
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});

describe('readWithPit', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.readWithPit(client, 'pitId', Option.none, 10_000);
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});

describe('closePit', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.closePit(client, 'pitId');
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});

describe('transformDocs', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.transformDocs(client, () => Promise.resolve([]), [], 'my_index', false);
try {
await task();
} catch (e) {
/** ignore */
}
expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});
});

describe('reindex', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.reindex(
Expand Down Expand Up @@ -205,7 +253,7 @@ describe('actions', () => {

describe('bulkOverwriteTransformedDocuments', () => {
it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
const task = Actions.bulkOverwriteTransformedDocuments(client, 'new_index', []);
const task = Actions.bulkOverwriteTransformedDocuments(client, 'new_index', [], 'wait_for');
try {
await task();
} catch (e) {
Expand Down
Loading

0 comments on commit e6ba8cc

Please sign in to comment.