-
Notifications
You must be signed in to change notification settings - Fork 391
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
128 additions
and
179 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,202 +1,149 @@ | ||
/* | ||
import { Queue, QueueEvents, Job, Worker, QueueScheduler } from '@src/classes'; | ||
import { describe, beforeEach, it } from 'mocha'; | ||
import { expect } from 'chai'; | ||
import IORedis from 'ioredis'; | ||
import { v4 } from 'node-uuid'; | ||
import { delay } from 'bluebird'; | ||
import { after, times, once } from 'lodash'; | ||
import { RetryErrors } from '@src/enums'; | ||
|
||
describe('Cleaner', () => { | ||
let queue; | ||
let queue: Queue; | ||
let queueEvents: QueueEvents; | ||
let queueName: string; | ||
let client: IORedis.Redis; | ||
|
||
beforeEach(() => { | ||
queue = utils.buildQueue('cleaner' + uuid()); | ||
}); | ||
beforeEach(function() { | ||
client = new IORedis(); | ||
return client.flushdb(); | ||
}); | ||
|
||
afterEach(function() { | ||
this.timeout( | ||
queue.settings.stalledInterval * (1 + queue.settings.maxStalledCount) | ||
); | ||
return queue.close(); | ||
}); | ||
beforeEach(async function() { | ||
queueName = 'test-' + v4(); | ||
queue = new Queue(queueName); | ||
queueEvents = new QueueEvents(queueName); | ||
return queueEvents.init(); | ||
}); | ||
|
||
it('should reject the cleaner with no grace', done => { | ||
queue.clean().then( | ||
() => { | ||
done(new Error('Promise should not resolve')); | ||
}, | ||
err => { | ||
expect(err).to.be.instanceof(Error); | ||
done(); | ||
} | ||
); | ||
}); | ||
afterEach(async function() { | ||
await queue.close(); | ||
await queueEvents.close(); | ||
return client.quit(); | ||
}); | ||
|
||
it('should reject the cleaner an unknown type', done => { | ||
queue.clean(0, 'bad').then( | ||
() => { | ||
done(new Error('Promise should not resolve')); | ||
}, | ||
e => { | ||
expect(e).to.be.instanceof(Error); | ||
done(); | ||
} | ||
); | ||
}); | ||
it('should clean an empty queue', async () => { | ||
await queue.waitUntilReady(); | ||
|
||
it('should clean an empty queue', done => { | ||
const testQueue = utils.buildQueue('cleaner' + uuid()); | ||
testQueue.isReady().then(() => { | ||
return testQueue.clean(0); | ||
}); | ||
testQueue.on('error', err => { | ||
utils.cleanupQueue(testQueue); | ||
done(err); | ||
}); | ||
testQueue.on('cleaned', (jobs, type) => { | ||
const waitCleaned = new Promise(resolve => { | ||
queue.on('cleaned', (jobs, type) => { | ||
expect(type).to.be.eql('completed'); | ||
expect(jobs.length).to.be.eql(0); | ||
utils.cleanupQueue(testQueue); | ||
done(); | ||
resolve(); | ||
}); | ||
}); | ||
|
||
it('should clean two jobs from the queue', done => { | ||
queue.add({ some: 'data' }); | ||
queue.add({ some: 'data' }); | ||
queue.process((job, jobDone) => { | ||
jobDone(); | ||
}); | ||
const jobs = await queue.clean(0, 0); | ||
|
||
queue.on( | ||
'completed', | ||
_.after(2, () => { | ||
queue.clean(0).then(jobs => { | ||
expect(jobs.length).to.be.eql(2); | ||
done(); | ||
}, done); | ||
}) | ||
); | ||
}); | ||
expect(jobs.length).to.be.eql(0); | ||
|
||
it('should only remove a job outside of the grace period', done => { | ||
queue.process((job, jobDone) => { | ||
jobDone(); | ||
}); | ||
queue.add({ some: 'data' }); | ||
queue.add({ some: 'data' }); | ||
delay(200) | ||
.then(() => { | ||
queue.add({ some: 'data' }); | ||
queue.clean(100); | ||
return null; | ||
}) | ||
.then(() => { | ||
return delay(100); | ||
}) | ||
.then(() => { | ||
return queue.getCompleted(); | ||
}) | ||
.then(jobs => { | ||
expect(jobs.length).to.be.eql(1); | ||
return queue.drain(); | ||
}) | ||
.then(() => { | ||
done(); | ||
}); | ||
}); | ||
await waitCleaned; | ||
}); | ||
|
||
it('should clean all failed jobs', done => { | ||
queue.add({ some: 'data' }); | ||
queue.add({ some: 'data' }); | ||
queue.process((job, jobDone) => { | ||
jobDone(new Error('It failed')); | ||
}); | ||
delay(100) | ||
.then(() => { | ||
return queue.clean(0, 'failed'); | ||
}) | ||
.then(jobs => { | ||
expect(jobs.length).to.be.eql(2); | ||
return queue.count(); | ||
}) | ||
.then(len => { | ||
expect(len).to.be.eql(0); | ||
done(); | ||
}); | ||
}); | ||
it('should clean two jobs from the queue', async () => { | ||
await queue.add('test', { some: 'data' }); | ||
await queue.add('test', { some: 'data' }); | ||
|
||
it('should clean all waiting jobs', done => { | ||
queue.add({ some: 'data' }); | ||
queue.add({ some: 'data' }); | ||
delay(100) | ||
.then(() => { | ||
return queue.clean(0, 'wait'); | ||
}) | ||
.then(jobs => { | ||
expect(jobs.length).to.be.eql(2); | ||
return queue.count(); | ||
}) | ||
.then(len => { | ||
expect(len).to.be.eql(0); | ||
done(); | ||
}); | ||
}); | ||
const worker = new Worker(queueName, async job => {}); | ||
await worker.waitUntilReady(); | ||
|
||
it('should clean all delayed jobs', done => { | ||
queue.add({ some: 'data' }, { delay: 5000 }); | ||
queue.add({ some: 'data' }, { delay: 5000 }); | ||
delay(100) | ||
.then(() => { | ||
return queue.clean(0, 'delayed'); | ||
}) | ||
.then(jobs => { | ||
expect(jobs.length).to.be.eql(2); | ||
return queue.count(); | ||
}) | ||
.then(len => { | ||
expect(len).to.be.eql(0); | ||
done(); | ||
}); | ||
}); | ||
queue.on( | ||
'completed', | ||
after(2, async () => { | ||
const jobs = await queue.clean(0, 0); | ||
expect(jobs.length).to.be.eql(2); | ||
}), | ||
); | ||
|
||
await worker.close(); | ||
}); | ||
|
||
it('should clean the number of jobs requested', done => { | ||
queue.add({ some: 'data' }); | ||
queue.add({ some: 'data' }); | ||
queue.add({ some: 'data' }); | ||
delay(100) | ||
.then(() => { | ||
return queue.clean(0, 'wait', 1); | ||
}) | ||
.then(jobs => { | ||
expect(jobs.length).to.be.eql(1); | ||
return queue.count(); | ||
}) | ||
.then(len => { | ||
expect(len).to.be.eql(2); | ||
done(); | ||
}); | ||
it('should only remove a job outside of the grace period', async () => { | ||
const worker = new Worker(queueName, async job => {}); | ||
await worker.waitUntilReady(); | ||
|
||
await queue.add('test', { some: 'data' }); | ||
await queue.add('test', { some: 'data' }); | ||
await delay(200); | ||
await queue.add('test', { some: 'data' }); | ||
await queue.clean(100, 100); | ||
await delay(100); | ||
const jobs = await queue.getCompleted(); | ||
expect(jobs.length).to.be.eql(1); | ||
}); | ||
|
||
it('should clean all failed jobs', async () => { | ||
const worker = new Worker(queueName, async job => { | ||
throw new Error('It failed'); | ||
}); | ||
await worker.waitUntilReady(); | ||
|
||
it('should clean a job without a timestamp', done => { | ||
const client = new redis(6379, '127.0.0.1', {}); | ||
await queue.add('test', { some: 'data' }); | ||
await queue.add('test', { some: 'data' }); | ||
|
||
queue.add({ some: 'data' }); | ||
queue.add({ some: 'data' }); | ||
queue.process((job, jobDone) => { | ||
jobDone(new Error('It failed')); | ||
}); | ||
await delay(100); | ||
const jobs = await queue.clean(0, 0, 'failed'); | ||
expect(jobs.length).to.be.eql(2); | ||
const count = await queue.count(); | ||
expect(count).to.be.eql(0); | ||
}); | ||
|
||
delay(100) | ||
.then(() => { | ||
return new Promise(resolve => { | ||
client.hdel('bull:' + queue.name + ':1', 'timestamp', resolve); | ||
}); | ||
}) | ||
.then(() => { | ||
return queue.clean(0, 'failed'); | ||
}) | ||
.then(jobs => { | ||
expect(jobs.length).to.be.eql(2); | ||
return queue.getFailed(); | ||
}) | ||
.then(failed => { | ||
expect(failed.length).to.be.eql(0); | ||
done(); | ||
}); | ||
it('should clean all waiting jobs', async () => { | ||
await queue.add('test', { some: 'data' }); | ||
await queue.add('test', { some: 'data' }); | ||
await delay(100); | ||
const jobs = await queue.clean(0, 0, 'wait'); | ||
expect(jobs.length).to.be.eql(2); | ||
const count = await queue.count(); | ||
expect(count).to.be.eql(0); | ||
}); | ||
|
||
it('should clean all delayed jobs', async () => { | ||
await queue.add('test', { some: 'data' }, { delay: 5000 }); | ||
await queue.add('test', { some: 'data' }, { delay: 5000 }); | ||
await delay(100); | ||
const jobs = await queue.clean(0, 0, 'delayed'); | ||
expect(jobs.length).to.be.eql(2); | ||
const count = await queue.count(); | ||
expect(count).to.be.eql(0); | ||
}); | ||
|
||
it('should clean the number of jobs requested', async () => { | ||
await queue.add('test', { some: 'data' }); | ||
await queue.add('test', { some: 'data' }); | ||
await queue.add('test', { some: 'data' }); | ||
await delay(100); | ||
const jobs = await queue.clean(0, 1, 'wait'); | ||
expect(jobs.length).to.be.eql(1); | ||
const count = await queue.count(); | ||
expect(count).to.be.eql(2); | ||
}); | ||
|
||
it('should clean a job without a timestamp', async () => { | ||
const worker = new Worker(queueName, async job => { | ||
throw new Error('It failed'); | ||
}); | ||
await worker.waitUntilReady(); | ||
|
||
const client = new IORedis(); | ||
|
||
await queue.add('test', { some: 'data' }); | ||
await queue.add('test', { some: 'data' }); | ||
|
||
await delay(100); | ||
await client.hdel(`bull:${queueName}:1`, 'timestamp'); | ||
const jobs = await queue.clean(0, 0, 'failed'); | ||
expect(jobs.length).to.be.eql(2); | ||
const failed = await queue.getFailed(); | ||
expect(failed.length).to.be.eql(0); | ||
}); | ||
*/ | ||
}); |