Skip to content

Commit

Permalink
fix: more typings, minor functionality changes (#2)
Browse files Browse the repository at this point in the history
* add more typings and streamline functions,
remoev unnused file

* use IJobParameters instead of Job,
fix unit test

Co-authored-by: Aras Abbasi <a.abbasi@cognigy.com>
  • Loading branch information
Uzlopak and Aras Abbasi committed Oct 20, 2020
1 parent 31f9b6e commit b13d054
Show file tree
Hide file tree
Showing 20 changed files with 158 additions and 210 deletions.
39 changes: 15 additions & 24 deletions src/Job.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as date from 'date.js';
import * as debug from 'debug';
import { parsePriority } from './utils/priority';
import { JobPriority, parsePriority } from './utils/priority';
import type { Agenda } from './index';
import { computeFromInterval, computeFromRepeatAt } from './utils/nextRunAt';
import { IJobParameters } from './types/JobParameters';
Expand Down Expand Up @@ -46,18 +46,13 @@ export class Job<DATA = any | void> {
data: any;
}
) {
// Remove special args

// Process args
args.priority = parsePriority(args.priority) || 0;

// Set attrs to args
this.attrs = {
...args,
// Set defaults if undefined
priority: args.priority || 0,
priority: parsePriority(args.priority),
nextRunAt: args.nextRunAt || new Date(),
type: args.type // || 'once'
type: args.type
};
}

Expand Down Expand Up @@ -86,7 +81,7 @@ export class Job<DATA = any | void> {
repeatEvery(
interval: string | number,
options: { timezone?: string; skipImmediate?: boolean } = {}
) {
): this {
this.attrs.repeatInterval = interval;
this.attrs.repeatTimezone = options.timezone;
if (options.skipImmediate) {
Expand All @@ -101,28 +96,28 @@ export class Job<DATA = any | void> {
return this;
}

repeatAt(time) {
repeatAt(time: string): this {
this.attrs.repeatAt = time;
return this;
}

disable() {
disable(): this {
this.attrs.disabled = true;
return this;
}

enable() {
enable(): this {
this.attrs.disabled = false;
return this;
}

unique(unique: IJobParameters['unique'], opts?: IJobParameters['uniqueOpts']) {
unique(unique: IJobParameters['unique'], opts?: IJobParameters['uniqueOpts']): this {
this.attrs.unique = unique;
this.attrs.uniqueOpts = opts;
return this;
}

schedule(time) {
schedule(time: string | Date): this {
const d = new Date(time);

this.attrs.nextRunAt = Number.isNaN(d.getTime()) ? date(time) : d;
Expand All @@ -135,17 +130,13 @@ export class Job<DATA = any | void> {
* @param {String} priority priority of when job should be queued
* @returns {exports} instance of Job
*/
priority(priority: 'lowest' | 'low' | 'normal' | 'high' | 'highest' | number) {
priority(priority: JobPriority): this {
this.attrs.priority = parsePriority(priority);
return this;
}

fail(reason: Error | string) {
if (reason instanceof Error) {
reason = reason.message;
}

this.attrs.failReason = reason;
fail(reason: Error | string): this {
this.attrs.failReason = reason instanceof Error ? reason.message : reason;
this.attrs.failCount = (this.attrs.failCount || 0) + 1;
const now = new Date();
this.attrs.failedAt = now;
Expand All @@ -159,7 +150,7 @@ export class Job<DATA = any | void> {
return this;
}

isRunning() {
isRunning(): boolean {
if (!this.attrs.lastRunAt) {
return false;
}
Expand All @@ -178,7 +169,7 @@ export class Job<DATA = any | void> {
return false;
}

save() {
async save(): Promise<Job> {
return this.agenda.db.saveJob(this);
}

Expand Down Expand Up @@ -237,7 +228,7 @@ export class Job<DATA = any | void> {
return this;
}

async run() {
async run(): Promise<void> {
const definition = this.agenda.definitions[this.attrs.name];

this.attrs.lastRunAt = new Date();
Expand Down
37 changes: 23 additions & 14 deletions src/JobDbRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import {
MongoClient,
MongoClientOptions,
UpdateQuery,
ObjectId
ObjectId,
SortOptionObject
} from 'mongodb';
import type { Job } from './Job';
import { hasMongoProtocol } from './utils/mongodb';
import { hasMongoProtocol } from './utils/hasMongoProtocol';
import type { Agenda } from './index';
import { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions';
import { IJobParameters } from './types/JobParameters';
Expand Down Expand Up @@ -49,26 +50,32 @@ export class JobDbRepository {
return !!connectOptions.db?.address;
}

async getJobs(query: any, sort: any = {}, limit = 0, skip = 0) {
async getJobs(
query: FilterQuery<IJobParameters>,
sort: SortOptionObject<IJobParameters> = {},
limit = 0,
skip = 0
): Promise<IJobParameters[]> {
return this.collection.find(query).sort(sort).limit(limit).skip(skip).toArray();
}

async removeJobs(query: any) {
return this.collection.deleteMany(query);
async removeJobs(query: FilterQuery<IJobParameters>): Promise<number> {
const result = await this.collection.deleteMany(query);
return result.result.n || 0;
}

async getQueueSize(): Promise<number> {
return this.collection.countDocuments({ nextRunAt: { $lt: new Date() } });
}

async unlockJob(job) {
await this.collection.updateOne({ _id: job._id }, { $unset: { lockedAt: true } });
async unlockJob(job: Job): Promise<void> {
await this.collection.updateOne({ _id: job.attrs._id }, { $unset: { lockedAt: true } });
}

/**
* Internal method to unlock jobs so that they can be re-run
*/
async unlockJobs(jobIds: ObjectId[]) {
async unlockJobs(jobIds: ObjectId[]): Promise<void> {
await this.collection.updateMany({ _id: { $in: jobIds } }, { $unset: { lockedAt: true } });
}

Expand Down Expand Up @@ -143,7 +150,7 @@ export class JobDbRepository {
return result.value;
}

async connect() {
async connect(): Promise<void> {
const db = await this.createConnection();
log('successful connection to MongoDB', db.options);

Expand Down Expand Up @@ -184,8 +191,10 @@ export class JobDbRepository {
}

private async database(url: string, options?: MongoClientOptions) {
if (!hasMongoProtocol(url)) {
url = `mongodb://${url}`;
let connectionString = url;

if (!hasMongoProtocol(connectionString)) {
connectionString = `mongodb://${connectionString}`;
}

const client = await MongoClient.connect(url, {
Expand All @@ -197,7 +206,7 @@ export class JobDbRepository {
return client.db();
}

private processDbResult(job: Job, res: IJobParameters) {
private processDbResult(job: Job, res: IJobParameters): Job {
log(
'processDbResult() called with success, checking whether to process job immediately or not'
);
Expand Down Expand Up @@ -245,7 +254,7 @@ export class JobDbRepository {
// Grab current time and set default query options for MongoDB
const now = new Date();
const protect: Partial<IJobParameters> = {};
let update: UpdateQuery<any> = { $set: props };
let update: UpdateQuery<IJobParameters> = { $set: props };
log('current time stored as %s', now.toISOString());

// If the job already had an ID, then update the properties of the job
Expand Down Expand Up @@ -310,7 +319,7 @@ export class JobDbRepository {

if (job.attrs.unique) {
// If we want the job to be unique, then we can upsert based on the 'unique' query object that was passed in
const query: FilterQuery<any> = job.attrs.unique;
const query = job.attrs.unique;
query.name = props.name;
if (job.attrs.uniqueOpts?.insertOnly) {
update = { $setOnInsert: props };
Expand Down
10 changes: 5 additions & 5 deletions src/JobProcessingQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ export class JobProcessingQueue {
this._queue = [];
}

get length() {
get length(): number {
return this._queue.length;
}

/**
* Pops and returns last queue element (next job to be processed) without checking concurrency.
* @returns {Job} Next Job to be processed
*/
pop() {
pop(): Job | undefined {
return this._queue.pop();
}

Expand All @@ -32,11 +32,11 @@ export class JobProcessingQueue {
* @param {Job} job job to add to queue
* @returns {undefined}
*/
push(job: Job) {
push(job: Job): void {
this._queue.push(job);
}

remove(job: Job) {
remove(job: Job): void {
let removeJobIndex = this._queue.indexOf(job);
if (removeJobIndex === -1) {
// lookup by id
Expand All @@ -58,7 +58,7 @@ export class JobProcessingQueue {
* @param {Job} job job to add to queue
* @returns {undefined}
*/
insert(job: Job) {
insert(job: Job): void {
const matchIndex = this._queue.findIndex(element => {
if (
element.attrs.nextRunAt &&
Expand Down
26 changes: 11 additions & 15 deletions src/JobProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as debug from 'debug';
import { Job } from './Job';
import { IAgendaStatus } from './types/AgendaStatus';
import { IJobDefinition } from './types/JobDefinition';
import { JobProcessingQueue } from './JobProcessingQueue';
import type { Agenda } from './index';
Expand All @@ -15,23 +16,18 @@ const log = debug('agenda:jobProcessor');
*/
export class JobProcessor {
private jobStatus: {
[name: string]:
| {
running: number;
locked: number;
}
| undefined;
[name: string]: { running: number; locked: number } | undefined;
} = {};

private localQueueProcessing = 0;

async getStatus(fullDetails = false) {
async getStatus(fullDetails = false): Promise<IAgendaStatus> {
// eslint-disable-next-line @typescript-eslint/no-var-requires,global-require
const { version } = require('../package.json');

return {
version,
queueName: this.agenda.name,
queueName: this.agenda.attrs.name,
totalQueueSizeDB: await this.agenda.db.getQueueSize(),
config: {
totalLockLimit: this.totalLockLimit,
Expand All @@ -42,7 +38,7 @@ export class JobProcessor {
Object.keys(this.jobStatus).map(job => [
job,
{
...this.jobStatus[job],
...this.jobStatus[job]!,
config: this.agenda.definitions[job]
}
])
Expand Down Expand Up @@ -95,7 +91,7 @@ export class JobProcessor {
}

// processJobs
async process(extraJob?: Job) {
async process(extraJob?: Job): Promise<void> {
// Make sure an interval has actually been set
// Prevents race condition with 'Agenda.stop' and already scheduled run
if (!this.isRunning) {
Expand Down Expand Up @@ -138,7 +134,7 @@ export class JobProcessor {
* @param {String} name name of job to check if we should lock or not
* @returns {boolean} whether or not you should lock job
*/
shouldLock(name) {
shouldLock(name: string): boolean {
const jobDefinition = this.agenda.definitions[name];
let shouldLock = true;
// global lock limit
Expand Down Expand Up @@ -168,7 +164,7 @@ export class JobProcessor {
* @param {boolean} inFront puts the job in front of queue if true
* @returns {undefined}
*/
private enqueueJob(job: Job) {
private enqueueJob(job: Job): void {
this.jobQueue.insert(job);
}

Expand All @@ -178,7 +174,7 @@ export class JobProcessor {
* We do this because sometimes jobs are scheduled but will be run before the next process time
* @returns {undefined}
*/
async lockOnTheFly() {
async lockOnTheFly(): Promise<void> {
// Already running this? Return
if (this.isLockingOnTheFly) {
log.extend('lockOnTheFly')('already running, returning');
Expand Down Expand Up @@ -347,7 +343,7 @@ export class JobProcessor {
return;
}

this.localQueueProcessing++;
this.localQueueProcessing += 1;

let jobEnqueued = false;
try {
Expand Down Expand Up @@ -400,7 +396,7 @@ export class JobProcessor {
);
} */
} finally {
this.localQueueProcessing--;
this.localQueueProcessing -= 1;
}
}

Expand Down
Loading

0 comments on commit b13d054

Please sign in to comment.