diff --git a/src/classes/compat.ts b/src/classes/compat.ts index 56296980c9..2251580854 100644 --- a/src/classes/compat.ts +++ b/src/classes/compat.ts @@ -554,7 +554,6 @@ export class Queue3 extends EventEmitter { private getQueueEvents() { if (!this.queueEvents) { this.queueEvents = new QueueEvents(this.name, this.opts); - this.queueEvents.waitUntilReady(); } return this.queueEvents; } diff --git a/src/test/test_compat.ts b/src/test/test_compat.ts index 6917647a92..a76689e0aa 100644 --- a/src/test/test_compat.ts +++ b/src/test/test_compat.ts @@ -1,4 +1,5 @@ /*eslint-env node */ +/* tslint:disable: no-floating-promises */ 'use strict'; import { Job, Worker } from '@src/classes'; diff --git a/src/test/test_connection.ts b/src/test/test_connection.ts index dcfe56719c..953aa187ff 100644 --- a/src/test/test_connection.ts +++ b/src/test/test_connection.ts @@ -81,7 +81,7 @@ describe('connection', () => { const worker = new Worker(queueName, processor); await worker.waitUntilReady(); - worker.on('completed', () => { + worker.on('completed', async () => { if (count === 1) { (queue.client).stream.end(); queue.client.emit('error', new Error('ECONNRESET')); @@ -89,7 +89,7 @@ describe('connection', () => { (worker.client).stream.end(); worker.client.emit('error', new Error('ECONNRESET')); - queue.add('test', { foo: 'bar' }); + await queue.add('test', { foo: 'bar' }); } }); diff --git a/src/test/test_delay.ts b/src/test/test_delay.ts index 54157b08ca..1be2e60c48 100644 --- a/src/test/test_delay.ts +++ b/src/test/test_delay.ts @@ -98,16 +98,18 @@ describe('Delayed jobs', function() { err.job = job; }); - queue.add('test', { order: 1 }, { delay: 100 }); - queue.add('test', { order: 6 }, { delay: 600 }); - queue.add('test', { order: 10 }, { delay: 1000 }); - queue.add('test', { order: 2 }, { delay: 200 }); - queue.add('test', { order: 9 }, { delay: 900 }); - queue.add('test', { order: 5 }, { delay: 500 }); - queue.add('test', { order: 3 }, { delay: 300 }); - queue.add('test', { order: 7 }, { delay: 700 }); - queue.add('test', { order: 4 }, { delay: 400 }); - queue.add('test', { order: 8 }, { delay: 800 }); + await Promise.all([ + queue.add('test', { order: 1 }, { delay: 100 }), + queue.add('test', { order: 6 }, { delay: 600 }), + queue.add('test', { order: 10 }, { delay: 1000 }), + queue.add('test', { order: 2 }, { delay: 200 }), + queue.add('test', { order: 9 }, { delay: 900 }), + queue.add('test', { order: 5 }, { delay: 500 }), + queue.add('test', { order: 3 }, { delay: 300 }), + queue.add('test', { order: 7 }, { delay: 700 }), + queue.add('test', { order: 4 }, { delay: 400 }), + queue.add('test', { order: 8 }, { delay: 800 }), + ]); await processing; diff --git a/src/test/test_events.ts b/src/test/test_events.ts index c37aa95b60..220e107e57 100644 --- a/src/test/test_events.ts +++ b/src/test/test_events.ts @@ -31,20 +31,24 @@ describe('events', function() { return client.quit(); }); - it('should emit waiting when a job has been added', function(done) { - queue.on('waiting', function() { - done(); + it('should emit waiting when a job has been added', async function() { + const waiting = new Promise(resolve => { + queue.on('waiting', resolve); }); - queue.add('test', { foo: 'bar' }); + await queue.add('test', { foo: 'bar' }); + + await waiting; }); - it('should emit global waiting event when a job has been added', function(done) { - queueEvents.on('waiting', function() { - done(); + it('should emit global waiting event when a job has been added', async function() { + const waiting = new Promise(resolve => { + queue.on('waiting', resolve); }); - queue.add('test', { foo: 'bar' }); + await queue.add('test', { foo: 'bar' }); + + await waiting; }); it('emits drained global drained event when all jobs have been processed', async function() { @@ -87,20 +91,24 @@ describe('events', function() { await worker.close(); }); - it('should emit an event when a job becomes active', function(done) { + it('should emit an event when a job becomes active', async () => { const worker = new Worker(queueName, async job => {}); - queue.add('test', {}); + await queue.add('test', {}); - worker.once('active', function() { - worker.once('completed', async function() { - await worker.close(); - done(); + const completed = new Promise(resolve => { + worker.once('active', function() { + worker.once('completed', async function() { + await worker.close(); + resolve(); + }); }); }); + + await completed; }); - it('should listen to global events', function(done) { + it('should listen to global events', async () => { const worker = new Worker(queueName, async job => {}); let state: string; @@ -112,13 +120,18 @@ describe('events', function() { expect(state).to.be.equal('waiting'); state = 'active'; }); - queueEvents.once('completed', async function() { - expect(state).to.be.equal('active'); - await worker.close(); - done(); + + const completed = new Promise(resolve => { + queueEvents.once('completed', async function() { + expect(state).to.be.equal('active'); + resolve(); + }); }); - queue.add('test', {}); + await queue.add('test', {}); + + await completed; + await worker.close(); }); it('should trim events automatically', async () => { diff --git a/src/test/test_getters.ts b/src/test/test_getters.ts index 2b9e7bda0a..8442d998f4 100644 --- a/src/test/test_getters.ts +++ b/src/test/test_getters.ts @@ -1,4 +1,5 @@ /*eslint-env node */ +/* tslint:disable: no-floating-promises */ 'use strict'; import { Queue, Job } from '@src/classes'; @@ -88,48 +89,56 @@ describe('Jobs getters', function() { }); */ - it('should get completed jobs', function(done) { + it('should get completed jobs', async () => { const worker = new Worker(queueName, async job => {}); let counter = 2; - worker.on('completed', async function() { - counter--; + const completed = new Promise(resolve => { + worker.on('completed', async function() { + counter--; - if (counter === 0) { - const jobs = await queue.getCompleted(); - expect(jobs).to.be.a('array'); + if (counter === 0) { + const jobs = await queue.getCompleted(); + expect(jobs).to.be.a('array'); - // We need a "empty completed" kind of function. - //expect(jobs.length).to.be.equal(2); - await worker.close(); - done(); - } + // We need a "empty completed" kind of function. + //expect(jobs.length).to.be.equal(2); + await worker.close(); + resolve(); + } + }); }); - queue.add('test', { foo: 'bar' }); - queue.add('test', { baz: 'qux' }); + await queue.add('test', { foo: 'bar' }); + await queue.add('test', { baz: 'qux' }); + + await completed; }); - it('should get failed jobs', function(done) { + it('should get failed jobs', async () => { const worker = new Worker(queueName, async job => { throw new Error('Forced error'); }); let counter = 2; - worker.on('failed', async function() { - counter--; + const failed = new Promise(resolve => { + worker.on('failed', async function() { + counter--; - if (counter === 0) { - const jobs = await queue.getFailed(); - expect(jobs).to.be.a('array'); - await worker.close(); - done(); - } + if (counter === 0) { + const jobs = await queue.getFailed(); + expect(jobs).to.be.a('array'); + await worker.close(); + resolve(); + } + }); }); - queue.add('test', { foo: 'bar' }); - queue.add('test', { baz: 'qux' }); + await queue.add('test', { foo: 'bar' }); + await queue.add('test', { baz: 'qux' }); + + await failed; }); /* diff --git a/src/test/test_pause.ts b/src/test/test_pause.ts index 24f6ec8d62..fed20026ef 100644 --- a/src/test/test_pause.ts +++ b/src/test/test_pause.ts @@ -126,8 +126,8 @@ describe('Pause', function() { const worker = new Worker(queueName, process); await worker.waitUntilReady(); - queue.add('test', { foo: 'paused' }); - queue.add('test', { foo: 'paused' }); + await queue.add('test', { foo: 'paused' }); + await queue.add('test', { foo: 'paused' }); queueEvents.on('paused', async () => { isPaused = false; @@ -243,10 +243,12 @@ describe('Pause', function() { const worker2 = new Worker(queueName, process2); await worker2.waitUntilReady(); - queue.add('test', 1); - queue.add('test', 2); - queue.add('test', 3); - queue.add('test', 4); + await Promise.all([ + queue.add('test', 1), + queue.add('test', 2), + queue.add('test', 3), + queue.add('test', 4), + ]); await Promise.all([startProcessing1, startProcessing2]); await Promise.all([worker1.pause(), worker2.pause()]); diff --git a/src/test/test_repeat.ts b/src/test/test_repeat.ts index 15cb1e6f59..6ff30cc3aa 100644 --- a/src/test/test_repeat.ts +++ b/src/test/test_repeat.ts @@ -486,7 +486,7 @@ describe('repeat', function() { }); const worker = new Worker(queueName, processor); - worker.waitUntilReady(); + await worker.waitUntilReady(); worker.on('completed', job => { this.clock.tick(nextTick); @@ -558,7 +558,7 @@ describe('repeat', function() { }; const worker = new Worker(queueName, NoopProc); - worker.waitUntilReady(); + await worker.waitUntilReady(); await queue.add( 'myTestJob', diff --git a/src/test/test_sandboxed_process.ts b/src/test/test_sandboxed_process.ts index af01fe315c..ad2f11deb9 100644 --- a/src/test/test_sandboxed_process.ts +++ b/src/test/test_sandboxed_process.ts @@ -46,16 +46,16 @@ describe('sandboxed process', () => { }); const completting = new Promise((resolve, reject) => { - worker.on('completed', (job, value) => { + worker.on('completed', async (job, value) => { try { expect(job.data).to.be.eql({ foo: 'bar' }); expect(value).to.be.eql(42); expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(0); expect(worker['childPool'].free[processFile]).to.have.lengthOf(1); - worker.close(); + await worker.close(); resolve(); } catch (err) { - worker.close(); + await worker.close(); reject(err); } }); @@ -79,16 +79,16 @@ describe('sandboxed process', () => { }); const completting = new Promise((resolve, reject) => { - worker.on('completed', (job, value) => { + worker.on('completed', async (job, value) => { try { expect(job.data).to.be.eql({ foo: 'bar' }); expect(value).to.be.eql(42); expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(0); expect(worker['childPool'].free[processFile]).to.have.lengthOf(1); - worker.close(); + await worker.close(); resolve(); } catch (err) { - worker.close(); + await worker.close(); reject(err); } }); @@ -127,7 +127,7 @@ describe('sandboxed process', () => { resolve(); }); - worker.on('completed', (job, value) => { + worker.on('completed', async (job, value) => { try { expect(value).to.be.eql(42); expect( @@ -136,7 +136,7 @@ describe('sandboxed process', () => { ).to.eql(4); after4(); } catch (err) { - worker.close(); + await worker.close(); reject(err); } }); @@ -168,29 +168,28 @@ describe('sandboxed process', () => { ]); const completting = new Promise((resolve, reject) => { - const after4 = after(4, () => { + const after4 = after(4, async () => { expect(worker['childPool'].getAllFree().length).to.eql(1); - worker.close(); + await worker.close(); resolve(); }); - worker.on('completed', (job, value) => { + worker.on('completed', async (job, value) => { try { expect(value).to.be.eql(42); expect( Object.keys(worker['childPool'].retained).length + worker['childPool'].getAllFree().length, ).to.eql(1); - after4(); + await after4(); } catch (err) { - worker.close(); + await worker.close(); reject(err); } }); }); await completting; - await worker.close(); }); it('should process and update progress', async () => { @@ -217,7 +216,6 @@ describe('sandboxed process', () => { expect(worker['childPool'].getAllFree()).to.have.lengthOf(1); resolve(); } catch (err) { - worker.close(); reject(err); } }); @@ -230,7 +228,6 @@ describe('sandboxed process', () => { await queue.add('test', { foo: 'bar' }); await completting; - await worker.close(); }); @@ -246,7 +243,7 @@ describe('sandboxed process', () => { }); const failing = new Promise((resolve, reject) => { - worker.on('failed', (job, err) => { + worker.on('failed', async (job, err) => { try { expect(job.data).eql({ foo: 'bar' }); expect(job.failedReason).eql('Manually failed processor'); @@ -257,7 +254,7 @@ describe('sandboxed process', () => { resolve(); } catch (err) { - worker.close(); + await worker.close(); reject(err); } }); diff --git a/src/test/test_stalled_jobs.ts b/src/test/test_stalled_jobs.ts index 790afea2a8..79bfbeedbc 100644 --- a/src/test/test_stalled_jobs.ts +++ b/src/test/test_stalled_jobs.ts @@ -65,7 +65,7 @@ describe('stalled jobs', function() { const queueScheduler = new QueueScheduler(queueName, { stalledInterval: 100, }); - queueScheduler.waitUntilReady(); + await queueScheduler.waitUntilReady(); const allStalled = new Promise(resolve => { queueScheduler.on('stalled', after(concurrency, resolve)); @@ -131,7 +131,7 @@ describe('stalled jobs', function() { stalledInterval: 100, maxStalledCount: 0, }); - queueScheduler.waitUntilReady(); + await queueScheduler.waitUntilReady(); const allFailed = new Promise(resolve => { queueScheduler.on('failed', after(concurrency, resolve)); diff --git a/src/test/test_worker.ts b/src/test/test_worker.ts index 0e240e4edf..92f8a16f26 100644 --- a/src/test/test_worker.ts +++ b/src/test/test_worker.ts @@ -191,8 +191,8 @@ describe('workers', function() { } catch (err) { reject(err); } finally { - worker.close(); - newQueue.close(); + await worker.close(); + await newQueue.close(); } resolve(); @@ -217,8 +217,8 @@ describe('workers', function() { expect(job.data.foo).to.be.eql('bar'); return new Promise((resolve, reject) => { - worker.on('failed', jobId => { - queue + worker.on('failed', async jobId => { + await queue .getJob(jobId) .then(job => { expect(job).to.be.equal(null); @@ -477,7 +477,7 @@ describe('workers', function() { await worker.waitUntilReady(); for (let i = 1; i <= maxJobs; i++) { - queue.add('test', { foo: 'bar', num: i }); + await queue.add('test', { foo: 'bar', num: i }); } await processing; @@ -502,7 +502,7 @@ describe('workers', function() { processor = async (job: Job) => { try { expect(job.data.foo).to.be.equal('bar'); - job.updateProgress(42); + await job.updateProgress(42); } catch (err) { reject(err); } @@ -894,7 +894,7 @@ describe('workers', function() { await Promise.all(added); const count = await queue.count(); expect(count).to.be.eql(maxJobs); - queue.drain(); + await queue.drain(); const countAfterEmpty = await queue.count(); expect(countAfterEmpty).to.be.eql(0); }); diff --git a/tslint.json b/tslint.json index cc2fd0bc19..6d0c973815 100644 --- a/tslint.json +++ b/tslint.json @@ -17,6 +17,7 @@ "limit": 120, "ignore-pattern": "^import |^export {(.*?)}" } - ] + ], + "no-floating-promises": true } }