diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 000000000..468ed284f --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,7 @@ +version: '3.2' +services: + redis: + image: redis:6.2-alpine + container_name: cache + ports: + - 6379:6379 diff --git a/lib/commands/updateData-1.lua b/lib/commands/updateData-1.lua new file mode 100644 index 000000000..7e57bf87b --- /dev/null +++ b/lib/commands/updateData-1.lua @@ -0,0 +1,20 @@ +--[[ + Update job data + + Input: + KEYS[1] Job id key + + ARGV[1] data + + Output: + 0 - OK + -1 - Missing job. +]] +local rcall = redis.call + +if rcall("EXISTS",KEYS[1]) == 1 then -- // Make sure job exists + rcall("HSET", KEYS[1], "data", ARGV[1]) + return 0 +else + return -1 +end diff --git a/lib/commands/updateProgress-2.lua b/lib/commands/updateProgress-2.lua index 026ad87ba..1be47ae24 100644 --- a/lib/commands/updateProgress-2.lua +++ b/lib/commands/updateProgress-2.lua @@ -11,5 +11,11 @@ Event: progress(jobId, progress) ]] -redis.call("HSET", KEYS[1], "progress", ARGV[1]) -redis.call("PUBLISH", KEYS[2], ARGV[2]) +local rcall = redis.call +if rcall("EXISTS", KEYS[1]) == 1 then -- // Make sure job exists + rcall("HSET", KEYS[1], "progress", ARGV[1]) + rcall("PUBLISH", KEYS[2], ARGV[2]) + return 0 +else + return -1 +end diff --git a/lib/job.js b/lib/job.js index 6a3606974..246e00a3e 100644 --- a/lib/job.js +++ b/lib/job.js @@ -159,13 +159,13 @@ Job.prototype.progress = function(progress) { return scripts.updateProgress(this, progress); }; -Job.prototype.update = function(data) { +Job.prototype.update = async function(data) { this.data = data; - return this.queue.client.hset( - this.queue.toKey(this.id), - 'data', - JSON.stringify(data) - ); + const code = await scripts.updateData(this, data); + + if (code < 0) { + throw scripts.finishedErrors(code, this.id, 'updateData'); + } }; Job.prototype.toJSON = function() { diff --git a/lib/scripts.js b/lib/scripts.js index 6e116a81c..59ebcaa53 100644 --- a/lib/scripts.js +++ b/lib/scripts.js @@ -109,11 +109,27 @@ const scripts = { progressJson, JSON.stringify({ jobId: job.id, progress }) ]) - .then(() => { + .then((code) => { + if (code < 0) { + throw scripts.finishedErrors(code, job.id, 'updateProgress'); + } queue.emit('progress', job, progress); }); }, + updateData(job, data) { + const queue = job.queue; + const keys = [job.id].map(name => { + return queue.toKey(name); + }); + const dataJson = JSON.stringify(data); + + return queue.client + .updateData(keys, [ + dataJson + ]); + }, + retryJobsArgs(queue, count) { const keys = [ queue.toKey(''), diff --git a/package.json b/package.json index 7d381e643..922d222b6 100644 --- a/package.json +++ b/package.json @@ -58,6 +58,8 @@ "sinon": "^7.5.0" }, "scripts": { + "dc:up": "docker-compose -f docker-compose.yml up -d", + "dc:down": "docker-compose -f docker-compose.yml down", "pretest": "npm run lint", "lint": "eslint lib test *.js", "test": "NODE_ENV=test nyc mocha -- 'test/test_*' --recursive --exit", diff --git a/test/test_job.js b/test/test_job.js index 18c8051d5..9a8b662c0 100644 --- a/test/test_job.js +++ b/test/test_job.js @@ -201,6 +201,16 @@ describe('Job', () => { }); }); }); + + describe('when job was removed', () => { + it('throws an error', async () => { + const job = await Job.create(queue, { foo: 'bar' }); + await job.remove(); + await job.update({baz: 'qux'}).catch(err => { + expect(err.message).to.be.equal('Missing key for job 1 updateData'); + }); + }); + }); }); describe('.remove', () => { @@ -520,7 +530,7 @@ describe('Job', () => { it('can set and get progress as number', () => { return Job.create(queue, { foo: 'bar' }).then(job => { return job.progress(42).then(() => { - return Job.fromId(queue, job.id).then(storedJob => { + return Job.fromId(queue, job.id).then(async storedJob => { expect(storedJob.progress()).to.be(42); }); }); @@ -532,6 +542,16 @@ describe('Job', () => { const storedJob = await Job.fromId(queue, job.id); expect(storedJob.progress()).to.eql({ total: 120, completed: 40 }); }); + + describe('when job was removed', () => { + it('throws an error', async () => { + const job = await Job.create(queue, { foo: 'bar' }); + await job.remove(); + await job.progress({ total: 120, completed: 40 }).catch(err => { + expect(err.message).to.be.equal('Missing key for job 1 updateProgress'); + }); + }); + }); }); describe('.log', () => {