Skip to content

Commit

Permalink
[esArchiver] Automatically cleanup SO indices when SO documents are f…
Browse files Browse the repository at this point in the history
…ound in data.json (elastic#159582)

The ultimate goal of this PR is to lay the groundwork to be able to
remove the "dynamic" `mappings.json`, which probably should have never
existed.

With the PR, detecting SO documents in the `data.json` will
automatically trigger a cleanup of the SO indices.
This, in turn, will allow not having to define "dynamic" saved objects
indices (i.e. those with the `$KIBANA_PACKAGE_VERSION` variable in the
`mappings.json`).

IIUC the idea behind the dynamic indices was to have SO indices that are
aligned with the current stack version, avoiding the extra overhead of
having to migrate the inserted documents, and reducing overall test
times.

Nonetheless, what is happening today is:
1. FTR starts ES and Kibana.
2. Kibana creates current version SO indices at startup (empty ones).
3. `esArchiver.load()` processes the `mappings.json`.
3.1. It detects that we are defining SO indices and **deletes** existing
saved object indices.
3.2 It then re-creates these indices according to the definitions on
`mappings.json`.
4. `esArchiver.load()` processes the `data.json`. Specifically, it
inserts SO documents present in `data.json`.
5. `esArchiver.load()` calls the _KibanaMigrator_ to make sure that the
inserted documents are up-to-date, hoping they are already aligned with
current stack version (which is not always the case, not even with
"dynamic" mappings).

Two interesting things to note:
- Steps 3 to 5 happen whilst Kibana is already started and running. If
Kibana queries SO indices during `esArchiver.load()`, and a request to
ES is made **right after** 3.2, the result might be
elastic#158918.
- Having dynamic SO indices' definitions, deleting the "official"
indices created by Kibana (3.1), and recreating them hoping to be
aligned with current stack version (3.2) is non-sense. We could use the
existing SO indices instead, and simply clean them up whenever we are
about to insert SO documents.

Performing that cleanup is precisely the goal of this PR.
Then, in subsequent PRs like
https://github.com/elastic/kibana/pull/159397/files, tackling the flaky
tests, we'll be able to simply remove the "dynamic" `mappings.json`
definitions, causing `esArchiver` to rely on SO indices created by
Kibana.

Thanks to this PR, the FTR tests won't need to explicitly cleanup saved
object indices in the `before` hooks.
  • Loading branch information
gsoldevila authored Jun 19, 2023
1 parent 69f35b3 commit bbb5fc4
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@
* Side Public License, v 1.
*/

import type { deleteSavedObjectIndices } from './kibana_index';
import type { cleanSavedObjectIndices, deleteSavedObjectIndices } from './kibana_index';

export const mockdeleteSavedObjectIndices = jest.fn() as jest.MockedFunction<
export const mockCleanSavedObjectIndices = jest.fn() as jest.MockedFunction<
typeof cleanSavedObjectIndices
>;

export const mockDeleteSavedObjectIndices = jest.fn() as jest.MockedFunction<
typeof deleteSavedObjectIndices
>;

jest.mock('./kibana_index', () => ({
deleteSavedObjectIndices: mockdeleteSavedObjectIndices,
cleanSavedObjectIndices: mockCleanSavedObjectIndices,
deleteSavedObjectIndices: mockDeleteSavedObjectIndices,
}));
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
* Side Public License, v 1.
*/

import { mockdeleteSavedObjectIndices } from './create_index_stream.test.mock';
import {
mockCleanSavedObjectIndices,
mockDeleteSavedObjectIndices,
} from './create_index_stream.test.mock';

import sinon from 'sinon';
import Chance from 'chance';
Expand All @@ -28,7 +31,8 @@ const chance = new Chance();
const log = createStubLogger();

beforeEach(() => {
mockdeleteSavedObjectIndices.mockClear();
mockCleanSavedObjectIndices.mockClear();
mockDeleteSavedObjectIndices.mockClear();
});

describe('esArchiver: createCreateIndexStream()', () => {
Expand Down Expand Up @@ -199,25 +203,25 @@ describe('esArchiver: createCreateIndexStream()', () => {
it('does not delete Kibana indices for indexes that do not start with .kibana', async () => {
await doTest('.foo');

expect(mockdeleteSavedObjectIndices).not.toHaveBeenCalled();
expect(mockDeleteSavedObjectIndices).not.toHaveBeenCalled();
});

it('deletes Kibana indices at most once for indices that start with .kibana', async () => {
// If we are loading the main Kibana index, we should delete all Kibana indices for backwards compatibility reasons.
await doTest('.kibana_7.16.0_001', '.kibana_task_manager_7.16.0_001');

expect(mockdeleteSavedObjectIndices).toHaveBeenCalledTimes(1);
expect(mockdeleteSavedObjectIndices).toHaveBeenCalledWith(
expect.not.objectContaining({ onlyTaskManager: true })
expect(mockDeleteSavedObjectIndices).toHaveBeenCalledTimes(1);
expect(mockDeleteSavedObjectIndices).toHaveBeenCalledWith(
expect.not.objectContaining({ index: '.kibana_task_manager_7.16.0_001' })
);
});

it('deletes Kibana task manager index at most once, using onlyTaskManager: true', async () => {
it('deletes Kibana task manager index at most once', async () => {
// If we are loading the Kibana task manager index, we should only delete that index, not any other Kibana indices.
await doTest('.kibana_task_manager_7.16.0_001', '.kibana_task_manager_7.16.0_002');

expect(mockdeleteSavedObjectIndices).toHaveBeenCalledTimes(1);
expect(mockdeleteSavedObjectIndices).toHaveBeenCalledWith(
expect(mockDeleteSavedObjectIndices).toHaveBeenCalledTimes(1);
expect(mockDeleteSavedObjectIndices).toHaveBeenCalledWith(
expect.objectContaining({ onlyTaskManager: true })
);
});
Expand All @@ -227,18 +231,63 @@ describe('esArchiver: createCreateIndexStream()', () => {
// So, we first delete only the Kibana task manager indices, then we wind up deleting all Kibana indices.
await doTest('.kibana_task_manager_7.16.0_001', '.kibana_7.16.0_001');

expect(mockdeleteSavedObjectIndices).toHaveBeenCalledTimes(2);
expect(mockdeleteSavedObjectIndices).toHaveBeenNthCalledWith(
expect(mockDeleteSavedObjectIndices).toHaveBeenCalledTimes(2);
expect(mockDeleteSavedObjectIndices).toHaveBeenNthCalledWith(
1,
expect.objectContaining({ onlyTaskManager: true })
);
expect(mockdeleteSavedObjectIndices).toHaveBeenNthCalledWith(
expect(mockDeleteSavedObjectIndices).toHaveBeenNthCalledWith(
2,
expect.not.objectContaining({ onlyTaskManager: true })
expect.not.objectContaining({ index: expect.any(String) })
);
});
});

describe('saved object cleanup', () => {
describe('when saved object documents are found', () => {
it('cleans the corresponding saved object indices', async () => {
const client = createStubClient();
const stats = createStubStats();
await createPromiseFromStreams([
createListStream([
createStubDocRecord('.kibana_task_manager', 1),
createStubDocRecord('.kibana_alerting_cases', 2),
createStubDocRecord('.kibana', 3),
]),
createCreateIndexStream({ client, stats, log }),
]);

expect(mockCleanSavedObjectIndices).toHaveBeenCalledTimes(2);

expect(mockCleanSavedObjectIndices).toHaveBeenNthCalledWith(
1,
expect.objectContaining({ index: '.kibana_task_manager' })
);
expect(mockCleanSavedObjectIndices).toHaveBeenNthCalledWith(
2,
expect.not.objectContaining({ index: expect.any(String) })
);
});
});

describe('when saved object documents are not found', () => {
it('does not clean any indices', async () => {
const client = createStubClient();
const stats = createStubStats();
await createPromiseFromStreams([
createListStream([
createStubDocRecord('.foo', 1),
createStubDocRecord('.bar', 2),
createStubDocRecord('.baz', 3),
]),
createCreateIndexStream({ client, stats, log }),
]);

expect(mockCleanSavedObjectIndices).not.toHaveBeenCalled();
});
});
});

describe('docsOnly = true', () => {
it('passes through "hit" records without attempting to create indices', async () => {
const client = createStubClient();
Expand Down
40 changes: 34 additions & 6 deletions packages/kbn-es-archiver/src/lib/indices/create_index_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
TASK_MANAGER_SAVED_OBJECT_INDEX,
} from '@kbn/core-saved-objects-server';
import { Stats } from '../stats';
import { deleteSavedObjectIndices } from './kibana_index';
import { cleanSavedObjectIndices, deleteSavedObjectIndices } from './kibana_index';
import { deleteIndex } from './delete_index';
import { deleteDataStream } from './delete_data_stream';
import { ES_CLIENT_HEADERS } from '../../client_headers';
Expand Down Expand Up @@ -50,14 +50,36 @@ export function createCreateIndexStream({
// If we're trying to import Kibana index docs, we need to ensure that
// previous indices are removed so we're starting w/ a clean slate for
// migrations. This only needs to be done once per archive load operation.
let kibanaIndexAlreadyDeleted = false;
let kibanaIndicesAlreadyDeleted = false;
let kibanaTaskManagerIndexAlreadyDeleted = false;

// if we detect saved object documents defined in the data.json, we will cleanup their indices
let kibanaIndicesAlreadyCleaned = false;
let kibanaTaskManagerIndexAlreadyCleaned = false;

async function handleDoc(stream: Readable, record: DocRecord) {
if (skipDocsFromIndices.has(record.value.index)) {
const index = record.value.index;

if (skipDocsFromIndices.has(index)) {
return;
}

if (!skipExisting) {
if (index?.startsWith(TASK_MANAGER_SAVED_OBJECT_INDEX)) {
if (!kibanaTaskManagerIndexAlreadyDeleted && !kibanaTaskManagerIndexAlreadyCleaned) {
await cleanSavedObjectIndices({ client, stats, log, index });
kibanaTaskManagerIndexAlreadyCleaned = true;
log.debug(`Cleaned saved object index [${index}]`);
}
} else if (index?.startsWith(MAIN_SAVED_OBJECT_INDEX)) {
if (!kibanaIndicesAlreadyDeleted && !kibanaIndicesAlreadyCleaned) {
await cleanSavedObjectIndices({ client, stats, log });
kibanaIndicesAlreadyCleaned = kibanaTaskManagerIndexAlreadyCleaned = true;
log.debug(`Cleaned all saved object indices`);
}
}
}

stream.push(record);
}

Expand Down Expand Up @@ -109,12 +131,14 @@ export function createCreateIndexStream({

async function attemptToCreate(attemptNumber = 1) {
try {
if (isKibana && !kibanaIndexAlreadyDeleted) {
if (isKibana && !kibanaIndicesAlreadyDeleted) {
await deleteSavedObjectIndices({ client, stats, log }); // delete all .kibana* indices
kibanaIndexAlreadyDeleted = kibanaTaskManagerIndexAlreadyDeleted = true;
kibanaIndicesAlreadyDeleted = kibanaTaskManagerIndexAlreadyDeleted = true;
log.debug(`Deleted all saved object indices`);
} else if (isKibanaTaskManager && !kibanaTaskManagerIndexAlreadyDeleted) {
await deleteSavedObjectIndices({ client, stats, onlyTaskManager: true, log }); // delete only .kibana_task_manager* indices
kibanaTaskManagerIndexAlreadyDeleted = true;
log.debug(`Deleted saved object index [${index}]`);
}

await client.indices.create(
Expand All @@ -137,7 +161,11 @@ export function createCreateIndexStream({
err?.body?.error?.reason?.includes('index exists with the same name as the alias') &&
attemptNumber < 3
) {
kibanaIndexAlreadyDeleted = false;
kibanaTaskManagerIndexAlreadyDeleted = false;
if (isKibana) {
kibanaIndicesAlreadyDeleted = false;
}

const aliasStr = inspect(aliases);
log.info(
`failed to create aliases [${aliasStr}] because ES indicated an index/alias already exists, trying again`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
* Side Public License, v 1.
*/

import { mockCleanSavedObjectIndices } from './create_index_stream.test.mock';

import sinon from 'sinon';

import { createListStream, createPromiseFromStreams } from '@kbn/utils';
Expand All @@ -22,6 +24,10 @@ import {

const log = createStubLogger();

beforeEach(() => {
mockCleanSavedObjectIndices.mockClear();
});

describe('esArchiver: createDeleteIndexStream()', () => {
it('deletes the index without checking if it exists', async () => {
const stats = createStubStats();
Expand Down
23 changes: 20 additions & 3 deletions packages/kbn-es-archiver/src/lib/indices/delete_index_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,20 @@ import { Transform } from 'stream';
import type { Client } from '@elastic/elasticsearch';
import { ToolingLog } from '@kbn/tooling-log';

import { MAIN_SAVED_OBJECT_INDEX } from '@kbn/core-saved-objects-server';
import {
MAIN_SAVED_OBJECT_INDEX,
TASK_MANAGER_SAVED_OBJECT_INDEX,
} from '@kbn/core-saved-objects-server';
import { Stats } from '../stats';
import { deleteIndex } from './delete_index';
import { cleanSavedObjectIndices } from './kibana_index';
import { deleteDataStream } from './delete_data_stream';

export function createDeleteIndexStream(client: Client, stats: Stats, log: ToolingLog) {
// if we detect saved object documents defined in the data.json, we will cleanup their indices
let kibanaIndicesAlreadyCleaned = false;
let kibanaTaskManagerIndexAlreadyCleaned = false;

return new Transform({
readableObjectMode: true,
writableObjectMode: true,
Expand All @@ -29,8 +36,18 @@ export function createDeleteIndexStream(client: Client, stats: Stats, log: Tooli
if (record.type === 'index') {
const { index } = record.value;

if (index.startsWith(MAIN_SAVED_OBJECT_INDEX)) {
await cleanSavedObjectIndices({ client, stats, log });
if (index.startsWith(TASK_MANAGER_SAVED_OBJECT_INDEX)) {
if (!kibanaTaskManagerIndexAlreadyCleaned) {
await cleanSavedObjectIndices({ client, stats, index, log });
kibanaTaskManagerIndexAlreadyCleaned = true;
log.debug(`Cleaned saved object index [${index}]`);
}
} else if (index.startsWith(MAIN_SAVED_OBJECT_INDEX)) {
if (!kibanaIndicesAlreadyCleaned) {
await cleanSavedObjectIndices({ client, stats, log });
kibanaIndicesAlreadyCleaned = kibanaTaskManagerIndexAlreadyCleaned = true;
log.debug(`Cleaned all saved object indices`);
}
} else {
await deleteIndex({ client, stats, log, index });
}
Expand Down
5 changes: 3 additions & 2 deletions packages/kbn-es-archiver/src/lib/indices/kibana_index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ export async function deleteSavedObjectIndices({
headers: ES_CLIENT_HEADERS,
}
);

await deleteIndex({
client,
stats,
Expand Down Expand Up @@ -111,15 +110,17 @@ export async function cleanSavedObjectIndices({
client,
stats,
log,
index = ALL_SAVED_OBJECT_INDICES,
}: {
client: Client;
stats: Stats;
log: ToolingLog;
index?: string | string[];
}) {
while (true) {
const resp = await client.deleteByQuery(
{
index: ALL_SAVED_OBJECT_INDICES,
index,
body: {
query: {
bool: {
Expand Down

0 comments on commit bbb5fc4

Please sign in to comment.