From 0ad63fc25699f97c9d4a3301830eee00d4d2d34f Mon Sep 17 00:00:00 2001 From: fgozdz Date: Fri, 29 Nov 2024 22:08:56 +0100 Subject: [PATCH 1/7] feat(telemetry): add option to omit context propagation --- src/interfaces/telemetry.ts | 16 ++++++++++++++++ src/utils.ts | 6 ++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/interfaces/telemetry.ts b/src/interfaces/telemetry.ts index e55c0990dc..c2f8b05595 100644 --- a/src/interfaces/telemetry.ts +++ b/src/interfaces/telemetry.ts @@ -24,6 +24,22 @@ export interface Telemetry { * it across the application. */ contextManager: ContextManager; + + /** + * Telemetry options + */ + options?: TelemetryOptions; +} + +/** + * Telemetry options + */ +export interface TelemetryOptions { + /** + * If `true` telemetry will omit the context propagation + * @defaultValue false + */ + omitContext?: boolean; } /** diff --git a/src/utils.ts b/src/utils.ts index 4efa2e3467..5692fe3167 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -11,6 +11,7 @@ import { ContextManager, RedisClient, Span, + TelemetryOptions, Tracer, } from './interfaces'; import { EventEmitter } from 'events'; @@ -299,6 +300,7 @@ export async function trace( | { tracer: Tracer; contextManager: ContextManager; + options?: TelemetryOptions; } | undefined, spanKind: SpanKind, @@ -311,12 +313,12 @@ export async function trace( if (!telemetry) { return callback(); } else { - const { tracer, contextManager } = telemetry; + const { tracer, contextManager, options } = telemetry; const currentContext = contextManager.active(); let parentContext; - if (srcPropagationMetadata) { + if (!options?.omitContext && srcPropagationMetadata) { parentContext = contextManager.fromMetadata( currentContext, srcPropagationMetadata, From 6e6b8dc472b25bd551bc24b76a736839e97d0f7e Mon Sep 17 00:00:00 2001 From: fgozdz Date: Wed, 4 Dec 2024 14:40:43 +0100 Subject: [PATCH 2/7] feat(queue, job, worker, utils): pass omit as a job options for few methods --- src/classes/job.ts | 3 +++ src/classes/queue-base.ts | 4 +++- src/classes/queue.ts | 2 ++ src/classes/worker.ts | 1 + src/interfaces/base-job-options.ts | 6 ++++++ src/interfaces/telemetry.ts | 16 ---------------- src/types/job-options.ts | 21 +++++++++++++++++++++ src/utils.ts | 8 ++++---- 8 files changed, 40 insertions(+), 21 deletions(-) diff --git a/src/classes/job.ts b/src/classes/job.ts index b63e5cdf49..87d3d2978e 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -46,6 +46,7 @@ const optsDecodeMap = { kl: 'keepLogs', rdof: 'removeDependencyOnFailure', tm: 'telemetryMetadata', + omc: 'omitContext', }; const optsEncodeMap = invertObject(optsDecodeMap); @@ -787,6 +788,8 @@ export class Job< return raw2NextJobData(result); } }, + undefined, + this.opts?.telemetry, ); } diff --git a/src/classes/queue-base.ts b/src/classes/queue-base.ts index 9e2008f958..93719004a4 100644 --- a/src/classes/queue-base.ts +++ b/src/classes/queue-base.ts @@ -1,6 +1,6 @@ import { EventEmitter } from 'events'; import { QueueBaseOptions, RedisClient, Span, Tracer } from '../interfaces'; -import { MinimalQueue } from '../types'; +import { JobsOptions, MinimalQueue } from '../types'; import { delay, DELAY_TIME_5, @@ -197,6 +197,7 @@ export class QueueBase extends EventEmitter implements MinimalQueue { destination: string, callback: (span?: Span, dstPropagationMetadata?: string) => Promise | T, srcPropagationMetadata?: string, + jobsOptions?: JobsOptions['telemetry'], ) { return trace | T>( this.opts.telemetry, @@ -206,6 +207,7 @@ export class QueueBase extends EventEmitter implements MinimalQueue { destination, callback, srcPropagationMetadata, + jobsOptions, ); } } diff --git a/src/classes/queue.ts b/src/classes/queue.ts index 89373010ae..37c6c96c92 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -320,6 +320,8 @@ export class Queue< return job; }, + undefined, + opts?.telemetry, ); } diff --git a/src/classes/worker.ts b/src/classes/worker.ts index acc6f1d533..b3cf75ef59 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -916,6 +916,7 @@ will never work with more accuracy than 1ms. */ } }, srcPropagationMedatada, + job.opts?.telemetry, ); } diff --git a/src/interfaces/base-job-options.ts b/src/interfaces/base-job-options.ts index bb10f1caa3..ac1db6faf9 100644 --- a/src/interfaces/base-job-options.ts +++ b/src/interfaces/base-job-options.ts @@ -117,4 +117,10 @@ export interface BaseJobOptions extends DefaultJobOptions { * TelemetryMetadata, provide for context propagation. */ telemetryMetadata?: string; + + /** + * If `true` telemetry will omit the context propagation + * @default false + */ + omitContext?: boolean; } diff --git a/src/interfaces/telemetry.ts b/src/interfaces/telemetry.ts index c2f8b05595..e55c0990dc 100644 --- a/src/interfaces/telemetry.ts +++ b/src/interfaces/telemetry.ts @@ -24,22 +24,6 @@ export interface Telemetry { * it across the application. */ contextManager: ContextManager; - - /** - * Telemetry options - */ - options?: TelemetryOptions; -} - -/** - * Telemetry options - */ -export interface TelemetryOptions { - /** - * If `true` telemetry will omit the context propagation - * @defaultValue false - */ - omitContext?: boolean; } /** diff --git a/src/types/job-options.ts b/src/types/job-options.ts index fb8b74d264..89069430d6 100644 --- a/src/types/job-options.ts +++ b/src/types/job-options.ts @@ -26,6 +26,22 @@ export type JobsOptions = BaseJobOptions & { * If true, removes the job from its parent dependencies when it fails after all attempts. */ removeDependencyOnFailure?: boolean; + + /** + * Telemetry options + */ + telemetry?: { + /** + * TelemetryMetadata, provide for context propagation. + */ + metadata?: string; + + /** + * If `true` telemetry will omit the context propagation + * @default false + */ + omitContext?: boolean; + }; }; /** @@ -61,4 +77,9 @@ export type RedisJobOptions = BaseJobOptions & { * TelemetryMetadata, provide for context propagation. */ tm?: string; + + /** + * Omit Context Propagation + */ + omc?: boolean; }; diff --git a/src/utils.ts b/src/utils.ts index 5692fe3167..f1a122dc9c 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -11,13 +11,13 @@ import { ContextManager, RedisClient, Span, - TelemetryOptions, Tracer, } from './interfaces'; import { EventEmitter } from 'events'; import * as semver from 'semver'; import { SpanKind, TelemetryAttributes } from './enums'; +import { JobsOptions } from './types'; export const errorObject: { [index: string]: any } = { value: null }; @@ -300,7 +300,6 @@ export async function trace( | { tracer: Tracer; contextManager: ContextManager; - options?: TelemetryOptions; } | undefined, spanKind: SpanKind, @@ -309,16 +308,17 @@ export async function trace( destination: string, callback: (span?: Span, dstPropagationMetadata?: string) => Promise | T, srcPropagationMetadata?: string, + jobsOptions?: JobsOptions['telemetry'], ) { if (!telemetry) { return callback(); } else { - const { tracer, contextManager, options } = telemetry; + const { tracer, contextManager } = telemetry; const currentContext = contextManager.active(); let parentContext; - if (!options?.omitContext && srcPropagationMetadata) { + if (!jobsOptions?.omitContext && srcPropagationMetadata) { parentContext = contextManager.fromMetadata( currentContext, srcPropagationMetadata, From dec981b9ed1b3af1add393879fb9224fbec207bf Mon Sep 17 00:00:00 2001 From: fgozdz Date: Thu, 5 Dec 2024 16:25:35 +0100 Subject: [PATCH 3/7] feat(telemetry): add omit on job options and create test cases --- src/classes/flow-producer.ts | 5 +- src/classes/job-scheduler.ts | 3 +- src/classes/job.ts | 4 +- src/classes/queue-base.ts | 2 - src/classes/queue.ts | 9 +- src/classes/worker.ts | 1 - src/utils.ts | 4 +- tests/test_telemetry_interface.ts | 133 +++++++++++++++++++++++++++++- 8 files changed, 144 insertions(+), 17 deletions(-) diff --git a/src/classes/flow-producer.ts b/src/classes/flow-producer.ts index f732fd4e11..a5f6bd8a3d 100644 --- a/src/classes/flow-producer.ts +++ b/src/classes/flow-producer.ts @@ -346,7 +346,10 @@ export class FlowProducer extends EventEmitter { ...jobsOpts, ...node.opts, parent: parent?.parentOpts, - telemetryMetadata: dstPropagationMetadata, + telemetryMetadata: + span && + !node.opts?.telemetry?.omitContext && + dstPropagationMetadata, }, jobId, ); diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index 63c0f8199c..c33f72022d 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -132,7 +132,8 @@ export class JobScheduler extends QueueBase { { ...opts, repeat: filteredRepeatOpts, - telemetryMetadata: srcPropagationMedatada, + telemetryMetadata: + span && !opts?.telemetry?.omitContext && srcPropagationMedatada, }, jobData, iterationCount, diff --git a/src/classes/job.ts b/src/classes/job.ts index 87d3d2978e..7424ed7178 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -748,7 +748,7 @@ export class Job< this.getSpanOperation(command), this.queue.name, async (span, dstPropagationMedatadata) => { - if (dstPropagationMedatadata) { + if (!this.opts?.telemetry?.omitContext && dstPropagationMedatadata) { this.scripts.execCommand(multi, 'updateJobOption', [ this.toKey(this.id), 'tm', @@ -788,8 +788,6 @@ export class Job< return raw2NextJobData(result); } }, - undefined, - this.opts?.telemetry, ); } diff --git a/src/classes/queue-base.ts b/src/classes/queue-base.ts index 93719004a4..42d6f03518 100644 --- a/src/classes/queue-base.ts +++ b/src/classes/queue-base.ts @@ -197,7 +197,6 @@ export class QueueBase extends EventEmitter implements MinimalQueue { destination: string, callback: (span?: Span, dstPropagationMetadata?: string) => Promise | T, srcPropagationMetadata?: string, - jobsOptions?: JobsOptions['telemetry'], ) { return trace | T>( this.opts.telemetry, @@ -207,7 +206,6 @@ export class QueueBase extends EventEmitter implements MinimalQueue { destination, callback, srcPropagationMetadata, - jobsOptions, ); } } diff --git a/src/classes/queue.ts b/src/classes/queue.ts index 37c6c96c92..b570d324b8 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -307,7 +307,7 @@ export class Queue< 'add', `${this.name}.${name}`, async (span, srcPropagationMedatada) => { - if (srcPropagationMedatada) { + if (!opts?.telemetry?.omitContext && srcPropagationMedatada) { opts = { ...opts, telemetryMetadata: srcPropagationMedatada }; } @@ -320,8 +320,6 @@ export class Queue< return job; }, - undefined, - opts?.telemetry, ); } @@ -406,7 +404,10 @@ export class Queue< ...this.jobsOpts, ...job.opts, jobId: job.opts?.jobId, - tm: span && srcPropagationMedatada, + tm: + span && + !job.opts?.telemetry?.omitContext && + srcPropagationMedatada, }, })), ); diff --git a/src/classes/worker.ts b/src/classes/worker.ts index b3cf75ef59..acc6f1d533 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -916,7 +916,6 @@ will never work with more accuracy than 1ms. */ } }, srcPropagationMedatada, - job.opts?.telemetry, ); } diff --git a/src/utils.ts b/src/utils.ts index f1a122dc9c..4efa2e3467 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -17,7 +17,6 @@ import { EventEmitter } from 'events'; import * as semver from 'semver'; import { SpanKind, TelemetryAttributes } from './enums'; -import { JobsOptions } from './types'; export const errorObject: { [index: string]: any } = { value: null }; @@ -308,7 +307,6 @@ export async function trace( destination: string, callback: (span?: Span, dstPropagationMetadata?: string) => Promise | T, srcPropagationMetadata?: string, - jobsOptions?: JobsOptions['telemetry'], ) { if (!telemetry) { return callback(); @@ -318,7 +316,7 @@ export async function trace( const currentContext = contextManager.active(); let parentContext; - if (!jobsOptions?.omitContext && srcPropagationMetadata) { + if (srcPropagationMetadata) { parentContext = contextManager.fromMetadata( currentContext, srcPropagationMetadata, diff --git a/tests/test_telemetry_interface.ts b/tests/test_telemetry_interface.ts index b598475d86..6820b913ca 100644 --- a/tests/test_telemetry_interface.ts +++ b/tests/test_telemetry_interface.ts @@ -2,7 +2,7 @@ import { expect, assert } from 'chai'; import { default as IORedis } from 'ioredis'; import { after, beforeEach, describe, it, before } from 'mocha'; import { v4 } from 'uuid'; -import { FlowProducer, JobScheduler, Queue, Worker } from '../src/classes'; +import { FlowProducer, Job, JobScheduler, Queue, Worker } from '../src/classes'; import { removeAllQueueData } from '../src/utils'; import { Telemetry, @@ -93,7 +93,7 @@ describe('Telemetry', () => { this.options = options; } - setSpanOnContext(ctx: any): any { + setSpanOnContext(ctx: any, omitContext?: boolean): any { context['getSpan'] = () => this; return { ...context, getMetadata_span: this['name'] }; } @@ -260,6 +260,7 @@ describe('Telemetry', () => { }); it('should correctly handle errors and record them in telemetry for upsertJobScheduler', async () => { + const originalCreateNextJob = JobScheduler.prototype.createNextJob; const recordExceptionSpy = sinon.spy( MockSpan.prototype, 'recordException', @@ -283,6 +284,7 @@ describe('Telemetry', () => { const recordedError = recordExceptionSpy.firstCall.args[0]; assert.equal(recordedError.message, errMessage); } finally { + JobScheduler.prototype.createNextJob = originalCreateNextJob; recordExceptionSpy.restore(); } }); @@ -511,4 +513,131 @@ describe('Telemetry', () => { } }); }); + + describe('Omit Propagation', () => { + let fromMetadataSpy; + + beforeEach(() => { + fromMetadataSpy = sinon.spy( + telemetryClient.contextManager, + 'fromMetadata', + ); + }); + + afterEach(() => fromMetadataSpy.restore()); + + it('should omit propagation on queue add', async () => { + const worker = new Worker(queueName, async () => 'some result', { + connection, + telemetry: telemetryClient, + }); + await worker.waitUntilReady(); + + const job = await queue.add( + 'testJob', + { foo: 'bar' }, + { telemetry: { omitContext: true } }, + ); + + await worker.processJob(job, 'some-token', () => false, new Set()); + + expect(fromMetadataSpy.callCount).to.equal(0); + await worker.close(); + }); + + it('should omit propagation on queue addBulk', async () => { + const worker = new Worker(queueName, async () => 'some result', { + connection, + telemetry: telemetryClient, + }); + await worker.waitUntilReady(); + + const jobs = [ + { + name: 'job1', + data: { foo: 'bar' }, + opts: { telemetry: { omitContext: true } }, + }, + ]; + const jobArray = await queue.addBulk(jobs); + + await Promise.all( + jobArray.map(job => + worker.processJob(job, 'some-token', () => false, new Set()), + ), + ); + + expect(fromMetadataSpy.callCount).to.equal(0); + await worker.close(); + }); + + it('should omit propagation on job scheduler', async () => { + const worker = new Worker(queueName, async () => 'some result', { + connection, + telemetry: telemetryClient, + }); + await worker.waitUntilReady(); + + const jobSchedulerId = 'testJobScheduler'; + const data = { foo: 'bar' }; + + const job = await queue.upsertJobScheduler( + jobSchedulerId, + { every: 1000, endDate: Date.now() + 1000, limit: 1 }, + { + name: 'repeatable-job', + data, + opts: { telemetry: { omitContext: true } }, + }, + ); + + await worker.processJob(job, 'some-token', () => false, new Set()); + + expect(fromMetadataSpy.callCount).to.equal(0); + await worker.close(); + }); + + it('should omit propagation on flow producer', async () => { + const worker = new Worker(queueName, async () => 'some result', { + connection, + telemetry: telemetryClient, + }); + await worker.waitUntilReady(); + + const flowProducer = new FlowProducer({ + connection, + telemetry: telemetryClient, + }); + + const testFlow = { + name: 'parentJob', + queueName, + data: { foo: 'bar' }, + children: [ + { + name: 'childJob', + queueName, + data: { baz: 'qux' }, + opts: { telemetry: { omitContext: true } }, + }, + ], + opts: { telemetry: { omitContext: true } }, + }; + + const jobNode = await flowProducer.add(testFlow); + const jobs = jobNode.children + ? [jobNode.job, ...jobNode.children.map(c => c.job)] + : [jobNode.job]; + + await Promise.all( + jobs.map(job => + worker.processJob(job, 'some-token', () => false, new Set()), + ), + ); + + expect(fromMetadataSpy.callCount).to.equal(0); + await flowProducer.close(); + await worker.close(); + }); + }); }); From 37cef68d6901dbef0445c220da5be0706941ab1f Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Fri, 6 Dec 2024 19:07:54 +0100 Subject: [PATCH 4/7] fix(telemetry): several fixes around omitContext --- src/classes/flow-producer.ts | 25 ++++++++++--- src/classes/job-scheduler.ts | 30 +++++++++++---- src/classes/job.ts | 49 ++++++++++++++++++------- src/classes/queue.ts | 57 +++++++++++++++++++++-------- src/classes/worker.ts | 4 +- src/interfaces/base-job-options.ts | 11 ------ src/types/job-options.ts | 16 +++++++- src/utils.ts | 21 +++++++---- tests/test_telemetry_interface.ts | 59 +++++++++++++++--------------- 9 files changed, 177 insertions(+), 95 deletions(-) diff --git a/src/classes/flow-producer.ts b/src/classes/flow-producer.ts index a5f6bd8a3d..a513275f5b 100644 --- a/src/classes/flow-producer.ts +++ b/src/classes/flow-producer.ts @@ -332,11 +332,27 @@ export class FlowProducer extends EventEmitter { node.name, 'addNode', node.queueName, - async (span, dstPropagationMetadata) => { + async (span, srcPropagationMedatada) => { span?.setAttributes({ [TelemetryAttributes.JobName]: node.name, [TelemetryAttributes.JobId]: jobId, }); + const opts = node.opts; + let telemetry = opts?.telemetry; + + if (srcPropagationMedatada && opts) { + const omitContext = opts.telemetry?.omitContext; + const telemetryMetadata = + opts.telemetry?.metadata || + (!omitContext && srcPropagationMedatada); + + if (telemetryMetadata || omitContext) { + telemetry = { + metadata: telemetryMetadata, + omitContext, + }; + } + } const job = new this.Job( queue, @@ -344,12 +360,9 @@ export class FlowProducer extends EventEmitter { node.data, { ...jobsOpts, - ...node.opts, + ...opts, parent: parent?.parentOpts, - telemetryMetadata: - span && - !node.opts?.telemetry?.omitContext && - dstPropagationMetadata, + telemetry, }, jobId, ); diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index c33f72022d..22b174c702 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -124,6 +124,22 @@ export class JobScheduler extends QueueBase { 'add', `${this.name}.${jobName}`, async (span, srcPropagationMedatada) => { + let telemetry = opts.telemetry; + + if (srcPropagationMedatada) { + const omitContext = opts.telemetry?.omitContext; + const telemetryMetadata = + opts.telemetry?.metadata || + (!omitContext && srcPropagationMedatada); + + if (telemetryMetadata || omitContext) { + telemetry = { + metadata: telemetryMetadata, + omitContext, + }; + } + } + const job = this.createNextJob( (multi) as RedisClient, jobName, @@ -132,8 +148,7 @@ export class JobScheduler extends QueueBase { { ...opts, repeat: filteredRepeatOpts, - telemetryMetadata: - span && !opts?.telemetry?.omitContext && srcPropagationMedatada, + telemetry, }, jobData, iterationCount, @@ -279,12 +294,11 @@ export class JobScheduler extends QueueBase { return Promise.all(jobs); } - async getSchedulersCount( - client: RedisClient, - prefix: string, - queueName: string, - ): Promise { - return client.zcard(`${prefix}:${queueName}:repeat`); + async getSchedulersCount(): Promise { + const jobSchedulersKey = this.keys.repeat; + const client = await this.client; + + return client.zcard(jobSchedulersKey); } private getSchedulerNextJobId({ diff --git a/src/classes/job.ts b/src/classes/job.ts index 7424ed7178..8c62080347 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -20,6 +20,7 @@ import { JobJsonSandbox, MinimalQueue, RedisJobOptions, + CompressableJobOptions, } from '../types'; import { errorObject, @@ -39,18 +40,19 @@ import { SpanKind } from '../enums'; const logger = debuglog('bull'); +// Simple options decode map. const optsDecodeMap = { de: 'deduplication', fpof: 'failParentOnFailure', idof: 'ignoreDependencyOnFailure', kl: 'keepLogs', rdof: 'removeDependencyOnFailure', - tm: 'telemetryMetadata', - omc: 'omitContext', -}; +} as const; -const optsEncodeMap = invertObject(optsDecodeMap); -optsEncodeMap.debounce = 'de'; +const optsEncodeMap = { + ...invertObject(optsDecodeMap), + /*/ Legacy for backwards compatibility */ debounce: 'de', +} as const; export const PRIORITY_LIMIT = 2 ** 21; @@ -403,7 +405,13 @@ export class Job< options[(optsDecodeMap as Record)[attributeName]] = value; } else { - options[attributeName] = value; + if (attributeName === 'tm') { + options.telemetry = { ...options.telemetry, metadata: value }; + } else if (attributeName === 'omc') { + options.telemetry = { ...options.telemetry, omitContext: value }; + } else { + options[attributeName] = value; + } } } @@ -492,17 +500,30 @@ export class Job< const optionEntries = Object.entries(opts) as Array< [keyof JobsOptions, any] >; - const options: Partial> = {}; - for (const item of optionEntries) { - const [attributeName, value] = item; - if ((optsEncodeMap as Record)[attributeName]) { - options[(optsEncodeMap as Record)[attributeName]] = - value; + const options: Record = {}; + + for (const [attributeName, value] of optionEntries) { + if (typeof value === 'undefined') { + continue; + } + if (attributeName in optsEncodeMap) { + const compressableAttribute = attributeName as keyof Omit< + CompressableJobOptions, + 'debounce' | 'telemetry' + >; + + const key = optsEncodeMap[compressableAttribute]; + options[key] = value; } else { - options[attributeName] = value; + // Handle complex compressable fields separately + if (attributeName === 'telemetry') { + options.tm = value.metadata; + options.omc = value.omitContext; + } else { + options[attributeName] = value; + } } } - return options as RedisJobOptions; } diff --git a/src/classes/queue.ts b/src/classes/queue.ts index b570d324b8..ea652d01a3 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -307,8 +307,11 @@ export class Queue< 'add', `${this.name}.${name}`, async (span, srcPropagationMedatada) => { - if (!opts?.telemetry?.omitContext && srcPropagationMedatada) { - opts = { ...opts, telemetryMetadata: srcPropagationMedatada }; + if (srcPropagationMedatada && !opts?.telemetry?.omitContext) { + const telemetry = { + metadata: srcPropagationMedatada, + }; + opts = { ...opts, telemetry }; } const job = await this.addJob(name, data, opts); @@ -397,19 +400,33 @@ export class Queue< return await this.Job.createBulk( this as MinimalQueue, - jobs.map(job => ({ - name: job.name, - data: job.data, - opts: { - ...this.jobsOpts, - ...job.opts, - jobId: job.opts?.jobId, - tm: - span && - !job.opts?.telemetry?.omitContext && - srcPropagationMedatada, - }, - })), + jobs.map(job => { + let telemetry = job.opts?.telemetry; + if (srcPropagationMedatada) { + const omitContext = job.opts?.telemetry?.omitContext; + const telemetryMetadata = + job.opts?.telemetry?.metadata || + (!omitContext && srcPropagationMedatada); + + if (telemetryMetadata || omitContext) { + telemetry = { + metadata: telemetryMetadata, + omitContext, + }; + } + } + + return { + name: job.name, + data: job.data, + opts: { + ...this.jobsOpts, + ...job.opts, + jobId: job.opts?.jobId, + telemetry, + }, + }; + }), ); }, ); @@ -594,6 +611,16 @@ export class Queue< return (await this.jobScheduler).getJobSchedulers(start, end, asc); } + /** + * + * Get the number of job schedulers. + * + * @returns The number of job schedulers. + */ + async getJobSchedulersCount(): Promise { + return (await this.jobScheduler).getSchedulersCount(); + } + /** * Removes a repeatable job. * diff --git a/src/classes/worker.ts b/src/classes/worker.ts index acc6f1d533..c1d648dcf7 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -569,7 +569,7 @@ export class Worker< return nextJob; }, - nextJob?.opts.telemetryMetadata, + nextJob?.opts?.telemetry?.metadata, ); } @@ -821,7 +821,7 @@ will never work with more accuracy than 1ms. */ return; } - const { telemetryMetadata: srcPropagationMedatada } = job.opts; + const srcPropagationMedatada = job.opts?.telemetry?.metadata; return this.trace>( SpanKind.CONSUMER, diff --git a/src/interfaces/base-job-options.ts b/src/interfaces/base-job-options.ts index ac1db6faf9..7d5d7d4c2a 100644 --- a/src/interfaces/base-job-options.ts +++ b/src/interfaces/base-job-options.ts @@ -112,15 +112,4 @@ export interface BaseJobOptions extends DefaultJobOptions { * Internal property used by repeatable jobs. */ prevMillis?: number; - - /** - * TelemetryMetadata, provide for context propagation. - */ - telemetryMetadata?: string; - - /** - * If `true` telemetry will omit the context propagation - * @default false - */ - omitContext?: boolean; } diff --git a/src/types/job-options.ts b/src/types/job-options.ts index 89069430d6..3c8c094d9c 100644 --- a/src/types/job-options.ts +++ b/src/types/job-options.ts @@ -1,6 +1,10 @@ import { BaseJobOptions, DebounceOptions } from '../interfaces'; -export type JobsOptions = BaseJobOptions & { +/** + * These options will be stored in Redis with smaller + * keys for compactness. + */ +export type CompressableJobOptions = { /** * Debounce options. * @deprecated use deduplication option @@ -32,7 +36,7 @@ export type JobsOptions = BaseJobOptions & { */ telemetry?: { /** - * TelemetryMetadata, provide for context propagation. + * Metadata, used for context propagation. */ metadata?: string; @@ -44,6 +48,8 @@ export type JobsOptions = BaseJobOptions & { }; }; +export type JobsOptions = BaseJobOptions & CompressableJobOptions; + /** * These fields are the ones stored in Redis with smaller keys for compactness. */ @@ -82,4 +88,10 @@ export type RedisJobOptions = BaseJobOptions & { * Omit Context Propagation */ omc?: boolean; + + /** + * Deduplication identifier. + * @deprecated use deid + */ + de?: string; }; diff --git a/src/utils.ts b/src/utils.ts index 4efa2e3467..61e7dfaca1 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -83,16 +83,21 @@ export function increaseMaxListeners( emitter.setMaxListeners(maxListeners + count); } -export const invertObject = (obj: Record) => { - return Object.entries(obj).reduce>( - (encodeMap, [key, value]) => { - encodeMap[value] = key; - return encodeMap; - }, - {}, - ); +type Invert> = { + [V in T[keyof T]]: { + [K in keyof T]: T[K] extends V ? K : never; + }[keyof T]; }; +export function invertObject>( + obj: T, +): Invert { + return Object.entries(obj).reduce((result, [key, value]) => { + (result as Record)[value] = key; + return result; + }, {} as Invert); +} + export function isRedisInstance(obj: any): obj is Redis | Cluster { if (!obj) { return false; diff --git a/tests/test_telemetry_interface.ts b/tests/test_telemetry_interface.ts index 6820b913ca..1fffb9d9dd 100644 --- a/tests/test_telemetry_interface.ts +++ b/tests/test_telemetry_interface.ts @@ -527,11 +527,13 @@ describe('Telemetry', () => { afterEach(() => fromMetadataSpy.restore()); it('should omit propagation on queue add', async () => { - const worker = new Worker(queueName, async () => 'some result', { - connection, - telemetry: telemetryClient, + let worker; + const processing = new Promise(resolve => { + worker = new Worker(queueName, async () => resolve(), { + connection, + telemetry: telemetryClient, + }); }); - await worker.waitUntilReady(); const job = await queue.add( 'testJob', @@ -539,18 +541,20 @@ describe('Telemetry', () => { { telemetry: { omitContext: true } }, ); - await worker.processJob(job, 'some-token', () => false, new Set()); + await processing; expect(fromMetadataSpy.callCount).to.equal(0); await worker.close(); }); it('should omit propagation on queue addBulk', async () => { - const worker = new Worker(queueName, async () => 'some result', { - connection, - telemetry: telemetryClient, + let worker; + const processing = new Promise(resolve => { + worker = new Worker(queueName, async () => resolve(), { + connection, + telemetry: telemetryClient, + }); }); - await worker.waitUntilReady(); const jobs = [ { @@ -559,24 +563,23 @@ describe('Telemetry', () => { opts: { telemetry: { omitContext: true } }, }, ]; - const jobArray = await queue.addBulk(jobs); + const addedJos = await queue.addBulk(jobs); + expect(addedJos).to.have.length(1); - await Promise.all( - jobArray.map(job => - worker.processJob(job, 'some-token', () => false, new Set()), - ), - ); + await processing; expect(fromMetadataSpy.callCount).to.equal(0); await worker.close(); }); it('should omit propagation on job scheduler', async () => { - const worker = new Worker(queueName, async () => 'some result', { - connection, - telemetry: telemetryClient, + let worker; + const processing = new Promise(resolve => { + worker = new Worker(queueName, async () => resolve(), { + connection, + telemetry: telemetryClient, + }); }); - await worker.waitUntilReady(); const jobSchedulerId = 'testJobScheduler'; const data = { foo: 'bar' }; @@ -591,18 +594,20 @@ describe('Telemetry', () => { }, ); - await worker.processJob(job, 'some-token', () => false, new Set()); + await processing; expect(fromMetadataSpy.callCount).to.equal(0); await worker.close(); }); it('should omit propagation on flow producer', async () => { - const worker = new Worker(queueName, async () => 'some result', { - connection, - telemetry: telemetryClient, + let worker; + const processing = new Promise(resolve => { + worker = new Worker(queueName, async () => resolve(), { + connection, + telemetry: telemetryClient, + }); }); - await worker.waitUntilReady(); const flowProducer = new FlowProducer({ connection, @@ -629,11 +634,7 @@ describe('Telemetry', () => { ? [jobNode.job, ...jobNode.children.map(c => c.job)] : [jobNode.job]; - await Promise.all( - jobs.map(job => - worker.processJob(job, 'some-token', () => false, new Set()), - ), - ); + await processing; expect(fromMetadataSpy.callCount).to.equal(0); await flowProducer.close(); From f960f40a0fd24dea98af0767936785eff00dd574 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Sat, 7 Dec 2024 11:01:09 +0100 Subject: [PATCH 5/7] refactor(job): opt from and as moved back to job class --- src/classes/job-scheduler.ts | 5 ++-- src/classes/job.ts | 11 ++++---- src/utils.ts | 52 ------------------------------------ 3 files changed, 7 insertions(+), 61 deletions(-) diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index 31612ff563..0cd8148a64 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -11,7 +11,6 @@ import { Job } from './job'; import { QueueBase } from './queue-base'; import { RedisConnection } from './redis-connection'; import { SpanKind, TelemetryAttributes } from '../enums'; -import { optsAsJSON, optsFromJSON } from '../utils'; export class JobScheduler extends QueueBase { private repeatStrategy: RepeatStrategy; @@ -100,7 +99,7 @@ export class JobScheduler extends QueueBase { jobSchedulerId, nextMillis, JSON.stringify(typeof jobData === 'undefined' ? {} : jobData), - optsAsJSON(opts), + Job.optsAsJSON(opts), { name: jobName, endDate: endDate ? new Date(endDate).getTime() : undefined, @@ -290,7 +289,7 @@ export class JobScheduler extends QueueBase { template.data = JSON.parse(rawData); } if (rawOpts) { - template.opts = optsFromJSON(rawOpts); + template.opts = Job.optsFromJSON(rawOpts); } return template; } diff --git a/src/classes/job.ts b/src/classes/job.ts index c11e8573f7..e2c1ef78b7 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -30,8 +30,7 @@ import { parseObjectValues, tryCatch, removeUndefinedFields, - optsAsJSON, - optsFromJSON, + invertObject, } from '../utils'; import { Backoffs } from './backoffs'; import { Scripts, raw2NextJobData } from './scripts'; @@ -328,7 +327,7 @@ export class Job< jobId?: string, ): Job { const data = JSON.parse(json.data || '{}'); - const opts = optsFromJSON(json.opts); + const opts = Job.optsFromJSON(json.opts); const job = new this( queue, @@ -392,7 +391,7 @@ export class Job< this.scripts = new Scripts(this.queue); } - private static optsFromJSON(rawOpts?: string): JobsOptions { + static optsFromJSON(rawOpts?: string): JobsOptions { const opts = JSON.parse(rawOpts || '{}'); const optionEntries = Object.entries(opts) as Array< @@ -479,7 +478,7 @@ export class Job< id: this.id, name: this.name, data: JSON.stringify(typeof this.data === 'undefined' ? {} : this.data), - opts: optsAsJSON(this.opts), + opts: Job.optsAsJSON(this.opts), parent: this.parent ? { ...this.parent } : undefined, parentKey: this.parentKey, progress: this.progress, @@ -497,7 +496,7 @@ export class Job< }); } - private optsAsJSON(opts: JobsOptions = {}): RedisJobOptions { + static optsAsJSON(opts: JobsOptions = {}): RedisJobOptions { const optionEntries = Object.entries(opts) as Array< [keyof JobsOptions, any] >; diff --git a/src/utils.ts b/src/utils.ts index 559739eb4b..61e7dfaca1 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -17,7 +17,6 @@ import { EventEmitter } from 'events'; import * as semver from 'semver'; import { SpanKind, TelemetryAttributes } from './enums'; -import { JobsOptions, RedisJobOptions } from './types'; export const errorObject: { [index: string]: any } = { value: null }; @@ -276,57 +275,6 @@ export const toString = (value: any): string => { export const QUEUE_EVENT_SUFFIX = ':qe'; -const optsDecodeMap = { - de: 'deduplication', - fpof: 'failParentOnFailure', - idof: 'ignoreDependencyOnFailure', - kl: 'keepLogs', - rdof: 'removeDependencyOnFailure', - tm: 'telemetryMetadata', -}; - -const optsEncodeMap = invertObject(optsDecodeMap); -optsEncodeMap.debounce = 'de'; - -export function optsAsJSON(opts: JobsOptions = {}): RedisJobOptions { - const optionEntries = Object.entries(opts) as Array<[keyof JobsOptions, any]>; - const options: Partial> = {}; - for (const item of optionEntries) { - const [attributeName, value] = item; - if (value !== undefined) { - if ((optsEncodeMap as Record)[attributeName]) { - options[(optsEncodeMap as Record)[attributeName]] = - value; - } else { - options[attributeName] = value; - } - } - } - - return options as RedisJobOptions; -} - -export function optsFromJSON(rawOpts?: string): JobsOptions { - const opts = JSON.parse(rawOpts || '{}'); - - const optionEntries = Object.entries(opts) as Array< - [keyof RedisJobOptions, any] - >; - - const options: Partial> = {}; - for (const item of optionEntries) { - const [attributeName, value] = item; - if ((optsDecodeMap as Record)[attributeName]) { - options[(optsDecodeMap as Record)[attributeName]] = - value; - } else { - options[attributeName] = value; - } - } - - return options as JobsOptions; -} - export function removeUndefinedFields>( obj: Record, ) { From 6350576f1a33172ed9b080672e3ed87d5e7015da Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Tue, 10 Dec 2024 14:49:09 +0100 Subject: [PATCH 6/7] fix(job-scheduler): add missing import --- src/classes/job-scheduler.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index c3a4ee9083..443cee4a3f 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -11,6 +11,7 @@ import { Job } from './job'; import { QueueBase } from './queue-base'; import { RedisConnection } from './redis-connection'; import { SpanKind, TelemetryAttributes } from '../enums'; +import { array2obj } from '../utils'; export class JobScheduler extends QueueBase { private repeatStrategy: RepeatStrategy; From c9a032277c637eb0afe1eeaf7423781a53911c13 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Tue, 10 Dec 2024 15:05:00 +0100 Subject: [PATCH 7/7] test(telemetry): add prefix where missing --- tests/test_telemetry_interface.ts | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/tests/test_telemetry_interface.ts b/tests/test_telemetry_interface.ts index 1fffb9d9dd..fa6ff4290a 100644 --- a/tests/test_telemetry_interface.ts +++ b/tests/test_telemetry_interface.ts @@ -298,6 +298,7 @@ describe('Telemetry', () => { connection, telemetry: telemetryClient, name: 'testWorker', + prefix, }); await worker.waitUntilReady(); @@ -330,6 +331,7 @@ describe('Telemetry', () => { const worker = new Worker(queueName, async () => 'some result', { connection, telemetry: telemetryClient, + prefix, }); await worker.waitUntilReady(); @@ -351,17 +353,20 @@ describe('Telemetry', () => { const flowProducer = new FlowProducer({ connection, telemetry: telemetryClient, + prefix, }); const traceSpy = sinon.spy(telemetryClient.tracer, 'startSpan'); const testFlow = { name: 'parentJob', queueName, + prefix, data: { foo: 'bar' }, children: [ { name: 'childJob', queueName, + prefix, data: { baz: 'qux' }, }, ], @@ -387,6 +392,7 @@ describe('Telemetry', () => { const flowProducer = new FlowProducer({ connection, telemetry: telemetryClient, + prefix, }); const traceSpy = sinon.spy(telemetryClient.tracer, 'startSpan'); @@ -422,6 +428,7 @@ describe('Telemetry', () => { const flowProducer = new FlowProducer({ connection, telemetry: telemetryClient, + prefix, }); const traceSpy = sinon.spy(telemetryClient.tracer, 'startSpan'); @@ -474,6 +481,7 @@ describe('Telemetry', () => { const flowProducer = new FlowProducer({ connection, telemetry: telemetryClient, + prefix, }); const traceSpy = sinon.spy(telemetryClient.tracer, 'startSpan'); @@ -532,6 +540,7 @@ describe('Telemetry', () => { worker = new Worker(queueName, async () => resolve(), { connection, telemetry: telemetryClient, + prefix, }); }); @@ -553,6 +562,7 @@ describe('Telemetry', () => { worker = new Worker(queueName, async () => resolve(), { connection, telemetry: telemetryClient, + prefix, }); }); @@ -578,6 +588,7 @@ describe('Telemetry', () => { worker = new Worker(queueName, async () => resolve(), { connection, telemetry: telemetryClient, + prefix, }); }); @@ -606,12 +617,14 @@ describe('Telemetry', () => { worker = new Worker(queueName, async () => resolve(), { connection, telemetry: telemetryClient, + prefix, }); }); const flowProducer = new FlowProducer({ connection, telemetry: telemetryClient, + prefix, }); const testFlow = {