-
Notifications
You must be signed in to change notification settings - Fork 8.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Reporting] ReportingStore module #69426
Changes from all commits
fae5713
b9b14e5
82a8525
6cc3f38
6b3b93a
ef79c04
29bf6a7
6a6b5cf
3a7082e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,17 +8,16 @@ import { ReportingCore } from '../core'; | |
import { JobSource, TaskRunResult } from '../types'; | ||
import { createTaggedLogger } from './create_tagged_logger'; // TODO remove createTaggedLogger once esqueue is removed | ||
import { createWorkerFactory } from './create_worker'; | ||
import { Job } from './enqueue_job'; | ||
// @ts-ignore | ||
import { Esqueue } from './esqueue'; | ||
import { LevelLogger } from './level_logger'; | ||
import { ReportingStore } from './store'; | ||
|
||
interface ESQueueWorker { | ||
on: (event: string, handler: any) => void; | ||
} | ||
|
||
export interface ESQueueInstance { | ||
addJob: (type: string, payload: unknown, options: object) => Job; | ||
registerWorker: <JobParamsType>( | ||
pluginId: string, | ||
workerFn: GenericWorkerFn<JobParamsType>, | ||
|
@@ -37,26 +36,25 @@ type GenericWorkerFn<JobParamsType> = ( | |
...workerRestArgs: any[] | ||
) => void | Promise<TaskRunResult>; | ||
|
||
export async function createQueueFactory<JobParamsType, JobPayloadType>( | ||
export async function createQueueFactory( | ||
reporting: ReportingCore, | ||
store: ReportingStore, | ||
logger: LevelLogger | ||
): Promise<ESQueueInstance> { | ||
const config = reporting.getConfig(); | ||
const queueIndexInterval = config.get('queue', 'indexInterval'); | ||
|
||
// esqueue-related | ||
const queueTimeout = config.get('queue', 'timeout'); | ||
const queueIndex = config.get('index'); | ||
const isPollingEnabled = config.get('queue', 'pollEnabled'); | ||
|
||
const elasticsearch = await reporting.getElasticsearchService(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not seeing if this changed from sync to async here. I'm assuming so... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method was never async, so this is a little cleanup. It's something that doesn't get warned about. |
||
const elasticsearch = reporting.getElasticsearchService(); | ||
const queueOptions = { | ||
interval: queueIndexInterval, | ||
timeout: queueTimeout, | ||
dateSeparator: '.', | ||
client: elasticsearch.legacy.client, | ||
logger: createTaggedLogger(logger, ['esqueue', 'queue-worker']), | ||
}; | ||
|
||
const queue: ESQueueInstance = new Esqueue(queueIndex, queueOptions); | ||
const queue: ESQueueInstance = new Esqueue(store, queueOptions); | ||
|
||
if (isPollingEnabled) { | ||
// create workers to poll the index for idle jobs waiting to be claimed and executed | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,39 +4,24 @@ | |
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
import { EventEmitter } from 'events'; | ||
import { KibanaRequest, RequestHandlerContext } from 'src/core/server'; | ||
import { AuthenticatedUser } from '../../../security/server'; | ||
import { ESQueueCreateJobFn } from '../../server/types'; | ||
import { ReportingCore } from '../core'; | ||
// @ts-ignore | ||
import { events as esqueueEvents } from './esqueue'; | ||
import { LevelLogger } from './level_logger'; | ||
import { LevelLogger } from './'; | ||
import { ReportingStore, Report } from './store'; | ||
|
||
interface ConfirmedJob { | ||
id: string; | ||
index: string; | ||
_seq_no: number; | ||
_primary_term: number; | ||
} | ||
|
||
export type Job = EventEmitter & { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This type is no longer needed: replaced with |
||
id: string; | ||
toJSON: () => { | ||
id: string; | ||
}; | ||
}; | ||
|
||
export type EnqueueJobFn = <JobParamsType>( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handling |
||
export type EnqueueJobFn = ( | ||
exportTypeId: string, | ||
jobParams: JobParamsType, | ||
jobParams: unknown, | ||
user: AuthenticatedUser | null, | ||
context: RequestHandlerContext, | ||
request: KibanaRequest | ||
) => Promise<Job>; | ||
) => Promise<Report>; | ||
|
||
export function enqueueJobFactory( | ||
reporting: ReportingCore, | ||
store: ReportingStore, | ||
parentLogger: LevelLogger | ||
): EnqueueJobFn { | ||
const config = reporting.getConfig(); | ||
|
@@ -45,16 +30,16 @@ export function enqueueJobFactory( | |
const maxAttempts = config.get('capture', 'maxAttempts'); | ||
const logger = parentLogger.clone(['queue-job']); | ||
|
||
return async function enqueueJob<JobParamsType>( | ||
return async function enqueueJob( | ||
exportTypeId: string, | ||
jobParams: JobParamsType, | ||
jobParams: unknown, | ||
user: AuthenticatedUser | null, | ||
context: RequestHandlerContext, | ||
request: KibanaRequest | ||
): Promise<Job> { | ||
type ScheduleTaskFnType = ESQueueCreateJobFn<JobParamsType>; | ||
) { | ||
type ScheduleTaskFnType = ESQueueCreateJobFn<unknown>; | ||
|
||
const username = user ? user.username : false; | ||
const esqueue = await reporting.getEsqueue(); | ||
const exportType = reporting.getExportTypesRegistry().getById(exportTypeId); | ||
|
||
if (exportType == null) { | ||
|
@@ -71,16 +56,6 @@ export function enqueueJobFactory( | |
max_attempts: maxAttempts, | ||
}; | ||
|
||
return new Promise((resolve, reject) => { | ||
const job = esqueue.addJob(exportType.jobType, payload, options); | ||
|
||
job.on(esqueueEvents.EVENT_JOB_CREATED, (createdJob: ConfirmedJob) => { | ||
if (createdJob.id === job.id) { | ||
logger.info(`Successfully queued job: ${createdJob.id}`); | ||
resolve(job); | ||
} | ||
}); | ||
job.on(esqueueEvents.EVENT_JOB_CREATE_ERROR, reject); | ||
}); | ||
return await store.addReport(exportType.jobType, payload, options); | ||
}; | ||
} |
This file was deleted.
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These generics were not used, so I removed them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pray I don't alter it further