diff --git a/src/commands/moveStalledJobsToWait-9.lua b/src/commands/moveStalledJobsToWait-9.lua index 241bdcfa60..0bb292068d 100644 --- a/src/commands/moveStalledJobsToWait-9.lua +++ b/src/commands/moveStalledJobsToWait-9.lua @@ -110,15 +110,17 @@ if (#stalling > 0) then jobKey, timestamp ) - elseif opts['idof'] then + elseif opts['idof'] or opts['rdof'] then local parentData = cjson.decode(rawParentData) local parentKey = parentData['queueKey'] .. ':' .. parentData['id'] local dependenciesSet = parentKey .. ":dependencies" if rcall("SREM", dependenciesSet, jobKey) == 1 then moveParentToWaitIfNeeded(parentData['queueKey'], dependenciesSet, parentKey, parentData['id'], timestamp) - local failedSet = parentKey .. ":failed" - rcall("HSET", failedSet, jobKey, failedReason) + if opts['idof'] then + local failedSet = parentKey .. ":failed" + rcall("HSET", failedSet, jobKey, failedReason) + end end end end diff --git a/tests/test_stalled_jobs.ts b/tests/test_stalled_jobs.ts index c040496ef0..2ccdf5c8a4 100644 --- a/tests/test_stalled_jobs.ts +++ b/tests/test_stalled_jobs.ts @@ -468,6 +468,100 @@ describe('stalled jobs', function () { }); describe('when ignoreDependencyOnFailure is provided as true', function () { + it('should move parent to waiting when child is moved to failed and save child failedReason', async function () { + this.timeout(6000); + const concurrency = 4; + const parentQueueName = `parent-queue-${v4()}`; + + const parentQueue = new Queue(parentQueueName, { + connection, + prefix, + }); + + const flow = new FlowProducer({ connection, prefix }); + + const worker = new Worker( + queueName, + async () => { + return delay(10000); + }, + { + connection, + prefix, + lockDuration: 1000, + stalledInterval: 100, + maxStalledCount: 0, + concurrency, + }, + ); + + const allActive = new Promise(resolve => { + worker.on('active', after(concurrency, resolve)); + }); + + await worker.waitUntilReady(); + + const { job: parent, children } = await flow.add({ + name: 'parent-job', + queueName: parentQueueName, + data: {}, + children: [ + { + name: 'test', + data: { foo: 'bar' }, + queueName, + opts: { ignoreDependencyOnFailure: true }, + }, + ], + }); + + const jobs = Array.from(Array(3).keys()).map(index => ({ + name: 'test', + data: { index }, + })); + + await queue.addBulk(jobs); + await allActive; + await worker.close(true); + + const worker2 = new Worker(queueName, async job => {}, { + connection, + prefix, + stalledInterval: 100, + maxStalledCount: 0, + concurrency, + }); + + const errorMessage = 'job stalled more than allowable limit'; + const allFailed = new Promise(resolve => { + worker2.on( + 'failed', + after(concurrency, async (job, failedReason, prev) => { + const parentState = await parent.getState(); + + expect(parentState).to.be.equal('waiting'); + expect(prev).to.be.equal('active'); + expect(failedReason.message).to.be.equal(errorMessage); + resolve(); + }), + ); + }); + + await allFailed; + const failedChildrenValues = await parent.getFailedChildrenValues(); + expect(failedChildrenValues).to.deep.equal({ + [`${queue.qualifiedName}:${children[0].job.id}`]: + 'job stalled more than allowable limit', + }); + + await worker2.close(); + await parentQueue.close(); + await flow.close(); + await removeAllQueueData(new IORedis(redisHost), parentQueueName); + }); + }); + + describe('when removeDependencyOnFailure is provided as true', function () { it('should move parent to waiting when child is moved to failed', async function () { this.timeout(6000); const concurrency = 4; @@ -510,7 +604,7 @@ describe('stalled jobs', function () { name: 'test', data: { foo: 'bar' }, queueName, - opts: { ignoreDependencyOnFailure: true }, + opts: { removeDependencyOnFailure: true }, }, ], });