Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(marker): add base markers while consuming jobs to get workers busy #2904

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/commands/includes/prepareJobForProcessing.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
opts - limiter
]]

-- Includes
--- @include "addBaseMarkerIfNeeded"

local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey,
jobId, processedOn, maxJobs, opts)
jobId, processedOn, maxJobs, markerKey, opts)
local jobKey = keyPrefix .. jobId

-- Check if we need to perform rate limiting.
Expand Down Expand Up @@ -41,5 +44,7 @@ local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey
rcall("HMSET", jobKey, "processedOn", processedOn, unpack(optionalValues))
rcall("HINCRBY", jobKey, "ats", 1)

addBaseMarkerIfNeeded(markerKey, false)

return {rcall("HGETALL", jobKey), jobId, 0, 0} -- get job data
end
4 changes: 2 additions & 2 deletions src/commands/moveToActive-11.lua
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ end

if jobId then
return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],
maxJobs, opts)
maxJobs, markerKey, opts)
else
jobId = moveJobFromPriorityToActive(KEYS[3], activeKey, KEYS[10])
if jobId then
return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],
maxJobs, opts)
maxJobs, markerKey, opts)
end
end

Expand Down
9 changes: 5 additions & 4 deletions src/commands/moveToFinished-14.lua
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,9 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists

local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[2], KEYS[1], KEYS[8])

local markerKey = KEYS[14]
-- Check if there are delayed jobs that can be promoted
promoteDelayedJobs(KEYS[7], KEYS[14], target, KEYS[3], eventStreamKey, prefix,
promoteDelayedJobs(KEYS[7], markerKey, target, KEYS[3], eventStreamKey, prefix,
timestamp, KEYS[10], isPausedOrMaxed)

local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max'])
Expand All @@ -233,19 +234,19 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2],
KEYS[10])
return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId,
timestamp, maxJobs,
timestamp, maxJobs, markerKey,
opts)
end
else
return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId,
timestamp, maxJobs,
timestamp, maxJobs, markerKey,
opts)
end
else
jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10])
if jobId then
return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId,
timestamp, maxJobs,
timestamp, maxJobs, markerKey,
opts)
end
end
Expand Down
47 changes: 45 additions & 2 deletions tests/test_bulk.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { expect } from 'chai';
import { default as IORedis } from 'ioredis';
import { after as afterNumExecutions } from 'lodash';
import { after, beforeEach, describe, it, before } from 'mocha';
import { v4 } from 'uuid';
import { Queue, Worker, Job } from '../src/classes';
import { removeAllQueueData } from '../src/utils';
import { Queue, QueueEvents, Worker, Job } from '../src/classes';
import { removeAllQueueData, delay } from '../src/utils';

describe('bulk jobs', () => {
const redisHost = process.env.REDIS_HOST || 'localhost';
Expand Down Expand Up @@ -119,6 +120,48 @@ describe('bulk jobs', () => {
await removeAllQueueData(new IORedis(redisHost), parentQueueName);
});

it('should keep workers busy', async () => {
const numJobs = 6;
const queue2 = new Queue(queueName, { connection, markerCount: 2, prefix });

const queueEvents = new QueueEvents(queueName, { connection, prefix });
await queueEvents.waitUntilReady();

const worker = new Worker(
queueName,
async () => {
await delay(1000);
},
{ connection, prefix },
);
const worker2 = new Worker(
queueName,
async () => {
await delay(1000);
},
{ connection, prefix },
);
await worker.waitUntilReady();
await worker2.waitUntilReady();

const completed = new Promise(resolve => {
queueEvents.on('completed', afterNumExecutions(numJobs, resolve));
});

const jobs = Array.from(Array(numJobs).keys()).map(index => ({
name: 'test',
data: { index },
}));

await queue2.addBulk(jobs);

await completed;
await queue2.close();
await worker.close();
await worker2.close();
await queueEvents.close();
});

it('should process jobs with custom ids', async () => {
const name = 'test';
let processor;
Expand Down
Loading