Skip to content

Commit

Permalink
fix(stalled): support removeDependencyOnFailure option when job is st…
Browse files Browse the repository at this point in the history
…alled (#2708)
  • Loading branch information
roggervalf authored Aug 14, 2024
1 parent de094a3 commit e0d3790
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 4 deletions.
8 changes: 5 additions & 3 deletions src/commands/moveStalledJobsToWait-9.lua
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,17 @@ if (#stalling > 0) then
jobKey,
timestamp
)
elseif opts['idof'] then
elseif opts['idof'] or opts['rdof'] then
local parentData = cjson.decode(rawParentData)
local parentKey = parentData['queueKey'] .. ':' .. parentData['id']
local dependenciesSet = parentKey .. ":dependencies"
if rcall("SREM", dependenciesSet, jobKey) == 1 then
moveParentToWaitIfNeeded(parentData['queueKey'], dependenciesSet,
parentKey, parentData['id'], timestamp)
local failedSet = parentKey .. ":failed"
rcall("HSET", failedSet, jobKey, failedReason)
if opts['idof'] then
local failedSet = parentKey .. ":failed"
rcall("HSET", failedSet, jobKey, failedReason)
end
end
end
end
Expand Down
96 changes: 95 additions & 1 deletion tests/test_stalled_jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,100 @@ describe('stalled jobs', function () {
});

describe('when ignoreDependencyOnFailure is provided as true', function () {
it('should move parent to waiting when child is moved to failed and save child failedReason', async function () {
this.timeout(6000);
const concurrency = 4;
const parentQueueName = `parent-queue-${v4()}`;

const parentQueue = new Queue(parentQueueName, {
connection,
prefix,
});

const flow = new FlowProducer({ connection, prefix });

const worker = new Worker(
queueName,
async () => {
return delay(10000);
},
{
connection,
prefix,
lockDuration: 1000,
stalledInterval: 100,
maxStalledCount: 0,
concurrency,
},
);

const allActive = new Promise(resolve => {
worker.on('active', after(concurrency, resolve));
});

await worker.waitUntilReady();

const { job: parent, children } = await flow.add({
name: 'parent-job',
queueName: parentQueueName,
data: {},
children: [
{
name: 'test',
data: { foo: 'bar' },
queueName,
opts: { ignoreDependencyOnFailure: true },
},
],
});

const jobs = Array.from(Array(3).keys()).map(index => ({
name: 'test',
data: { index },
}));

await queue.addBulk(jobs);
await allActive;
await worker.close(true);

const worker2 = new Worker(queueName, async job => {}, {
connection,
prefix,
stalledInterval: 100,
maxStalledCount: 0,
concurrency,
});

const errorMessage = 'job stalled more than allowable limit';
const allFailed = new Promise<void>(resolve => {
worker2.on(
'failed',
after(concurrency, async (job, failedReason, prev) => {
const parentState = await parent.getState();

expect(parentState).to.be.equal('waiting');
expect(prev).to.be.equal('active');
expect(failedReason.message).to.be.equal(errorMessage);
resolve();
}),
);
});

await allFailed;
const failedChildrenValues = await parent.getFailedChildrenValues();
expect(failedChildrenValues).to.deep.equal({
[`${queue.qualifiedName}:${children[0].job.id}`]:
'job stalled more than allowable limit',
});

await worker2.close();
await parentQueue.close();
await flow.close();
await removeAllQueueData(new IORedis(redisHost), parentQueueName);
});
});

describe('when removeDependencyOnFailure is provided as true', function () {
it('should move parent to waiting when child is moved to failed', async function () {
this.timeout(6000);
const concurrency = 4;
Expand Down Expand Up @@ -510,7 +604,7 @@ describe('stalled jobs', function () {
name: 'test',
data: { foo: 'bar' },
queueName,
opts: { ignoreDependencyOnFailure: true },
opts: { removeDependencyOnFailure: true },
},
],
});
Expand Down

0 comments on commit e0d3790

Please sign in to comment.