diff --git a/docs/gitbook/README (1).md b/docs/gitbook/README (1).md index e849e2ce07..20de97fe4c 100644 --- a/docs/gitbook/README (1).md +++ b/docs/gitbook/README (1).md @@ -49,11 +49,15 @@ import IORedis from 'ioredis'; const connection = new IORedis({ maxRetriesPerRequest: null }); -const worker = new Worker('foo', async job => { - // Will print { foo: 'bar'} for the first job - // and { qux: 'baz' } for the second. - console.log(job.data); -}, { connection }); +const worker = new Worker( + 'foo', + async job => { + // Will print { foo: 'bar'} for the first job + // and { qux: 'baz' } for the second. + console.log(job.data); + }, + { connection }, +); ``` {% hint style="info" %} diff --git a/docs/gitbook/SUMMARY.md b/docs/gitbook/SUMMARY.md index 73b1e37a81..dea69273f3 100644 --- a/docs/gitbook/SUMMARY.md +++ b/docs/gitbook/SUMMARY.md @@ -1,136 +1,136 @@ # Table of contents -* [What is BullMQ](README.md) -* [Quick Start]() -* [API Reference](https://api.docs.bullmq.io) -* [Changelogs](changelog.md) - * [v4](changelogs/changelog-v4.md) - * [v3](changelogs/changelog-v3.md) - * [v2](changelogs/changelog-v2.md) - * [v1](changelogs/changelog-v1.md) +- [What is BullMQ](README.md) +- [Quick Start]() +- [API Reference](https://api.docs.bullmq.io) +- [Changelogs](changelog.md) + - [v4](changelogs/changelog-v4.md) + - [v3](changelogs/changelog-v3.md) + - [v2](changelogs/changelog-v2.md) + - [v1](changelogs/changelog-v1.md) ## Guide -* [Introduction](guide/introduction.md) -* [Connections](guide/connections.md) -* [Queues](guide/queues/README.md) - * [Auto-removal of jobs](guide/queues/auto-removal-of-jobs.md) - * [Adding jobs in bulk](guide/queues/adding-bulks.md) - * [Global Concurrency](guide/queues/global-concurrency.md) - * [Removing Jobs](guide/queues/removing-jobs.md) -* [Workers](guide/workers/README.md) - * [Auto-removal of jobs](guide/workers/auto-removal-of-jobs.md) - * [Concurrency](guide/workers/concurrency.md) - * [Graceful shutdown](guide/workers/graceful-shutdown.md) - * [Stalled Jobs](guide/workers/stalled-jobs.md) - * [Sandboxed processors](guide/workers/sandboxed-processors.md) - * [Pausing queues](guide/workers/pausing-queues.md) -* [Jobs](guide/jobs/README.md) - * [FIFO](guide/jobs/fifo.md) - * [LIFO](guide/jobs/lifo.md) - * [Job Ids](guide/jobs/job-ids.md) - * [Job Data](guide/jobs/job-data.md) - * [Deduplication](guide/jobs/deduplication.md) - * [Delayed](guide/jobs/delayed.md) - * [Repeatable](guide/jobs/repeatable.md) - * [Prioritized](guide/jobs/prioritized.md) - * [Removing jobs](guide/jobs/removing-job.md) - * [Stalled](guide/jobs/stalled.md) - * [Getters](guide/jobs/getters.md) -* [Job Schedulers](guide/job-schedulers/README.md) - * [Repeat Strategies](guide/job-schedulers/repeat-strategies.md) - * [Repeat options](guide/job-schedulers/repeat-options.md) - * [Manage Job Schedulers](guide/job-schedulers/manage-job-schedulers.md) -* [Flows](guide/flows/README.md) - * [Adding flows in bulk](guide/flows/adding-bulks.md) - * [Get Flow Tree](guide/flows/get-flow-tree.md) - * [Fail Parent](guide/flows/fail-parent.md) - * [Remove Dependency](guide/flows/remove-dependency.md) - * [Ignore Dependency](guide/flows/ignore-dependency.md) - * [Remove Child Dependency](guide/flows/remove-child-dependency.md) -* [Metrics](guide/metrics/metrics.md) -* [Rate limiting](guide/rate-limiting.md) -* [Parallelism and Concurrency](guide/parallelism-and-concurrency.md) -* [Retrying failing jobs](guide/retrying-failing-jobs.md) -* [Returning job data](guide/returning-job-data.md) -* [Events](guide/events/README.md) - * [Create Custom Events](guide/events/create-custom-events.md) -* [Telemetry](guide/telemetry/README.md) - * [Getting started](guide/telemetry/getting-started.md) - * [Running Jaeger](guide/telemetry/running-jaeger.md) - * [Running a simple example](guide/telemetry/running-a-simple-example.md) -* [QueueScheduler](guide/queuescheduler.md) -* [Redis™ Compatibility](guide/redis-tm-compatibility/README.md) - * [Dragonfly](guide/redis-tm-compatibility/dragonfly.md) -* [Redis™ hosting](guide/redis-tm-hosting/README.md) - * [AWS MemoryDB](guide/redis-tm-hosting/aws-memorydb.md) - * [AWS Elasticache](guide/redis-tm-hosting/aws-elasticache.md) -* [Architecture](guide/architecture.md) -* [NestJs](guide/nestjs/README.md) - * [Producers](guide/nestjs/producers.md) - * [Queue Events Listeners](guide/nestjs/queue-events-listeners.md) -* [Going to production](guide/going-to-production.md) -* [Migration to newer versions](guide/migration-to-newer-versions.md) -* [Troubleshooting](guide/troubleshooting.md) +- [Introduction](guide/introduction.md) +- [Connections](guide/connections.md) +- [Queues](guide/queues/README.md) + - [Auto-removal of jobs](guide/queues/auto-removal-of-jobs.md) + - [Adding jobs in bulk](guide/queues/adding-bulks.md) + - [Global Concurrency](guide/queues/global-concurrency.md) + - [Removing Jobs](guide/queues/removing-jobs.md) +- [Workers](guide/workers/README.md) + - [Auto-removal of jobs](guide/workers/auto-removal-of-jobs.md) + - [Concurrency](guide/workers/concurrency.md) + - [Graceful shutdown](guide/workers/graceful-shutdown.md) + - [Stalled Jobs](guide/workers/stalled-jobs.md) + - [Sandboxed processors](guide/workers/sandboxed-processors.md) + - [Pausing queues](guide/workers/pausing-queues.md) +- [Jobs](guide/jobs/README.md) + - [FIFO](guide/jobs/fifo.md) + - [LIFO](guide/jobs/lifo.md) + - [Job Ids](guide/jobs/job-ids.md) + - [Job Data](guide/jobs/job-data.md) + - [Deduplication](guide/jobs/deduplication.md) + - [Delayed](guide/jobs/delayed.md) + - [Repeatable](guide/jobs/repeatable.md) + - [Prioritized](guide/jobs/prioritized.md) + - [Removing jobs](guide/jobs/removing-job.md) + - [Stalled](guide/jobs/stalled.md) + - [Getters](guide/jobs/getters.md) +- [Job Schedulers](guide/job-schedulers/README.md) + - [Repeat Strategies](guide/job-schedulers/repeat-strategies.md) + - [Repeat options](guide/job-schedulers/repeat-options.md) + - [Manage Job Schedulers](guide/job-schedulers/manage-job-schedulers.md) +- [Flows](guide/flows/README.md) + - [Adding flows in bulk](guide/flows/adding-bulks.md) + - [Get Flow Tree](guide/flows/get-flow-tree.md) + - [Fail Parent](guide/flows/fail-parent.md) + - [Remove Dependency](guide/flows/remove-dependency.md) + - [Ignore Dependency](guide/flows/ignore-dependency.md) + - [Remove Child Dependency](guide/flows/remove-child-dependency.md) +- [Metrics](guide/metrics/metrics.md) +- [Rate limiting](guide/rate-limiting.md) +- [Parallelism and Concurrency](guide/parallelism-and-concurrency.md) +- [Retrying failing jobs](guide/retrying-failing-jobs.md) +- [Returning job data](guide/returning-job-data.md) +- [Events](guide/events/README.md) + - [Create Custom Events](guide/events/create-custom-events.md) +- [Telemetry](guide/telemetry/README.md) + - [Getting started](guide/telemetry/getting-started.md) + - [Running Jaeger](guide/telemetry/running-jaeger.md) + - [Running a simple example](guide/telemetry/running-a-simple-example.md) +- [QueueScheduler](guide/queuescheduler.md) +- [Redis™ Compatibility](guide/redis-tm-compatibility/README.md) + - [Dragonfly](guide/redis-tm-compatibility/dragonfly.md) +- [Redis™ hosting](guide/redis-tm-hosting/README.md) + - [AWS MemoryDB](guide/redis-tm-hosting/aws-memorydb.md) + - [AWS Elasticache](guide/redis-tm-hosting/aws-elasticache.md) +- [Architecture](guide/architecture.md) +- [NestJs](guide/nestjs/README.md) + - [Producers](guide/nestjs/producers.md) + - [Queue Events Listeners](guide/nestjs/queue-events-listeners.md) +- [Going to production](guide/going-to-production.md) +- [Migration to newer versions](guide/migration-to-newer-versions.md) +- [Troubleshooting](guide/troubleshooting.md) ## Patterns -* [Adding jobs in bulk across different queues](patterns/adding-bulks.md) -* [Manually processing jobs](patterns/manually-fetching-jobs.md) -* [Named Processor](patterns/named-processor.md) -* [Flows](patterns/flows.md) -* [Idempotent jobs](patterns/idempotent-jobs.md) -* [Throttle jobs](patterns/throttle-jobs.md) -* [Process Step Jobs](patterns/process-step-jobs.md) -* [Failing fast when Redis is down](patterns/failing-fast-when-redis-is-down.md) -* [Stop retrying jobs](patterns/stop-retrying-jobs.md) -* [Timeout jobs](patterns/timeout-jobs.md) -* [Redis Cluster](patterns/redis-cluster.md) +- [Adding jobs in bulk across different queues](patterns/adding-bulks.md) +- [Manually processing jobs](patterns/manually-fetching-jobs.md) +- [Named Processor](patterns/named-processor.md) +- [Flows](patterns/flows.md) +- [Idempotent jobs](patterns/idempotent-jobs.md) +- [Throttle jobs](patterns/throttle-jobs.md) +- [Process Step Jobs](patterns/process-step-jobs.md) +- [Failing fast when Redis is down](patterns/failing-fast-when-redis-is-down.md) +- [Stop retrying jobs](patterns/stop-retrying-jobs.md) +- [Timeout jobs](patterns/timeout-jobs.md) +- [Redis Cluster](patterns/redis-cluster.md) ## BullMQ Pro -* [Introduction](bullmq-pro/introduction.md) -* [Install](bullmq-pro/install.md) -* [Observables](bullmq-pro/observables/README.md) - * [Cancelation](bullmq-pro/observables/cancelation.md) -* [Groups](bullmq-pro/groups/README.md) - * [Getters](bullmq-pro/groups/getters.md) - * [Rate limiting](bullmq-pro/groups/rate-limiting.md) - * [Concurrency](bullmq-pro/groups/concurrency.md) - * [Local group concurrency](bullmq-pro/groups/local-group-concurrency.md) - * [Max group size](bullmq-pro/groups/max-group-size.md) - * [Pausing groups](bullmq-pro/groups/pausing-groups.md) - * [Prioritized intra-groups](bullmq-pro/groups/prioritized.md) - * [Sandboxes for groups](bullmq-pro/groups/sandboxes-for-groups.md) -* [Telemetry](bullmq-pro/telemetry.md) -* [Batches](bullmq-pro/batches.md) -* [NestJs](bullmq-pro/nestjs/README.md) - * [Producers](bullmq-pro/nestjs/producers.md) - * [Queue Events Listeners](bullmq-pro/nestjs/queue-events-listeners.md) - * [API Reference](https://nestjs.bullmq.pro/) - * [Changelog](bullmq-pro/nestjs/changelog.md) -* [API Reference](https://api.bullmq.pro) -* [Changelog](bullmq-pro/changelog.md) -* [Support](bullmq-pro/support.md) +- [Introduction](bullmq-pro/introduction.md) +- [Install](bullmq-pro/install.md) +- [Observables](bullmq-pro/observables/README.md) + - [Cancelation](bullmq-pro/observables/cancelation.md) +- [Groups](bullmq-pro/groups/README.md) + - [Getters](bullmq-pro/groups/getters.md) + - [Rate limiting](bullmq-pro/groups/rate-limiting.md) + - [Concurrency](bullmq-pro/groups/concurrency.md) + - [Local group concurrency](bullmq-pro/groups/local-group-concurrency.md) + - [Max group size](bullmq-pro/groups/max-group-size.md) + - [Pausing groups](bullmq-pro/groups/pausing-groups.md) + - [Prioritized intra-groups](bullmq-pro/groups/prioritized.md) + - [Sandboxes for groups](bullmq-pro/groups/sandboxes-for-groups.md) +- [Telemetry](bullmq-pro/telemetry.md) +- [Batches](bullmq-pro/batches.md) +- [NestJs](bullmq-pro/nestjs/README.md) + - [Producers](bullmq-pro/nestjs/producers.md) + - [Queue Events Listeners](bullmq-pro/nestjs/queue-events-listeners.md) + - [API Reference](https://nestjs.bullmq.pro/) + - [Changelog](bullmq-pro/nestjs/changelog.md) +- [API Reference](https://api.bullmq.pro) +- [Changelog](bullmq-pro/changelog.md) +- [Support](bullmq-pro/support.md) ## Bull -* [Introduction](bull/introduction.md) -* [Install](bull/install.md) -* [Quick Guide](bull/quick-guide.md) -* [Important Notes](bull/important-notes.md) -* [Reference](https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md) -* [Patterns](bull/patterns/README.md) - * [Persistent connections](bull/patterns/persistent-connections.md) - * [Message queue](bull/patterns/message-queue.md) - * [Returning Job Completions](bull/patterns/returning-job-completions.md) - * [Reusing Redis Connections](bull/patterns/reusing-redis-connections.md) - * [Redis cluster](bull/patterns/redis-cluster.md) - * [Custom backoff strategy](bull/patterns/custom-backoff-strategy.md) - * [Debugging](bull/patterns/debugging.md) - * [Manually fetching jobs](bull/patterns/manually-fetching-jobs.md) +- [Introduction](bull/introduction.md) +- [Install](bull/install.md) +- [Quick Guide](bull/quick-guide.md) +- [Important Notes](bull/important-notes.md) +- [Reference](https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md) +- [Patterns](bull/patterns/README.md) + - [Persistent connections](bull/patterns/persistent-connections.md) + - [Message queue](bull/patterns/message-queue.md) + - [Returning Job Completions](bull/patterns/returning-job-completions.md) + - [Reusing Redis Connections](bull/patterns/reusing-redis-connections.md) + - [Redis cluster](bull/patterns/redis-cluster.md) + - [Custom backoff strategy](bull/patterns/custom-backoff-strategy.md) + - [Debugging](bull/patterns/debugging.md) + - [Manually fetching jobs](bull/patterns/manually-fetching-jobs.md) ## Python -* [Introduction](python/introduction.md) -* [Changelog](python/changelog.md) +- [Introduction](python/introduction.md) +- [Changelog](python/changelog.md) diff --git a/docs/gitbook/bullmq-pro/telemetry.md b/docs/gitbook/bullmq-pro/telemetry.md index 3e40973f64..15c01adf32 100644 --- a/docs/gitbook/bullmq-pro/telemetry.md +++ b/docs/gitbook/bullmq-pro/telemetry.md @@ -3,46 +3,46 @@ In the same fashion we support telemetry in BullMQ open source edition, we also support telemetry for BullMQ Pro. It works basically the same, in fact you can just the same integrations available for BullMQ in the Pro version. So in order to enable it you would do something like this: ```typescript -import { QueuePro } from "@taskforcesh/bullmq-pro"; -import { BullMQOtel } from "bullmq-otel"; +import { QueuePro } from '@taskforcesh/bullmq-pro'; +import { BullMQOtel } from 'bullmq-otel'; // Initialize a Pro queue using BullMQ-Otel -const queue = new QueuePro("myProQueue", { +const queue = new QueuePro('myProQueue', { connection, - telemetry: new BullMQOtel("guide"), + telemetry: new BullMQOtel('guide'), }); await queue.add( - "myJob", - { data: "myData" }, + 'myJob', + { data: 'myData' }, { attempts: 2, backoff: 1000, group: { - id: "myGroupId", + id: 'myGroupId', }, - } + }, ); ``` For the Worker we will do it in a similar way: ```typescript -import { WorkerPro } from "@taskforcesh/bullmq-pro"; -import { BullMQOtel } from "bullmq-otel"; +import { WorkerPro } from '@taskforcesh/bullmq-pro'; +import { BullMQOtel } from 'bullmq-otel'; const worker = new WorkerPro( - "myProQueue", - async (job) => { - console.log("processing job", job.id); + 'myProQueue', + async job => { + console.log('processing job', job.id); }, { - name: "myWorker", + name: 'myWorker', connection, - telemetry: new BullMQOtel("guide"), + telemetry: new BullMQOtel('guide'), concurrency: 10, batch: { size: 10 }, - } + }, ); ``` diff --git a/src/classes/flow-producer.ts b/src/classes/flow-producer.ts index f732fd4e11..a513275f5b 100644 --- a/src/classes/flow-producer.ts +++ b/src/classes/flow-producer.ts @@ -332,11 +332,27 @@ export class FlowProducer extends EventEmitter { node.name, 'addNode', node.queueName, - async (span, dstPropagationMetadata) => { + async (span, srcPropagationMedatada) => { span?.setAttributes({ [TelemetryAttributes.JobName]: node.name, [TelemetryAttributes.JobId]: jobId, }); + const opts = node.opts; + let telemetry = opts?.telemetry; + + if (srcPropagationMedatada && opts) { + const omitContext = opts.telemetry?.omitContext; + const telemetryMetadata = + opts.telemetry?.metadata || + (!omitContext && srcPropagationMedatada); + + if (telemetryMetadata || omitContext) { + telemetry = { + metadata: telemetryMetadata, + omitContext, + }; + } + } const job = new this.Job( queue, @@ -344,9 +360,9 @@ export class FlowProducer extends EventEmitter { node.data, { ...jobsOpts, - ...node.opts, + ...opts, parent: parent?.parentOpts, - telemetryMetadata: dstPropagationMetadata, + telemetry, }, jobId, ); diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index c728f50834..2e41591e45 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -15,7 +15,7 @@ import { Job } from './job'; import { QueueBase } from './queue-base'; import { RedisConnection } from './redis-connection'; import { SpanKind, TelemetryAttributes } from '../enums'; -import { array2obj, optsAsJSON, optsFromJSON } from '../utils'; +import { array2obj } from '../utils'; export class JobScheduler extends QueueBase { private repeatStrategy: RepeatStrategy; @@ -104,7 +104,7 @@ export class JobScheduler extends QueueBase { jobSchedulerId, nextMillis, JSON.stringify(typeof jobData === 'undefined' ? {} : jobData), - optsAsJSON(opts), + Job.optsAsJSON(opts), { name: jobName, endDate: endDate ? new Date(endDate).getTime() : undefined, @@ -126,6 +126,22 @@ export class JobScheduler extends QueueBase { 'add', `${this.name}.${jobName}`, async (span, srcPropagationMedatada) => { + let telemetry = opts.telemetry; + + if (srcPropagationMedatada) { + const omitContext = opts.telemetry?.omitContext; + const telemetryMetadata = + opts.telemetry?.metadata || + (!omitContext && srcPropagationMedatada); + + if (telemetryMetadata || omitContext) { + telemetry = { + metadata: telemetryMetadata, + omitContext, + }; + } + } + const job = this.createNextJob( (multi) as RedisClient, jobName, @@ -134,7 +150,7 @@ export class JobScheduler extends QueueBase { { ...opts, repeat: filteredRepeatOpts, - telemetryMetadata: srcPropagationMedatada, + telemetry, }, jobData, iterationCount, @@ -275,7 +291,7 @@ export class JobScheduler extends QueueBase { template.data = JSON.parse(rawData); } if (rawOpts) { - template.opts = optsFromJSON(rawOpts); + template.opts = Job.optsFromJSON(rawOpts); } return template; } diff --git a/src/classes/job.ts b/src/classes/job.ts index 9e71bb45e5..780005fc0a 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -19,6 +19,8 @@ import { JobState, JobJsonSandbox, MinimalQueue, + RedisJobOptions, + CompressableJobOptions, } from '../types'; import { errorObject, @@ -28,8 +30,7 @@ import { parseObjectValues, tryCatch, removeUndefinedFields, - optsAsJSON, - optsFromJSON, + invertObject, } from '../utils'; import { Backoffs } from './backoffs'; import { Scripts, raw2NextJobData } from './scripts'; @@ -39,6 +40,20 @@ import { SpanKind } from '../enums'; const logger = debuglog('bull'); +// Simple options decode map. +const optsDecodeMap = { + de: 'deduplication', + fpof: 'failParentOnFailure', + idof: 'ignoreDependencyOnFailure', + kl: 'keepLogs', + rdof: 'removeDependencyOnFailure', +} as const; + +const optsEncodeMap = { + ...invertObject(optsDecodeMap), + /*/ Legacy for backwards compatibility */ debounce: 'de', +} as const; + export const PRIORITY_LIMIT = 2 ** 21; /** @@ -312,7 +327,7 @@ export class Job< jobId?: string, ): Job { const data = JSON.parse(json.data || '{}'); - const opts = optsFromJSON(json.opts); + const opts = Job.optsFromJSON(json.opts); const job = new this( queue, @@ -376,6 +391,33 @@ export class Job< this.scripts = new Scripts(this.queue); } + static optsFromJSON(rawOpts?: string): JobsOptions { + const opts = JSON.parse(rawOpts || '{}'); + + const optionEntries = Object.entries(opts) as Array< + [keyof RedisJobOptions, any] + >; + + const options: Partial> = {}; + for (const item of optionEntries) { + const [attributeName, value] = item; + if ((optsDecodeMap as Record)[attributeName]) { + options[(optsDecodeMap as Record)[attributeName]] = + value; + } else { + if (attributeName === 'tm') { + options.telemetry = { ...options.telemetry, metadata: value }; + } else if (attributeName === 'omc') { + options.telemetry = { ...options.telemetry, omitContext: value }; + } else { + options[attributeName] = value; + } + } + } + + return options as JobsOptions; + } + /** * Fetches a Job from the queue given the passed job id. * @@ -436,7 +478,7 @@ export class Job< id: this.id, name: this.name, data: JSON.stringify(typeof this.data === 'undefined' ? {} : this.data), - opts: optsAsJSON(this.opts), + opts: Job.optsAsJSON(this.opts), parent: this.parent ? { ...this.parent } : undefined, parentKey: this.parentKey, progress: this.progress, @@ -454,6 +496,37 @@ export class Job< }); } + static optsAsJSON(opts: JobsOptions = {}): RedisJobOptions { + const optionEntries = Object.entries(opts) as Array< + [keyof JobsOptions, any] + >; + const options: Record = {}; + + for (const [attributeName, value] of optionEntries) { + if (typeof value === 'undefined') { + continue; + } + if (attributeName in optsEncodeMap) { + const compressableAttribute = attributeName as keyof Omit< + CompressableJobOptions, + 'debounce' | 'telemetry' + >; + + const key = optsEncodeMap[compressableAttribute]; + options[key] = value; + } else { + // Handle complex compressable fields separately + if (attributeName === 'telemetry') { + options.tm = value.metadata; + options.omc = value.omitContext; + } else { + options[attributeName] = value; + } + } + } + return options as RedisJobOptions; + } + /** * Prepares a job to be passed to Sandbox. * @returns @@ -650,6 +723,10 @@ export class Job< this.getSpanOperation('moveToFailed'), this.queue.name, async (span, dstPropagationMedatadata) => { + let tm; + if (!this.opts?.telemetry?.omitContext && dstPropagationMedatadata) { + tm = dstPropagationMedatadata; + } let result; this.updateStacktrace(err); @@ -657,7 +734,7 @@ export class Job< const fieldsToUpdate = { failedReason: this.failedReason, stacktrace: JSON.stringify(this.stacktrace), - tm: dstPropagationMedatadata, + tm, }; // diff --git a/src/classes/queue-base.ts b/src/classes/queue-base.ts index b355f90014..bdd9da7911 100644 --- a/src/classes/queue-base.ts +++ b/src/classes/queue-base.ts @@ -1,6 +1,7 @@ import { EventEmitter } from 'events'; import { QueueBaseOptions, RedisClient, Span } from '../interfaces'; import { MinimalQueue } from '../types'; + import { delay, DELAY_TIME_5, diff --git a/src/classes/queue.ts b/src/classes/queue.ts index 34cb1a4bd8..092ebd2218 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -313,8 +313,11 @@ export class Queue< 'add', `${this.name}.${name}`, async (span, srcPropagationMedatada) => { - if (srcPropagationMedatada) { - opts = { ...opts, telemetryMetadata: srcPropagationMedatada }; + if (srcPropagationMedatada && !opts?.telemetry?.omitContext) { + const telemetry = { + metadata: srcPropagationMedatada, + }; + opts = { ...opts, telemetry }; } const job = await this.addJob(name, data, opts); @@ -403,16 +406,33 @@ export class Queue< return await this.Job.createBulk( this as MinimalQueue, - jobs.map(job => ({ - name: job.name, - data: job.data, - opts: { - ...this.jobsOpts, - ...job.opts, - jobId: job.opts?.jobId, - tm: span && srcPropagationMedatada, - }, - })), + jobs.map(job => { + let telemetry = job.opts?.telemetry; + if (srcPropagationMedatada) { + const omitContext = job.opts?.telemetry?.omitContext; + const telemetryMetadata = + job.opts?.telemetry?.metadata || + (!omitContext && srcPropagationMedatada); + + if (telemetryMetadata || omitContext) { + telemetry = { + metadata: telemetryMetadata, + omitContext, + }; + } + } + + return { + name: job.name, + data: job.data, + opts: { + ...this.jobsOpts, + ...job.opts, + jobId: job.opts?.jobId, + telemetry, + }, + }; + }), ); }, ); diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 0c2ae6d284..f68811c589 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -569,7 +569,7 @@ export class Worker< return nextJob; }, - nextJob?.opts.telemetryMetadata, + nextJob?.opts?.telemetry?.metadata, ); } @@ -821,7 +821,7 @@ will never work with more accuracy than 1ms. */ return; } - const { telemetryMetadata: srcPropagationMedatada } = job.opts; + const srcPropagationMedatada = job.opts?.telemetry?.metadata; return this.trace>( SpanKind.CONSUMER, diff --git a/src/interfaces/base-job-options.ts b/src/interfaces/base-job-options.ts index 269309263b..7ad62439cc 100644 --- a/src/interfaces/base-job-options.ts +++ b/src/interfaces/base-job-options.ts @@ -114,9 +114,4 @@ export interface BaseJobOptions extends DefaultJobOptions { * Internal property used by repeatable jobs. */ prevMillis?: number; - - /** - * TelemetryMetadata, provide for context propagation. - */ - telemetryMetadata?: string; } diff --git a/src/types/job-options.ts b/src/types/job-options.ts index fb8b74d264..3c8c094d9c 100644 --- a/src/types/job-options.ts +++ b/src/types/job-options.ts @@ -1,6 +1,10 @@ import { BaseJobOptions, DebounceOptions } from '../interfaces'; -export type JobsOptions = BaseJobOptions & { +/** + * These options will be stored in Redis with smaller + * keys for compactness. + */ +export type CompressableJobOptions = { /** * Debounce options. * @deprecated use deduplication option @@ -26,8 +30,26 @@ export type JobsOptions = BaseJobOptions & { * If true, removes the job from its parent dependencies when it fails after all attempts. */ removeDependencyOnFailure?: boolean; + + /** + * Telemetry options + */ + telemetry?: { + /** + * Metadata, used for context propagation. + */ + metadata?: string; + + /** + * If `true` telemetry will omit the context propagation + * @default false + */ + omitContext?: boolean; + }; }; +export type JobsOptions = BaseJobOptions & CompressableJobOptions; + /** * These fields are the ones stored in Redis with smaller keys for compactness. */ @@ -61,4 +83,15 @@ export type RedisJobOptions = BaseJobOptions & { * TelemetryMetadata, provide for context propagation. */ tm?: string; + + /** + * Omit Context Propagation + */ + omc?: boolean; + + /** + * Deduplication identifier. + * @deprecated use deid + */ + de?: string; }; diff --git a/src/utils.ts b/src/utils.ts index 71c2d5e1da..38999e56b5 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -17,7 +17,6 @@ import { EventEmitter } from 'events'; import * as semver from 'semver'; import { SpanKind, TelemetryAttributes } from './enums'; -import { JobsOptions, RedisJobOptions } from './types'; export const errorObject: { [index: string]: any } = { value: null }; @@ -98,16 +97,21 @@ export function increaseMaxListeners( emitter.setMaxListeners(maxListeners + count); } -export const invertObject = (obj: Record) => { - return Object.entries(obj).reduce>( - (encodeMap, [key, value]) => { - encodeMap[value] = key; - return encodeMap; - }, - {}, - ); +type Invert> = { + [V in T[keyof T]]: { + [K in keyof T]: T[K] extends V ? K : never; + }[keyof T]; }; +export function invertObject>( + obj: T, +): Invert { + return Object.entries(obj).reduce((result, [key, value]) => { + (result as Record)[value] = key; + return result; + }, {} as Invert); +} + export function isRedisInstance(obj: any): obj is Redis | Cluster { if (!obj) { return false; @@ -285,57 +289,6 @@ export const toString = (value: any): string => { export const QUEUE_EVENT_SUFFIX = ':qe'; -const optsDecodeMap = { - de: 'deduplication', - fpof: 'failParentOnFailure', - idof: 'ignoreDependencyOnFailure', - kl: 'keepLogs', - rdof: 'removeDependencyOnFailure', - tm: 'telemetryMetadata', -}; - -const optsEncodeMap = invertObject(optsDecodeMap); -optsEncodeMap.debounce = 'de'; - -export function optsAsJSON(opts: JobsOptions = {}): RedisJobOptions { - const optionEntries = Object.entries(opts) as Array<[keyof JobsOptions, any]>; - const options: Partial> = {}; - for (const item of optionEntries) { - const [attributeName, value] = item; - if (value !== undefined) { - if ((optsEncodeMap as Record)[attributeName]) { - options[(optsEncodeMap as Record)[attributeName]] = - value; - } else { - options[attributeName] = value; - } - } - } - - return options as RedisJobOptions; -} - -export function optsFromJSON(rawOpts?: string): JobsOptions { - const opts = JSON.parse(rawOpts || '{}'); - - const optionEntries = Object.entries(opts) as Array< - [keyof RedisJobOptions, any] - >; - - const options: Partial> = {}; - for (const item of optionEntries) { - const [attributeName, value] = item; - if ((optsDecodeMap as Record)[attributeName]) { - options[(optsDecodeMap as Record)[attributeName]] = - value; - } else { - options[attributeName] = value; - } - } - - return options as JobsOptions; -} - export function removeUndefinedFields>( obj: Record, ) { diff --git a/tests/test_telemetry_interface.ts b/tests/test_telemetry_interface.ts index b598475d86..fa6ff4290a 100644 --- a/tests/test_telemetry_interface.ts +++ b/tests/test_telemetry_interface.ts @@ -2,7 +2,7 @@ import { expect, assert } from 'chai'; import { default as IORedis } from 'ioredis'; import { after, beforeEach, describe, it, before } from 'mocha'; import { v4 } from 'uuid'; -import { FlowProducer, JobScheduler, Queue, Worker } from '../src/classes'; +import { FlowProducer, Job, JobScheduler, Queue, Worker } from '../src/classes'; import { removeAllQueueData } from '../src/utils'; import { Telemetry, @@ -93,7 +93,7 @@ describe('Telemetry', () => { this.options = options; } - setSpanOnContext(ctx: any): any { + setSpanOnContext(ctx: any, omitContext?: boolean): any { context['getSpan'] = () => this; return { ...context, getMetadata_span: this['name'] }; } @@ -260,6 +260,7 @@ describe('Telemetry', () => { }); it('should correctly handle errors and record them in telemetry for upsertJobScheduler', async () => { + const originalCreateNextJob = JobScheduler.prototype.createNextJob; const recordExceptionSpy = sinon.spy( MockSpan.prototype, 'recordException', @@ -283,6 +284,7 @@ describe('Telemetry', () => { const recordedError = recordExceptionSpy.firstCall.args[0]; assert.equal(recordedError.message, errMessage); } finally { + JobScheduler.prototype.createNextJob = originalCreateNextJob; recordExceptionSpy.restore(); } }); @@ -296,6 +298,7 @@ describe('Telemetry', () => { connection, telemetry: telemetryClient, name: 'testWorker', + prefix, }); await worker.waitUntilReady(); @@ -328,6 +331,7 @@ describe('Telemetry', () => { const worker = new Worker(queueName, async () => 'some result', { connection, telemetry: telemetryClient, + prefix, }); await worker.waitUntilReady(); @@ -349,17 +353,20 @@ describe('Telemetry', () => { const flowProducer = new FlowProducer({ connection, telemetry: telemetryClient, + prefix, }); const traceSpy = sinon.spy(telemetryClient.tracer, 'startSpan'); const testFlow = { name: 'parentJob', queueName, + prefix, data: { foo: 'bar' }, children: [ { name: 'childJob', queueName, + prefix, data: { baz: 'qux' }, }, ], @@ -385,6 +392,7 @@ describe('Telemetry', () => { const flowProducer = new FlowProducer({ connection, telemetry: telemetryClient, + prefix, }); const traceSpy = sinon.spy(telemetryClient.tracer, 'startSpan'); @@ -420,6 +428,7 @@ describe('Telemetry', () => { const flowProducer = new FlowProducer({ connection, telemetry: telemetryClient, + prefix, }); const traceSpy = sinon.spy(telemetryClient.tracer, 'startSpan'); @@ -472,6 +481,7 @@ describe('Telemetry', () => { const flowProducer = new FlowProducer({ connection, telemetry: telemetryClient, + prefix, }); const traceSpy = sinon.spy(telemetryClient.tracer, 'startSpan'); @@ -511,4 +521,137 @@ describe('Telemetry', () => { } }); }); + + describe('Omit Propagation', () => { + let fromMetadataSpy; + + beforeEach(() => { + fromMetadataSpy = sinon.spy( + telemetryClient.contextManager, + 'fromMetadata', + ); + }); + + afterEach(() => fromMetadataSpy.restore()); + + it('should omit propagation on queue add', async () => { + let worker; + const processing = new Promise(resolve => { + worker = new Worker(queueName, async () => resolve(), { + connection, + telemetry: telemetryClient, + prefix, + }); + }); + + const job = await queue.add( + 'testJob', + { foo: 'bar' }, + { telemetry: { omitContext: true } }, + ); + + await processing; + + expect(fromMetadataSpy.callCount).to.equal(0); + await worker.close(); + }); + + it('should omit propagation on queue addBulk', async () => { + let worker; + const processing = new Promise(resolve => { + worker = new Worker(queueName, async () => resolve(), { + connection, + telemetry: telemetryClient, + prefix, + }); + }); + + const jobs = [ + { + name: 'job1', + data: { foo: 'bar' }, + opts: { telemetry: { omitContext: true } }, + }, + ]; + const addedJos = await queue.addBulk(jobs); + expect(addedJos).to.have.length(1); + + await processing; + + expect(fromMetadataSpy.callCount).to.equal(0); + await worker.close(); + }); + + it('should omit propagation on job scheduler', async () => { + let worker; + const processing = new Promise(resolve => { + worker = new Worker(queueName, async () => resolve(), { + connection, + telemetry: telemetryClient, + prefix, + }); + }); + + const jobSchedulerId = 'testJobScheduler'; + const data = { foo: 'bar' }; + + const job = await queue.upsertJobScheduler( + jobSchedulerId, + { every: 1000, endDate: Date.now() + 1000, limit: 1 }, + { + name: 'repeatable-job', + data, + opts: { telemetry: { omitContext: true } }, + }, + ); + + await processing; + + expect(fromMetadataSpy.callCount).to.equal(0); + await worker.close(); + }); + + it('should omit propagation on flow producer', async () => { + let worker; + const processing = new Promise(resolve => { + worker = new Worker(queueName, async () => resolve(), { + connection, + telemetry: telemetryClient, + prefix, + }); + }); + + const flowProducer = new FlowProducer({ + connection, + telemetry: telemetryClient, + prefix, + }); + + const testFlow = { + name: 'parentJob', + queueName, + data: { foo: 'bar' }, + children: [ + { + name: 'childJob', + queueName, + data: { baz: 'qux' }, + opts: { telemetry: { omitContext: true } }, + }, + ], + opts: { telemetry: { omitContext: true } }, + }; + + const jobNode = await flowProducer.add(testFlow); + const jobs = jobNode.children + ? [jobNode.job, ...jobNode.children.map(c => c.job)] + : [jobNode.job]; + + await processing; + + expect(fromMetadataSpy.callCount).to.equal(0); + await flowProducer.close(); + await worker.close(); + }); + }); });