Skip to content

Commit

Permalink
feat(queue): add rateLimit method
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Nov 18, 2024
1 parent 8204ea3 commit 73e8c43
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 6 deletions.
28 changes: 28 additions & 0 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,34 @@ export class Queue<
await super.close();
});
}

/**
* Overrides the rate limit to be active for the next jobs.
*
* @param expireTimeMs - expire time in ms of this rate limit.
*/
async rateLimit(expireTimeMs: number): Promise<void> {
await this.trace<void>(
SpanKind.INTERNAL,
'rateLimit',
this.name,
async span => {
span?.setAttributes({
[TelemetryAttributes.QueueRateLimit]: expireTimeMs,
});

await this.client.then(client =>
client.set(
this.keys.limiter,
Number.MAX_SAFE_INTEGER,
'PX',
expireTimeMs,
),
);
},
);
}

/**
* Resumes the processing of this queue globally.
*
Expand Down
2 changes: 1 addition & 1 deletion src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ export class Worker<

/**
* Overrides the rate limit to be active for the next jobs.
*
* @deprecated This method is deprecated and will be removed in v6. Use queue.rateLimit method instead.
* @param expireTimeMs - expire time in ms of this rate limit.
*/
async rateLimit(expireTimeMs: number): Promise<void> {
Expand Down
1 change: 1 addition & 0 deletions src/enums/telemetry-attributes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export enum TelemetryAttributes {
QueueDrainDelay = 'bullmq.queue.drain.delay',
QueueGrace = 'bullmq.queue.grace',
QueueCleanLimit = 'bullmq.queue.clean.limit',
QueueRateLimit = 'bullmq.queue.rate.limit',
JobType = 'bullmq.job.type',
QueueOptions = 'bullmq.queue.options',
QueueEventMaxLength = 'bullmq.queue.event.max.length',
Expand Down
16 changes: 11 additions & 5 deletions tests/test_concurrency.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import { default as IORedis } from 'ioredis';
import { FlowProducer, QueueEvents, Queue, Worker } from '../src/classes';
import {
FlowProducer,
QueueEvents,
Queue,
Worker,
RateLimitError,
} from '../src/classes';
import { delay, removeAllQueueData } from '../src/utils';
import { beforeEach, describe, it, after as afterAll } from 'mocha';
import { v4 } from 'uuid';
Expand Down Expand Up @@ -156,8 +162,8 @@ describe('Concurrency', () => {
queueName,
async job => {
if (job.attemptsStarted === 1) {
await worker.rateLimit(dynamicLimit);
throw Worker.RateLimitError();
await queue.rateLimit(dynamicLimit);
throw new RateLimitError();
}
},
{
Expand Down Expand Up @@ -240,8 +246,8 @@ describe('Concurrency', () => {
queueName,
async job => {
if (job.attemptsStarted === 1) {
await worker.rateLimit(dynamicLimit);
throw Worker.RateLimitError();
await queue.rateLimit(dynamicLimit);
throw new RateLimitError();
}
},
{
Expand Down

0 comments on commit 73e8c43

Please sign in to comment.