From 5ed154ba240dbe9eb5c22e27ad02e851c0f3cf69 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Tue, 5 Nov 2024 14:58:22 +0100 Subject: [PATCH] feat(flows): add telemetry support (#2879) --- src/classes/flow-producer.ts | 247 +++++++++++++++++++----------- src/classes/queue-base.ts | 73 ++------- src/enums/telemetry-attributes.ts | 1 + src/utils.ts | 92 ++++++++++- tests/test_telemetry_interface.ts | 172 ++++++++++++++++++++- 5 files changed, 428 insertions(+), 157 deletions(-) diff --git a/src/classes/flow-producer.ts b/src/classes/flow-producer.ts index c7d70fa6dc..f732fd4e11 100644 --- a/src/classes/flow-producer.ts +++ b/src/classes/flow-producer.ts @@ -8,11 +8,14 @@ import { IoredisListener, QueueBaseOptions, RedisClient, + Tracer, + ContextManager, } from '../interfaces'; -import { getParentKey, isRedisInstance } from '../utils'; +import { getParentKey, isRedisInstance, trace } from '../utils'; import { Job } from './job'; import { KeysMap, QueueKeys } from './queue-keys'; import { RedisConnection } from './redis-connection'; +import { SpanKind, TelemetryAttributes } from '../enums'; export interface AddNodeOpts { multi: ChainableCommander; @@ -95,6 +98,10 @@ export class FlowProducer extends EventEmitter { queueKeys: QueueKeys; protected connection: RedisConnection; + protected telemetry: { + tracer: Tracer | undefined; + contextManager: ContextManager | undefined; + }; constructor( public opts: QueueBaseOptions = { connection: {} }, @@ -122,6 +129,10 @@ export class FlowProducer extends EventEmitter { }); this.queueKeys = new QueueKeys(opts.prefix); + + if (opts?.telemetry) { + this.telemetry = opts.telemetry; + } } emit( @@ -196,19 +207,32 @@ export class FlowProducer extends EventEmitter { ? `${parentKey}:dependencies` : undefined; - const jobsTree = this.addNode({ - multi, - node: flow, - queuesOpts: opts?.queuesOptions, - parent: { - parentOpts, - parentDependenciesKey, - }, - }); + return trace>( + this.telemetry, + SpanKind.PRODUCER, + flow.queueName, + 'addFlow', + flow.queueName, + async span => { + span?.setAttributes({ + [TelemetryAttributes.FlowName]: flow.name, + }); + + const jobsTree = await this.addNode({ + multi, + node: flow, + queuesOpts: opts?.queuesOptions, + parent: { + parentOpts, + parentDependenciesKey, + }, + }); - await multi.exec(); + await multi.exec(); - return jobsTree; + return jobsTree; + }, + ); } /** @@ -255,11 +279,27 @@ export class FlowProducer extends EventEmitter { const client = await this.connection.client; const multi = client.multi(); - const jobsTrees = this.addNodes(multi, flows); - - await multi.exec(); - - return jobsTrees; + return trace>( + this.telemetry, + SpanKind.PRODUCER, + '', + 'addBulkFlows', + '', + async span => { + span?.setAttributes({ + [TelemetryAttributes.BulkCount]: flows.length, + [TelemetryAttributes.BulkNames]: flows + .map(flow => flow.name) + .join(','), + }); + + const jobsTrees = await this.addNodes(multi, flows); + + await multi.exec(); + + return jobsTrees; + }, + ); } /** @@ -273,7 +313,12 @@ export class FlowProducer extends EventEmitter { * @param parent - parent data sent to children to create the "links" to their parent * @returns */ - protected addNode({ multi, node, parent, queuesOpts }: AddNodeOpts): JobNode { + protected async addNode({ + multi, + node, + parent, + queuesOpts, + }: AddNodeOpts): Promise { const prefix = node.prefix || this.opts.prefix; const queue = this.queueFromNode(node, new QueueKeys(prefix), prefix); const queueOpts = queuesOpts && queuesOpts[node.queueName]; @@ -281,62 +326,79 @@ export class FlowProducer extends EventEmitter { const jobsOpts = queueOpts?.defaultJobOptions ?? {}; const jobId = node.opts?.jobId || v4(); - const job = new this.Job( - queue, + return trace>( + this.telemetry, + SpanKind.PRODUCER, node.name, - node.data, - { - ...jobsOpts, - ...node.opts, - parent: parent?.parentOpts, - }, - jobId, - ); - - const parentKey = getParentKey(parent?.parentOpts); - - if (node.children && node.children.length > 0) { - // Create parent job, will be a job in status "waiting-children". - const parentId = jobId; - const queueKeysParent = new QueueKeys(node.prefix || this.opts.prefix); - const waitChildrenKey = queueKeysParent.toKey( - node.queueName, - 'waiting-children', - ); - - job.addJob((multi as unknown), { - parentDependenciesKey: parent?.parentDependenciesKey, - waitChildrenKey, - parentKey, - }); - - const parentDependenciesKey = `${queueKeysParent.toKey( - node.queueName, - parentId, - )}:dependencies`; - - const children = this.addChildren({ - multi, - nodes: node.children, - parent: { - parentOpts: { - id: parentId, - queue: queueKeysParent.getQueueQualifiedName(node.queueName), + 'addNode', + node.queueName, + async (span, dstPropagationMetadata) => { + span?.setAttributes({ + [TelemetryAttributes.JobName]: node.name, + [TelemetryAttributes.JobId]: jobId, + }); + + const job = new this.Job( + queue, + node.name, + node.data, + { + ...jobsOpts, + ...node.opts, + parent: parent?.parentOpts, + telemetryMetadata: dstPropagationMetadata, }, - parentDependenciesKey, - }, - queuesOpts, - }); - - return { job, children }; - } else { - job.addJob((multi as unknown), { - parentDependenciesKey: parent?.parentDependenciesKey, - parentKey, - }); + jobId, + ); - return { job }; - } + const parentKey = getParentKey(parent?.parentOpts); + + if (node.children && node.children.length > 0) { + // Create the parent job, it will be a job in status "waiting-children". + const parentId = jobId; + const queueKeysParent = new QueueKeys( + node.prefix || this.opts.prefix, + ); + const waitChildrenKey = queueKeysParent.toKey( + node.queueName, + 'waiting-children', + ); + + await job.addJob((multi as unknown), { + parentDependenciesKey: parent?.parentDependenciesKey, + waitChildrenKey, + parentKey, + }); + + const parentDependenciesKey = `${queueKeysParent.toKey( + node.queueName, + parentId, + )}:dependencies`; + + const children = await this.addChildren({ + multi, + nodes: node.children, + parent: { + parentOpts: { + id: parentId, + queue: queueKeysParent.getQueueQualifiedName(node.queueName), + }, + parentDependenciesKey, + }, + queuesOpts, + }); + + return { job, children }; + } else { + await job.addJob((multi as unknown), { + parentDependenciesKey: parent?.parentDependenciesKey, + parentKey, + }); + + return { job }; + } + }, + ); } /** @@ -349,23 +411,28 @@ export class FlowProducer extends EventEmitter { * @param nodes - the nodes representing jobs to be added to some queue * @returns */ - protected addNodes(multi: ChainableCommander, nodes: FlowJob[]): JobNode[] { - return nodes.map(node => { - const parentOpts = node?.opts?.parent; - const parentKey = getParentKey(parentOpts); - const parentDependenciesKey = parentKey - ? `${parentKey}:dependencies` - : undefined; - - return this.addNode({ - multi, - node, - parent: { - parentOpts, - parentDependenciesKey, - }, - }); - }); + protected addNodes( + multi: ChainableCommander, + nodes: FlowJob[], + ): Promise { + return Promise.all( + nodes.map(node => { + const parentOpts = node?.opts?.parent; + const parentKey = getParentKey(parentOpts); + const parentDependenciesKey = parentKey + ? `${parentKey}:dependencies` + : undefined; + + return this.addNode({ + multi, + node, + parent: { + parentOpts, + parentDependenciesKey, + }, + }); + }), + ); } private async getNode(client: RedisClient, node: NodeOpts): Promise { @@ -406,7 +473,9 @@ export class FlowProducer extends EventEmitter { } private addChildren({ multi, nodes, parent, queuesOpts }: AddChildrenOpts) { - return nodes.map(node => this.addNode({ multi, node, parent, queuesOpts })); + return Promise.all( + nodes.map(node => this.addNode({ multi, node, parent, queuesOpts })), + ); } private getChildren( diff --git a/src/classes/queue-base.ts b/src/classes/queue-base.ts index 6b5ef9d385..42f7bf1d31 100644 --- a/src/classes/queue-base.ts +++ b/src/classes/queue-base.ts @@ -6,6 +6,7 @@ import { DELAY_TIME_5, isNotConnectionError, isRedisInstance, + trace, } from '../utils'; import { RedisConnection } from './redis-connection'; import { Job } from './job'; @@ -31,13 +32,6 @@ export class QueueBase extends EventEmitter implements MinimalQueue { protected connection: RedisConnection; public readonly qualifiedName: string; - /** - * Instance of a telemetry client - * To use it wrap the code with trace helper - * It will check if tracer is provided and if not it will continue as is - */ - private tracer: Tracer | undefined; - /** * * @param name - The name of the queue. @@ -84,10 +78,6 @@ export class QueueBase extends EventEmitter implements MinimalQueue { this.keys = queueKeys.getKeys(name); this.toKey = (type: string) => queueKeys.toKey(name, type); this.setScripts(); - - if (opts?.telemetry) { - this.tracer = opts.telemetry.tracer; - } } /** @@ -198,64 +188,21 @@ export class QueueBase extends EventEmitter implements MinimalQueue { * @param srcPropagationMedatada - * @returns */ - async trace( + trace( spanKind: SpanKind, operation: string, destination: string, callback: (span?: Span, dstPropagationMetadata?: string) => Promise | T, srcPropagationMetadata?: string, ) { - if (!this.tracer) { - return callback(); - } - - const currentContext = this.opts.telemetry.contextManager.active(); - - let parentContext; - if (srcPropagationMetadata) { - parentContext = this.opts.telemetry.contextManager.fromMetadata( - currentContext, - srcPropagationMetadata, - ); - } - - const spanName = `${operation} ${destination}`; - const span = this.tracer.startSpan( - spanName, - { - kind: spanKind, - }, - parentContext, + return trace | T>( + this.opts.telemetry, + spanKind, + this.name, + operation, + destination, + callback, + srcPropagationMetadata, ); - - try { - span.setAttributes({ - [TelemetryAttributes.QueueName]: this.name, - [TelemetryAttributes.QueueOperation]: operation, - }); - - let messageContext; - let dstPropagationMetadata: undefined | string; - - if (spanKind === SpanKind.CONSUMER) { - messageContext = span.setSpanOnContext(parentContext); - } else { - messageContext = span.setSpanOnContext(currentContext); - } - - if (callback.length == 2) { - dstPropagationMetadata = - this.opts.telemetry.contextManager.getMetadata(messageContext); - } - - return await this.opts.telemetry.contextManager.with(messageContext, () => - callback(span, dstPropagationMetadata), - ); - } catch (err) { - span.recordException(err as Error); - throw err; - } finally { - span.end(); - } } } diff --git a/src/enums/telemetry-attributes.ts b/src/enums/telemetry-attributes.ts index 3806242e1b..112a8a3c9c 100644 --- a/src/enums/telemetry-attributes.ts +++ b/src/enums/telemetry-attributes.ts @@ -29,6 +29,7 @@ export enum TelemetryAttributes { JobProcessedTimestamp = 'bullmq.job.processed.timestamp', JobResult = 'bullmq.job.result', JobFailedReason = 'bullmq.job.failed.reason', + FlowName = 'bullmq.flow.name', } export enum SpanKind { diff --git a/src/utils.ts b/src/utils.ts index af04b88413..a8b0ff071b 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -6,12 +6,17 @@ import { AbortController } from 'node-abort-controller'; // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore import { CONNECTION_CLOSED_ERROR_MSG } from 'ioredis/built/utils'; -import { ChildMessage, RedisClient } from './interfaces'; +import { + ChildMessage, + ContextManager, + RedisClient, + Span, + Tracer, +} from './interfaces'; import { EventEmitter } from 'events'; import * as semver from 'semver'; -import { join } from 'path'; -import { readFileSync } from 'fs'; +import { SpanKind, TelemetryAttributes } from './enums'; export const errorObject: { [index: string]: any } = { value: null }; @@ -264,3 +269,84 @@ export const toString = (value: any): string => { }; export const QUEUE_EVENT_SUFFIX = ':qe'; + +/** + * Wraps the code with telemetry and provides a span for configuration. + * + * @param telemetry - telemetry configuration. If undefined, the callback will be executed without telemetry. + * @param spanKind - kind of the span: Producer, Consumer, Internal + * @param queueName - queue name + * @param operation - operation name (such as add, process, etc) + * @param destination - destination name (normally the queue name) + * @param callback - code to wrap with telemetry + * @param srcPropagationMedatada - + * @returns + */ +export async function trace( + telemetry: + | { + tracer: Tracer; + contextManager: ContextManager; + } + | undefined, + spanKind: SpanKind, + queueName: string, + operation: string, + destination: string, + callback: (span?: Span, dstPropagationMetadata?: string) => Promise | T, + srcPropagationMetadata?: string, +) { + if (!telemetry) { + return callback(); + } else { + const { tracer, contextManager } = telemetry; + + const currentContext = contextManager.active(); + + let parentContext; + if (srcPropagationMetadata) { + parentContext = contextManager.fromMetadata( + currentContext, + srcPropagationMetadata, + ); + } + + const spanName = destination ? `${operation} ${destination}` : operation; + const span = tracer.startSpan( + spanName, + { + kind: spanKind, + }, + parentContext, + ); + + try { + span.setAttributes({ + [TelemetryAttributes.QueueName]: queueName, + [TelemetryAttributes.QueueOperation]: operation, + }); + + let messageContext; + let dstPropagationMetadata: undefined | string; + + if (spanKind === SpanKind.CONSUMER) { + messageContext = span.setSpanOnContext(parentContext); + } else { + messageContext = span.setSpanOnContext(currentContext); + } + + if (callback.length == 2) { + dstPropagationMetadata = contextManager.getMetadata(messageContext); + } + + return await contextManager.with(messageContext, () => + callback(span, dstPropagationMetadata), + ); + } catch (err) { + span.recordException(err as Error); + throw err; + } finally { + span.end(); + } + } +} diff --git a/tests/test_telemetry_interface.ts b/tests/test_telemetry_interface.ts index 125db85216..13e48cc77d 100644 --- a/tests/test_telemetry_interface.ts +++ b/tests/test_telemetry_interface.ts @@ -2,7 +2,7 @@ import { expect, assert } from 'chai'; import { default as IORedis } from 'ioredis'; import { after, beforeEach, describe, it, before } from 'mocha'; import { v4 } from 'uuid'; -import { Queue, Worker } from '../src/classes'; +import { FlowProducer, Queue, Worker } from '../src/classes'; import { removeAllQueueData } from '../src/utils'; import { Telemetry, @@ -247,7 +247,7 @@ describe('Telemetry', () => { await worker.waitUntilReady(); const moveToCompletedStub = sinon.stub(job, 'moveToCompleted').resolves(); - const startSpanSpy = sinon.spy(worker['tracer'], 'startSpan'); + const startSpanSpy = sinon.spy(telemetryClient.tracer, 'startSpan'); const token = 'some-token'; @@ -289,4 +289,172 @@ describe('Telemetry', () => { await worker.close(); }); }); + + describe('Flows', () => { + it('should correctly interact with telemetry when adding a flow', async () => { + const flowProducer = new FlowProducer({ + connection, + telemetry: telemetryClient, + }); + + const traceSpy = sinon.spy(telemetryClient.tracer, 'startSpan'); + const testFlow = { + name: 'parentJob', + queueName, + data: { foo: 'bar' }, + children: [ + { + name: 'childJob', + queueName, + data: { baz: 'qux' }, + }, + ], + }; + + const jobNode = await flowProducer.add(testFlow); + const parentJob = jobNode.job; + + const span = traceSpy.returnValues[0] as MockSpan; + + expect(span).to.be.an.instanceOf(MockSpan); + expect(span.name).to.equal(`addFlow ${queueName}`); + expect(span.options?.kind).to.equal(SpanKind.PRODUCER); + expect(span.attributes[TelemetryAttributes.FlowName]).to.equal( + testFlow.name, + ); + + traceSpy.restore(); + await flowProducer.close(); + }); + + it('should correctly handle errors and record them in telemetry for flows', async () => { + const flowProducer = new FlowProducer({ + connection, + telemetry: telemetryClient, + }); + + const traceSpy = sinon.spy(telemetryClient.tracer, 'startSpan'); + const recordExceptionSpy = sinon.spy( + MockSpan.prototype, + 'recordException', + ); + + try { + await flowProducer.add({ + name: 'errorJob', + queueName, + data: { foo: 'bar' }, + opts: { parent: { id: 'invalidParentId', queue: 'invalidQueue' } }, + }); + } catch (e) { + assert(recordExceptionSpy.calledOnce); + const recordedError = recordExceptionSpy.firstCall.args[0]; + assert.equal( + recordedError.message, + 'Failed to add flow due to invalid parent configuration', + ); + } finally { + traceSpy.restore(); + recordExceptionSpy.restore(); + await flowProducer.close(); + } + }); + }); + + describe('Flows - addBulk', () => { + it('should correctly interact with telemetry when adding multiple flows', async () => { + const flowProducer = new FlowProducer({ + connection, + telemetry: telemetryClient, + }); + + const traceSpy = sinon.spy(telemetryClient.tracer, 'startSpan'); + const testFlows = [ + { + name: 'parentJob1', + queueName, + data: { foo: 'bar1' }, + children: [ + { + name: 'childJob1', + queueName, + data: { baz: 'qux1' }, + }, + ], + }, + { + name: 'parentJob2', + queueName, + data: { foo: 'bar2' }, + children: [ + { + name: 'childJob2', + queueName, + data: { baz: 'qux2' }, + }, + ], + }, + ]; + + const jobNodes = await flowProducer.addBulk(testFlows); + + const span = traceSpy.returnValues[0] as MockSpan; + + expect(span).to.be.an.instanceOf(MockSpan); + expect(span.name).to.equal('addBulkFlows'); + expect(span.options?.kind).to.equal(SpanKind.PRODUCER); + expect(span.attributes[TelemetryAttributes.BulkNames]).to.equal( + testFlows.map(flow => flow.name).join(','), + ); + expect(span.attributes[TelemetryAttributes.BulkCount]).to.equal( + testFlows.length, + ); + + traceSpy.restore(); + await flowProducer.close(); + }); + + it('should correctly handle errors and record them in telemetry for addBulk', async () => { + const flowProducer = new FlowProducer({ + connection, + telemetry: telemetryClient, + }); + + const traceSpy = sinon.spy(telemetryClient.tracer, 'startSpan'); + const recordExceptionSpy = sinon.spy( + MockSpan.prototype, + 'recordException', + ); + + const invalidFlows = [ + { + name: 'errorJob1', + queueName, + data: { foo: 'bar1' }, + opts: { parent: { id: 'invalidParentId', queue: 'invalidQueue' } }, + }, + { + name: 'errorJob2', + queueName, + data: { foo: 'bar2' }, + opts: { parent: { id: 'invalidParentId', queue: 'invalidQueue' } }, + }, + ]; + + try { + await flowProducer.addBulk(invalidFlows); + } catch (e) { + assert(recordExceptionSpy.calledOnce); + const recordedError = recordExceptionSpy.firstCall.args[0]; + assert.equal( + recordedError.message, + 'Failed to add bulk flows due to invalid parent configuration', + ); + } finally { + traceSpy.restore(); + recordExceptionSpy.restore(); + await flowProducer.close(); + } + }); + }); });