Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(move-to-finished): throw error when job not in active state #2739

Merged
merged 8 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 6 additions & 20 deletions lib/commands/addJob-6.lua
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ local jobId
local jobIdKey
local rcall = redis.call

-- Includes
--- @include "includes/addJobWithPriority"
--- @include "includes/getTargetQueueList"

local jobCounter = rcall("INCR", KEYS[4])

if ARGV[2] == "" then
Expand Down Expand Up @@ -67,33 +71,15 @@ else

-- Whe check for the meta-paused key to decide if we are paused or not
-- (since an empty list and !EXISTS are not really the same)
local paused
if rcall("EXISTS", KEYS[3]) ~= 1 then
target = KEYS[1]
paused = false
else
target = KEYS[2]
paused = true
end
local target, paused = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2])

-- Standard or priority add
local priority = tonumber(ARGV[9])
if priority == 0 then
-- LIFO or FIFO
rcall(ARGV[10], target, jobId)
else
-- Priority add
rcall("ZADD", KEYS[6], priority, jobId)
local count = rcall("ZCOUNT", KEYS[6], 0, priority)

local len = rcall("LLEN", target)
local id = rcall("LINDEX", target, len - (count-1))
if id then
rcall("LINSERT", target, "BEFORE", id, jobId)
else
rcall("RPUSH", target, jobId)
end

addJobWithPriority(KEYS[6], priority, jobId, target)
end

-- Emit waiting event (wait..ing@token)
Expand Down
18 changes: 18 additions & 0 deletions lib/commands/includes/batches.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
--[[
Function to loop in batches.
Just a bit of warning, some commands as ZREM
could receive a maximum of 7000 parameters per call.
]]

local function batches(n, batchSize)
local i = 0

return function()
local from = i * batchSize + 1
i = i + 1
if (from <= n) then
local to = math.min(from + batchSize - 1, n)
return from, to
end
end
end
30 changes: 7 additions & 23 deletions lib/commands/moveStalledJobsToWait-7.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

local rcall = redis.call

-- Includes
--- @include "includes/batches"
--- @include "includes/getTargetQueueList"

local function removeJob(jobId, baseKey)
local jobKey = baseKey .. jobId
rcall("DEL", jobKey, jobKey .. ':logs')
Expand All @@ -45,19 +49,6 @@ local function removeJobsByMaxCount(maxCount, targetSet, prefix)
rcall("ZREMRANGEBYRANK", targetSet, 0, -(maxCount + 1))
end

local function batches(n, batchSize)
local i = 0

return function()
local from = i * batchSize + 1
i = i + 1
if (from <= n) then
local to = math.min(from + batchSize - 1, n)
return from, to
end
end
end

-- Check if we need to check for stalled jobs now.
if rcall("EXISTS", KEYS[5]) == 1 then
return {{}, {}}
Expand All @@ -70,15 +61,6 @@ local stalling = rcall('SMEMBERS', KEYS[1])
local stalled = {}
local failed = {}
if(#stalling > 0) then

local dst
-- wait or paused destination
if rcall("EXISTS", KEYS[6]) ~= 1 then
dst = KEYS[2]
else
dst = KEYS[7]
end

rcall('DEL', KEYS[1])

local MAX_STALLED_JOB_COUNT = tonumber(ARGV[1])
Expand Down Expand Up @@ -129,8 +111,10 @@ if(#stalling > 0) then

table.insert(failed, jobId)
else
local target = getTargetQueueList(KEYS[6], KEYS[2], KEYS[7])

-- Move the job back to the wait queue, to immediately be picked up by a waiting worker.
rcall("RPUSH", dst, jobId)
rcall("RPUSH", target, jobId)
rcall('PUBLISH', KEYS[1] .. '@', jobId)
table.insert(stalled, jobId)
end
Expand Down
6 changes: 4 additions & 2 deletions lib/commands/moveToFinished-9.lua
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists
end
end

-- Remove from active list
rcall("LREM", KEYS[1], -1, ARGV[1])
-- Remove from active list (if not active we shall return error)
local numRemovedElements = rcall("LREM", KEYS[1], -1, ARGV[1])

if numRemovedElements < 1 then return -3 end

-- Remove job?
local keepJobs = cmsgpack.unpack(ARGV[6])
Expand Down
14 changes: 2 additions & 12 deletions lib/commands/retryJobs-5.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,8 @@ local maxCount = tonumber(ARGV[1])

local rcall = redis.call;

local function batches(n, batchSize)
local i = 0

return function()
local from = i * batchSize + 1
i = i + 1
if (from <= n) then
local to = math.min(from + batchSize - 1, n)
return from, to
end
end
end
-- Includes
--- @include "includes/batches"

local function getZSetItems(keyName, max)
return rcall('ZRANGE', keyName, 0, max - 1)
Expand Down
23 changes: 6 additions & 17 deletions lib/commands/updateDelaySet-6.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,18 @@
]]
local rcall = redis.call;

-- Includes
--- @include "includes/addJobWithPriority"
--- @include "includes/getTargetQueueList"

-- Try to get as much as 1000 jobs at once
local jobs = rcall("ZRANGEBYSCORE", KEYS[1], 0, tonumber(ARGV[2]) * 0x1000, "LIMIT", 0, 1000)

if(#jobs > 0) then
rcall("ZREM", KEYS[1], unpack(jobs))

-- check if we need to use push in paused instead of waiting
local target;
if rcall("EXISTS", KEYS[6]) ~= 1 then
target = KEYS[3]
else
target = KEYS[5]
end
local target = getTargetQueueList(KEYS[6], KEYS[3], KEYS[5])

for _, jobId in ipairs(jobs) do
-- Is this really needed?
Expand All @@ -44,17 +43,7 @@ if(#jobs > 0) then
-- LIFO or FIFO
rcall("LPUSH", target, jobId)
else
-- Priority add
rcall("ZADD", KEYS[4], priority, jobId)
local count = rcall("ZCOUNT", KEYS[4], 0, priority)

local len = rcall("LLEN", target)
local id = rcall("LINDEX", target, len - (count-1))
if id then
rcall("LINSERT", target, "BEFORE", id, jobId)
else
rcall("RPUSH", target, jobId)
end
addJobWithPriority(KEYS[4], priority, jobId, target)
end

-- Emit waiting event (wait..ing@token)
Expand Down
19 changes: 17 additions & 2 deletions test/test_job.js
Original file line number Diff line number Diff line change
Expand Up @@ -608,12 +608,15 @@ describe('Job', () => {
describe('.moveToCompleted', () => {
it('marks the job as completed and returns new job', () => {
return Job.create(queue, { foo: 'bar' }).then(job1 => {
return Job.create(queue, { foo: 'bar' }).then(job2 => {
return Job.create(queue, { foo: 'bar' }, { lifo: true }).then(job2 => {
return job2
.isCompleted()
.then(isCompleted => {
expect(isCompleted).to.be(false);
})
.then(() => {
return scripts.moveToActive(queue);
})
.then(() => {
return job2.moveToCompleted('succeeded', true);
})
Expand All @@ -637,6 +640,9 @@ describe('Job', () => {
.then(isFailed => {
expect(isFailed).to.be(false);
})
.then(() => {
return scripts.moveToActive(queue);
})
.then(() => {
return job.moveToFailed(new Error('test error'), true);
})
Expand Down Expand Up @@ -702,6 +708,9 @@ describe('Job', () => {
.then(isFailed => {
expect(isFailed).to.be(false);
})
.then(() => {
return scripts.moveToActive(queue);
})
.then(() => {
return job.moveToFailed(new Error('test error'), true);
})
Expand Down Expand Up @@ -747,16 +756,22 @@ describe('Job', () => {

it('applies stacktrace limit on failure', () => {
const stackTraceLimit = 1;
return Job.create(queue, { foo: 'bar' }, { stackTraceLimit }).then(
return Job.create(queue, { foo: 'bar' }, { stackTraceLimit, attempts: 2 }).then(
job => {
return job
.isFailed()
.then(isFailed => {
expect(isFailed).to.be(false);
})
.then(() => {
return scripts.moveToActive(queue);
})
.then(() => {
return job.moveToFailed(new Error('test error'), true);
})
.then(() => {
return scripts.moveToActive(queue);
})
.then(() => {
return job
.moveToFailed(new Error('test error'), true)
Expand Down
Loading