diff --git a/src/Job.ts b/src/Job.ts index 207715c..9c7aa77 100644 --- a/src/Job.ts +++ b/src/Job.ts @@ -1,10 +1,10 @@ import * as date from 'date.js'; import * as debug from 'debug'; -import { JobPriority, parsePriority } from './utils/priority'; import type { Agenda } from './index'; -import { computeFromInterval, computeFromRepeatAt } from './utils/nextRunAt'; -import { IJobParameters } from './types/JobParameters'; +import type { IJobParameters } from './types/JobParameters'; import type { DefinitionProcessor } from './types/JobDefinition'; +import { JobPriority, parsePriority } from './utils/priority'; +import { computeFromInterval, computeFromRepeatAt } from './utils/nextRunAt'; const log = debug('agenda:job'); @@ -56,7 +56,7 @@ export class Job { }; } - toJson(): Partial { + toJson(): IJobParameters { const attrs = this.attrs || {}; const result = {}; @@ -74,8 +74,7 @@ export class Job { } }); - // console.log('toJson', this.attrs, result); - return result; + return result as IJobParameters; } repeatEvery( diff --git a/src/JobDbRepository.ts b/src/JobDbRepository.ts index aabbc9b..2f3caff 100644 --- a/src/JobDbRepository.ts +++ b/src/JobDbRepository.ts @@ -7,13 +7,14 @@ import { MongoClientOptions, UpdateQuery, ObjectId, - SortOptionObject + SortOptionObject, + FindOneAndUpdateOption } from 'mongodb'; import type { Job } from './Job'; -import { hasMongoProtocol } from './utils/hasMongoProtocol'; import type { Agenda } from './index'; -import { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions'; -import { IJobParameters } from './types/JobParameters'; +import type { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions'; +import type { IJobParameters } from './types/JobParameters'; +import { hasMongoProtocol } from './utils/hasMongoProtocol'; const log = debug('agenda:db'); @@ -81,7 +82,7 @@ export class JobDbRepository { async lockJob(job: Job): Promise { // Query to run against collection to see if we need to lock it - const criteria = { + const criteria: FilterQuery & { lockedAt?: Date | null }> = { _id: job.attrs._id, name: job.attrs.name, lockedAt: null, @@ -90,11 +91,16 @@ export class JobDbRepository { }; // Update / options for the MongoDB query - const update = { $set: { lockedAt: new Date() } }; - const options = { returnOriginal: false }; + const update: UpdateQuery = { $set: { lockedAt: new Date() } }; + const options: FindOneAndUpdateOption = { returnOriginal: false }; // Lock the job in MongoDB! - const resp = await this.collection.findOneAndUpdate(criteria, update, options); + const resp = await this.collection.findOneAndUpdate( + criteria as FilterQuery, + update, + options + ); + return resp?.value; } @@ -104,11 +110,13 @@ export class JobDbRepository { lockDeadline: Date, now: Date = new Date() ): Promise { - // /** - // * Query used to find job to run - // * @type {{$and: [*]}} - // */ - const JOB_PROCESS_WHERE_QUERY = { + /** + * Query used to find job to run + * @type {{$and: [*]}} + */ + const JOB_PROCESS_WHERE_QUERY: FilterQuery< + Omit & { lockedAt?: Date | null } + > = { $and: [ { name: jobName, @@ -132,13 +140,16 @@ export class JobDbRepository { * Query used to set a job as locked * @type {{$set: {lockedAt: Date}}} */ - const JOB_PROCESS_SET_QUERY = { $set: { lockedAt: now } }; + const JOB_PROCESS_SET_QUERY: UpdateQuery = { $set: { lockedAt: now } }; /** * Query used to affect what gets returned * @type {{returnOriginal: boolean, sort: object}} */ - const JOB_RETURN_QUERY = { returnOriginal: false, sort: this.connectOptions.sort }; + const JOB_RETURN_QUERY: FindOneAndUpdateOption = { + returnOriginal: false, + sort: this.connectOptions.sort + }; // Find ONE and ONLY ONE job and set the 'lockedAt' time so that job begins to be processed const result = await this.collection.findOneAndUpdate( @@ -208,7 +219,7 @@ export class JobDbRepository { private processDbResult( job: Job, - res: IJobParameters + res?: IJobParameters ): Job { log( 'processDbResult() called with success, checking whether to process job immediately or not' @@ -243,7 +254,6 @@ export class JobDbRepository { // Grab information needed to save job but that we don't want to persist in MongoDB const id = job.attrs._id; - // const { unique, uniqueOpts } = job.attrs; // Store job as JSON and remove props we don't want to store from object // _id, unique, uniqueOpts @@ -284,7 +294,7 @@ export class JobDbRepository { if (props.nextRunAt && props.nextRunAt <= now) { log('job has a scheduled nextRunAt time, protecting that field from upsert'); protect.nextRunAt = props.nextRunAt; - delete props.nextRunAt; + delete (props as Partial).nextRunAt; } // If we have things to protect, set them in MongoDB using $setOnInsert @@ -309,7 +319,7 @@ export class JobDbRepository { update, { upsert: true, - returnOriginal: false // same as new: true -> returns the final document + returnOriginal: false } ); log( @@ -330,8 +340,6 @@ export class JobDbRepository { update = { $setOnInsert: props }; } - // console.log('update', query, update, uniqueOpts); - // Use the 'unique' query object to find an existing job or create a new one log('calling findOneAndUpdate() with unique object as query: \n%O', query); const result = await this.collection.findOneAndUpdate(query, update, { diff --git a/src/JobProcessingQueue.ts b/src/JobProcessingQueue.ts index bc94f39..ae9f6e9 100644 --- a/src/JobProcessingQueue.ts +++ b/src/JobProcessingQueue.ts @@ -1,6 +1,6 @@ // eslint-disable-next-line prettier/prettier import type { Job } from './Job'; -import { IJobParameters } from './types/JobParameters'; +import type { IJobParameters } from './types/JobParameters'; import type { Agenda } from './index'; /** * @class diff --git a/src/JobProcessor.ts b/src/JobProcessor.ts index 599d45e..90195e6 100644 --- a/src/JobProcessor.ts +++ b/src/JobProcessor.ts @@ -1,10 +1,10 @@ import * as debug from 'debug'; -import { Job } from './Job'; -import { IAgendaJobStatus, IAgendaStatus } from './types/AgendaStatus'; -import { IJobDefinition } from './types/JobDefinition'; -import { JobProcessingQueue } from './JobProcessingQueue'; +import type { IAgendaJobStatus, IAgendaStatus } from './types/AgendaStatus'; +import type { IJobDefinition } from './types/JobDefinition'; import type { Agenda } from './index'; import type { IJobParameters } from './types/JobParameters'; +import { Job } from './Job'; +import { JobProcessingQueue } from './JobProcessingQueue'; const log = debug('agenda:jobProcessor'); diff --git a/src/index.ts b/src/index.ts index f0de99f..cc94f68 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,18 +1,18 @@ import { EventEmitter } from 'events'; import * as debug from 'debug'; -import { Db, FilterQuery, MongoClientOptions, SortOptionObject } from 'mongodb'; -import { Job } from './Job'; -import { JobProcessor } from './JobProcessor'; +import type { Db, FilterQuery, MongoClientOptions, SortOptionObject } from 'mongodb'; import type { IJobDefinition } from './types/JobDefinition'; -import { IAgendaConfig } from './types/AgendaConfig'; +import type { IAgendaConfig } from './types/AgendaConfig'; +import type { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions'; +import type { IAgendaStatus } from './types/AgendaStatus'; +import type { IJobParameters } from './types/JobParameters'; +import { Job } from './Job'; import { JobDbRepository } from './JobDbRepository'; -import { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions'; -import { filterUndefined } from './utils/filterUndefined'; import { JobPriority, parsePriority } from './utils/priority'; -import { IAgendaStatus } from './types/AgendaStatus'; -import { IJobParameters } from './types/JobParameters'; +import { JobProcessor } from './JobProcessor'; import { calculateProcessEvery } from './utils/processEvery'; +import { filterUndefined } from './utils/filterUndefined'; const log = debug('agenda'); diff --git a/src/types/AgendaStatus.ts b/src/types/AgendaStatus.ts index 0116302..ebddb05 100644 --- a/src/types/AgendaStatus.ts +++ b/src/types/AgendaStatus.ts @@ -1,4 +1,4 @@ -import { Job } from '../Job'; +import type { Job } from '../Job'; export interface IAgendaJobStatus { [name: string]: { running: number; locked: number }; diff --git a/src/types/DbOptions.ts b/src/types/DbOptions.ts index c3cefe3..3030978 100644 --- a/src/types/DbOptions.ts +++ b/src/types/DbOptions.ts @@ -1,5 +1,5 @@ -import { Db, MongoClientOptions, SortOptionObject } from 'mongodb'; -import { IJobParameters } from './JobParameters'; +import type { Db, MongoClientOptions, SortOptionObject } from 'mongodb'; +import type { IJobParameters } from './JobParameters'; export interface IDatabaseOptions { db: { diff --git a/src/types/JobDefinition.ts b/src/types/JobDefinition.ts index 4c5d61b..bea5ffb 100644 --- a/src/types/JobDefinition.ts +++ b/src/types/JobDefinition.ts @@ -1,4 +1,4 @@ -import { Job } from '../Job'; +import type { Job } from '../Job'; export interface IJobDefinition { /** max number of locked jobs of this kind */ @@ -7,13 +7,10 @@ export interface IJobDefinition { lockLifetime: number; /** Higher priority jobs will run first. */ priority?: number; - /** how many jobs of this kind can run in parallel/simultanously */ + /** how many jobs of this kind can run in parallel/simultanously per Agenda instance */ concurrency?: number; - // running: number; - // locked: number; - - fn: DefinitionProcessor void)>; + fn: DefinitionProcessor void)>; } export type DefinitionProcessor = ( diff --git a/src/types/JobParameters.ts b/src/types/JobParameters.ts index 9fa8e17..170cda5 100644 --- a/src/types/JobParameters.ts +++ b/src/types/JobParameters.ts @@ -8,11 +8,10 @@ export interface IJobParameters { priority: number; nextRunAt: Date | null; /** - * // once: the job is just queued in the database --> this does not really exists, it's just fallback * normal: job is queued and will be processed (regular case when the user adds a new job) * single: job with this name is only queued once, if there is an exisitn gentry in the database, the job is just updated, but not newly inserted (this is used for .every()) */ - type: /* 'once' | */ 'normal' | 'single'; + type: 'normal' | 'single'; lockedAt?: Date; lastFinishedAt?: Date; diff --git a/src/utils/priority.ts b/src/utils/priority.ts index 61fac28..eff4d08 100644 --- a/src/utils/priority.ts +++ b/src/utils/priority.ts @@ -1,11 +1,3 @@ -/** - * Internal method to turn priority into a number - * @name Job#priority - * @function - * @param {String|Number} priority string to parse into number - * @returns {Number} priority that was parsed - */ - export type JobPriority = number | keyof typeof priorityMap; const priorityMap = { @@ -16,6 +8,13 @@ const priorityMap = { highest: 20 }; +/** + * Internal method to turn priority into a number + * @name Job#priority + * @function + * @param {String|Number} priority string to parse into number + * @returns {Number} priority that was parsed + */ export function parsePriority(priority?: JobPriority): number { if (typeof priority === 'number') { return priority; diff --git a/test/job.test.ts b/test/job.test.ts index 4f90cc7..cdb16dc 100644 --- a/test/job.test.ts +++ b/test/job.test.ts @@ -1150,7 +1150,7 @@ describe('Job', () => { ]); expect(results).to.eql([10, 0, -10]); } catch (err) { - console.log('stats', JSON.stringify(await agenda.getRunningStats(), undefined, 3)); + // console.log('stats', JSON.stringify(await agenda.getRunningStats(), undefined, 3)); } }); diff --git a/test/retry.test.ts b/test/retry.test.ts index 372963f..682f269 100644 --- a/test/retry.test.ts +++ b/test/retry.test.ts @@ -16,7 +16,7 @@ const clearJobs = async (): Promise => { }; const jobType = 'do work'; -const jobProcessor = () => {}; +const jobProcessor = () => { }; describe('Retry', () => { beforeEach(async () => { @@ -79,5 +79,5 @@ describe('Retry', () => { await agenda.start(); await successPromise; - }); + }).timeout(100000); });