diff --git a/src/JobDbRepository.ts b/src/JobDbRepository.ts index 2fb7d85..fb19fa0 100644 --- a/src/JobDbRepository.ts +++ b/src/JobDbRepository.ts @@ -57,6 +57,13 @@ export class JobDbRepository { return this.collection.deleteMany(query); } + async getQueueSize(): Promise { + return this.collection.countDocuments({ nextRunAt: { $lt: new Date() } }); + } + + /** + * Internal method to unlock jobs so that they can be re-run + */ async unlockJobs(jobIds: ObjectId[]) { await this.collection.updateMany({ _id: { $in: jobIds } }, { $set: { lockedAt: null } }); } diff --git a/src/JobProcessor.ts b/src/JobProcessor.ts index 0f6e072..5089f12 100644 --- a/src/JobProcessor.ts +++ b/src/JobProcessor.ts @@ -23,6 +23,7 @@ export class JobProcessor { async getStatus() { return { + queueSize: await this.agenda.db.getQueueSize(), jobStatus: this.jobStatus, runningJobs: this.runningJobs.length, lockedJobs: this.lockedJobs.length, diff --git a/src/index.ts b/src/index.ts index b3d8530..e476e0d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -50,7 +50,10 @@ export class Agenda extends EventEmitter { private ready: Promise; getRunningStats() { - return this.jobProcessor?.getStatus(); + if (!this.jobProcessor) { + throw new Error('agenda not running!'); + } + return this.jobProcessor.getStatus(); } constructor(