Skip to content

Commit

Permalink
feat(telemetry): add option to omit context propagation on jobs (#2946)
Browse files Browse the repository at this point in the history
  • Loading branch information
fgozdz authored Dec 10, 2024
1 parent 0e3c2e5 commit 6514c33
Show file tree
Hide file tree
Showing 13 changed files with 492 additions and 234 deletions.
14 changes: 9 additions & 5 deletions docs/gitbook/README (1).md
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,15 @@ import IORedis from 'ioredis';

const connection = new IORedis({ maxRetriesPerRequest: null });

const worker = new Worker('foo', async job => {
// Will print { foo: 'bar'} for the first job
// and { qux: 'baz' } for the second.
console.log(job.data);
}, { connection });
const worker = new Worker(
'foo',
async job => {
// Will print { foo: 'bar'} for the first job
// and { qux: 'baz' } for the second.
console.log(job.data);
},
{ connection },
);
```

{% hint style="info" %}
Expand Down
238 changes: 119 additions & 119 deletions docs/gitbook/SUMMARY.md
Original file line number Diff line number Diff line change
@@ -1,136 +1,136 @@
# Table of contents

* [What is BullMQ](README.md)
* [Quick Start](<README (1).md>)
* [API Reference](https://api.docs.bullmq.io)
* [Changelogs](changelog.md)
* [v4](changelogs/changelog-v4.md)
* [v3](changelogs/changelog-v3.md)
* [v2](changelogs/changelog-v2.md)
* [v1](changelogs/changelog-v1.md)
- [What is BullMQ](README.md)
- [Quick Start](<README (1).md>)
- [API Reference](https://api.docs.bullmq.io)
- [Changelogs](changelog.md)
- [v4](changelogs/changelog-v4.md)
- [v3](changelogs/changelog-v3.md)
- [v2](changelogs/changelog-v2.md)
- [v1](changelogs/changelog-v1.md)

## Guide

* [Introduction](guide/introduction.md)
* [Connections](guide/connections.md)
* [Queues](guide/queues/README.md)
* [Auto-removal of jobs](guide/queues/auto-removal-of-jobs.md)
* [Adding jobs in bulk](guide/queues/adding-bulks.md)
* [Global Concurrency](guide/queues/global-concurrency.md)
* [Removing Jobs](guide/queues/removing-jobs.md)
* [Workers](guide/workers/README.md)
* [Auto-removal of jobs](guide/workers/auto-removal-of-jobs.md)
* [Concurrency](guide/workers/concurrency.md)
* [Graceful shutdown](guide/workers/graceful-shutdown.md)
* [Stalled Jobs](guide/workers/stalled-jobs.md)
* [Sandboxed processors](guide/workers/sandboxed-processors.md)
* [Pausing queues](guide/workers/pausing-queues.md)
* [Jobs](guide/jobs/README.md)
* [FIFO](guide/jobs/fifo.md)
* [LIFO](guide/jobs/lifo.md)
* [Job Ids](guide/jobs/job-ids.md)
* [Job Data](guide/jobs/job-data.md)
* [Deduplication](guide/jobs/deduplication.md)
* [Delayed](guide/jobs/delayed.md)
* [Repeatable](guide/jobs/repeatable.md)
* [Prioritized](guide/jobs/prioritized.md)
* [Removing jobs](guide/jobs/removing-job.md)
* [Stalled](guide/jobs/stalled.md)
* [Getters](guide/jobs/getters.md)
* [Job Schedulers](guide/job-schedulers/README.md)
* [Repeat Strategies](guide/job-schedulers/repeat-strategies.md)
* [Repeat options](guide/job-schedulers/repeat-options.md)
* [Manage Job Schedulers](guide/job-schedulers/manage-job-schedulers.md)
* [Flows](guide/flows/README.md)
* [Adding flows in bulk](guide/flows/adding-bulks.md)
* [Get Flow Tree](guide/flows/get-flow-tree.md)
* [Fail Parent](guide/flows/fail-parent.md)
* [Remove Dependency](guide/flows/remove-dependency.md)
* [Ignore Dependency](guide/flows/ignore-dependency.md)
* [Remove Child Dependency](guide/flows/remove-child-dependency.md)
* [Metrics](guide/metrics/metrics.md)
* [Rate limiting](guide/rate-limiting.md)
* [Parallelism and Concurrency](guide/parallelism-and-concurrency.md)
* [Retrying failing jobs](guide/retrying-failing-jobs.md)
* [Returning job data](guide/returning-job-data.md)
* [Events](guide/events/README.md)
* [Create Custom Events](guide/events/create-custom-events.md)
* [Telemetry](guide/telemetry/README.md)
* [Getting started](guide/telemetry/getting-started.md)
* [Running Jaeger](guide/telemetry/running-jaeger.md)
* [Running a simple example](guide/telemetry/running-a-simple-example.md)
* [QueueScheduler](guide/queuescheduler.md)
* [Redis™ Compatibility](guide/redis-tm-compatibility/README.md)
* [Dragonfly](guide/redis-tm-compatibility/dragonfly.md)
* [Redis™ hosting](guide/redis-tm-hosting/README.md)
* [AWS MemoryDB](guide/redis-tm-hosting/aws-memorydb.md)
* [AWS Elasticache](guide/redis-tm-hosting/aws-elasticache.md)
* [Architecture](guide/architecture.md)
* [NestJs](guide/nestjs/README.md)
* [Producers](guide/nestjs/producers.md)
* [Queue Events Listeners](guide/nestjs/queue-events-listeners.md)
* [Going to production](guide/going-to-production.md)
* [Migration to newer versions](guide/migration-to-newer-versions.md)
* [Troubleshooting](guide/troubleshooting.md)
- [Introduction](guide/introduction.md)
- [Connections](guide/connections.md)
- [Queues](guide/queues/README.md)
- [Auto-removal of jobs](guide/queues/auto-removal-of-jobs.md)
- [Adding jobs in bulk](guide/queues/adding-bulks.md)
- [Global Concurrency](guide/queues/global-concurrency.md)
- [Removing Jobs](guide/queues/removing-jobs.md)
- [Workers](guide/workers/README.md)
- [Auto-removal of jobs](guide/workers/auto-removal-of-jobs.md)
- [Concurrency](guide/workers/concurrency.md)
- [Graceful shutdown](guide/workers/graceful-shutdown.md)
- [Stalled Jobs](guide/workers/stalled-jobs.md)
- [Sandboxed processors](guide/workers/sandboxed-processors.md)
- [Pausing queues](guide/workers/pausing-queues.md)
- [Jobs](guide/jobs/README.md)
- [FIFO](guide/jobs/fifo.md)
- [LIFO](guide/jobs/lifo.md)
- [Job Ids](guide/jobs/job-ids.md)
- [Job Data](guide/jobs/job-data.md)
- [Deduplication](guide/jobs/deduplication.md)
- [Delayed](guide/jobs/delayed.md)
- [Repeatable](guide/jobs/repeatable.md)
- [Prioritized](guide/jobs/prioritized.md)
- [Removing jobs](guide/jobs/removing-job.md)
- [Stalled](guide/jobs/stalled.md)
- [Getters](guide/jobs/getters.md)
- [Job Schedulers](guide/job-schedulers/README.md)
- [Repeat Strategies](guide/job-schedulers/repeat-strategies.md)
- [Repeat options](guide/job-schedulers/repeat-options.md)
- [Manage Job Schedulers](guide/job-schedulers/manage-job-schedulers.md)
- [Flows](guide/flows/README.md)
- [Adding flows in bulk](guide/flows/adding-bulks.md)
- [Get Flow Tree](guide/flows/get-flow-tree.md)
- [Fail Parent](guide/flows/fail-parent.md)
- [Remove Dependency](guide/flows/remove-dependency.md)
- [Ignore Dependency](guide/flows/ignore-dependency.md)
- [Remove Child Dependency](guide/flows/remove-child-dependency.md)
- [Metrics](guide/metrics/metrics.md)
- [Rate limiting](guide/rate-limiting.md)
- [Parallelism and Concurrency](guide/parallelism-and-concurrency.md)
- [Retrying failing jobs](guide/retrying-failing-jobs.md)
- [Returning job data](guide/returning-job-data.md)
- [Events](guide/events/README.md)
- [Create Custom Events](guide/events/create-custom-events.md)
- [Telemetry](guide/telemetry/README.md)
- [Getting started](guide/telemetry/getting-started.md)
- [Running Jaeger](guide/telemetry/running-jaeger.md)
- [Running a simple example](guide/telemetry/running-a-simple-example.md)
- [QueueScheduler](guide/queuescheduler.md)
- [Redis™ Compatibility](guide/redis-tm-compatibility/README.md)
- [Dragonfly](guide/redis-tm-compatibility/dragonfly.md)
- [Redis™ hosting](guide/redis-tm-hosting/README.md)
- [AWS MemoryDB](guide/redis-tm-hosting/aws-memorydb.md)
- [AWS Elasticache](guide/redis-tm-hosting/aws-elasticache.md)
- [Architecture](guide/architecture.md)
- [NestJs](guide/nestjs/README.md)
- [Producers](guide/nestjs/producers.md)
- [Queue Events Listeners](guide/nestjs/queue-events-listeners.md)
- [Going to production](guide/going-to-production.md)
- [Migration to newer versions](guide/migration-to-newer-versions.md)
- [Troubleshooting](guide/troubleshooting.md)

## Patterns

* [Adding jobs in bulk across different queues](patterns/adding-bulks.md)
* [Manually processing jobs](patterns/manually-fetching-jobs.md)
* [Named Processor](patterns/named-processor.md)
* [Flows](patterns/flows.md)
* [Idempotent jobs](patterns/idempotent-jobs.md)
* [Throttle jobs](patterns/throttle-jobs.md)
* [Process Step Jobs](patterns/process-step-jobs.md)
* [Failing fast when Redis is down](patterns/failing-fast-when-redis-is-down.md)
* [Stop retrying jobs](patterns/stop-retrying-jobs.md)
* [Timeout jobs](patterns/timeout-jobs.md)
* [Redis Cluster](patterns/redis-cluster.md)
- [Adding jobs in bulk across different queues](patterns/adding-bulks.md)
- [Manually processing jobs](patterns/manually-fetching-jobs.md)
- [Named Processor](patterns/named-processor.md)
- [Flows](patterns/flows.md)
- [Idempotent jobs](patterns/idempotent-jobs.md)
- [Throttle jobs](patterns/throttle-jobs.md)
- [Process Step Jobs](patterns/process-step-jobs.md)
- [Failing fast when Redis is down](patterns/failing-fast-when-redis-is-down.md)
- [Stop retrying jobs](patterns/stop-retrying-jobs.md)
- [Timeout jobs](patterns/timeout-jobs.md)
- [Redis Cluster](patterns/redis-cluster.md)

## BullMQ Pro

* [Introduction](bullmq-pro/introduction.md)
* [Install](bullmq-pro/install.md)
* [Observables](bullmq-pro/observables/README.md)
* [Cancelation](bullmq-pro/observables/cancelation.md)
* [Groups](bullmq-pro/groups/README.md)
* [Getters](bullmq-pro/groups/getters.md)
* [Rate limiting](bullmq-pro/groups/rate-limiting.md)
* [Concurrency](bullmq-pro/groups/concurrency.md)
* [Local group concurrency](bullmq-pro/groups/local-group-concurrency.md)
* [Max group size](bullmq-pro/groups/max-group-size.md)
* [Pausing groups](bullmq-pro/groups/pausing-groups.md)
* [Prioritized intra-groups](bullmq-pro/groups/prioritized.md)
* [Sandboxes for groups](bullmq-pro/groups/sandboxes-for-groups.md)
* [Telemetry](bullmq-pro/telemetry.md)
* [Batches](bullmq-pro/batches.md)
* [NestJs](bullmq-pro/nestjs/README.md)
* [Producers](bullmq-pro/nestjs/producers.md)
* [Queue Events Listeners](bullmq-pro/nestjs/queue-events-listeners.md)
* [API Reference](https://nestjs.bullmq.pro/)
* [Changelog](bullmq-pro/nestjs/changelog.md)
* [API Reference](https://api.bullmq.pro)
* [Changelog](bullmq-pro/changelog.md)
* [Support](bullmq-pro/support.md)
- [Introduction](bullmq-pro/introduction.md)
- [Install](bullmq-pro/install.md)
- [Observables](bullmq-pro/observables/README.md)
- [Cancelation](bullmq-pro/observables/cancelation.md)
- [Groups](bullmq-pro/groups/README.md)
- [Getters](bullmq-pro/groups/getters.md)
- [Rate limiting](bullmq-pro/groups/rate-limiting.md)
- [Concurrency](bullmq-pro/groups/concurrency.md)
- [Local group concurrency](bullmq-pro/groups/local-group-concurrency.md)
- [Max group size](bullmq-pro/groups/max-group-size.md)
- [Pausing groups](bullmq-pro/groups/pausing-groups.md)
- [Prioritized intra-groups](bullmq-pro/groups/prioritized.md)
- [Sandboxes for groups](bullmq-pro/groups/sandboxes-for-groups.md)
- [Telemetry](bullmq-pro/telemetry.md)
- [Batches](bullmq-pro/batches.md)
- [NestJs](bullmq-pro/nestjs/README.md)
- [Producers](bullmq-pro/nestjs/producers.md)
- [Queue Events Listeners](bullmq-pro/nestjs/queue-events-listeners.md)
- [API Reference](https://nestjs.bullmq.pro/)
- [Changelog](bullmq-pro/nestjs/changelog.md)
- [API Reference](https://api.bullmq.pro)
- [Changelog](bullmq-pro/changelog.md)
- [Support](bullmq-pro/support.md)

## Bull

* [Introduction](bull/introduction.md)
* [Install](bull/install.md)
* [Quick Guide](bull/quick-guide.md)
* [Important Notes](bull/important-notes.md)
* [Reference](https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md)
* [Patterns](bull/patterns/README.md)
* [Persistent connections](bull/patterns/persistent-connections.md)
* [Message queue](bull/patterns/message-queue.md)
* [Returning Job Completions](bull/patterns/returning-job-completions.md)
* [Reusing Redis Connections](bull/patterns/reusing-redis-connections.md)
* [Redis cluster](bull/patterns/redis-cluster.md)
* [Custom backoff strategy](bull/patterns/custom-backoff-strategy.md)
* [Debugging](bull/patterns/debugging.md)
* [Manually fetching jobs](bull/patterns/manually-fetching-jobs.md)
- [Introduction](bull/introduction.md)
- [Install](bull/install.md)
- [Quick Guide](bull/quick-guide.md)
- [Important Notes](bull/important-notes.md)
- [Reference](https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md)
- [Patterns](bull/patterns/README.md)
- [Persistent connections](bull/patterns/persistent-connections.md)
- [Message queue](bull/patterns/message-queue.md)
- [Returning Job Completions](bull/patterns/returning-job-completions.md)
- [Reusing Redis Connections](bull/patterns/reusing-redis-connections.md)
- [Redis cluster](bull/patterns/redis-cluster.md)
- [Custom backoff strategy](bull/patterns/custom-backoff-strategy.md)
- [Debugging](bull/patterns/debugging.md)
- [Manually fetching jobs](bull/patterns/manually-fetching-jobs.md)

## Python

* [Introduction](python/introduction.md)
* [Changelog](python/changelog.md)
- [Introduction](python/introduction.md)
- [Changelog](python/changelog.md)
32 changes: 16 additions & 16 deletions docs/gitbook/bullmq-pro/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,46 @@
In the same fashion we support telemetry in BullMQ open source edition, we also support telemetry for BullMQ Pro. It works basically the same, in fact you can just the same integrations available for BullMQ in the Pro version. So in order to enable it you would do something like this:

```typescript
import { QueuePro } from "@taskforcesh/bullmq-pro";
import { BullMQOtel } from "bullmq-otel";
import { QueuePro } from '@taskforcesh/bullmq-pro';
import { BullMQOtel } from 'bullmq-otel';

// Initialize a Pro queue using BullMQ-Otel
const queue = new QueuePro("myProQueue", {
const queue = new QueuePro('myProQueue', {
connection,
telemetry: new BullMQOtel("guide"),
telemetry: new BullMQOtel('guide'),
});

await queue.add(
"myJob",
{ data: "myData" },
'myJob',
{ data: 'myData' },
{
attempts: 2,
backoff: 1000,
group: {
id: "myGroupId",
id: 'myGroupId',
},
}
},
);
```

For the Worker we will do it in a similar way:

```typescript
import { WorkerPro } from "@taskforcesh/bullmq-pro";
import { BullMQOtel } from "bullmq-otel";
import { WorkerPro } from '@taskforcesh/bullmq-pro';
import { BullMQOtel } from 'bullmq-otel';

const worker = new WorkerPro(
"myProQueue",
async (job) => {
console.log("processing job", job.id);
'myProQueue',
async job => {
console.log('processing job', job.id);
},
{
name: "myWorker",
name: 'myWorker',
connection,
telemetry: new BullMQOtel("guide"),
telemetry: new BullMQOtel('guide'),
concurrency: 10,
batch: { size: 10 },
}
},
);
```

Expand Down
22 changes: 19 additions & 3 deletions src/classes/flow-producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -332,21 +332,37 @@ export class FlowProducer extends EventEmitter {
node.name,
'addNode',
node.queueName,
async (span, dstPropagationMetadata) => {
async (span, srcPropagationMedatada) => {
span?.setAttributes({
[TelemetryAttributes.JobName]: node.name,
[TelemetryAttributes.JobId]: jobId,
});
const opts = node.opts;
let telemetry = opts?.telemetry;

if (srcPropagationMedatada && opts) {
const omitContext = opts.telemetry?.omitContext;
const telemetryMetadata =
opts.telemetry?.metadata ||
(!omitContext && srcPropagationMedatada);

if (telemetryMetadata || omitContext) {
telemetry = {
metadata: telemetryMetadata,
omitContext,
};
}
}

const job = new this.Job(
queue,
node.name,
node.data,
{
...jobsOpts,
...node.opts,
...opts,
parent: parent?.parentOpts,
telemetryMetadata: dstPropagationMetadata,
telemetry,
},
jobId,
);
Expand Down
Loading

0 comments on commit 6514c33

Please sign in to comment.