Skip to content

Commit

Permalink
feat(job): allow passing a debounce as option (#2666)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Jul 29, 2024
1 parent 64f58a9 commit 163ccea
Show file tree
Hide file tree
Showing 33 changed files with 443 additions and 47 deletions.
1 change: 1 addition & 0 deletions docs/gitbook/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* [LIFO](guide/jobs/lifo.md)
* [Job Ids](guide/jobs/job-ids.md)
* [Job Data](guide/jobs/job-data.md)
* [Debouncing](guide/jobs/debouncing.md)
* [Delayed](guide/jobs/delayed.md)
* [Repeatable](guide/jobs/repeatable.md)
* [Prioritized](guide/jobs/prioritized.md)
Expand Down
49 changes: 49 additions & 0 deletions docs/gitbook/guide/jobs/debouncing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Debouncing

Debouncing in BullMQ is a process where job execution is delayed and deduplicated based on specific identifiers. It ensures that within a specified period, or until a specific job is completed or failed, no new jobs with the same identifier will be added to the queue. Instead, these attempts will trigger a debounced event.

## Fixed Mode

In the Fixed Mode, debouncing works by assigning a delay (Time to Live, TTL) to a job upon its creation. If a similar job (identified by a unique debouncer ID) is added during this delay period, it is ignored. This prevents the queue from being overwhelmed with multiple instances of the same task, thus optimizing the processing time and resource utilization.

```typescript
import { Queue } from 'bullmq';

const myQueue = new Queue('Paint');

// Add a job that will be debounced for 5 seconds.
await myQueue.add(
'house',
{ color: 'white' },
{ debounce: { id: 'customValue', ttl: 5000 } },
);
```

In this example, after adding the house painting job with the debouncing parameters (id and ttl), any subsequent job with the same debouncing ID customValue added within 5 seconds will be ignored. This is useful for scenarios where rapid, repetitive requests are made, such as multiple users or processes attempting to trigger the same job.

Note that you must provide a debounce id that should represent your job. You can hash your entire job data or a subset of attributes for creating this identifier.

## Extended Mode

The Extended Mode takes a different approach by extending the debouncing duration until the job's completion or failure. This means as long as the job remains in an incomplete state (neither succeeded nor failed), any subsequent job with the same debouncer ID will be ignored.

```typescript
// Add a job that will be debounced as this record is not finished (completed or failed).
await myQueue.add(
'house',
{ color: 'white' },
{ debounce: { id: 'customValue' } },
);
```

While this job is not moved to completed or failed state, next jobs added with same **debounce id** will be ignored and a _debounced_ event will be triggered by our QueueEvent class.

This mode is particularly useful for jobs that have a long running time or those that must not be duplicated until they are resolved, such as processing a file upload or performing a critical update that should not be repeated if the initial attempt is still in progress.

{% hint style="warning" %}
Any manual deletion will disable the debouncing. For example, when calling _job.remove_ method.
{% endhint %}

## Read more:

- 💡 [Add Job API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#add)
13 changes: 13 additions & 0 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import type { QueueEvents } from './queue-events';
const logger = debuglog('bull');

const optsDecodeMap = {
de: 'debounce',
fpof: 'failParentOnFailure',
idof: 'ignoreDependencyOnFailure',
kl: 'keepLogs',
Expand Down Expand Up @@ -136,6 +137,11 @@ export class Job<
*/
parent?: ParentKeys;

/**
* Debounce identifier.
*/
debounceId?: string;

/**
* Base repeat job key.
*/
Expand Down Expand Up @@ -199,6 +205,8 @@ export class Job<
? { id: opts.parent.id, queueKey: opts.parent.queue }
: undefined;

this.debounceId = opts.debounce ? opts.debounce.id : undefined;

this.toKey = queue.toKey.bind(queue);
this.setScripts();

Expand Down Expand Up @@ -322,6 +330,10 @@ export class Job<
job.repeatJobKey = json.rjk;
}

if (json.deid) {
job.debounceId = json.deid;
}

job.failedReason = json.failedReason;

job.attemptsStarted = parseInt(json.ats || '0');
Expand Down Expand Up @@ -445,6 +457,7 @@ export class Job<
timestamp: this.timestamp,
failedReason: JSON.stringify(this.failedReason),
stacktrace: JSON.stringify(this.stacktrace),
debounceId: this.debounceId,
repeatJobKey: this.repeatJobKey,
returnvalue: JSON.stringify(this.returnvalue),
};
Expand Down
7 changes: 7 additions & 0 deletions src/classes/queue-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ export interface QueueEventsListener extends IoredisListener {
id: string,
) => void;

/**
* Listen to 'debounced' event.
*
* This event is triggered when a job is debounced because debouncedId still existed.
*/
debounced: (args: { jobId: string }, id: string) => void;

/**
* Listen to 'delayed' event.
*
Expand Down
1 change: 1 addition & 0 deletions src/classes/queue-keys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export class QueueKeys {
'events',
'pc', // priority counter key
'marker', // marker key
'de', // debounce key
].forEach(key => {
keys[key] = this.toKey(name, key);
});
Expand Down
26 changes: 18 additions & 8 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,10 @@ export class Queue<
* Get global concurrency value.
* Returns null in case no value is set.
*/
async getGlobalConcurrency():Promise<number|null> {
async getGlobalConcurrency(): Promise<number | null> {
const client = await this.client;
const concurrency = await client.hget(this.keys.meta, 'concurrency');
if(concurrency){
if (concurrency) {
return Number(concurrency);
}
return null;
Expand All @@ -203,12 +203,11 @@ export class Queue<
* is processed at any given time. If this limit is not defined, there will be no
* restriction on the number of concurrent jobs.
*/
async setGlobalConcurrency(concurrency: number) {
const client = await this.client;
return client.hset(this.keys.meta, 'concurrency', concurrency);
}


async setGlobalConcurrency(concurrency: number) {
const client = await this.client;
return client.hset(this.keys.meta, 'concurrency', concurrency);
}

/**
* Adds a new job to the queue.
*
Expand Down Expand Up @@ -374,6 +373,17 @@ export class Queue<
return !removed;
}

/**
* Removes a debounce key.
*
* @param id - identifier
*/
async removeDebounceKey(id: string): Promise<number> {
const client = await this.client;

return client.del(`${this.keys.de}:${id}`);
}

/**
* Removes a repeatable job by its key. Note that the key is the one used
* to store the repeatable job metadata and not one of the job iterations
Expand Down
1 change: 1 addition & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ export class Scripts {
parentOpts.parentDependenciesKey || null,
parent,
job.repeatJobKey,
job.debounceId ? `${queueKeys.de}:${job.debounceId}` : null,
];

let encodedOpts;
Expand Down
12 changes: 11 additions & 1 deletion src/commands/addDelayedJob-6.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
[7] parent dependencies key.
[8] parent? {id, queueKey}
[9] repeat job key
[10] debounce key
ARGV[2] Json stringified job data
ARGV[3] msgpacked options
Expand All @@ -49,12 +50,14 @@ local args = cmsgpack.unpack(ARGV[1])
local data = ARGV[2]

local parentKey = args[5]
local repeatJobKey = args[9]
local parent = args[8]
local repeatJobKey = args[9]
local debounceKey = args[10]
local parentData

-- Includes
--- @include "includes/addDelayMarkerIfNeeded"
--- @include "includes/debounceJob"
--- @include "includes/getDelayedScore"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/handleDuplicatedJob"
Expand All @@ -73,6 +76,7 @@ local opts = cmsgpack.unpack(ARGV[3])

local parentDependenciesKey = args[7]
local timestamp = args[4]

if args[2] == "" then
jobId = jobCounter
jobIdKey = args[1] .. jobId
Expand All @@ -86,6 +90,12 @@ else
end
end

local debouncedJobId = debounceJob(args[1], opts['de'],
jobId, debounceKey, eventsKey, maxEvents)
if debouncedJobId then
return debouncedJobId
end

-- Store the job.
local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2],
opts, timestamp, parentKey, parentData,
Expand Down
11 changes: 10 additions & 1 deletion src/commands/addParentJob-4.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
[7] parent dependencies key.
[8] parent? {id, queueKey}
[9] repeat job key
[10] debounce key
ARGV[2] Json stringified job data
ARGV[3] msgpacked options
Expand All @@ -44,11 +45,13 @@ local data = ARGV[2]
local opts = cmsgpack.unpack(ARGV[3])

local parentKey = args[5]
local repeatJobKey = args[9]
local parent = args[8]
local repeatJobKey = args[9]
local debounceKey = args[10]
local parentData

-- Includes
--- @include "includes/debounceJob"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/handleDuplicatedJob"
--- @include "includes/storeJob"
Expand Down Expand Up @@ -78,6 +81,12 @@ else
end
end

local debouncedJobId = debounceJob(args[1], opts['de'],
jobId, debounceKey, eventsKey, maxEvents)
if debouncedJobId then
return debouncedJobId
end

-- Store the job.
storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp,
parentKey, parentData, repeatJobKey)
Expand Down
13 changes: 11 additions & 2 deletions src/commands/addPrioritizedJob-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
[7] parent dependencies key.
[8] parent? {id, queueKey}
[9] repeat job key
[10] debounce key
ARGV[2] Json stringified job data
ARGV[3] msgpacked options
Expand All @@ -51,12 +52,14 @@ local data = ARGV[2]
local opts = cmsgpack.unpack(ARGV[3])

local parentKey = args[5]
local repeatJobKey = args[9]
local parent = args[8]
local repeatJobKey = args[9]
local debounceKey = args[10]
local parentData

-- Includes
--- @include "includes/addJobWithPriority"
--- @include "includes/debounceJob"
--- @include "includes/storeJob"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/handleDuplicatedJob"
Expand Down Expand Up @@ -87,6 +90,12 @@ else
end
end

local debouncedJobId = debounceJob(args[1], opts['de'],
jobId, debounceKey, eventsKey, maxEvents)
if debouncedJobId then
return debouncedJobId
end

-- Store the job.
local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2],
opts, timestamp, parentKey, parentData,
Expand Down
13 changes: 11 additions & 2 deletions src/commands/addStandardJob-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
[7] parent dependencies key.
[8] parent? {id, queueKey}
[9] repeat job key
[10] debounce key
ARGV[2] Json stringified job data
ARGV[3] msgpacked options
Expand All @@ -54,12 +55,14 @@ local data = ARGV[2]
local opts = cmsgpack.unpack(ARGV[3])

local parentKey = args[5]
local repeatJobKey = args[9]
local parent = args[8]
local repeatJobKey = args[9]
local debounceKey = args[10]
local parentData

-- Includes
--- @include "includes/addJobInTargetList"
--- @include "includes/debounceJob"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/getTargetQueueList"
--- @include "includes/handleDuplicatedJob"
Expand Down Expand Up @@ -91,6 +94,12 @@ else
end
end

local debouncedJobId = debounceJob(args[1], opts['de'],
jobId, debounceKey, eventsKey, maxEvents)
if debouncedJobId then
return debouncedJobId
end

-- Store the job.
storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp,
parentKey, parentData, repeatJobKey)
Expand Down
2 changes: 1 addition & 1 deletion src/commands/includes/cleanList.lua
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ local function cleanList(listKey, jobKeyPrefix, rangeStart, rangeEnd,
-- replace the entry with a deletion marker; the actual deletion will
-- occur at the end of the script
rcall("LSET", listKey, rangeEnd - jobIdsLen + i, deletionMarker)
removeJob(job, true, jobKeyPrefix)
removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]])
deletedCount = deletedCount + 1
table.insert(deleted, job)
end
Expand Down
4 changes: 2 additions & 2 deletions src/commands/includes/cleanSet.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ local function cleanSet(setKey, jobKeyPrefix, rangeEnd, timestamp, limit, attrib

local jobKey = jobKeyPrefix .. job
if isFinished then
removeJob(job, true, jobKeyPrefix)
removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]])
deletedCount = deletedCount + 1
table.insert(deleted, job)
else
-- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed
jobTS = getTimestamp(jobKey, attributes)
if (not jobTS or jobTS <= timestamp) then
removeJob(job, true, jobKeyPrefix)
removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]])
deletedCount = deletedCount + 1
table.insert(deleted, job)
end
Expand Down
23 changes: 23 additions & 0 deletions src/commands/includes/debounceJob.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
--[[
Function to debounce a job.
]]

local function debounceJob(prefixKey, debounceOpts, jobId, debounceKey, eventsKey, maxEvents)
local debounceId = debounceOpts and debounceOpts['id']
if debounceId then
local ttl = debounceOpts['ttl']
local debounceKeyExists
if ttl then
debounceKeyExists = not rcall('SET', debounceKey, jobId, 'PX', ttl, 'NX')
else
debounceKeyExists = not rcall('SET', debounceKey, jobId, 'NX')
end
if debounceKeyExists then
local currentDebounceJobId = rcall('GET', debounceKey)
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
"debounced", "jobId", currentDebounceJobId)
return currentDebounceJobId
end
end
end

Loading

0 comments on commit 163ccea

Please sign in to comment.