diff --git a/src/test/test_worker.ts b/src/test/test_worker.ts index fd03da53ad..df67f0d8a5 100644 --- a/src/test/test_worker.ts +++ b/src/test/test_worker.ts @@ -517,6 +517,27 @@ describe('workers', function() { await worker.close(); }); + it('processes jobs that were added before the worker started', async () => { + const jobs = [ + queue.add('test', { bar: 'baz' }), + queue.add('test', { bar1: 'baz1' }), + queue.add('test', { bar2: 'baz2' }), + queue.add('test', { bar3: 'baz3' }), + ]; + + await Promise.all(jobs); + + const worker = new Worker(queueName, async job => {}); + await worker.waitUntilReady(); + + await new Promise(resolve => { + const resolveAfterAllJobs = after(jobs.length, resolve); + worker.on('completed', resolveAfterAllJobs); + }); + + await worker.close(); + }); + it('process a job that returns data in the process handler', async () => { const worker = new Worker(queueName, async job => { expect(job.data.foo).to.be.equal('bar'); @@ -653,198 +674,6 @@ describe('workers', function() { await worker.close(); }); - /* - it.only('process stalled jobs when starting a queue', function(done) { - this.timeout(6000); - - const onceRunning = once(afterJobsRunning); - - const worker = new Worker(queueName, async job => { - onceRunning(); - return delay(150); - }, {settings: { - stalledInterval: 100, - }, - }); - - queue2.on('completed', doneAfterFour); - queue2.on('stalled', stalledCallback); - - await worker.waitUntilReady(); - - const jobs = await Promise.all([ - queue.add('test', { bar: 'baz' }), - queue.add('test', { bar1: 'baz1' }), - queue.add('test', { bar2: 'baz2' }), - queue.add('test', { bar3: 'baz3' }), - ]); - }); - - it.only('process stalled jobs when starting a queue', function(done) { - this.timeout(6000); - - const worker = new Worker(queueName, async job => { - }, {settings: { - stalledInterval: 100, - }, - }); - - await worker.waitUntilReady(); - const jobs = [ - queue.add('test', { bar: 'baz' }), - queue.add('test', { bar1: 'baz1' }), - queue.add('test', { bar2: 'baz2' }), - queue.add('test', { bar3: 'baz3' }), - ]; - Promise.all(jobs).then(() => { - const afterJobsRunning = function() { - const stalledCallback = sandbox.spy(); - await worker.close(); - - return new Promise((resolve, reject) => { - const worker2 = new Worker(queueName, async job => { - }, {settings: { - stalledInterval: 100, - }, - }); - - const doneAfterFour = _.after(4, () => { - try { - expect(stalledCallback.calledOnce).to.be.eql(true); - queue.close().then(resolve); - } catch (e) { - queue.close().then(() => { - reject(e); - }); - } - }); - queue2.on('completed', doneAfterFour); - queue2.on('stalled', stalledCallback); - - queue2.process((job, jobDone2) => { - jobDone2(); - }); - }); - }); - }) - .then(done, done); - }; - - const onceRunning = _.once(afterJobsRunning); - queueStalled.process(() => { - onceRunning(); - return delay(150); - }); - }); - }); - }); - - it('processes jobs that were added before the worker started', async () => { - const jobs = [ - queue.add('test', { bar: 'baz' }), - queue.add('test', { bar1: 'baz1' }), - queue.add('test', { bar2: 'baz2' }), - queue.add('test', { bar3: 'baz3' }), - ]; - - await Promise.all(jobs); - - const worker = new Worker(queueName, async job => {}); - await worker.waitUntilReady(); - - await new Promise(resolve => { - const resolveAfterAllJobs = after(jobs.length, resolve); - worker.on('completed', resolveAfterAllJobs); - }); - - await worker.close(); - }); -*/ - /* - it('processes several stalled jobs when starting several queues', function(done) { - const queueScheduler = new QueueScheduler(queueName, { - stalledInterval: 10, - }); - this.timeout(50000); - - const NUM_QUEUES = 10; - const NUM_JOBS_PER_QUEUE = 10; - const stalledQueues = []; - const jobs = []; - const redisOpts = { port: 6379, host: '127.0.0.1' }; - - for (let i = 0; i < NUM_QUEUES; i++) { - const queueStalled2 = new Queue('test queue stalled 2', { - redis: redisOpts, - settings: { - lockDuration: 30, - lockRenewTime: 10, - stalledInterval: 100, - }, - }); - - for (let j = 0; j < NUM_JOBS_PER_QUEUE; j++) { - jobs.push(queueStalled2.add({ job: j })); - } - - stalledQueues.push(queueStalled2); - } - - const closeStalledQueues = function() { - return Promise.all( - stalledQueues.map(queue => { - return queue.close(true); - }), - ); - }; - - Promise.all(jobs).then(() => { - let processed = 0; - const procFn = function() { - // instead of completing we just close the queue to simulate a crash. - utils.simulateDisconnect(this); - processed++; - if (processed === stalledQueues.length) { - setTimeout(() => { - const queue2 = new Queue('test queue stalled 2', { - redis: redisOpts, - settings: { stalledInterval: 100 }, - }); - queue2.on('error', err => { - done(err); - }); - queue2.process((job2, jobDone) => { - jobDone(); - }); - - let counter = 0; - queue2.on('completed', () => { - counter++; - if (counter === NUM_QUEUES * NUM_JOBS_PER_QUEUE) { - queue2.close().then(done); - - closeStalledQueues().then(() => { - // This can take long time since queues are disconnected. - }); - } - }); - }, 100); - } - }; - - const processes = []; - stalledQueues.forEach(queue => { - queue.on('error', (err) => { - // - // Swallow errors produced by the disconnect - // - }); - processes.push(queue.process(procFn)); - }); - return Promise.all(processes); - }); - }); - */ it('does not process a job that is being processed when a new queue starts', async () => { this.timeout(12000); @@ -881,89 +710,6 @@ describe('workers', function() { } }); - /* - it('process stalled jobs without requiring a queue restart', function(done) { - this.timeout(12000); - - const queue2 = utils.buildQueue('running-stalled-job-' + uuid(), { - settings: { - lockRenewTime: 5000, - lockDuration: 500, - stalledInterval: 1000, - }, - }); - - const collect = _.after(2, () => { - queue2.close().then(done); - }); - - queue2.on('completed', () => { - const client = new redis(); - client - .multi() - .zrem(queue2.toKey('completed'), 1) - .lpush(queue2.toKey('active'), 1) - .exec(); - client.quit(); - collect(); - }); - - queue2.process((job, jobDone) => { - expect(job.data.foo).to.be.equal('bar'); - jobDone(); - }); - - queue2 - .add({ foo: 'bar' }) - .then((job: Job) => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.eql('bar'); - }) - .catch(done); - }); - */ - /* - it('failed stalled jobs that stall more than allowable stalled limit', function(done) { - const FAILED_MESSAGE = 'job stalled more than allowable limit'; - this.timeout(10000); - - const queue2 = utils.buildQueue('running-stalled-job-' + uuid(), { - settings: { - lockRenewTime: 2500, - lockDuration: 250, - stalledInterval: 500, - maxStalledCount: 1, - }, - }); - - let processedCount = 0; - queue2.process( (job: Job) => { - processedCount++; - expect(job.data.foo).to.be.equal('bar'); - return delay(1500); - }); - - queue2.on('completed', () => { - done(new Error('should not complete')); - }); - - queue2.on('failed', (job: Job, err: Error) => { - expect(processedCount).to.be.eql(2); - expect(job.failedReason).to.be.eql(FAILED_MESSAGE); - expect(err.message).to.be.eql(FAILED_MESSAGE); - done(); - }); - - queue2 - .add({ foo: 'bar' }) - .then((job: Job) => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.eql('bar'); - }) - .catch(done); - }); - */ - it('process a job that throws an exception', async () => { const jobError = new Error('Job Failed'); @@ -1076,10 +822,9 @@ describe('workers', function() { }); }); - await queue.add('test', { foo: 'bar' }).then(job => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.eql('bar'); - }); + const job = await queue.add('test', { foo: 'bar' }); + expect(job.id).to.be.ok; + expect(job.data.foo).to.be.eql('bar'); await failing; await completing;