Skip to content

Commit

Permalink
fix: don't fail a job when the worker already lost the lock
Browse files Browse the repository at this point in the history
  • Loading branch information
Embraser01 committed Jan 9, 2020
1 parent b4a7dfd commit 23c0bf7
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 2 deletions.
10 changes: 8 additions & 2 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,14 @@ export class Worker<T = any> extends QueueBase {
};

const handleFailed = async (err: Error) => {
await job.moveToFailed(err, token);
this.emit('failed', job, err, 'active');
try {
await job.moveToFailed(err, token);
this.emit('failed', job, err, 'active');
} catch (e) {
// It probably means that the job has lost the lock before completion
// The QueueScheduler will (or already has) move the job to the waiting list (as stalled)
this.emit('error', e);
}
};

// TODO: how to cancel the processing? (null -> job.cancel() => throw CancelError()void)
Expand Down
37 changes: 37 additions & 0 deletions src/test/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,43 @@ describe('workers', function() {
expect(countAfterEmpty).to.be.eql(0);
});

it('emit error if lock is lost', async function() {
this.timeout(10000);

const worker = new Worker(
queueName,
async job => {
return delay(2000);
},
{
lockDuration: 1000,
lockRenewTime: 3000, // The lock will not be updated
},
);
await worker.waitUntilReady();

const queueScheduler = new QueueScheduler(queueName, {
stalledInterval: 100,
});
await queueScheduler.waitUntilReady();

const job = await queue.add('test', { bar: 'baz' });

const workerError = new Promise(resolve => {
worker.on('error', resolve);
});

const error = await workerError;

expect(error).to.be.instanceOf(Error);
expect((error as Error).message).to.be.eql(
`Missing lock for job ${job.id} failed`,
);

await worker.close();
await queueScheduler.close();
});

describe('Concurrency process', () => {
it('should run job in sequence if I specify a concurrency of 1', async () => {
let processing = false;
Expand Down

0 comments on commit 23c0bf7

Please sign in to comment.