Skip to content

Commit

Permalink
fix: only unlock jobs which have a nextRunAt jobs on shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
simllll committed Oct 30, 2020
1 parent fff42d5 commit 291f16e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 16 deletions.
24 changes: 11 additions & 13 deletions src/JobDbRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ export class JobDbRepository {
}

async unlockJob(job: Job): Promise<void> {
await this.collection.updateOne({ _id: job.attrs._id }, { $unset: { lockedAt: true } });
// only unlock jobs which are not currently procsesed (nextRunAT is not null)
await this.collection.updateOne(
{ _id: job.attrs._id, nextRunAt: { $ne: null } },
{ $unset: { lockedAt: true } }
);
}

/**
Expand Down Expand Up @@ -119,21 +123,15 @@ export class JobDbRepository {
const JOB_PROCESS_WHERE_QUERY: FilterQuery<
Omit<IJobParameters, 'lockedAt'> & { lockedAt?: Date | null }
> = {
$and: [
name: jobName,
disabled: { $ne: true },
$or: [
{
name: jobName,
disabled: { $ne: true }
lockedAt: { $eq: null },
nextRunAt: { $lte: nextScanAt }
},
{
$or: [
{
lockedAt: { $eq: null },
nextRunAt: { $lte: nextScanAt }
},
{
lockedAt: { $lte: lockDeadline }
}
]
lockedAt: { $lte: lockDeadline }
}
]
};
Expand Down
17 changes: 14 additions & 3 deletions test/agenda.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -670,31 +670,42 @@ describe('Agenda', () => {
const rejectionsHandler = error => unhandledRejections.push(error);
process.on('unhandledRejection', rejectionsHandler);

/*
let j0processes = 0;
globalAgenda.define('j0', (_job, done) => {
j0processes += 1;
done();
}); */

let j1processes = 0;
globalAgenda.define('j1', (job, done) => {
globalAgenda.define('j1', (_job, done) => {
j1processes += 1;
done();
});

let j2processes = 0;
globalAgenda.define('j2', (job, done) => {
globalAgenda.define('j2', (_job, done) => {
j2processes += 1;
done();
});

let j3processes = 0;
globalAgenda.define('j3', async job => {
globalAgenda.define('j3', async _job => {
j3processes += 1;
});

await globalAgenda.start();

// await globalAgenda.every('1 seconds', 'j0');
await globalAgenda.every('5 seconds', 'j1');
await globalAgenda.every('10 seconds', 'j2');
await globalAgenda.every('15 seconds', 'j3');

await delay(6000);

process.removeListener('unhandledRejection', rejectionsHandler);

// expect(j0processes).to.equal(5);
expect(j1processes).to.equal(2);
expect(j2processes).to.equal(1);
expect(j3processes).to.equal(1);
Expand Down

0 comments on commit 291f16e

Please sign in to comment.