Skip to content

Commit

Permalink
fix: check if job has expired before we run it
Browse files Browse the repository at this point in the history
it can happen that the process is so long in the queue, that it gets expired in the meantime.
if it is expired, it is put again in the queue, and then it could happen that a job that should
run with a concurrency of e.g. 1 runs in parallel twice.
  • Loading branch information
simllll committed Dec 3, 2021
1 parent 544e948 commit e301511
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 54 deletions.
4 changes: 4 additions & 0 deletions src/Job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ export class Job<DATA = unknown | void> {
await this.fetchStatus();
}

return this.isExpired();
}

isExpired(): boolean {
const definition = this.agenda.definitions[this.attrs.name];

const lockDeadline = new Date(Date.now() - definition.lockLifetime);
Expand Down
2 changes: 1 addition & 1 deletion src/JobDbRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ export class JobDbRepository {
}

async unlockJob(job: Job): Promise<void> {
// only unlock jobs which are not currently procsesed (nextRunAT is not null)
// only unlock jobs which are not currently processed (nextRunAT is not null)
await this.collection.updateOne(
{ _id: job.attrs._id, nextRunAt: { $ne: null } },
{ $unset: { lockedAt: true } }
Expand Down
111 changes: 58 additions & 53 deletions src/JobProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ export class JobProcessor {
if (job) {
if (job.attrs.name !== name) {
throw new Error(
`got different job name: ${job.attrs.name} (acutal) !== ${name} (expected)`
`got different job name: ${job.attrs.name} (actual) !== ${name} (expected)`
);
}

Expand Down Expand Up @@ -393,66 +393,71 @@ export class JobProcessor {
return;
}

log.extend('jobProcessing')(
'[%s:%s] there is a job to process (priority = %d)',
job.attrs.name,
job.attrs._id,
job.attrs.priority,
job.gotTimerToExecute
);

this.jobQueue.remove(job);

// If the 'nextRunAt' time is older than the current time, run the job
// Otherwise, setTimeout that gets called at the time of 'nextRunAt'
if (job.attrs.nextRunAt <= now) {
if (!job.isExpired()) {
// check if job has expired (and therefore probably got picked up again by another queue in the meantime)
// before it even has started to run

log.extend('jobProcessing')(
'[%s:%s] nextRunAt is in the past, run the job immediately',
'[%s:%s] there is a job to process (priority = %d)',
job.attrs.name,
job.attrs._id
job.attrs._id,
job.attrs.priority,
job.gotTimerToExecute
);
this.runOrRetry(job);
} else {
const runIn = job.attrs.nextRunAt.getTime() - now.getTime();
if (runIn > this.processEvery) {
// this job is not in the near future, remove it (it will be picked up later)
log.extend('runOrRetry')(
'[%s:%s] job is too far away, freeing it up',
job.attrs.name,
job.attrs._id
);
let lockedJobIndex = this.lockedJobs.indexOf(job);
if (lockedJobIndex === -1) {
// lookup by id
lockedJobIndex = this.lockedJobs.findIndex(
j => j.attrs._id?.toString() === job.attrs._id?.toString()
);
}
if (lockedJobIndex === -1) {
throw new Error(`cannot find job ${job.attrs._id} in locked jobs queue?`);
}

this.lockedJobs.splice(lockedJobIndex, 1);
this.updateStatus(job.attrs.name, 'locked', -1);
} else {
// If the 'nextRunAt' time is older than the current time, run the job
// Otherwise, setTimeout that gets called at the time of 'nextRunAt'
if (job.attrs.nextRunAt <= now) {
log.extend('jobProcessing')(
'[%s:%s] nextRunAt is in the future, calling setTimeout(%d)',
'[%s:%s] nextRunAt is in the past, run the job immediately',
job.attrs.name,
job.attrs._id,
runIn
job.attrs._id
);
// re add to queue (puts it at the right position in the queue)
this.jobQueue.insert(job);
// ensure every job gets a timer to run at the near future time (but also ensure this time is set only once)
if (!job.gotTimerToExecute) {
job.gotTimerToExecute = true;
setTimeout(
() => {
this.jobProcessing();
},
runIn > MAX_SAFE_32BIT_INTEGER ? MAX_SAFE_32BIT_INTEGER : runIn
); // check if runIn is higher than unsined 32 bit int, if so, use this time to recheck,
// because setTimeout will run in an overflow otherwise and reprocesses immediately
this.runOrRetry(job);
} else {
const runIn = job.attrs.nextRunAt.getTime() - now.getTime();
if (runIn > this.processEvery) {
// this job is not in the near future, remove it (it will be picked up later)
log.extend('runOrRetry')(
'[%s:%s] job is too far away, freeing it up',
job.attrs.name,
job.attrs._id
);
let lockedJobIndex = this.lockedJobs.indexOf(job);
if (lockedJobIndex === -1) {
// lookup by id
lockedJobIndex = this.lockedJobs.findIndex(
j => j.attrs._id?.toString() === job.attrs._id?.toString()
);
}
if (lockedJobIndex === -1) {
throw new Error(`cannot find job ${job.attrs._id} in locked jobs queue?`);
}

this.lockedJobs.splice(lockedJobIndex, 1);
this.updateStatus(job.attrs.name, 'locked', -1);
} else {
log.extend('jobProcessing')(
'[%s:%s] nextRunAt is in the future, calling setTimeout(%d)',
job.attrs.name,
job.attrs._id,
runIn
);
// re add to queue (puts it at the right position in the queue)
this.jobQueue.insert(job);
// ensure every job gets a timer to run at the near future time (but also ensure this time is set only once)
if (!job.gotTimerToExecute) {
job.gotTimerToExecute = true;
setTimeout(
() => {
this.jobProcessing();
},
runIn > MAX_SAFE_32BIT_INTEGER ? MAX_SAFE_32BIT_INTEGER : runIn
); // check if runIn is higher than unsined 32 bit int, if so, use this time to recheck,
// because setTimeout will run in an overflow otherwise and reprocesses immediately
}
}
}
}
Expand Down Expand Up @@ -508,7 +513,7 @@ export class JobProcessor {
return;
}

if (await job.isDead()) {
if (job.isExpired()) {
reject(
new Error(
`execution of '${job.attrs.name}' canceled, execution took more than ${
Expand Down

0 comments on commit e301511

Please sign in to comment.