forked from elastic/kibana
-
Notifications
You must be signed in to change notification settings - Fork 0
/
load.ts
118 lines (104 loc) · 3.51 KB
/
load.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { resolve, relative } from 'path';
import { createReadStream } from 'fs';
import { Readable } from 'stream';
import { ToolingLog, REPO_ROOT } from '@kbn/dev-utils';
import { KbnClient } from '@kbn/test';
import type { KibanaClient } from '@elastic/elasticsearch/api/kibana';
import { createPromiseFromStreams, concatStreamProviders } from '@kbn/utils';
import { ES_CLIENT_HEADERS } from '../client_headers';
import {
isGzip,
createStats,
prioritizeMappings,
readDirectory,
createParseArchiveStreams,
createCreateIndexStream,
createIndexDocRecordsStream,
migrateSavedObjectIndex,
Progress,
createDefaultSpace,
} from '../lib';
// pipe a series of streams into each other so that data and errors
// flow from the first stream to the last. Errors from the last stream
// are not listened for
const pipeline = (...streams: Readable[]) =>
streams.reduce((source, dest) =>
source.once('error', (error) => dest.destroy(error)).pipe(dest as any)
);
export async function loadAction({
inputDir,
skipExisting,
useCreate,
docsOnly,
client,
log,
kbnClient,
}: {
inputDir: string;
skipExisting: boolean;
useCreate: boolean;
docsOnly?: boolean;
client: KibanaClient;
log: ToolingLog;
kbnClient: KbnClient;
}) {
const name = relative(REPO_ROOT, inputDir);
const stats = createStats(name, log);
const files = prioritizeMappings(await readDirectory(inputDir));
const kibanaPluginIds = await kbnClient.plugins.getEnabledIds();
// a single stream that emits records from all archive files, in
// order, so that createIndexStream can track the state of indexes
// across archives and properly skip docs from existing indexes
const recordStream = concatStreamProviders(
files.map((filename) => () => {
log.info('[%s] Loading %j', name, filename);
return pipeline(
createReadStream(resolve(inputDir, filename)),
...createParseArchiveStreams({ gzip: isGzip(filename) })
);
}),
{ objectMode: true }
);
const progress = new Progress();
progress.activate(log);
await createPromiseFromStreams([
recordStream,
createCreateIndexStream({ client, stats, skipExisting, docsOnly, log }),
createIndexDocRecordsStream(client, stats, progress, useCreate),
]);
progress.deactivate();
const result = stats.toJSON();
const indicesWithDocs: string[] = [];
for (const [index, { docs }] of Object.entries(result)) {
if (docs && docs.indexed > 0) {
log.info('[%s] Indexed %d docs into %j', name, docs.indexed, index);
indicesWithDocs.push(index);
}
}
await client.indices.refresh(
{
index: indicesWithDocs.join(','),
allow_no_indices: true,
},
{
headers: ES_CLIENT_HEADERS,
}
);
// If we affected the Kibana index, we need to ensure it's migrated...
if (Object.keys(result).some((k) => k.startsWith('.kibana'))) {
await migrateSavedObjectIndex({ kbnClient });
log.debug('[%s] Migrated Kibana index after loading Kibana data', name);
if (kibanaPluginIds.includes('spaces')) {
await createDefaultSpace({ client, index: '.kibana' });
log.debug('[%s] Ensured that default space exists in .kibana', name);
}
}
return result;
}