Skip to content

Commit

Permalink
feat(queue-events): add QueueEventsProducer for publishing custom eve…
Browse files Browse the repository at this point in the history
…nts (#2844)
  • Loading branch information
roggervalf authored Nov 6, 2024
1 parent 43d80b3 commit 5eb03cd
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 23 deletions.
3 changes: 2 additions & 1 deletion docs/gitbook/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@
* [Parallelism and Concurrency](guide/parallelism-and-concurrency.md)
* [Retrying failing jobs](guide/retrying-failing-jobs.md)
* [Returning job data](guide/returning-job-data.md)
* [Events](guide/events.md)
* [Events](guide/events/README.md)
* [Create Custom Events](guide/events/create-custom-events.md)
* [Telemetry](guide/telemetry/README.md)
* [Getting started](guide/telemetry/getting-started.md)
* [Running Jaeger](guide/telemetry/running-jaeger.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ myWorker.on('failed', (job: Job) => {
});
```

The events above are local for the workers that actually completed the jobs. However, in many situations you want to listen to all the events emitted by all the workers in one single place. For this you can use the [`QueueEvents`](../api/bullmq.queueevents.md) class:
The events above are local for the workers that actually completed the jobs. However, in many situations you want to listen to all the events emitted by all the workers in one single place. For this you can use the [`QueueEvents`](https://api.docs.bullmq.io/classes/v5.QueueEvents.html) class:

```typescript
import { QueueEvents } from 'bullmq';
Expand Down
44 changes: 44 additions & 0 deletions docs/gitbook/guide/events/create-custom-events.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Create Custom Events

In BullMQ, creating a generic distributed realtime event emitter is possible by using our **QueueEventsProducer** class.

Consumers must use **QueueEvents** class to subscribe to those events that they are interested in.

```typescript
const queueName = 'customQueue';
const queueEventsProducer = new QueueEventsProducer(queueName, {
connection,
});
const queueEvents = new QueueEvents(queueName, {
connection,
});

interface CustomListener extends QueueEventsListener {
example: (args: { custom: string }, id: string) => void;
}
queueEvents.on<CustomListener>('example', async ({ custom }) => {
// custom logic
});

interface CustomEventPayload {
eventName: string;
custom: string;
}

await queueEventsProducer.publishEvent<CustomEventPayload>({
eventName: 'example',
custom: 'value',
});
```

Only eventName attribute is required.

{% hint style="warning" %}
Some event names are reserved from [Queue Listener API Reference](https://api.docs.bullmq.io/interfaces/v5.QueueListener.html).
{% endhint %}

## Read more:

- 💡 [Queue Events API Reference](https://api.docs.bullmq.io/classes/v5.QueueEvents.html)
- 💡 [Queue Events Listener API Reference](https://api.docs.bullmq.io/interfaces/v5.QueueEventsListener.html)
- 💡 [Queue Events Producer API Reference](https://api.docs.bullmq.io/interfaces/v5.QueueEventsProducer.html)
1 change: 1 addition & 0 deletions src/classes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export * from './job';
// export * from './main-worker'; this file must not be exported
export * from './queue-base';
export * from './queue-events';
export * from './queue-events-producer';
export * from './queue-getters';
export * from './queue-keys';
export * from './queue';
Expand Down
58 changes: 58 additions & 0 deletions src/classes/queue-events-producer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { QueueBaseOptions } from '../interfaces';
import { QueueBase } from './queue-base';
import { RedisConnection } from './redis-connection';

/**
* The QueueEventsProducer class is used for publishing custom events.
*/
export class QueueEventsProducer extends QueueBase {
constructor(
name: string,
opts: QueueBaseOptions = {
connection: {},
},
Connection?: typeof RedisConnection,
) {
super(
name,
{
blockingConnection: false,
...opts,
},
Connection,
);

this.opts = opts;
}

/**
* Publish custom event to be processed in QueueEvents.
* @param argsObj - Event payload
* @param maxEvents - Max quantity of events to be saved
*/
async publishEvent<T extends { eventName: string }>(
argsObj: T,
maxEvents = 1000,
): Promise<void> {
const client = await this.client;
const key = this.keys.events;
const { eventName, ...restArgs } = argsObj;
const args: any[] = ['MAXLEN', '~', maxEvents, '*', 'event', eventName];

for (const [key, value] of Object.entries(restArgs)) {
args.push(key, value);
}

await client.xadd(key, ...args);
}

/**
* Closes the connection and returns a promise that resolves when the connection is closed.
*/
async close(): Promise<void> {
if (!this.closing) {
this.closing = this.connection.close();
}
await this.closing;
}
}
48 changes: 28 additions & 20 deletions src/classes/queue-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ export interface QueueEventsListener extends IoredisListener {
'waiting-children': (args: { jobId: string }, id: string) => void;
}

type CustomParameters<T> = T extends (...args: infer Args) => void
? Args
: never;

type KeyOf<T extends object> = Extract<keyof T, string>;

/**
* The QueueEvents class is used for listening to the global events
* emitted by a given queue.
Expand Down Expand Up @@ -213,34 +219,34 @@ export class QueueEvents extends QueueBase {
}
}

emit<U extends keyof QueueEventsListener>(
event: U,
...args: Parameters<QueueEventsListener[U]>
): boolean {
emit<
QEL extends QueueEventsListener = QueueEventsListener,
U extends KeyOf<QEL> = KeyOf<QEL>,
>(event: U, ...args: CustomParameters<QEL[U]>): boolean {
return super.emit(event, ...args);
}

off<U extends keyof QueueEventsListener>(
eventName: U,
listener: QueueEventsListener[U],
): this {
super.off(eventName, listener);
off<
QEL extends QueueEventsListener = QueueEventsListener,
U extends KeyOf<QEL> = KeyOf<QEL>,
>(eventName: U, listener: QEL[U]): this {
super.off(eventName, listener as (...args: any[]) => void);
return this;
}

on<U extends keyof QueueEventsListener>(
event: U,
listener: QueueEventsListener[U],
): this {
super.on(event, listener);
on<
QEL extends QueueEventsListener = QueueEventsListener,
U extends KeyOf<QEL> = KeyOf<QEL>,
>(event: U, listener: QEL[U]): this {
super.on(event, listener as (...args: any[]) => void);
return this;
}

once<U extends keyof QueueEventsListener>(
event: U,
listener: QueueEventsListener[U],
): this {
super.once(event, listener);
once<
QEL extends QueueEventsListener = QueueEventsListener,
U extends KeyOf<QEL> = KeyOf<QEL>,
>(event: U, listener: QEL[U]): this {
super.once(event, listener as (...args: any[]) => void);
return this;
}

Expand Down Expand Up @@ -310,7 +316,9 @@ export class QueueEvents extends QueueBase {
this.emit(event, id);
} else {
this.emit(event as any, restArgs, id);
this.emit(`${event}:${restArgs.jobId}` as any, restArgs, id);
if (restArgs.jobId) {
this.emit(`${event}:${restArgs.jobId}` as any, restArgs, id);
}
}
}
}
Expand Down
54 changes: 53 additions & 1 deletion tests/test_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@ import { v4 } from 'uuid';
import { expect } from 'chai';
import { after } from 'lodash';
import { beforeEach, describe, it, before, after as afterAll } from 'mocha';
import { FlowProducer, Queue, QueueEvents, Worker } from '../src/classes';
import {
FlowProducer,
Queue,
QueueEvents,
QueueEventsListener,
QueueEventsProducer,
Worker,
} from '../src/classes';
import { delay, removeAllQueueData } from '../src/utils';

describe('events', function () {
Expand Down Expand Up @@ -1253,4 +1260,49 @@ describe('events', function () {
await trimmedQueue.close();
await removeAllQueueData(new IORedis(redisHost), queueName);
});

describe('when publishing custom events', function () {
it('emits waiting when a job has been added', async () => {
const queueName2 = `test-${v4()}`;
const queueEventsProducer = new QueueEventsProducer(queueName2, {
connection,
prefix,
});
const queueEvents2 = new QueueEvents(queueName2, {
autorun: false,
connection,
prefix,
lastEventId: '0-0',
});
await queueEvents2.waitUntilReady();

interface CustomListener extends QueueEventsListener {
example: (args: { custom: string }, id: string) => void;
}
const customEvent = new Promise<void>(resolve => {
queueEvents2.on<CustomListener>('example', async ({ custom }) => {
await delay(250);
await expect(custom).to.be.equal('value');
resolve();
});
});

interface CustomEventPayload {
eventName: string;
custom: string;
}

await queueEventsProducer.publishEvent<CustomEventPayload>({
eventName: 'example',
custom: 'value',
});

queueEvents2.run();
await customEvent;

await queueEventsProducer.close();
await queueEvents2.close();
await removeAllQueueData(new IORedis(redisHost), queueName2);
});
});
});

0 comments on commit 5eb03cd

Please sign in to comment.