From b13d054889638e218a2706f05512340e764c395b Mon Sep 17 00:00:00 2001 From: Uzlopak Date: Tue, 20 Oct 2020 19:29:16 +0200 Subject: [PATCH] fix: more typings, minor functionality changes (#2) * add more typings and streamline functions, remoev unnused file * use IJobParameters instead of Job, fix unit test Co-authored-by: Aras Abbasi --- src/Job.ts | 39 +++++++---------- src/JobDbRepository.ts | 37 ++++++++++------ src/JobProcessingQueue.ts | 10 ++--- src/JobProcessor.ts | 26 +++++------ src/index.ts | 51 ++++++++++++---------- src/types/AgendaStatus.ts | 18 ++++++++ src/types/DbOptions.ts | 5 ++- src/types/JobParameters.ts | 4 +- src/utils/date.ts | 4 +- src/utils/filterUndefined.ts | 3 ++ src/utils/hasMongoProtocol.ts | 3 ++ src/utils/helpers.ts | 3 -- src/utils/mongodb.ts | 3 -- src/utils/nextRunAt.ts | 4 +- src/utils/priority.ts | 30 ++++++++----- test/agenda.test.ts | 21 +++++---- test/fixtures/add-tests.ts | 2 +- test/jobprocessor.test.ts | 18 ++++---- test/retry.test.ts | 5 ++- test/retry.ts | 82 ----------------------------------- 20 files changed, 158 insertions(+), 210 deletions(-) create mode 100644 src/types/AgendaStatus.ts create mode 100644 src/utils/filterUndefined.ts create mode 100644 src/utils/hasMongoProtocol.ts delete mode 100644 src/utils/helpers.ts delete mode 100644 src/utils/mongodb.ts delete mode 100644 test/retry.ts diff --git a/src/Job.ts b/src/Job.ts index 826046c..d5f99fb 100644 --- a/src/Job.ts +++ b/src/Job.ts @@ -1,6 +1,6 @@ import * as date from 'date.js'; import * as debug from 'debug'; -import { parsePriority } from './utils/priority'; +import { JobPriority, parsePriority } from './utils/priority'; import type { Agenda } from './index'; import { computeFromInterval, computeFromRepeatAt } from './utils/nextRunAt'; import { IJobParameters } from './types/JobParameters'; @@ -46,18 +46,13 @@ export class Job { data: any; } ) { - // Remove special args - - // Process args - args.priority = parsePriority(args.priority) || 0; - // Set attrs to args this.attrs = { ...args, // Set defaults if undefined - priority: args.priority || 0, + priority: parsePriority(args.priority), nextRunAt: args.nextRunAt || new Date(), - type: args.type // || 'once' + type: args.type }; } @@ -86,7 +81,7 @@ export class Job { repeatEvery( interval: string | number, options: { timezone?: string; skipImmediate?: boolean } = {} - ) { + ): this { this.attrs.repeatInterval = interval; this.attrs.repeatTimezone = options.timezone; if (options.skipImmediate) { @@ -101,28 +96,28 @@ export class Job { return this; } - repeatAt(time) { + repeatAt(time: string): this { this.attrs.repeatAt = time; return this; } - disable() { + disable(): this { this.attrs.disabled = true; return this; } - enable() { + enable(): this { this.attrs.disabled = false; return this; } - unique(unique: IJobParameters['unique'], opts?: IJobParameters['uniqueOpts']) { + unique(unique: IJobParameters['unique'], opts?: IJobParameters['uniqueOpts']): this { this.attrs.unique = unique; this.attrs.uniqueOpts = opts; return this; } - schedule(time) { + schedule(time: string | Date): this { const d = new Date(time); this.attrs.nextRunAt = Number.isNaN(d.getTime()) ? date(time) : d; @@ -135,17 +130,13 @@ export class Job { * @param {String} priority priority of when job should be queued * @returns {exports} instance of Job */ - priority(priority: 'lowest' | 'low' | 'normal' | 'high' | 'highest' | number) { + priority(priority: JobPriority): this { this.attrs.priority = parsePriority(priority); return this; } - fail(reason: Error | string) { - if (reason instanceof Error) { - reason = reason.message; - } - - this.attrs.failReason = reason; + fail(reason: Error | string): this { + this.attrs.failReason = reason instanceof Error ? reason.message : reason; this.attrs.failCount = (this.attrs.failCount || 0) + 1; const now = new Date(); this.attrs.failedAt = now; @@ -159,7 +150,7 @@ export class Job { return this; } - isRunning() { + isRunning(): boolean { if (!this.attrs.lastRunAt) { return false; } @@ -178,7 +169,7 @@ export class Job { return false; } - save() { + async save(): Promise { return this.agenda.db.saveJob(this); } @@ -237,7 +228,7 @@ export class Job { return this; } - async run() { + async run(): Promise { const definition = this.agenda.definitions[this.attrs.name]; this.attrs.lastRunAt = new Date(); diff --git a/src/JobDbRepository.ts b/src/JobDbRepository.ts index 3aff5b7..86c701f 100644 --- a/src/JobDbRepository.ts +++ b/src/JobDbRepository.ts @@ -6,10 +6,11 @@ import { MongoClient, MongoClientOptions, UpdateQuery, - ObjectId + ObjectId, + SortOptionObject } from 'mongodb'; import type { Job } from './Job'; -import { hasMongoProtocol } from './utils/mongodb'; +import { hasMongoProtocol } from './utils/hasMongoProtocol'; import type { Agenda } from './index'; import { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions'; import { IJobParameters } from './types/JobParameters'; @@ -49,26 +50,32 @@ export class JobDbRepository { return !!connectOptions.db?.address; } - async getJobs(query: any, sort: any = {}, limit = 0, skip = 0) { + async getJobs( + query: FilterQuery, + sort: SortOptionObject = {}, + limit = 0, + skip = 0 + ): Promise { return this.collection.find(query).sort(sort).limit(limit).skip(skip).toArray(); } - async removeJobs(query: any) { - return this.collection.deleteMany(query); + async removeJobs(query: FilterQuery): Promise { + const result = await this.collection.deleteMany(query); + return result.result.n || 0; } async getQueueSize(): Promise { return this.collection.countDocuments({ nextRunAt: { $lt: new Date() } }); } - async unlockJob(job) { - await this.collection.updateOne({ _id: job._id }, { $unset: { lockedAt: true } }); + async unlockJob(job: Job): Promise { + await this.collection.updateOne({ _id: job.attrs._id }, { $unset: { lockedAt: true } }); } /** * Internal method to unlock jobs so that they can be re-run */ - async unlockJobs(jobIds: ObjectId[]) { + async unlockJobs(jobIds: ObjectId[]): Promise { await this.collection.updateMany({ _id: { $in: jobIds } }, { $unset: { lockedAt: true } }); } @@ -143,7 +150,7 @@ export class JobDbRepository { return result.value; } - async connect() { + async connect(): Promise { const db = await this.createConnection(); log('successful connection to MongoDB', db.options); @@ -184,8 +191,10 @@ export class JobDbRepository { } private async database(url: string, options?: MongoClientOptions) { - if (!hasMongoProtocol(url)) { - url = `mongodb://${url}`; + let connectionString = url; + + if (!hasMongoProtocol(connectionString)) { + connectionString = `mongodb://${connectionString}`; } const client = await MongoClient.connect(url, { @@ -197,7 +206,7 @@ export class JobDbRepository { return client.db(); } - private processDbResult(job: Job, res: IJobParameters) { + private processDbResult(job: Job, res: IJobParameters): Job { log( 'processDbResult() called with success, checking whether to process job immediately or not' ); @@ -245,7 +254,7 @@ export class JobDbRepository { // Grab current time and set default query options for MongoDB const now = new Date(); const protect: Partial = {}; - let update: UpdateQuery = { $set: props }; + let update: UpdateQuery = { $set: props }; log('current time stored as %s', now.toISOString()); // If the job already had an ID, then update the properties of the job @@ -310,7 +319,7 @@ export class JobDbRepository { if (job.attrs.unique) { // If we want the job to be unique, then we can upsert based on the 'unique' query object that was passed in - const query: FilterQuery = job.attrs.unique; + const query = job.attrs.unique; query.name = props.name; if (job.attrs.uniqueOpts?.insertOnly) { update = { $setOnInsert: props }; diff --git a/src/JobProcessingQueue.ts b/src/JobProcessingQueue.ts index 54ba076..46b0cd0 100644 --- a/src/JobProcessingQueue.ts +++ b/src/JobProcessingQueue.ts @@ -15,7 +15,7 @@ export class JobProcessingQueue { this._queue = []; } - get length() { + get length(): number { return this._queue.length; } @@ -23,7 +23,7 @@ export class JobProcessingQueue { * Pops and returns last queue element (next job to be processed) without checking concurrency. * @returns {Job} Next Job to be processed */ - pop() { + pop(): Job | undefined { return this._queue.pop(); } @@ -32,11 +32,11 @@ export class JobProcessingQueue { * @param {Job} job job to add to queue * @returns {undefined} */ - push(job: Job) { + push(job: Job): void { this._queue.push(job); } - remove(job: Job) { + remove(job: Job): void { let removeJobIndex = this._queue.indexOf(job); if (removeJobIndex === -1) { // lookup by id @@ -58,7 +58,7 @@ export class JobProcessingQueue { * @param {Job} job job to add to queue * @returns {undefined} */ - insert(job: Job) { + insert(job: Job): void { const matchIndex = this._queue.findIndex(element => { if ( element.attrs.nextRunAt && diff --git a/src/JobProcessor.ts b/src/JobProcessor.ts index 320ca0c..4ef49a1 100644 --- a/src/JobProcessor.ts +++ b/src/JobProcessor.ts @@ -1,5 +1,6 @@ import * as debug from 'debug'; import { Job } from './Job'; +import { IAgendaStatus } from './types/AgendaStatus'; import { IJobDefinition } from './types/JobDefinition'; import { JobProcessingQueue } from './JobProcessingQueue'; import type { Agenda } from './index'; @@ -15,23 +16,18 @@ const log = debug('agenda:jobProcessor'); */ export class JobProcessor { private jobStatus: { - [name: string]: - | { - running: number; - locked: number; - } - | undefined; + [name: string]: { running: number; locked: number } | undefined; } = {}; private localQueueProcessing = 0; - async getStatus(fullDetails = false) { + async getStatus(fullDetails = false): Promise { // eslint-disable-next-line @typescript-eslint/no-var-requires,global-require const { version } = require('../package.json'); return { version, - queueName: this.agenda.name, + queueName: this.agenda.attrs.name, totalQueueSizeDB: await this.agenda.db.getQueueSize(), config: { totalLockLimit: this.totalLockLimit, @@ -42,7 +38,7 @@ export class JobProcessor { Object.keys(this.jobStatus).map(job => [ job, { - ...this.jobStatus[job], + ...this.jobStatus[job]!, config: this.agenda.definitions[job] } ]) @@ -95,7 +91,7 @@ export class JobProcessor { } // processJobs - async process(extraJob?: Job) { + async process(extraJob?: Job): Promise { // Make sure an interval has actually been set // Prevents race condition with 'Agenda.stop' and already scheduled run if (!this.isRunning) { @@ -138,7 +134,7 @@ export class JobProcessor { * @param {String} name name of job to check if we should lock or not * @returns {boolean} whether or not you should lock job */ - shouldLock(name) { + shouldLock(name: string): boolean { const jobDefinition = this.agenda.definitions[name]; let shouldLock = true; // global lock limit @@ -168,7 +164,7 @@ export class JobProcessor { * @param {boolean} inFront puts the job in front of queue if true * @returns {undefined} */ - private enqueueJob(job: Job) { + private enqueueJob(job: Job): void { this.jobQueue.insert(job); } @@ -178,7 +174,7 @@ export class JobProcessor { * We do this because sometimes jobs are scheduled but will be run before the next process time * @returns {undefined} */ - async lockOnTheFly() { + async lockOnTheFly(): Promise { // Already running this? Return if (this.isLockingOnTheFly) { log.extend('lockOnTheFly')('already running, returning'); @@ -347,7 +343,7 @@ export class JobProcessor { return; } - this.localQueueProcessing++; + this.localQueueProcessing += 1; let jobEnqueued = false; try { @@ -400,7 +396,7 @@ export class JobProcessor { ); } */ } finally { - this.localQueueProcessing--; + this.localQueueProcessing -= 1; } } diff --git a/src/index.ts b/src/index.ts index 05641d3..74b0d9c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,15 +2,17 @@ import { EventEmitter } from 'events'; import * as debug from 'debug'; import * as humanInterval from 'human-interval'; -import { Db, MongoClientOptions } from 'mongodb'; +import { Db, FilterQuery, MongoClientOptions, SortOptionObject } from 'mongodb'; import { Job } from './Job'; import { JobProcessor } from './JobProcessor'; import type { IJobDefinition } from './types/JobDefinition'; import { IAgendaConfig } from './types/AgendaConfig'; import { JobDbRepository } from './JobDbRepository'; import { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions'; -import { filterUndef } from './utils/helpers'; -import { parsePriority } from './utils/priority'; +import { filterUndefined } from './utils/filterUndefined'; +import { JobPriority, parsePriority } from './utils/priority'; +import { IAgendaStatus } from './types/AgendaStatus'; +import { IJobParameters } from './types/JobParameters'; const log = debug('agenda'); @@ -51,7 +53,7 @@ export class Agenda extends EventEmitter { on(event: string, listener: (err: Error, job: Job) => void): this; on(event: 'ready', listener: () => void): this; on(event: 'error', listener: (err: Error) => void): this; - on(event, listener) { + on(event: string, listener: (...args: any[]) => void): this { return super.on(event, listener); } @@ -63,7 +65,7 @@ export class Agenda extends EventEmitter { private ready: Promise; - getRunningStats(fullDetails = false) { + async getRunningStats(fullDetails = false): Promise { if (!this.jobProcessor) { throw new Error('agenda not running!'); } @@ -126,7 +128,7 @@ export class Agenda extends EventEmitter { return this; } - sort(query): Agenda { + sort(query: SortOptionObject): Agenda { log('Agenda.sort([Object])'); this.attrs.sort = query; return this; @@ -136,19 +138,19 @@ export class Agenda extends EventEmitter { return !!(config.db?.address || config.mongo); } - async cancel(query: any): Promise { + async cancel(query: FilterQuery): Promise { log('attempting to cancel all Agenda jobs', query); try { - const { result } = await this.db.removeJobs(query); - log('%s jobs cancelled', result.n); - return result.n || 0; + const amountOfRemovedJobs = await this.db.removeJobs(query); + log('%s jobs cancelled', amountOfRemovedJobs); + return amountOfRemovedJobs; } catch (error) { log('error trying to delete jobs from MongoDB'); throw error; } } - name(name): Agenda { + name(name: string): Agenda { log('Agenda.name(%s)', name); this.attrs.name = name; return this; @@ -190,7 +192,12 @@ export class Agenda extends EventEmitter { return this; } - async jobs(query: any = {}, sort: any = {}, limit = 0, skip = 0): Promise { + async jobs( + query: FilterQuery = {}, + sort: FilterQuery = {}, + limit = 0, + skip = 0 + ): Promise { const result = await this.db.getJobs(query, sort, limit, skip); return result.map(job => new Job(this, job)); @@ -208,30 +215,30 @@ export class Agenda extends EventEmitter { /** BREAKING CHANGE: options moved from 2nd to 3rd parameter! */ define( name: string, - processor: (agendaJob: Job, done: (err?) => void) => void, + processor: (agendaJob: Job, done: (err?: Error) => void) => void, options?: Partial> & { - priority?: 'lowest' | 'low' | 'normal' | 'high' | 'highest' | number; + priority?: JobPriority; } - ); + ): void; define( name: string, processor: (agendaJob: Job) => Promise, options?: Partial> & { - priority?: 'lowest' | 'low' | 'normal' | 'high' | 'highest' | number; + priority?: JobPriority; } - ); + ): void; define( name: string, processor, options?: Partial> & { - priority?: 'lowest' | 'low' | 'normal' | 'high' | 'highest' | number; + priority?: JobPriority; } ): void { this.definitions[name] = { fn: processor, concurrency: options?.concurrency || this.attrs.defaultConcurrency, lockLimit: options?.lockLimit || this.attrs.defaultLockLimit, - priority: parsePriority(options?.priority) || 0, + priority: parsePriority(options?.priority), lockLifetime: options?.lockLifetime || this.attrs.defaultLockLifetime }; log('job [%s] defined with following options: \n%O', name, this.definitions[name]); @@ -247,7 +254,7 @@ export class Agenda extends EventEmitter { */ private async createJobs( names: string[], - createJob: (name) => Promise> + createJob: (name: string) => Promise> ): Promise[]> { try { const jobs = await Promise.all(names.map(name => createJob(name))); @@ -263,7 +270,7 @@ export class Agenda extends EventEmitter { create(name: string): Job; create(name: string, data: DATA): Job; - create(name: string, data?: any) { + create(name: string, data?: any): Job { log('Agenda.create(%s, [Object])', name); const priority = this.definitions[name] ? this.definitions[name].priority : 0; const job = new Job(this, { name, data, type: 'normal', priority }); @@ -408,7 +415,7 @@ export class Agenda extends EventEmitter { const lockedJobs = this.jobProcessor?.stop(); log('Agenda._unlockJobs()'); - const jobIds = filterUndef(lockedJobs?.map(job => job.attrs._id) || []); + const jobIds = filterUndefined(lockedJobs?.map(job => job.attrs._id) || []); if (jobIds.length > 0) { log('about to unlock jobs with ids: %O', jobIds); diff --git a/src/types/AgendaStatus.ts b/src/types/AgendaStatus.ts new file mode 100644 index 0000000..4093b4b --- /dev/null +++ b/src/types/AgendaStatus.ts @@ -0,0 +1,18 @@ +import { Job } from '../Job'; + +export interface IAgendaStatus { + version: string; + queueName: string | undefined; + totalQueueSizeDB: number; + config: { + totalLockLimit: number; + maxConcurrency: number; + processEvery: string | number; + }; + jobStatus: { [name: string]: { running: number; locked: number } | undefined }; + queuedJobs: number; + runningJobs: number | Job[]; + lockedJobs: number | Job[]; + jobsToLock: number | Job[]; + isLockingOnTheFly: boolean; +} diff --git a/src/types/DbOptions.ts b/src/types/DbOptions.ts index e150880..c3cefe3 100644 --- a/src/types/DbOptions.ts +++ b/src/types/DbOptions.ts @@ -1,4 +1,5 @@ -import { Db, MongoClientOptions } from 'mongodb'; +import { Db, MongoClientOptions, SortOptionObject } from 'mongodb'; +import { IJobParameters } from './JobParameters'; export interface IDatabaseOptions { db: { @@ -17,5 +18,5 @@ export interface IMongoOptions { export interface IDbConfig { ensureIndex?: boolean; - sort?: any; // { priority?: number; nextRunAt: number }; + sort?: SortOptionObject; } diff --git a/src/types/JobParameters.ts b/src/types/JobParameters.ts index 8e528a0..bae5629 100644 --- a/src/types/JobParameters.ts +++ b/src/types/JobParameters.ts @@ -1,4 +1,4 @@ -import { ObjectId } from 'mongodb'; +import { FilterQuery, ObjectId } from 'mongodb'; export interface IJobParameters { _id?: ObjectId; @@ -27,7 +27,7 @@ export interface IJobParameters { progress?: number; // unique query object - unique?: any; + unique?: FilterQuery>; uniqueOpts?: { insertOnly: boolean; }; diff --git a/src/utils/date.ts b/src/utils/date.ts index d0135b1..5d4f52b 100644 --- a/src/utils/date.ts +++ b/src/utils/date.ts @@ -1,6 +1,6 @@ -export const isValidDate = function (date: Date) { +export function isValidDate(date: Date): boolean { // An invalid date object returns NaN for getTime() and NaN is the only // object not strictly equal to itself. // eslint-disable-next-line no-self-compare return new Date(date).getTime() === new Date(date).getTime(); -}; +} diff --git a/src/utils/filterUndefined.ts b/src/utils/filterUndefined.ts new file mode 100644 index 0000000..737d007 --- /dev/null +++ b/src/utils/filterUndefined.ts @@ -0,0 +1,3 @@ +export function filterUndefined(ts: (T | undefined)[]): T[] { + return ts.filter((t: T | undefined): t is T => !!t); +} diff --git a/src/utils/hasMongoProtocol.ts b/src/utils/hasMongoProtocol.ts new file mode 100644 index 0000000..7a8a23f --- /dev/null +++ b/src/utils/hasMongoProtocol.ts @@ -0,0 +1,3 @@ +export const hasMongoProtocol = (url: string): boolean => { + return /mongodb(?:\+srv)?:\/\/.*/.test(url); +}; diff --git a/src/utils/helpers.ts b/src/utils/helpers.ts deleted file mode 100644 index bf34215..0000000 --- a/src/utils/helpers.ts +++ /dev/null @@ -1,3 +0,0 @@ -export function filterUndef(ts: (T | undefined)[]): T[] { - return ts.filter((t: T | undefined): t is T => !!t); -} diff --git a/src/utils/mongodb.ts b/src/utils/mongodb.ts deleted file mode 100644 index c2c1201..0000000 --- a/src/utils/mongodb.ts +++ /dev/null @@ -1,3 +0,0 @@ -export const hasMongoProtocol = function (url) { - return url.match(/mongodb(?:\+srv)?:\/\/.*/) !== null; -}; diff --git a/src/utils/nextRunAt.ts b/src/utils/nextRunAt.ts index 8d2935f..ba311b3 100644 --- a/src/utils/nextRunAt.ts +++ b/src/utils/nextRunAt.ts @@ -68,7 +68,7 @@ export const computeFromInterval = (attrs: IJobParameters): Date => { * Internal method to compute next run time from the repeat string * @returns {undefined} */ -export const computeFromRepeatAt = (attrs: IJobParameters): Date => { +export function computeFromRepeatAt(attrs: IJobParameters): Date { const lastRun = attrs.lastRunAt || new Date(); const nextDate = date(attrs.repeatAt).valueOf(); @@ -84,4 +84,4 @@ export const computeFromRepeatAt = (attrs: IJobParameters): Date => { } else { return date(attrs.repeatAt); } -}; +} diff --git a/src/utils/priority.ts b/src/utils/priority.ts index 32673ee..61fac28 100644 --- a/src/utils/priority.ts +++ b/src/utils/priority.ts @@ -5,17 +5,25 @@ * @param {String|Number} priority string to parse into number * @returns {Number} priority that was parsed */ -export const parsePriority = priority => { - const priorityMap = { - lowest: -20, - low: -10, - normal: 0, - high: 10, - highest: 20 - }; - if (typeof priority === 'number' || priority instanceof Number) { + +export type JobPriority = number | keyof typeof priorityMap; + +const priorityMap = { + lowest: -20, + low: -10, + normal: 0, + high: 10, + highest: 20 +}; + +export function parsePriority(priority?: JobPriority): number { + if (typeof priority === 'number') { return priority; } - return priorityMap[priority]; -}; + if (typeof priority === 'string' && priorityMap[priority]) { + return priorityMap[priority]; + } + + return priorityMap.normal; +} diff --git a/test/agenda.test.ts b/test/agenda.test.ts index e0d4eb0..0ca0eab 100644 --- a/test/agenda.test.ts +++ b/test/agenda.test.ts @@ -4,7 +4,7 @@ import { expect } from 'chai'; import { mockMongo } from './helpers/mock-mongodb'; import { Agenda } from '../src'; -import { hasMongoProtocol } from '../src/utils/mongodb'; +import { hasMongoProtocol } from '../src/utils/hasMongoProtocol'; import { Job } from '../src/Job'; // agenda instances @@ -14,9 +14,9 @@ let mongoCfg: string; // mongo db connection db instance let mongoDb: Db; -const clearJobs = () => { +const clearJobs = async (): Promise => { if (mongoDb) { - return mongoDb.collection('agendaJobs').deleteMany({}); + await mongoDb.collection('agendaJobs').deleteMany({}); } }; @@ -25,7 +25,7 @@ const jobTimeout = 500; const jobType = 'do work'; const jobProcessor = () => {}; -describe('Agenda', function () { +describe('Agenda', () => { beforeEach(async () => { if (!mongoDb) { const mockedMongo = await mockMongo(); @@ -463,7 +463,7 @@ describe('Agenda', function () { it('runs the job immediately', async () => { globalAgenda.define('immediateJob', async job => { - expect(job.isRunning()).to.be.true; + expect(job.isRunning()).to.be.equal(true); await globalAgenda.stop(); }); await globalAgenda.now('immediateJob'); @@ -517,7 +517,7 @@ describe('Agenda', function () { const job = globalAgenda.create('someJob', {}); await job.save(); - expect(job.attrs._id).to.not.be.undefined; + expect(job.attrs._id).to.not.be.equal(undefined); await clearJobs(); }); @@ -528,7 +528,7 @@ describe('Agenda', function () { beforeEach(async () => { let remaining = 3; const checkDone = () => { - remaining--; + remaining -= 1; }; await globalAgenda.create('jobA').save().then(checkDone); @@ -609,13 +609,12 @@ describe('Agenda', function () { }); }); - describe('process jobs', function () { + describe('process jobs', () => { // eslint-disable-line prefer-arrow-callback - it('should not cause unhandledRejection', async function () { + it('should not cause unhandledRejection', async () => { // This unit tests if for this bug [https://github.com/agenda/agenda/issues/884] // which is not reproducible with default agenda config on shorter processEvery. // Thus we set the test timeout to 10000, and the delay below to 6000. - this.timeout(10000); const unhandledRejections: any[] = []; const rejectionsHandler = error => unhandledRejections.push(error); @@ -651,6 +650,6 @@ describe('Agenda', function () { expect(j3processes).to.equal(1); expect(unhandledRejections).to.have.length(0); - }); + }).timeout(10000); }); }); diff --git a/test/fixtures/add-tests.ts b/test/fixtures/add-tests.ts index 32fc4d0..a2b121f 100644 --- a/test/fixtures/add-tests.ts +++ b/test/fixtures/add-tests.ts @@ -1,6 +1,6 @@ /* eslint-disable unicorn/no-process-exit */ export default { - none: () => {}, + none: (): void => {}, daily: agenda => { agenda.define('once a day test job', (job, done) => { process.send!('ran'); diff --git a/test/jobprocessor.test.ts b/test/jobprocessor.test.ts index 716eb9c..63743aa 100644 --- a/test/jobprocessor.test.ts +++ b/test/jobprocessor.test.ts @@ -15,7 +15,7 @@ const clearJobs = async () => { } }; -describe('JobProcessor', function () { +describe('JobProcessor', () => { // this.timeout(1000000); beforeEach(async () => { @@ -62,20 +62,20 @@ describe('JobProcessor', function () { await agenda.start(); // queue up long ones - for (let i = 0; i < 100; i++) { + for (let i = 0; i < 100; i += 1) { agenda.now('test long'); } await new Promise(resolve => setTimeout(resolve, 1000)); // queue more short ones (they should complete first!) - for (let j = 0; j < 100; j++) { + for (let j = 0; j < 100; j += 1) { agenda.now('test short'); } await new Promise(resolve => setTimeout(resolve, 1000)); - expect(shortOneFinished).to.be.true; + expect(shortOneFinished).to.be.equal(true); }); it('ensure slow jobs time out', async () => { @@ -109,7 +109,7 @@ describe('JobProcessor', function () { agenda.define( 'test long', async job => { - for (let i = 0; i < 10; i++) { + for (let i = 0; i < 10; i += 1) { await new Promise(resolve => setTimeout(resolve, 100)); await job.touch(); } @@ -141,10 +141,10 @@ describe('JobProcessor', function () { agenda.defaultLockLimit(20); agenda.defaultConcurrency(10); - for (let jobI = 0; jobI < 10; jobI++) { + for (let jobI = 0; jobI < 10; jobI += 1) { agenda.define( `test job ${jobI}`, - async job => { + async () => { await new Promise(resolve => setTimeout(resolve, 5000)); }, { lockLifetime: 10000 } @@ -152,8 +152,8 @@ describe('JobProcessor', function () { } // queue up jobs - for (let jobI = 0; jobI < 10; jobI++) { - for (let jobJ = 0; jobJ < 25; jobJ++) { + for (let jobI = 0; jobI < 10; jobI += 1) { + for (let jobJ = 0; jobJ < 25; jobJ += 1) { agenda.now(`test job ${jobI}`); } } diff --git a/test/retry.test.ts b/test/retry.test.ts index abc5f8e..372963f 100644 --- a/test/retry.test.ts +++ b/test/retry.test.ts @@ -9,9 +9,9 @@ let agenda: Agenda; // mongo db connection db instance let mongoDb: Db; -const clearJobs = () => { +const clearJobs = async (): Promise => { if (mongoDb) { - return mongoDb.collection('agendaJobs').deleteMany({}); + await mongoDb.collection('agendaJobs').deleteMany({}); } }; @@ -62,6 +62,7 @@ describe('Retry', () => { } done(); + return undefined; }); agenda.on('fail:a job', (err, job) => { diff --git a/test/retry.ts b/test/retry.ts deleted file mode 100644 index e7f2bd0..0000000 --- a/test/retry.ts +++ /dev/null @@ -1,82 +0,0 @@ -import { Db, MongoClient } from 'mongodb'; -import * as delay from 'delay'; -import { IMockMongo, mockMongo } from './helpers/mock-mongodb'; - -import { Agenda } from '../src'; - -// Create agenda instances -let agenda: Agenda; -let mongoDb: Db; -let mongoClient: IMockMongo; - -const clearJobs = () => { - if (mongoDb) { - return mongoDb.collection('agendaJobs').deleteMany({}); - } -}; - -const jobType = 'do work'; -const jobProcessor = () => {}; - -describe('Retry', () => { - beforeEach(async () => { - if (!mongoDb) { - mongoClient = await mockMongo(); - mongoDb = mongoClient.mongo.db(); - } - - return new Promise(resolve => { - agenda = new Agenda( - { - mongo: mongoDb - }, - async () => { - await delay(50); - await clearJobs(); - agenda.define('someJob', jobProcessor); - agenda.define('send email', jobProcessor); - agenda.define('some job', jobProcessor); - agenda.define(jobType, jobProcessor); - return resolve(); - } - ); - }); - }); - - afterEach(async () => { - await delay(50); - await agenda.stop(); - await clearJobs(); - // await mongoClient.disconnect(); - // await jobs._db.close(); - }); - - it('should retry a job', async () => { - let shouldFail = true; - - agenda.processEvery(100); // Shave 5s off test runtime :grin: - agenda.define('a job', (job, done) => { - if (shouldFail) { - shouldFail = false; - return done(new Error('test failure')); - } - - done(); - }); - - agenda.on('fail:a job', (err, job) => { - if (err) { - // Do nothing as this is expected to fail. - } - - job.schedule('now').save(); - }); - - const successPromise = new Promise(resolve => agenda.on('success:a job', resolve)); - - await agenda.now('a job'); - - await agenda.start(); - await successPromise; - }); -});