Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.12] Migrations v2 ignore fleet agent events (#96690) and non-persisted sessions (#96938) #96892

Merged
merged 9 commits into from
Apr 13, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -436,11 +436,34 @@ describe('ElasticIndex', () => {
size: 100,
query: {
bool: {
must_not: {
term: {
type: 'fleet-agent-events',
must_not: [
{
term: {
type: 'fleet-agent-events',
},
},
},
{
term: {
type: 'tsvb-validation-telemetry',
},
},
{
bool: {
must: [
{
match: {
type: 'search-session',
},
},
{
match: {
'search-session.persisted': false,
},
},
],
},
},
],
},
},
},
Expand Down
56 changes: 41 additions & 15 deletions src/core/server/saved_objects/migrations/core/elastic_index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,46 @@ export interface FullIndexInfo {
mappings: IndexMapping;
}

// When migrating from the outdated index we use a read query which excludes
// saved objects which are no longer used. These saved objects will still be
// kept in the outdated index for backup purposes, but won't be availble in
// the upgraded index.
export const excludeUnusedTypesQuery = {
bool: {
must_not: [
// https://github.com/elastic/kibana/issues/91869
{
term: {
type: 'fleet-agent-events',
},
},
// https://github.com/elastic/kibana/issues/95617
{
term: {
type: 'tsvb-validation-telemetry',
},
},
// https://github.com/elastic/kibana/issues/96131
{
bool: {
must: [
{
match: {
type: 'search-session',
},
},
{
match: {
'search-session.persisted': false,
},
},
],
},
},
],
},
};

/**
* A slight enhancement to indices.get, that adds indexName, and validates that the
* index mappings are somewhat what we expect.
Expand Down Expand Up @@ -67,20 +107,6 @@ export function reader(
const scroll = scrollDuration;
let scrollId: string | undefined;

// When migrating from the outdated index we use a read query which excludes
// saved objects which are no longer used. These saved objects will still be
// kept in the outdated index for backup purposes, but won't be availble in
// the upgraded index.
const excludeUnusedTypes = {
bool: {
must_not: {
term: {
type: 'fleet-agent-events', // https://github.com/elastic/kibana/issues/91869
},
},
},
};

const nextBatch = () =>
scrollId !== undefined
? client.scroll<SearchResponse<SavedObjectsRawDocSource>>({
Expand All @@ -90,7 +116,7 @@ export function reader(
: client.search<SearchResponse<SavedObjectsRawDocSource>>({
body: {
size: batchSize,
query: excludeUnusedTypes,
query: excludeUnusedTypesQuery,
},
index,
scroll,
Expand Down
1 change: 1 addition & 0 deletions src/core/server/saved_objects/migrations/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ export { CallCluster } from './call_cluster';
export { LogFn, SavedObjectsMigrationLogger } from './migration_logger';
export { MigrationResult, MigrationStatus } from './migration_coordinator';
export { createMigrationEsClient, MigrationEsClient } from './migration_es_client';
export { excludeUnusedTypesQuery } from './elastic_index';
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ describe('KibanaMigrator', () => {
options.client.tasks.get.mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({
completed: true,
error: { type: 'elatsicsearch_exception', reason: 'task failed with an error' },
error: { type: 'elasticsearch_exception', reason: 'task failed with an error' },
failures: [],
task: { description: 'task description' },
})
Expand All @@ -331,11 +331,11 @@ describe('KibanaMigrator', () => {
migrator.prepareMigrations();
await expect(migrator.runMigrations()).rejects.toMatchInlineSnapshot(`
[Error: Unable to complete saved object migrations for the [.my-index] index. Error: Reindex failed with the following error:
{"_tag":"Some","value":{"type":"elatsicsearch_exception","reason":"task failed with an error"}}]
{"_tag":"Some","value":{"type":"elasticsearch_exception","reason":"task failed with an error"}}]
`);
expect(loggingSystemMock.collect(options.logger).error[0][0]).toMatchInlineSnapshot(`
[Error: Reindex failed with the following error:
{"_tag":"Some","value":{"type":"elatsicsearch_exception","reason":"task failed with an error"}}]
{"_tag":"Some","value":{"type":"elasticsearch_exception","reason":"task failed with an error"}}]
`);
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ describe('actions', () => {
'my_source_index',
'my_target_index',
Option.none,
false
false,
Option.none
);
try {
await task();
Expand Down
12 changes: 11 additions & 1 deletion src/core/server/saved_objects/migrationsv2/actions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,12 @@ export const reindex = (
sourceIndex: string,
targetIndex: string,
reindexScript: Option.Option<string>,
requireAlias: boolean
requireAlias: boolean,
/* When reindexing we use a source query to exclude saved objects types which
* are no longer used. These saved objects will still be kept in the outdated
* index for backup purposes, but won't be available in the upgraded index.
*/
unusedTypesQuery: Option.Option<any>
): TaskEither.TaskEither<RetryableEsClientError, ReindexResponse> => () => {
return client
.reindex({
Expand All @@ -451,6 +456,11 @@ export const reindex = (
index: sourceIndex,
// Set reindex batch size
size: BATCH_SIZE,
// Exclude saved object types
query: Option.fold<any, any | undefined>(
() => undefined,
(query) => query
)(unusedTypesQuery),
},
dest: {
index: targetIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ describe('migration actions', () => {
{ _source: { title: 'doc 1' } },
{ _source: { title: 'doc 2' } },
{ _source: { title: 'doc 3' } },
{ _source: { title: 'saved object 4' } },
{ _source: { title: 'saved object 4', type: 'another_unused_type' } },
{ _source: { title: 'f-agent-event 5', type: 'f_agent_event' } },
] as unknown) as SavedObjectsRawDoc[];
await bulkOverwriteTransformedDocuments(client, 'existing_index_with_docs', sourceDocs)();

Expand Down Expand Up @@ -344,7 +345,8 @@ describe('migration actions', () => {
'existing_index_with_docs',
'reindex_target',
Option.none,
false
false,
Option.none
)()) as Either.Right<ReindexResponse>;
const task = waitForReindexTask(client, res.right.taskId, '10s');
await expect(task()).resolves.toMatchInlineSnapshot(`
Expand All @@ -365,6 +367,43 @@ describe('migration actions', () => {
"doc 2",
"doc 3",
"saved object 4",
"f-agent-event 5",
]
`);
});
it('resolves right and excludes all documents not matching the unusedTypesQuery', async () => {
const res = (await reindex(
client,
'existing_index_with_docs',
'reindex_target_excluded_docs',
Option.none,
false,
Option.of({
bool: {
must_not: ['f_agent_event', 'another_unused_type'].map((type) => ({
term: { type },
})),
},
})
)()) as Either.Right<ReindexResponse>;
const task = waitForReindexTask(client, res.right.taskId, '10s');
await expect(task()).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Right",
"right": "reindex_succeeded",
}
`);

const results = ((await searchForOutdatedDocuments(
client,
'reindex_target_excluded_docs',
undefined as any
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(results.map((doc) => doc._source.title)).toMatchInlineSnapshot(`
Array [
"doc 1",
"doc 2",
"doc 3",
]
`);
});
Expand All @@ -375,7 +414,8 @@ describe('migration actions', () => {
'existing_index_with_docs',
'reindex_target_2',
Option.some(`ctx._source.title = ctx._source.title + '_updated'`),
false
false,
Option.none
)()) as Either.Right<ReindexResponse>;
const task = waitForReindexTask(client, res.right.taskId, '10s');
await expect(task()).resolves.toMatchInlineSnapshot(`
Expand All @@ -395,6 +435,7 @@ describe('migration actions', () => {
"doc 2_updated",
"doc 3_updated",
"saved object 4_updated",
"f-agent-event 5_updated",
]
`);
});
Expand All @@ -406,7 +447,8 @@ describe('migration actions', () => {
'existing_index_with_docs',
'reindex_target_3',
Option.some(`ctx._source.title = ctx._source.title + '_updated'`),
false
false,
Option.none
)()) as Either.Right<ReindexResponse>;
let task = waitForReindexTask(client, res.right.taskId, '10s');
await expect(task()).resolves.toMatchInlineSnapshot(`
Expand All @@ -422,7 +464,8 @@ describe('migration actions', () => {
'existing_index_with_docs',
'reindex_target_3',
Option.none,
false
false,
Option.none
)()) as Either.Right<ReindexResponse>;
task = waitForReindexTask(client, res.right.taskId, '10s');
await expect(task()).resolves.toMatchInlineSnapshot(`
Expand All @@ -444,6 +487,7 @@ describe('migration actions', () => {
"doc 2_updated",
"doc 3_updated",
"saved object 4_updated",
"f-agent-event 5_updated",
]
`);
});
Expand All @@ -470,7 +514,8 @@ describe('migration actions', () => {
'existing_index_with_docs',
'reindex_target_4',
Option.some(`ctx._source.title = ctx._source.title + '_updated'`),
false
false,
Option.none
)()) as Either.Right<ReindexResponse>;
const task = waitForReindexTask(client, res.right.taskId, '10s');
await expect(task()).resolves.toMatchInlineSnapshot(`
Expand All @@ -492,6 +537,7 @@ describe('migration actions', () => {
"doc 2",
"doc 3_updated",
"saved object 4_updated",
"f-agent-event 5_updated",
]
`);
});
Expand All @@ -518,7 +564,8 @@ describe('migration actions', () => {
'existing_index_with_docs',
'reindex_target_5',
Option.none,
false
false,
Option.none
)()) as Either.Right<ReindexResponse>;
const task = waitForReindexTask(client, reindexTaskId, '10s');

Expand Down Expand Up @@ -552,7 +599,8 @@ describe('migration actions', () => {
'existing_index_with_docs',
'reindex_target_6',
Option.none,
false
false,
Option.none
)()) as Either.Right<ReindexResponse>;
const task = waitForReindexTask(client, reindexTaskId, '10s');

Expand All @@ -572,7 +620,8 @@ describe('migration actions', () => {
'no_such_index',
'reindex_target',
Option.none,
false
false,
Option.none
)()) as Either.Right<ReindexResponse>;
const task = waitForReindexTask(client, res.right.taskId, '10s');
await expect(task()).resolves.toMatchInlineSnapshot(`
Expand All @@ -592,7 +641,8 @@ describe('migration actions', () => {
'existing_index_with_docs',
'existing_index_with_write_block',
Option.none,
false
false,
Option.none
)()) as Either.Right<ReindexResponse>;

const task = waitForReindexTask(client, res.right.taskId, '10s');
Expand All @@ -613,7 +663,8 @@ describe('migration actions', () => {
'existing_index_with_docs',
'existing_index_with_write_block',
Option.none,
true
true,
Option.none
)()) as Either.Right<ReindexResponse>;

const task = waitForReindexTask(client, res.right.taskId, '10s');
Expand All @@ -634,7 +685,8 @@ describe('migration actions', () => {
'existing_index_with_docs',
'reindex_target',
Option.none,
false
false,
Option.none
)()) as Either.Right<ReindexResponse>;

const task = waitForReindexTask(client, res.right.taskId, '0s');
Expand All @@ -660,7 +712,8 @@ describe('migration actions', () => {
'existing_index_with_docs',
'reindex_target_7',
Option.none,
false
false,
Option.none
)()) as Either.Right<ReindexResponse>;
await waitForReindexTask(client, res.right.taskId, '10s')();

Expand Down Expand Up @@ -715,7 +768,7 @@ describe('migration actions', () => {
'existing_index_with_docs',
undefined as any
)()) as Either.Right<SearchResponse>).right.outdatedDocuments;
expect(resultsWithoutQuery.length).toBe(4);
expect(resultsWithoutQuery.length).toBe(5);
});
it('resolves with _id, _source, _seq_no and _primary_term', async () => {
expect.assertions(1);
Expand Down
Loading