Skip to content

Commit

Permalink
fix: more typings (#5)
Browse files Browse the repository at this point in the history
* add more typings

* refactor nextRunAt and processEvery

* reduce complexity of calculateProcessEvery

* minor changes in interfaces

Co-authored-by: Aras Abbasi <a.abbasi@cognigy.com>
  • Loading branch information
Uzlopak and Aras Abbasi committed Oct 22, 2020
1 parent 9eca3de commit 8d6e137
Show file tree
Hide file tree
Showing 12 changed files with 64 additions and 62 deletions.
11 changes: 5 additions & 6 deletions src/Job.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import * as date from 'date.js';
import * as debug from 'debug';
import { JobPriority, parsePriority } from './utils/priority';
import type { Agenda } from './index';
import { computeFromInterval, computeFromRepeatAt } from './utils/nextRunAt';
import { IJobParameters } from './types/JobParameters';
import type { IJobParameters } from './types/JobParameters';
import type { DefinitionProcessor } from './types/JobDefinition';
import { JobPriority, parsePriority } from './utils/priority';
import { computeFromInterval, computeFromRepeatAt } from './utils/nextRunAt';

const log = debug('agenda:job');

Expand Down Expand Up @@ -56,7 +56,7 @@ export class Job<DATA = unknown | void> {
};
}

toJson(): Partial<IJobParameters> {
toJson(): IJobParameters {
const attrs = this.attrs || {};
const result = {};

Expand All @@ -74,8 +74,7 @@ export class Job<DATA = unknown | void> {
}
});

// console.log('toJson', this.attrs, result);
return result;
return result as IJobParameters;
}

repeatEvery(
Expand Down
50 changes: 29 additions & 21 deletions src/JobDbRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import {
MongoClientOptions,
UpdateQuery,
ObjectId,
SortOptionObject
SortOptionObject,
FindOneAndUpdateOption
} from 'mongodb';
import type { Job } from './Job';
import { hasMongoProtocol } from './utils/hasMongoProtocol';
import type { Agenda } from './index';
import { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions';
import { IJobParameters } from './types/JobParameters';
import type { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions';
import type { IJobParameters } from './types/JobParameters';
import { hasMongoProtocol } from './utils/hasMongoProtocol';

const log = debug('agenda:db');

Expand Down Expand Up @@ -81,7 +82,7 @@ export class JobDbRepository {

async lockJob(job: Job): Promise<IJobParameters | undefined> {
// Query to run against collection to see if we need to lock it
const criteria = {
const criteria: FilterQuery<Omit<IJobParameters, 'lockedAt'> & { lockedAt?: Date | null }> = {
_id: job.attrs._id,
name: job.attrs.name,
lockedAt: null,
Expand All @@ -90,11 +91,16 @@ export class JobDbRepository {
};

// Update / options for the MongoDB query
const update = { $set: { lockedAt: new Date() } };
const options = { returnOriginal: false };
const update: UpdateQuery<IJobParameters> = { $set: { lockedAt: new Date() } };
const options: FindOneAndUpdateOption<IJobParameters> = { returnOriginal: false };

// Lock the job in MongoDB!
const resp = await this.collection.findOneAndUpdate(criteria, update, options);
const resp = await this.collection.findOneAndUpdate(
criteria as FilterQuery<IJobParameters>,
update,
options
);

return resp?.value;
}

Expand All @@ -104,11 +110,13 @@ export class JobDbRepository {
lockDeadline: Date,
now: Date = new Date()
): Promise<IJobParameters | undefined> {
// /**
// * Query used to find job to run
// * @type {{$and: [*]}}
// */
const JOB_PROCESS_WHERE_QUERY = {
/**
* Query used to find job to run
* @type {{$and: [*]}}
*/
const JOB_PROCESS_WHERE_QUERY: FilterQuery<
Omit<IJobParameters, 'lockedAt'> & { lockedAt?: Date | null }
> = {
$and: [
{
name: jobName,
Expand All @@ -132,13 +140,16 @@ export class JobDbRepository {
* Query used to set a job as locked
* @type {{$set: {lockedAt: Date}}}
*/
const JOB_PROCESS_SET_QUERY = { $set: { lockedAt: now } };
const JOB_PROCESS_SET_QUERY: UpdateQuery<IJobParameters> = { $set: { lockedAt: now } };

/**
* Query used to affect what gets returned
* @type {{returnOriginal: boolean, sort: object}}
*/
const JOB_RETURN_QUERY = { returnOriginal: false, sort: this.connectOptions.sort };
const JOB_RETURN_QUERY: FindOneAndUpdateOption<IJobParameters> = {
returnOriginal: false,
sort: this.connectOptions.sort
};

// Find ONE and ONLY ONE job and set the 'lockedAt' time so that job begins to be processed
const result = await this.collection.findOneAndUpdate(
Expand Down Expand Up @@ -208,7 +219,7 @@ export class JobDbRepository {

private processDbResult<DATA = unknown | void>(
job: Job<DATA>,
res: IJobParameters<DATA>
res?: IJobParameters<DATA>
): Job<DATA> {
log(
'processDbResult() called with success, checking whether to process job immediately or not'
Expand Down Expand Up @@ -243,7 +254,6 @@ export class JobDbRepository {

// Grab information needed to save job but that we don't want to persist in MongoDB
const id = job.attrs._id;
// const { unique, uniqueOpts } = job.attrs;

// Store job as JSON and remove props we don't want to store from object
// _id, unique, uniqueOpts
Expand Down Expand Up @@ -284,7 +294,7 @@ export class JobDbRepository {
if (props.nextRunAt && props.nextRunAt <= now) {
log('job has a scheduled nextRunAt time, protecting that field from upsert');
protect.nextRunAt = props.nextRunAt;
delete props.nextRunAt;
delete (props as Partial<IJobParameters>).nextRunAt;
}

// If we have things to protect, set them in MongoDB using $setOnInsert
Expand All @@ -309,7 +319,7 @@ export class JobDbRepository {
update,
{
upsert: true,
returnOriginal: false // same as new: true -> returns the final document
returnOriginal: false
}
);
log(
Expand All @@ -330,8 +340,6 @@ export class JobDbRepository {
update = { $setOnInsert: props };
}

// console.log('update', query, update, uniqueOpts);

// Use the 'unique' query object to find an existing job or create a new one
log('calling findOneAndUpdate() with unique object as query: \n%O', query);
const result = await this.collection.findOneAndUpdate(query, update, {
Expand Down
2 changes: 1 addition & 1 deletion src/JobProcessingQueue.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// eslint-disable-next-line prettier/prettier
import type { Job } from './Job';
import { IJobParameters } from './types/JobParameters';
import type { IJobParameters } from './types/JobParameters';
import type { Agenda } from './index';
/**
* @class
Expand Down
8 changes: 4 additions & 4 deletions src/JobProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import * as debug from 'debug';
import { Job } from './Job';
import { IAgendaJobStatus, IAgendaStatus } from './types/AgendaStatus';
import { IJobDefinition } from './types/JobDefinition';
import { JobProcessingQueue } from './JobProcessingQueue';
import type { IAgendaJobStatus, IAgendaStatus } from './types/AgendaStatus';
import type { IJobDefinition } from './types/JobDefinition';
import type { Agenda } from './index';
import type { IJobParameters } from './types/JobParameters';
import { Job } from './Job';
import { JobProcessingQueue } from './JobProcessingQueue';

const log = debug('agenda:jobProcessor');

Expand Down
16 changes: 8 additions & 8 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import { EventEmitter } from 'events';
import * as debug from 'debug';

import { Db, FilterQuery, MongoClientOptions, SortOptionObject } from 'mongodb';
import { Job } from './Job';
import { JobProcessor } from './JobProcessor';
import type { Db, FilterQuery, MongoClientOptions, SortOptionObject } from 'mongodb';
import type { IJobDefinition } from './types/JobDefinition';
import { IAgendaConfig } from './types/AgendaConfig';
import type { IAgendaConfig } from './types/AgendaConfig';
import type { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions';
import type { IAgendaStatus } from './types/AgendaStatus';
import type { IJobParameters } from './types/JobParameters';
import { Job } from './Job';
import { JobDbRepository } from './JobDbRepository';
import { IDatabaseOptions, IDbConfig, IMongoOptions } from './types/DbOptions';
import { filterUndefined } from './utils/filterUndefined';
import { JobPriority, parsePriority } from './utils/priority';
import { IAgendaStatus } from './types/AgendaStatus';
import { IJobParameters } from './types/JobParameters';
import { JobProcessor } from './JobProcessor';
import { calculateProcessEvery } from './utils/processEvery';
import { filterUndefined } from './utils/filterUndefined';

const log = debug('agenda');

Expand Down
2 changes: 1 addition & 1 deletion src/types/AgendaStatus.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Job } from '../Job';
import type { Job } from '../Job';

export interface IAgendaJobStatus {
[name: string]: { running: number; locked: number };
Expand Down
4 changes: 2 additions & 2 deletions src/types/DbOptions.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Db, MongoClientOptions, SortOptionObject } from 'mongodb';
import { IJobParameters } from './JobParameters';
import type { Db, MongoClientOptions, SortOptionObject } from 'mongodb';
import type { IJobParameters } from './JobParameters';

export interface IDatabaseOptions {
db: {
Expand Down
9 changes: 3 additions & 6 deletions src/types/JobDefinition.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Job } from '../Job';
import type { Job } from '../Job';

export interface IJobDefinition<DATA = unknown> {
/** max number of locked jobs of this kind */
Expand All @@ -7,13 +7,10 @@ export interface IJobDefinition<DATA = unknown> {
lockLifetime: number;
/** Higher priority jobs will run first. */
priority?: number;
/** how many jobs of this kind can run in parallel/simultanously */
/** how many jobs of this kind can run in parallel/simultanously per Agenda instance */
concurrency?: number;

// running: number;
// locked: number;

fn: DefinitionProcessor<DATA, void | ((err?) => void)>;
fn: DefinitionProcessor<DATA, void | ((err?: Error) => void)>;
}

export type DefinitionProcessor<DATA, CB> = (
Expand Down
3 changes: 1 addition & 2 deletions src/types/JobParameters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ export interface IJobParameters<DATA = unknown | void> {
priority: number;
nextRunAt: Date | null;
/**
* // once: the job is just queued in the database --> this does not really exists, it's just fallback
* normal: job is queued and will be processed (regular case when the user adds a new job)
* single: job with this name is only queued once, if there is an exisitn gentry in the database, the job is just updated, but not newly inserted (this is used for .every())
*/
type: /* 'once' | */ 'normal' | 'single';
type: 'normal' | 'single';

lockedAt?: Date;
lastFinishedAt?: Date;
Expand Down
15 changes: 7 additions & 8 deletions src/utils/priority.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
/**
* Internal method to turn priority into a number
* @name Job#priority
* @function
* @param {String|Number} priority string to parse into number
* @returns {Number} priority that was parsed
*/

export type JobPriority = number | keyof typeof priorityMap;

const priorityMap = {
Expand All @@ -16,6 +8,13 @@ const priorityMap = {
highest: 20
};

/**
* Internal method to turn priority into a number
* @name Job#priority
* @function
* @param {String|Number} priority string to parse into number
* @returns {Number} priority that was parsed
*/
export function parsePriority(priority?: JobPriority): number {
if (typeof priority === 'number') {
return priority;
Expand Down
2 changes: 1 addition & 1 deletion test/job.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1150,7 +1150,7 @@ describe('Job', () => {
]);
expect(results).to.eql([10, 0, -10]);
} catch (err) {
console.log('stats', JSON.stringify(await agenda.getRunningStats(), undefined, 3));
// console.log('stats', JSON.stringify(await agenda.getRunningStats(), undefined, 3));
}
});

Expand Down
4 changes: 2 additions & 2 deletions test/retry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const clearJobs = async (): Promise<void> => {
};

const jobType = 'do work';
const jobProcessor = () => {};
const jobProcessor = () => { };

describe('Retry', () => {
beforeEach(async () => {
Expand Down Expand Up @@ -79,5 +79,5 @@ describe('Retry', () => {

await agenda.start();
await successPromise;
});
}).timeout(100000);
});

0 comments on commit 8d6e137

Please sign in to comment.