Skip to content

Commit

Permalink
fix: fix several floating promises
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Sep 29, 2019
1 parent 279bbba commit 590a4a9
Show file tree
Hide file tree
Showing 12 changed files with 117 additions and 93 deletions.
1 change: 0 additions & 1 deletion src/classes/compat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,6 @@ export class Queue3<T = any> extends EventEmitter {
private getQueueEvents() {
if (!this.queueEvents) {
this.queueEvents = new QueueEvents(this.name, this.opts);
this.queueEvents.waitUntilReady();
}
return this.queueEvents;
}
Expand Down
1 change: 1 addition & 0 deletions src/test/test_compat.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/*eslint-env node */
/* tslint:disable: no-floating-promises */
'use strict';

import { Job, Worker } from '@src/classes';
Expand Down
4 changes: 2 additions & 2 deletions src/test/test_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,15 @@ describe('connection', () => {
const worker = new Worker(queueName, processor);
await worker.waitUntilReady();

worker.on('completed', () => {
worker.on('completed', async () => {
if (count === 1) {
(<any>queue.client).stream.end();
queue.client.emit('error', new Error('ECONNRESET'));

(<any>worker.client).stream.end();
worker.client.emit('error', new Error('ECONNRESET'));

queue.add('test', { foo: 'bar' });
await queue.add('test', { foo: 'bar' });
}
});

Expand Down
22 changes: 12 additions & 10 deletions src/test/test_delay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,18 @@ describe('Delayed jobs', function() {
err.job = job;
});

queue.add('test', { order: 1 }, { delay: 100 });
queue.add('test', { order: 6 }, { delay: 600 });
queue.add('test', { order: 10 }, { delay: 1000 });
queue.add('test', { order: 2 }, { delay: 200 });
queue.add('test', { order: 9 }, { delay: 900 });
queue.add('test', { order: 5 }, { delay: 500 });
queue.add('test', { order: 3 }, { delay: 300 });
queue.add('test', { order: 7 }, { delay: 700 });
queue.add('test', { order: 4 }, { delay: 400 });
queue.add('test', { order: 8 }, { delay: 800 });
await Promise.all([
queue.add('test', { order: 1 }, { delay: 100 }),
queue.add('test', { order: 6 }, { delay: 600 }),
queue.add('test', { order: 10 }, { delay: 1000 }),
queue.add('test', { order: 2 }, { delay: 200 }),
queue.add('test', { order: 9 }, { delay: 900 }),
queue.add('test', { order: 5 }, { delay: 500 }),
queue.add('test', { order: 3 }, { delay: 300 }),
queue.add('test', { order: 7 }, { delay: 700 }),
queue.add('test', { order: 4 }, { delay: 400 }),
queue.add('test', { order: 8 }, { delay: 800 }),
]);

await processing;

Expand Down
53 changes: 33 additions & 20 deletions src/test/test_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,24 @@ describe('events', function() {
return client.quit();
});

it('should emit waiting when a job has been added', function(done) {
queue.on('waiting', function() {
done();
it('should emit waiting when a job has been added', async function() {
const waiting = new Promise(resolve => {
queue.on('waiting', resolve);
});

queue.add('test', { foo: 'bar' });
await queue.add('test', { foo: 'bar' });

await waiting;
});

it('should emit global waiting event when a job has been added', function(done) {
queueEvents.on('waiting', function() {
done();
it('should emit global waiting event when a job has been added', async function() {
const waiting = new Promise(resolve => {
queue.on('waiting', resolve);
});

queue.add('test', { foo: 'bar' });
await queue.add('test', { foo: 'bar' });

await waiting;
});

it('emits drained global drained event when all jobs have been processed', async function() {
Expand Down Expand Up @@ -87,20 +91,24 @@ describe('events', function() {
await worker.close();
});

it('should emit an event when a job becomes active', function(done) {
it('should emit an event when a job becomes active', async () => {
const worker = new Worker(queueName, async job => {});

queue.add('test', {});
await queue.add('test', {});

worker.once('active', function() {
worker.once('completed', async function() {
await worker.close();
done();
const completed = new Promise(resolve => {
worker.once('active', function() {
worker.once('completed', async function() {
await worker.close();
resolve();
});
});
});

await completed;
});

it('should listen to global events', function(done) {
it('should listen to global events', async () => {
const worker = new Worker(queueName, async job => {});

let state: string;
Expand All @@ -112,13 +120,18 @@ describe('events', function() {
expect(state).to.be.equal('waiting');
state = 'active';
});
queueEvents.once('completed', async function() {
expect(state).to.be.equal('active');
await worker.close();
done();

const completed = new Promise(resolve => {
queueEvents.once('completed', async function() {
expect(state).to.be.equal('active');
resolve();
});
});

queue.add('test', {});
await queue.add('test', {});

await completed;
await worker.close();
});

it('should trim events automatically', async () => {
Expand Down
57 changes: 33 additions & 24 deletions src/test/test_getters.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/*eslint-env node */
/* tslint:disable: no-floating-promises */
'use strict';

import { Queue, Job } from '@src/classes';
Expand Down Expand Up @@ -88,48 +89,56 @@ describe('Jobs getters', function() {
});
*/

it('should get completed jobs', function(done) {
it('should get completed jobs', async () => {
const worker = new Worker(queueName, async job => {});
let counter = 2;

worker.on('completed', async function() {
counter--;
const completed = new Promise(resolve => {
worker.on('completed', async function() {
counter--;

if (counter === 0) {
const jobs = await queue.getCompleted();
expect(jobs).to.be.a('array');
if (counter === 0) {
const jobs = await queue.getCompleted();
expect(jobs).to.be.a('array');

// We need a "empty completed" kind of function.
//expect(jobs.length).to.be.equal(2);
await worker.close();
done();
}
// We need a "empty completed" kind of function.
//expect(jobs.length).to.be.equal(2);
await worker.close();
resolve();
}
});
});

queue.add('test', { foo: 'bar' });
queue.add('test', { baz: 'qux' });
await queue.add('test', { foo: 'bar' });
await queue.add('test', { baz: 'qux' });

await completed;
});

it('should get failed jobs', function(done) {
it('should get failed jobs', async () => {
const worker = new Worker(queueName, async job => {
throw new Error('Forced error');
});

let counter = 2;

worker.on('failed', async function() {
counter--;
const failed = new Promise(resolve => {
worker.on('failed', async function() {
counter--;

if (counter === 0) {
const jobs = await queue.getFailed();
expect(jobs).to.be.a('array');
await worker.close();
done();
}
if (counter === 0) {
const jobs = await queue.getFailed();
expect(jobs).to.be.a('array');
await worker.close();
resolve();
}
});
});

queue.add('test', { foo: 'bar' });
queue.add('test', { baz: 'qux' });
await queue.add('test', { foo: 'bar' });
await queue.add('test', { baz: 'qux' });

await failed;
});

/*
Expand Down
14 changes: 8 additions & 6 deletions src/test/test_pause.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ describe('Pause', function() {
const worker = new Worker(queueName, process);
await worker.waitUntilReady();

queue.add('test', { foo: 'paused' });
queue.add('test', { foo: 'paused' });
await queue.add('test', { foo: 'paused' });
await queue.add('test', { foo: 'paused' });

queueEvents.on('paused', async () => {
isPaused = false;
Expand Down Expand Up @@ -243,10 +243,12 @@ describe('Pause', function() {
const worker2 = new Worker(queueName, process2);
await worker2.waitUntilReady();

queue.add('test', 1);
queue.add('test', 2);
queue.add('test', 3);
queue.add('test', 4);
await Promise.all([
queue.add('test', 1),
queue.add('test', 2),
queue.add('test', 3),
queue.add('test', 4),
]);

await Promise.all([startProcessing1, startProcessing2]);
await Promise.all([worker1.pause(), worker2.pause()]);
Expand Down
4 changes: 2 additions & 2 deletions src/test/test_repeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ describe('repeat', function() {
});

const worker = new Worker(queueName, processor);
worker.waitUntilReady();
await worker.waitUntilReady();

worker.on('completed', job => {
this.clock.tick(nextTick);
Expand Down Expand Up @@ -558,7 +558,7 @@ describe('repeat', function() {
};

const worker = new Worker(queueName, NoopProc);
worker.waitUntilReady();
await worker.waitUntilReady();

await queue.add(
'myTestJob',
Expand Down
Loading

0 comments on commit 590a4a9

Please sign in to comment.