Skip to content

Commit

Permalink
test: add workers test
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Sep 1, 2019
1 parent ba1e4a4 commit 74cf2d2
Showing 1 changed file with 24 additions and 279 deletions.
303 changes: 24 additions & 279 deletions src/test/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,27 @@ describe('workers', function() {
await worker.close();
});

it('processes jobs that were added before the worker started', async () => {
const jobs = [
queue.add('test', { bar: 'baz' }),
queue.add('test', { bar1: 'baz1' }),
queue.add('test', { bar2: 'baz2' }),
queue.add('test', { bar3: 'baz3' }),
];

await Promise.all(jobs);

const worker = new Worker(queueName, async job => {});
await worker.waitUntilReady();

await new Promise(resolve => {
const resolveAfterAllJobs = after(jobs.length, resolve);
worker.on('completed', resolveAfterAllJobs);
});

await worker.close();
});

it('process a job that returns data in the process handler', async () => {
const worker = new Worker(queueName, async job => {
expect(job.data.foo).to.be.equal('bar');
Expand Down Expand Up @@ -653,198 +674,6 @@ describe('workers', function() {

await worker.close();
});
/*
it.only('process stalled jobs when starting a queue', function(done) {
this.timeout(6000);
const onceRunning = once(afterJobsRunning);
const worker = new Worker(queueName, async job => {
onceRunning();
return delay(150);
}, {settings: {
stalledInterval: 100,
},
});
queue2.on('completed', doneAfterFour);
queue2.on('stalled', stalledCallback);
await worker.waitUntilReady();
const jobs = await Promise.all([
queue.add('test', { bar: 'baz' }),
queue.add('test', { bar1: 'baz1' }),
queue.add('test', { bar2: 'baz2' }),
queue.add('test', { bar3: 'baz3' }),
]);
});
it.only('process stalled jobs when starting a queue', function(done) {
this.timeout(6000);
const worker = new Worker(queueName, async job => {
}, {settings: {
stalledInterval: 100,
},
});
await worker.waitUntilReady();
const jobs = [
queue.add('test', { bar: 'baz' }),
queue.add('test', { bar1: 'baz1' }),
queue.add('test', { bar2: 'baz2' }),
queue.add('test', { bar3: 'baz3' }),
];
Promise.all(jobs).then(() => {
const afterJobsRunning = function() {
const stalledCallback = sandbox.spy();
await worker.close();
return new Promise((resolve, reject) => {
const worker2 = new Worker(queueName, async job => {
}, {settings: {
stalledInterval: 100,
},
});
const doneAfterFour = _.after(4, () => {
try {
expect(stalledCallback.calledOnce).to.be.eql(true);
queue.close().then(resolve);
} catch (e) {
queue.close().then(() => {
reject(e);
});
}
});
queue2.on('completed', doneAfterFour);
queue2.on('stalled', stalledCallback);
queue2.process((job, jobDone2) => {
jobDone2();
});
});
});
})
.then(done, done);
};
const onceRunning = _.once(afterJobsRunning);
queueStalled.process(() => {
onceRunning();
return delay(150);
});
});
});
});
it('processes jobs that were added before the worker started', async () => {
const jobs = [
queue.add('test', { bar: 'baz' }),
queue.add('test', { bar1: 'baz1' }),
queue.add('test', { bar2: 'baz2' }),
queue.add('test', { bar3: 'baz3' }),
];
await Promise.all(jobs);
const worker = new Worker(queueName, async job => {});
await worker.waitUntilReady();
await new Promise(resolve => {
const resolveAfterAllJobs = after(jobs.length, resolve);
worker.on('completed', resolveAfterAllJobs);
});
await worker.close();
});
*/
/*
it('processes several stalled jobs when starting several queues', function(done) {
const queueScheduler = new QueueScheduler(queueName, {
stalledInterval: 10,
});
this.timeout(50000);
const NUM_QUEUES = 10;
const NUM_JOBS_PER_QUEUE = 10;
const stalledQueues = [];
const jobs = [];
const redisOpts = { port: 6379, host: '127.0.0.1' };
for (let i = 0; i < NUM_QUEUES; i++) {
const queueStalled2 = new Queue('test queue stalled 2', {
redis: redisOpts,
settings: {
lockDuration: 30,
lockRenewTime: 10,
stalledInterval: 100,
},
});
for (let j = 0; j < NUM_JOBS_PER_QUEUE; j++) {
jobs.push(queueStalled2.add({ job: j }));
}
stalledQueues.push(queueStalled2);
}
const closeStalledQueues = function() {
return Promise.all(
stalledQueues.map(queue => {
return queue.close(true);
}),
);
};
Promise.all(jobs).then(() => {
let processed = 0;
const procFn = function() {
// instead of completing we just close the queue to simulate a crash.
utils.simulateDisconnect(this);
processed++;
if (processed === stalledQueues.length) {
setTimeout(() => {
const queue2 = new Queue('test queue stalled 2', {
redis: redisOpts,
settings: { stalledInterval: 100 },
});
queue2.on('error', err => {
done(err);
});
queue2.process((job2, jobDone) => {
jobDone();
});
let counter = 0;
queue2.on('completed', () => {
counter++;
if (counter === NUM_QUEUES * NUM_JOBS_PER_QUEUE) {
queue2.close().then(done);
closeStalledQueues().then(() => {
// This can take long time since queues are disconnected.
});
}
});
}, 100);
}
};
const processes = [];
stalledQueues.forEach(queue => {
queue.on('error', (err) => {
//
// Swallow errors produced by the disconnect
//
});
processes.push(queue.process(procFn));
});
return Promise.all(processes);
});
});
*/

it('does not process a job that is being processed when a new queue starts', async () => {
this.timeout(12000);
Expand Down Expand Up @@ -881,89 +710,6 @@ describe('workers', function() {
}
});

/*
it('process stalled jobs without requiring a queue restart', function(done) {
this.timeout(12000);
const queue2 = utils.buildQueue('running-stalled-job-' + uuid(), {
settings: {
lockRenewTime: 5000,
lockDuration: 500,
stalledInterval: 1000,
},
});
const collect = _.after(2, () => {
queue2.close().then(done);
});
queue2.on('completed', () => {
const client = new redis();
client
.multi()
.zrem(queue2.toKey('completed'), 1)
.lpush(queue2.toKey('active'), 1)
.exec();
client.quit();
collect();
});
queue2.process((job, jobDone) => {
expect(job.data.foo).to.be.equal('bar');
jobDone();
});
queue2
.add({ foo: 'bar' })
.then((job: Job) => {
expect(job.id).to.be.ok;
expect(job.data.foo).to.be.eql('bar');
})
.catch(done);
});
*/
/*
it('failed stalled jobs that stall more than allowable stalled limit', function(done) {
const FAILED_MESSAGE = 'job stalled more than allowable limit';
this.timeout(10000);
const queue2 = utils.buildQueue('running-stalled-job-' + uuid(), {
settings: {
lockRenewTime: 2500,
lockDuration: 250,
stalledInterval: 500,
maxStalledCount: 1,
},
});
let processedCount = 0;
queue2.process( (job: Job) => {
processedCount++;
expect(job.data.foo).to.be.equal('bar');
return delay(1500);
});
queue2.on('completed', () => {
done(new Error('should not complete'));
});
queue2.on('failed', (job: Job, err: Error) => {
expect(processedCount).to.be.eql(2);
expect(job.failedReason).to.be.eql(FAILED_MESSAGE);
expect(err.message).to.be.eql(FAILED_MESSAGE);
done();
});
queue2
.add({ foo: 'bar' })
.then((job: Job) => {
expect(job.id).to.be.ok;
expect(job.data.foo).to.be.eql('bar');
})
.catch(done);
});
*/

it('process a job that throws an exception', async () => {
const jobError = new Error('Job Failed');

Expand Down Expand Up @@ -1076,10 +822,9 @@ describe('workers', function() {
});
});

await queue.add('test', { foo: 'bar' }).then(job => {
expect(job.id).to.be.ok;
expect(job.data.foo).to.be.eql('bar');
});
const job = await queue.add('test', { foo: 'bar' });
expect(job.id).to.be.ok;
expect(job.data.foo).to.be.eql('bar');

await failing;
await completing;
Expand Down

0 comments on commit 74cf2d2

Please sign in to comment.