Skip to content

Commit

Permalink
fix: job processor localQueueProcessing flag
Browse files Browse the repository at this point in the history
  • Loading branch information
simllll committed Oct 25, 2020
1 parent d0ff5b3 commit 413f673
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 61 deletions.
2 changes: 1 addition & 1 deletion src/JobDbRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ export class JobDbRepository {
*/
async saveJob<DATA = unknown | void>(job: Job<DATA>): Promise<Job<DATA>> {
try {
log('attempting to save a job into Agenda instance');
log('attempting to save a job');

// Grab information needed to save job but that we don't want to persist in MongoDB
const id = job.attrs._id;
Expand Down
3 changes: 2 additions & 1 deletion src/JobProcessingQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ export class JobProcessingQueue {
* @param {Job} job job to add to queue
* @returns {undefined}
*/
/*
push(job: Job): void {
this._queue.push(job);
}
} */

remove(job: Job): void {
let removeJobIndex = this._queue.indexOf(job);
Expand Down
120 changes: 61 additions & 59 deletions src/JobProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -363,86 +363,88 @@ export class JobProcessor {
* handledJobs keeps list of already processed jobs
* @returns {undefined}
*/
private async jobProcessing(handledJobs: IJobParameters['_id'][] = []) {
private jobProcessing(handledJobs: IJobParameters['_id'][] = []) {
// Ensure we have jobs
if (this.jobQueue.length === 0) {
return;
}

this.localQueueProcessing += 1;

const now = new Date();

// Check if there is any job that is not blocked by concurrency
const job = this.jobQueue.returnNextConcurrencyFreeJob(this.jobStatus, handledJobs);
try {
const now = new Date();

if (!job) {
log.extend('jobProcessing')('[%s:%s] there is no job to process');
return;
}
// Check if there is any job that is not blocked by concurrency
const job = this.jobQueue.returnNextConcurrencyFreeJob(this.jobStatus, handledJobs);

log.extend('jobProcessing')(
'[%s:%s] there is a job to process (priority = %d)',
job.attrs.name,
job.attrs._id,
job.attrs.priority
);

this.jobQueue.remove(job);
if (!job) {
log.extend('jobProcessing')('[%s:%s] there is no job to process');
return;
}

// If the 'nextRunAt' time is older than the current time, run the job
// Otherwise, setTimeout that gets called at the time of 'nextRunAt'
if (job.attrs.nextRunAt <= now) {
log.extend('jobProcessing')(
'[%s:%s] nextRunAt is in the past, run the job immediately',
'[%s:%s] there is a job to process (priority = %d)',
job.attrs.name,
job.attrs._id
job.attrs._id,
job.attrs.priority
);
this.runOrRetry(job);
} else {
const runIn = job.attrs.nextRunAt.getTime() - now.getTime();
if (runIn > this.processEvery) {
// this job is not in the near future, remove it (it will be picked up later)
log.extend('runOrRetry')(
'[%s:%s] job is too far away, freeing it up',

this.jobQueue.remove(job);

// If the 'nextRunAt' time is older than the current time, run the job
// Otherwise, setTimeout that gets called at the time of 'nextRunAt'
if (job.attrs.nextRunAt <= now) {
log.extend('jobProcessing')(
'[%s:%s] nextRunAt is in the past, run the job immediately',
job.attrs.name,
job.attrs._id
);
let lockedJobIndex = this.lockedJobs.indexOf(job);
if (lockedJobIndex === -1) {
// lookup by id
lockedJobIndex = this.lockedJobs.findIndex(
j => j.attrs._id?.toString() === job.attrs._id?.toString()
this.runOrRetry(job);
} else {
const runIn = job.attrs.nextRunAt.getTime() - now.getTime();
if (runIn > this.processEvery) {
// this job is not in the near future, remove it (it will be picked up later)
log.extend('runOrRetry')(
'[%s:%s] job is too far away, freeing it up',
job.attrs.name,
job.attrs._id
);
}
if (lockedJobIndex === -1) {
throw new Error(`cannot find job ${job.attrs._id} in locked jobs queue?`);
}
let lockedJobIndex = this.lockedJobs.indexOf(job);
if (lockedJobIndex === -1) {
// lookup by id
lockedJobIndex = this.lockedJobs.findIndex(
j => j.attrs._id?.toString() === job.attrs._id?.toString()
);
}
if (lockedJobIndex === -1) {
throw new Error(`cannot find job ${job.attrs._id} in locked jobs queue?`);
}

this.lockedJobs.splice(lockedJobIndex, 1);
this.updateStatus(job.attrs.name, 'locked', -1);
} else {
log.extend('jobProcessing')(
'[%s:%s] nextRunAt is in the future, calling setTimeout(%d)',
job.attrs.name,
job.attrs._id,
runIn
);
// re add to queue (puts it at the ned of the queue)
this.jobQueue.push(job);
setTimeout(() => {
this.jobProcessing();
}, runIn);
this.lockedJobs.splice(lockedJobIndex, 1);
this.updateStatus(job.attrs.name, 'locked', -1);
} else {
log.extend('jobProcessing')(
'[%s:%s] nextRunAt is in the future, calling setTimeout(%d)',
job.attrs.name,
job.attrs._id,
runIn
);
// re add to queue (puts it at the right position in the queue)
this.jobQueue.insert(job);
setTimeout(() => {
this.jobProcessing();
}, runIn);
}
}
}

handledJobs.push(job.attrs._id);

this.localQueueProcessing -= 1;
handledJobs.push(job.attrs._id);

if (job && this.localQueueProcessing < this.maxConcurrency) {
// additionally run again and check if there are more jobs that we can process right now (as long concurrency not reached)
setImmediate(() => this.jobProcessing(handledJobs));
if (job && this.localQueueProcessing < this.maxConcurrency) {
// additionally run again and check if there are more jobs that we can process right now (as long concurrency not reached)
setImmediate(() => this.jobProcessing(handledJobs));
}
} finally {
this.localQueueProcessing -= 1;
}
}

Expand Down

0 comments on commit 413f673

Please sign in to comment.