From a9752e9771cf9dd46757cad7b6510dc2dc1d5222 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Wed, 2 Oct 2024 23:58:19 -0500 Subject: [PATCH 1/3] refactor: add deduplication logic --- docs/gitbook/SUMMARY.md | 2 +- docs/gitbook/guide/jobs/debouncing.md | 38 ++++++++++++------------ src/classes/job.ts | 12 +++++++- src/classes/queue-events.ts | 8 +++++ src/classes/queue-getters.ts | 12 ++++++++ src/classes/queue-keys.ts | 2 +- src/classes/queue.ts | 12 ++++++++ src/classes/scripts.ts | 2 +- src/commands/addDelayedJob-6.lua | 14 ++++----- src/commands/addParentJob-4.lua | 14 ++++----- src/commands/addPrioritizedJob-8.lua | 14 ++++----- src/commands/addStandardJob-8.lua | 14 ++++----- src/commands/includes/debounceJob.lua | 23 -------------- src/commands/includes/deduplicateJob.lua | 25 ++++++++++++++++ src/interfaces/job-json.ts | 1 + src/types/job-options.ts | 6 ++++ 16 files changed, 125 insertions(+), 74 deletions(-) delete mode 100644 src/commands/includes/debounceJob.lua create mode 100644 src/commands/includes/deduplicateJob.lua diff --git a/docs/gitbook/SUMMARY.md b/docs/gitbook/SUMMARY.md index 8770547c8c..5df7ef2634 100644 --- a/docs/gitbook/SUMMARY.md +++ b/docs/gitbook/SUMMARY.md @@ -30,7 +30,7 @@ * [LIFO](guide/jobs/lifo.md) * [Job Ids](guide/jobs/job-ids.md) * [Job Data](guide/jobs/job-data.md) - * [Debouncing](guide/jobs/debouncing.md) + * [Deduplication](guide/jobs/deduplication.md) * [Delayed](guide/jobs/delayed.md) * [Repeatable](guide/jobs/repeatable.md) * [Prioritized](guide/jobs/prioritized.md) diff --git a/docs/gitbook/guide/jobs/debouncing.md b/docs/gitbook/guide/jobs/debouncing.md index ce87711d3c..0b37e43355 100644 --- a/docs/gitbook/guide/jobs/debouncing.md +++ b/docs/gitbook/guide/jobs/debouncing.md @@ -1,66 +1,66 @@ -# Debouncing +# Deduplication -Debouncing in BullMQ is a process where job execution is delayed and deduplicated based on specific identifiers. It ensures that within a specified period, or until a specific job is completed or failed, no new jobs with the same identifier will be added to the queue. Instead, these attempts will trigger a debounced event. +Deduplication in BullMQ is a process where job execution is delayed and deduplicated based on specific identifiers. It ensures that within a specified period, or until a specific job is completed or failed, no new jobs with the same identifier will be added to the queue. Instead, these attempts will trigger a deduplicated event. ## Fixed Mode -In the Fixed Mode, debouncing works by assigning a delay (Time to Live, TTL) to a job upon its creation. If a similar job (identified by a unique debouncer ID) is added during this delay period, it is ignored. This prevents the queue from being overwhelmed with multiple instances of the same task, thus optimizing the processing time and resource utilization. +In the Fixed Mode, deduplication works by assigning a delay (Time to Live, TTL) to a job upon its creation. If a similar job (identified by a unique deduplication ID) is added during this delay period, it is ignored. This prevents the queue from being overwhelmed with multiple instances of the same task, thus optimizing the processing time and resource utilization. ```typescript import { Queue } from 'bullmq'; const myQueue = new Queue('Paint'); -// Add a job that will be debounced for 5 seconds. +// Add a job that will be deduplicated for 5 seconds. await myQueue.add( 'house', { color: 'white' }, - { debounce: { id: 'customValue', ttl: 5000 } }, + { deduplication: { id: 'customValue', ttl: 5000 } }, ); ``` -In this example, after adding the house painting job with the debouncing parameters (id and ttl), any subsequent job with the same debouncing ID customValue added within 5 seconds will be ignored. This is useful for scenarios where rapid, repetitive requests are made, such as multiple users or processes attempting to trigger the same job. +In this example, after adding the house painting job with the deduplicated parameters (id and ttl), any subsequent job with the same deduplication ID customValue added within 5 seconds will be ignored. This is useful for scenarios where rapid, repetitive requests are made, such as multiple users or processes attempting to trigger the same job. -Note that you must provide a debounce id that should represent your job. You can hash your entire job data or a subset of attributes for creating this identifier. +Note that you must provide a deduplication id that should represent your job. You can hash your entire job data or a subset of attributes for creating this identifier. ## Extended Mode -The Extended Mode takes a different approach by extending the debouncing duration until the job's completion or failure. This means as long as the job remains in an incomplete state (neither succeeded nor failed), any subsequent job with the same debouncer ID will be ignored. +The Extended Mode takes a different approach by extending the deduplication duration until the job's completion or failure. This means as long as the job remains in an incomplete state (neither succeeded nor failed), any subsequent job with the same deduplication ID will be ignored. ```typescript -// Add a job that will be debounced as this record is not finished (completed or failed). +// Add a job that will be deduplicated as this record is not finished (completed or failed). await myQueue.add( 'house', { color: 'white' }, - { debounce: { id: 'customValue' } }, + { deduplication: { id: 'customValue' } }, ); ``` -While this job is not moved to completed or failed state, next jobs added with same **debounce id** will be ignored and a _debounced_ event will be triggered by our QueueEvent class. +While this job is not moved to completed or failed state, next jobs added with same **deduplication id** will be ignored and a _deduplicated_ event will be triggered by our QueueEvent class. This mode is particularly useful for jobs that have a long running time or those that must not be duplicated until they are resolved, such as processing a file upload or performing a critical update that should not be repeated if the initial attempt is still in progress. {% hint style="warning" %} -Any manual deletion will disable the debouncing. For example, when calling _job.remove_ method. +Any manual deletion will disable the deduplication. For example, when calling _job.remove_ method. {% endhint %} -## Get Debounce Job Id +## Get Deduplication Job Id -If you need to know which is the job id that started the debounce state. You can call **getDebounceJobId** method. +If you need to know which is the job id that started the deduplicated state. You can call **getDeduplicationJobId** method. ```typescript -const jobId = await myQueue.getDebounceJobId('customValue'); +const jobId = await myQueue.getDeduplicationJobId('customValue'); ``` -## Remove Debounce Key +## Remove Deduplication Key -If you need to stop debouncing before ttl finishes or before finishing a job. You can call **removeDebounceKey** method. +If you need to stop deduplication before ttl finishes or before finishing a job. You can call **removeDeduplicationKey** method. ```typescript -await myQueue.removeDebounceKey('customValue'); +await myQueue.removeDeduplicationKey('customValue'); ``` ## Read more: - 💡 [Add Job API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#add) -- 💡 [Remove Debounce Key API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#removeDebounceKey) +- 💡 [Remove Deduplication Key API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#removeDeduplicationKey) diff --git a/src/classes/job.ts b/src/classes/job.ts index 4d335f6c11..0f99864170 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -38,7 +38,7 @@ import type { QueueEvents } from './queue-events'; const logger = debuglog('bull'); const optsDecodeMap = { - de: 'debounce', + de: 'deduplication', fpof: 'failParentOnFailure', idof: 'ignoreDependencyOnFailure', kl: 'keepLogs', @@ -46,6 +46,7 @@ const optsDecodeMap = { }; const optsEncodeMap = invertObject(optsDecodeMap); +optsEncodeMap.debounce = 'de'; export const PRIORITY_LIMIT = 2 ** 21; @@ -139,9 +140,15 @@ export class Job< /** * Debounce identifier. + * @deprecated use deduplicationId */ debounceId?: string; + /** + * Deduplication identifier. + */ + deduplicationId?: string; + /** * Base repeat job key. */ @@ -206,6 +213,7 @@ export class Job< : undefined; this.debounceId = opts.debounce ? opts.debounce.id : undefined; + this.deduplicationId = opts.deduplication ? opts.deduplication.id : this.debounceId; this.toKey = queue.toKey.bind(queue); this.setScripts(); @@ -332,6 +340,7 @@ export class Job< if (json.deid) { job.debounceId = json.deid; + job.deduplicationId = json.deid; } job.failedReason = json.failedReason; @@ -458,6 +467,7 @@ export class Job< failedReason: JSON.stringify(this.failedReason), stacktrace: JSON.stringify(this.stacktrace), debounceId: this.debounceId, + deduplicationId: this.deduplicationId, repeatJobKey: this.repeatJobKey, returnvalue: JSON.stringify(this.returnvalue), }; diff --git a/src/classes/queue-events.ts b/src/classes/queue-events.ts index 1fdb028f50..1f5c6c41ee 100644 --- a/src/classes/queue-events.ts +++ b/src/classes/queue-events.ts @@ -47,11 +47,19 @@ export interface QueueEventsListener extends IoredisListener { /** * Listen to 'debounced' event. + * @deprecated use deduplicated event * * This event is triggered when a job is debounced because debounceId still existed. */ debounced: (args: { jobId: string; debounceId: string }, id: string) => void; + /** + * Listen to 'deduplicated' event. + * + * This event is triggered when a job is deduplicated because deduplicatedId still existed. + */ + deduplicated: (args: { jobId: string; deduplicationId: string }, id: string) => void; + /** * Listen to 'delayed' event. * diff --git a/src/classes/queue-getters.ts b/src/classes/queue-getters.ts index 584dc2f799..66b094f89f 100644 --- a/src/classes/queue-getters.ts +++ b/src/classes/queue-getters.ts @@ -115,6 +115,7 @@ export class QueueGetters< /** * Get jobId that starts debounced state. + * @deprecated use getDeduplicationJobId method * * @param id - debounce identifier */ @@ -124,6 +125,17 @@ export class QueueGetters< return client.get(`${this.keys.de}:${id}`); } + /** + * Get jobId from deduplicated state. + * + * @param id - deduplication identifier + */ + async getDeduplicationJobId(id: string): Promise { + const client = await this.client; + + return client.get(`${this.keys.de}:${id}`); + } + /** * Job counts by type * diff --git a/src/classes/queue-keys.ts b/src/classes/queue-keys.ts index 9109ae8dba..aaea64a130 100644 --- a/src/classes/queue-keys.ts +++ b/src/classes/queue-keys.ts @@ -24,7 +24,7 @@ export class QueueKeys { 'events', 'pc', // priority counter key 'marker', // marker key - 'de', // debounce key + 'de', // deduplication key ].forEach(key => { keys[key] = this.toKey(name, key); }); diff --git a/src/classes/queue.ts b/src/classes/queue.ts index db29ed03e1..60f005605f 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -375,6 +375,7 @@ export class Queue< /** * Removes a debounce key. + * @deprecated use removeDeduplicationKey * * @param id - identifier */ @@ -384,6 +385,17 @@ export class Queue< return client.del(`${this.keys.de}:${id}`); } + /** + * Removes a deduplication key. + * + * @param id - identifier + */ + async removeDeduplicationKey(id: string): Promise { + const client = await this.client; + + return client.del(`${this.keys.de}:${id}`); + } + /** * Removes a repeatable job by its key. Note that the key is the one used * to store the repeatable job metadata and not one of the job iterations diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 0c21578640..07a7adbf02 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -183,7 +183,7 @@ export class Scripts { parentOpts.parentDependenciesKey || null, parent, job.repeatJobKey, - job.debounceId ? `${queueKeys.de}:${job.debounceId}` : null, + job.deduplicationId ? `${queueKeys.de}:${job.deduplicationId}` : null, ]; let encodedOpts; diff --git a/src/commands/addDelayedJob-6.lua b/src/commands/addDelayedJob-6.lua index cbec0dcf63..de2c0f764e 100644 --- a/src/commands/addDelayedJob-6.lua +++ b/src/commands/addDelayedJob-6.lua @@ -25,7 +25,7 @@ [7] parent dependencies key. [8] parent? {id, queueKey} [9] repeat job key - [10] debounce key + [10] deduplication key ARGV[2] Json stringified job data ARGV[3] msgpacked options @@ -52,12 +52,12 @@ local data = ARGV[2] local parentKey = args[5] local parent = args[8] local repeatJobKey = args[9] -local debounceKey = args[10] +local deduplicationKey = args[10] local parentData -- Includes --- @include "includes/addDelayMarkerIfNeeded" ---- @include "includes/debounceJob" +--- @include "includes/deduplicateJob" --- @include "includes/getDelayedScore" --- @include "includes/getOrSetMaxEvents" --- @include "includes/handleDuplicatedJob" @@ -90,10 +90,10 @@ else end end -local debouncedJobId = debounceJob(args[1], opts['de'], - jobId, debounceKey, eventsKey, maxEvents) -if debouncedJobId then - return debouncedJobId +local deduplicationJobId = deduplicateJob(args[1], opts['de'], + jobId, deduplicationKey, eventsKey, maxEvents) +if deduplicationJobId then + return deduplicationJobId end -- Store the job. diff --git a/src/commands/addParentJob-4.lua b/src/commands/addParentJob-4.lua index 0cb89372dc..0840037e5f 100644 --- a/src/commands/addParentJob-4.lua +++ b/src/commands/addParentJob-4.lua @@ -20,7 +20,7 @@ [7] parent dependencies key. [8] parent? {id, queueKey} [9] repeat job key - [10] debounce key + [10] deduplication key ARGV[2] Json stringified job data ARGV[3] msgpacked options @@ -47,11 +47,11 @@ local opts = cmsgpack.unpack(ARGV[3]) local parentKey = args[5] local parent = args[8] local repeatJobKey = args[9] -local debounceKey = args[10] +local deduplicationKey = args[10] local parentData -- Includes ---- @include "includes/debounceJob" +--- @include "includes/deduplicateJob" --- @include "includes/getOrSetMaxEvents" --- @include "includes/handleDuplicatedJob" --- @include "includes/storeJob" @@ -81,10 +81,10 @@ else end end -local debouncedJobId = debounceJob(args[1], opts['de'], - jobId, debounceKey, eventsKey, maxEvents) -if debouncedJobId then - return debouncedJobId +local deduplicationJobId = deduplicateJob(args[1], opts['de'], + jobId, deduplicationKey, eventsKey, maxEvents) +if deduplicationJobId then + return deduplicationJobId end -- Store the job. diff --git a/src/commands/addPrioritizedJob-8.lua b/src/commands/addPrioritizedJob-8.lua index 288e779154..4fa6dbc616 100644 --- a/src/commands/addPrioritizedJob-8.lua +++ b/src/commands/addPrioritizedJob-8.lua @@ -24,7 +24,7 @@ [7] parent dependencies key. [8] parent? {id, queueKey} [9] repeat job key - [10] debounce key + [10] deduplication key ARGV[2] Json stringified job data ARGV[3] msgpacked options @@ -54,12 +54,12 @@ local opts = cmsgpack.unpack(ARGV[3]) local parentKey = args[5] local parent = args[8] local repeatJobKey = args[9] -local debounceKey = args[10] +local deduplicationKey = args[10] local parentData -- Includes --- @include "includes/addJobWithPriority" ---- @include "includes/debounceJob" +--- @include "includes/deduplicateJob" --- @include "includes/storeJob" --- @include "includes/getOrSetMaxEvents" --- @include "includes/handleDuplicatedJob" @@ -90,10 +90,10 @@ else end end -local debouncedJobId = debounceJob(args[1], opts['de'], - jobId, debounceKey, eventsKey, maxEvents) -if debouncedJobId then - return debouncedJobId +local deduplicationJobId = deduplicateJob(args[1], opts['de'], + jobId, deduplicationKey, eventsKey, maxEvents) +if deduplicationJobId then + return deduplicationJobId end -- Store the job. diff --git a/src/commands/addStandardJob-8.lua b/src/commands/addStandardJob-8.lua index 54fb363d73..7005e91af7 100644 --- a/src/commands/addStandardJob-8.lua +++ b/src/commands/addStandardJob-8.lua @@ -34,7 +34,7 @@ [7] parent dependencies key. [8] parent? {id, queueKey} [9] repeat job key - [10] debounce key + [10] deduplication key ARGV[2] Json stringified job data ARGV[3] msgpacked options @@ -57,12 +57,12 @@ local opts = cmsgpack.unpack(ARGV[3]) local parentKey = args[5] local parent = args[8] local repeatJobKey = args[9] -local debounceKey = args[10] +local deduplicationKey = args[10] local parentData -- Includes --- @include "includes/addJobInTargetList" ---- @include "includes/debounceJob" +--- @include "includes/deduplicateJob" --- @include "includes/getOrSetMaxEvents" --- @include "includes/getTargetQueueList" --- @include "includes/handleDuplicatedJob" @@ -94,10 +94,10 @@ else end end -local debouncedJobId = debounceJob(args[1], opts['de'], - jobId, debounceKey, eventsKey, maxEvents) -if debouncedJobId then - return debouncedJobId +local deduplicationJobId = deduplicateJob(args[1], opts['de'], + jobId, deduplicationKey, eventsKey, maxEvents) +if deduplicationJobId then + return deduplicationJobId end -- Store the job. diff --git a/src/commands/includes/debounceJob.lua b/src/commands/includes/debounceJob.lua deleted file mode 100644 index aa4cf089dd..0000000000 --- a/src/commands/includes/debounceJob.lua +++ /dev/null @@ -1,23 +0,0 @@ ---[[ - Function to debounce a job. -]] - -local function debounceJob(prefixKey, debounceOpts, jobId, debounceKey, eventsKey, maxEvents) - local debounceId = debounceOpts and debounceOpts['id'] - if debounceId then - local ttl = debounceOpts['ttl'] - local debounceKeyExists - if ttl then - debounceKeyExists = not rcall('SET', debounceKey, jobId, 'PX', ttl, 'NX') - else - debounceKeyExists = not rcall('SET', debounceKey, jobId, 'NX') - end - if debounceKeyExists then - local currentDebounceJobId = rcall('GET', debounceKey) - rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", - "debounced", "jobId", currentDebounceJobId, "debounceId", debounceId) - return currentDebounceJobId - end - end -end - \ No newline at end of file diff --git a/src/commands/includes/deduplicateJob.lua b/src/commands/includes/deduplicateJob.lua new file mode 100644 index 0000000000..07d785437a --- /dev/null +++ b/src/commands/includes/deduplicateJob.lua @@ -0,0 +1,25 @@ +--[[ + Function to debounce a job. +]] + +local function deduplicateJob(prefixKey, deduplicationOpts, jobId, deduplicationKey, eventsKey, maxEvents) + local deduplicationId = deduplicationOpts and deduplicationOpts['id'] + if deduplicationId then + local ttl = deduplicationOpts['ttl'] + local deduplicationKeyExists + if ttl then + deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'PX', ttl, 'NX') + else + deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX') + end + if deduplicationKeyExists then + local currentDebounceJobId = rcall('GET', deduplicationKey) + rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", + "debounced", "jobId", currentDebounceJobId, "debounceId", deduplicationId) + rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", + "deduplicated", "jobId", currentDebounceJobId, "deduplicationId", deduplicationId) + return currentDebounceJobId + end + end +end + \ No newline at end of file diff --git a/src/interfaces/job-json.ts b/src/interfaces/job-json.ts index 79f104d934..25ad335145 100644 --- a/src/interfaces/job-json.ts +++ b/src/interfaces/job-json.ts @@ -19,6 +19,7 @@ export interface JobJson { parentKey?: string; repeatJobKey?: string; debounceId?: string; + deduplicationId?: string; processedBy?: string; } diff --git a/src/types/job-options.ts b/src/types/job-options.ts index 4b0eea7b78..15cdd8ad16 100644 --- a/src/types/job-options.ts +++ b/src/types/job-options.ts @@ -3,9 +3,15 @@ import { BaseJobOptions, DebounceOptions } from '../interfaces'; export type JobsOptions = BaseJobOptions & { /** * Debounce options. + * @deprecated use deduplication option */ debounce?: DebounceOptions; + /** + * Deduplication options. + */ + deduplication?: DebounceOptions; + /** * If true, moves parent to failed. */ From 12a567d7510aa3cbfa1f1fe4e10b1df800581d05 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Thu, 3 Oct 2024 00:04:43 -0500 Subject: [PATCH 2/3] test: add some extra tests for deduplication --- tests/test_events.ts | 212 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 212 insertions(+) diff --git a/tests/test_events.ts b/tests/test_events.ts index 66a664df00..a77ca2f985 100644 --- a/tests/test_events.ts +++ b/tests/test_events.ts @@ -602,6 +602,218 @@ describe('events', function () { }); }); + describe('when job is deduplicated when added again with same debounce id', function () { + describe('when ttl is provided', function () { + it('used a fixed time period and emits debounced event', async function () { + const testName = 'test'; + + const job = await queue.add( + testName, + { foo: 'bar' }, + { deduplication: { id: 'a1', ttl: 2000 } }, + ); + + let deduplicatedCounter = 0; + let secondJob; + queueEvents.on('deduplicated', ({ jobId, deduplicationId }) => { + if (deduplicatedCounter > 1) { + expect(jobId).to.be.equal(secondJob.id); + expect(deduplicationId).to.be.equal('a1'); + } else { + expect(jobId).to.be.equal(job.id); + expect(deduplicationId).to.be.equal('a1'); + } + deduplicatedCounter++; + }); + + await delay(1000); + await queue.add( + testName, + { foo: 'bar' }, + { deduplication: { id: 'a1', ttl: 2000 } }, + ); + await queue.add( + testName, + { foo: 'bar' }, + { deduplication: { id: 'a1', ttl: 2000 } }, + ); + await delay(1100); + secondJob = await queue.add( + testName, + { foo: 'bar' }, + { deduplication: { id: 'a1', ttl: 2000 } }, + ); + await queue.add( + testName, + { foo: 'bar' }, + { deduplication: { id: 'a1', ttl: 2000 } }, + ); + await queue.add( + testName, + { foo: 'bar' }, + { deduplication: { id: 'a1', ttl: 2000 } }, + ); + await delay(100); + + expect(deduplicatedCounter).to.be.equal(4); + }); + + describe('when removing deduplicated job', function () { + it('removes deduplication key', async function () { + const testName = 'test'; + + const job = await queue.add( + testName, + { foo: 'bar' }, + { deduplication: { id: 'a1', ttl: 2000 } }, + ); + + let deduplicatedCounter = 0; + queueEvents.on('deduplicated', ({ jobId }) => { + deduplicatedCounter++; + }); + await job.remove(); + + await queue.add( + testName, + { foo: 'bar' }, + { deduplication: { id: 'a1', ttl: 2000 } }, + ); + await delay(1000); + await queue.add( + testName, + { foo: 'bar' }, + { deduplication: { id: 'a1', ttl: 2000 } }, + ); + await delay(1100); + const secondJob = await queue.add( + testName, + { foo: 'bar' }, + { deduplication: { id: 'a1', ttl: 2000 } }, + ); + await secondJob.remove(); + + await queue.add( + testName, + { foo: 'bar' }, + { deduplication: { id: 'a1', ttl: 2000 } }, + ); + await queue.add( + testName, + { foo: 'bar' }, + { deduplication: { id: 'a1', ttl: 2000 } }, + ); + await delay(100); + + expect(deduplicatedCounter).to.be.equal(2); + }); + }); + }); + + describe('when ttl is not provided', function () { + it('waits until job is finished before removing debounce key', async function () { + const testName = 'test'; + + const worker = new Worker( + queueName, + async () => { + await delay(100); + await queue.add( + testName, + { foo: 'bar' }, + { deduplication: { id: 'a1' } }, + ); + await delay(100); + await queue.add( + testName, + { foo: 'bar' }, + { deduplication: { id: 'a1' } }, + ); + await delay(100); + }, + { + autorun: false, + connection, + prefix, + }, + ); + await worker.waitUntilReady(); + + let deduplicatedCounter = 0; + + const completing = new Promise(resolve => { + queueEvents.once('completed', ({ jobId }) => { + expect(jobId).to.be.equal('1'); + resolve(); + }); + + queueEvents.on('deduplicated', ({ jobId }) => { + deduplicatedCounter++; + }); + }); + + worker.run(); + + await queue.add(testName, { foo: 'bar' }, { debounce: { id: 'a1' } }); + + await completing; + + const secondJob = await queue.add( + testName, + { foo: 'bar' }, + { deduplication: { id: 'a1' } }, + ); + + const count = await queue.getJobCountByTypes(); + + expect(count).to.be.eql(2); + + expect(deduplicatedCounter).to.be.equal(2); + expect(secondJob.id).to.be.equal('4'); + await worker.close(); + }); + + describe('when removing deduplicated job', function () { + it('removes deduplication key', async function () { + const testName = 'test'; + + const job = await queue.add( + testName, + { foo: 'bar' }, + { deduplication: { id: 'a1' } }, + ); + + let deduplicatedCounter = 0; + queueEvents.on('deduplicated', ({ jobId }) => { + deduplicatedCounter++; + }); + await job.remove(); + + await queue.add( + testName, + { foo: 'bar' }, + { deduplication: { id: 'a1' } }, + ); + + await queue.add( + testName, + { foo: 'bar' }, + { deduplication: { id: 'a1' } }, + ); + await delay(100); + const secondJob = await queue.add( + testName, + { foo: 'bar' }, + { deduplication: { id: 'a1' } }, + ); + await secondJob.remove(); + + expect(deduplicatedCounter).to.be.equal(2); + }); + }); + }); + }); + it('should emit an event when a job becomes active', async () => { const worker = new Worker(queueName, async job => {}, { connection, From fd4139e49e2aee778c21a4efeb0319409183f694 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Fri, 4 Oct 2024 22:35:31 -0500 Subject: [PATCH 3/3] docs: rename modes --- .../jobs/{debouncing.md => deduplication.md} | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) rename docs/gitbook/guide/jobs/{debouncing.md => deduplication.md} (79%) diff --git a/docs/gitbook/guide/jobs/debouncing.md b/docs/gitbook/guide/jobs/deduplication.md similarity index 79% rename from docs/gitbook/guide/jobs/debouncing.md rename to docs/gitbook/guide/jobs/deduplication.md index 0b37e43355..6356d37f51 100644 --- a/docs/gitbook/guide/jobs/debouncing.md +++ b/docs/gitbook/guide/jobs/deduplication.md @@ -2,43 +2,43 @@ Deduplication in BullMQ is a process where job execution is delayed and deduplicated based on specific identifiers. It ensures that within a specified period, or until a specific job is completed or failed, no new jobs with the same identifier will be added to the queue. Instead, these attempts will trigger a deduplicated event. -## Fixed Mode +## Simple Mode -In the Fixed Mode, deduplication works by assigning a delay (Time to Live, TTL) to a job upon its creation. If a similar job (identified by a unique deduplication ID) is added during this delay period, it is ignored. This prevents the queue from being overwhelmed with multiple instances of the same task, thus optimizing the processing time and resource utilization. +The Simple Mode takes a different approach by extending the deduplication duration until the job's completion or failure. This means as long as the job remains in an incomplete state (neither succeeded nor failed), any subsequent job with the same deduplication ID will be ignored. ```typescript -import { Queue } from 'bullmq'; - -const myQueue = new Queue('Paint'); - -// Add a job that will be deduplicated for 5 seconds. +// Add a job that will be deduplicated as this record is not finished (completed or failed). await myQueue.add( 'house', { color: 'white' }, - { deduplication: { id: 'customValue', ttl: 5000 } }, + { deduplication: { id: 'customValue' } }, ); ``` -In this example, after adding the house painting job with the deduplicated parameters (id and ttl), any subsequent job with the same deduplication ID customValue added within 5 seconds will be ignored. This is useful for scenarios where rapid, repetitive requests are made, such as multiple users or processes attempting to trigger the same job. +While this job is not moved to completed or failed state, next jobs added with same **deduplication id** will be ignored and a _deduplicated_ event will be triggered by our QueueEvent class. -Note that you must provide a deduplication id that should represent your job. You can hash your entire job data or a subset of attributes for creating this identifier. +This mode is particularly useful for jobs that have a long running time or those that must not be duplicated until they are resolved, such as processing a file upload or performing a critical update that should not be repeated if the initial attempt is still in progress. -## Extended Mode +## Throttle Mode -The Extended Mode takes a different approach by extending the deduplication duration until the job's completion or failure. This means as long as the job remains in an incomplete state (neither succeeded nor failed), any subsequent job with the same deduplication ID will be ignored. +In the Throttle Mode, deduplication works by assigning a delay (Time to Live, TTL) to a job upon its creation. If a similar job (identified by a unique deduplication ID) is added during this delay period, it is ignored. This prevents the queue from being overwhelmed with multiple instances of the same task, thus optimizing the processing time and resource utilization. ```typescript -// Add a job that will be deduplicated as this record is not finished (completed or failed). +import { Queue } from 'bullmq'; + +const myQueue = new Queue('Paint'); + +// Add a job that will be deduplicated for 5 seconds. await myQueue.add( 'house', { color: 'white' }, - { deduplication: { id: 'customValue' } }, + { deduplication: { id: 'customValue', ttl: 5000 } }, ); ``` -While this job is not moved to completed or failed state, next jobs added with same **deduplication id** will be ignored and a _deduplicated_ event will be triggered by our QueueEvent class. +In this example, after adding the house painting job with the deduplicated parameters (id and ttl), any subsequent job with the same deduplication ID customValue added within 5 seconds will be ignored. This is useful for scenarios where rapid, repetitive requests are made, such as multiple users or processes attempting to trigger the same job. -This mode is particularly useful for jobs that have a long running time or those that must not be duplicated until they are resolved, such as processing a file upload or performing a critical update that should not be repeated if the initial attempt is still in progress. +Note that you must provide a deduplication id that should represent your job. You can hash your entire job data or a subset of attributes for creating this identifier. {% hint style="warning" %} Any manual deletion will disable the deduplication. For example, when calling _job.remove_ method.