Skip to content

Commit

Permalink
events: add EventEmitterAsyncResource to core
Browse files Browse the repository at this point in the history
Signd-off-by: James M Snell <jasnell@gmail.com>

PR-URL: #41246
Reviewed-By: Gerhard Stöbich <deb2001-github@yahoo.de>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com>
  • Loading branch information
jasnell authored and targos committed Jan 14, 2022
1 parent c546cef commit fe21607
Show file tree
Hide file tree
Showing 3 changed files with 347 additions and 0 deletions.
85 changes: 85 additions & 0 deletions doc/api/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,89 @@ const emitter = new EventEmitter();
setMaxListeners(5, target, emitter);
```

## Class: `events.EventEmitterAsyncResource extends EventEmitter`

<!-- YAML
added: REPLACEME
-->

Integrates `EventEmitter` with {AsyncResource} for `EventEmitter`s that
require manual async tracking. Specifically, all events emitted by instances
of `events.EventEmitterAsyncResource` will run within its [async context][].

```js
const { EventEmitterAsyncResource } = require('events');
const { notStrictEqual, strictEqual } = require('assert');
const { executionAsyncId } = require('async_hooks');

// Async tracking tooling will identify this as 'Q'.
const ee1 = new EventEmitterAsyncResource({ name: 'Q' });

// 'foo' listeners will run in the EventEmitters async context.
ee1.on('foo', () => {
strictEqual(executionAsyncId(), ee1.asyncId);
strictEqual(triggerAsyncId(), ee1.triggerAsyncId);
});

const ee2 = new EventEmitter();

// 'foo' listeners on ordinary EventEmitters that do not track async
// context, however, run in the same async context as the emit().
ee2.on('foo', () => {
notStrictEqual(executionAsyncId(), ee2.asyncId);
notStrictEqual(triggerAsyncId(), ee2.triggerAsyncId);
});

Promise.resolve().then(() => {
ee1.emit('foo');
ee2.emit('foo');
});
```

The `EventEmitterAsyncResource` class has the same methods and takes the
same options as `EventEmitter` and `AsyncResource` themselves.

### `new events.EventEmitterAsyncResource(options)`

* `options` {Object}
* `captureRejections` {boolean} It enables
[automatic capturing of promise rejection][capturerejections].
**Default:** `false`.
* `name` {string} The type of async event. **Default::**
[`new.target.name`][].
* `triggerAsyncId` {number} The ID of the execution context that created this
async event. **Default:** `executionAsyncId()`.
* `requireManualDestroy` {boolean} If set to `true`, disables `emitDestroy`
when the object is garbage collected. This usually does not need to be set
(even if `emitDestroy` is called manually), unless the resource's `asyncId`
is retrieved and the sensitive API's `emitDestroy` is called with it.
When set to `false`, the `emitDestroy` call on garbage collection
will only take place if there is at least one active `destroy` hook.
**Default:** `false`.

### `eventemitterasyncresource.asyncId`

* Type: {number} The unique `asyncId` assigned to the resource.

### `eventemitterasyncresource.asyncResource`

* Type: The underlying {AsyncResource}.

The returned `AsyncResource` object has an additional `eventEmitter` property
that provides a reference to this `EventEmitterAsyncResource`.

### `eventemitterasyncresource.emitDestroy()`

Call all `destroy` hooks. This should only ever be called once. An error will
be thrown if it is called more than once. This **must** be manually called. If
the resource is left to be collected by the GC then the `destroy` hooks will
never be called.

### `eventemitterasyncresource.triggerAsyncId`

* Type: {number} The same `triggerAsyncId` that is passed to the
`AsyncResource` constructor.

<a id="event-target-and-event-api"></a>

## `EventTarget` and `Event` API
Expand Down Expand Up @@ -1706,7 +1789,9 @@ to the `EventTarget`.
[`events.defaultMaxListeners`]: #eventsdefaultmaxlisteners
[`fs.ReadStream`]: fs.md#class-fsreadstream
[`net.Server`]: net.md#class-netserver
[`new.target.name`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Operators/new.target
[`process.on('warning')`]: process.md#event-warning
[async context]: async_context.md
[capturerejections]: #capture-rejections-of-promises
[error]: #error-events
[rejection]: #emittersymbolfornodejsrejectionerr-eventname-args
Expand Down
130 changes: 130 additions & 0 deletions lib/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const {
ArrayPrototypeShift,
ArrayPrototypeSlice,
ArrayPrototypeSplice,
ArrayPrototypeUnshift,
Boolean,
Error,
ErrorCaptureStackTrace,
Expand All @@ -42,6 +43,7 @@ const {
Promise,
PromiseReject,
PromiseResolve,
ReflectApply,
ReflectOwnKeys,
String,
StringPrototypeSplit,
Expand All @@ -59,6 +61,7 @@ const {
kEnhanceStackBeforeInspector,
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_THIS,
ERR_OUT_OF_RANGE,
ERR_UNHANDLED_ERROR
},
Expand All @@ -68,6 +71,7 @@ const {
validateAbortSignal,
validateBoolean,
validateFunction,
validateString,
} = require('internal/validators');

const kCapture = Symbol('kCapture');
Expand All @@ -76,6 +80,125 @@ const kMaxEventTargetListeners = Symbol('events.maxEventTargetListeners');
const kMaxEventTargetListenersWarned =
Symbol('events.maxEventTargetListenersWarned');

let EventEmitterAsyncResource;
// The EventEmitterAsyncResource has to be initialized lazily because event.js
// is loaded so early in the bootstrap process, before async_hooks is available.
//
// This implementation was adapted straight from addaleax's
// eventemitter-asyncresource MIT-licensed userland module.
// https://github.com/addaleax/eventemitter-asyncresource
function lazyEventEmitterAsyncResource() {
if (EventEmitterAsyncResource === undefined) {
const {
AsyncResource
} = require('async_hooks');

const kEventEmitter = Symbol('kEventEmitter');
const kAsyncResource = Symbol('kAsyncResource');
class EventEmitterReferencingAsyncResource extends AsyncResource {
/**
* @param {EventEmitter} ee
* @param {string} [type]
* @param {{
* triggerAsyncId?: number,
* requireManualDestroy?: boolean,
* }} [options]
*/
constructor(ee, type, options) {
super(type, options);
this[kEventEmitter] = ee;
}

/**
* @type {EventEmitter}
*/
get eventEmitter() {
if (this[kEventEmitter] === undefined)
throw new ERR_INVALID_THIS('EventEmitterReferencingAsyncResource');
return this[kEventEmitter];
}
}

EventEmitterAsyncResource =
class EventEmitterAsyncResource extends EventEmitter {
/**
* @param {{
* name?: string,
* triggerAsyncId?: number,
* requireManualDestroy?: boolean,
* }} [options]
*/
constructor(options = undefined) {
let name;
if (typeof options === 'string') {
name = options;
options = undefined;
} else {
if (new.target === EventEmitterAsyncResource) {
validateString(options?.name, 'options.name');
}
name = options?.name || new.target.name;
}
super(options);

this[kAsyncResource] =
new EventEmitterReferencingAsyncResource(this, name, options);
}

/**
* @param {symbol,string} event
* @param {...any} args
* @returns {boolean}
*/
emit(event, ...args) {
if (this[kAsyncResource] === undefined)
throw new ERR_INVALID_THIS('EventEmitterAsyncResource');
const { asyncResource } = this;
ArrayPrototypeUnshift(args, super.emit, this, event);
return ReflectApply(asyncResource.runInAsyncScope, asyncResource,
args);
}

/**
* @returns {void}
*/
emitDestroy() {
if (this[kAsyncResource] === undefined)
throw new ERR_INVALID_THIS('EventEmitterAsyncResource');
this.asyncResource.emitDestroy();
}

/**
* @type {number}
*/
get asyncId() {
if (this[kAsyncResource] === undefined)
throw new ERR_INVALID_THIS('EventEmitterAsyncResource');
return this.asyncResource.asyncId();
}

/**
* @type {number}
*/
get triggerAsyncId() {
if (this[kAsyncResource] === undefined)
throw new ERR_INVALID_THIS('EventEmitterAsyncResource');
return this.asyncResource.triggerAsyncId();
}

/**
* @type {EventEmitterReferencingAsyncResource}
*/
get asyncResource() {
if (this[kAsyncResource] === undefined)
throw new ERR_INVALID_THIS('EventEmitterAsyncResource');
return this[kAsyncResource];
}
};
}
return EventEmitterAsyncResource;
}

/**
* Creates a new `EventEmitter` instance.
* @param {{ captureRejections?: boolean; }} [opts]
Expand Down Expand Up @@ -106,6 +229,13 @@ ObjectDefineProperty(EventEmitter, 'captureRejections', {
enumerable: true
});

ObjectDefineProperty(EventEmitter, 'EventEmitterAsyncResource', {
enumerable: true,
get: lazyEventEmitterAsyncResource,
set: undefined,
configurable: true,
});

EventEmitter.errorMonitor = kErrorMonitor;

// The default for captureRejections is false
Expand Down
Loading

0 comments on commit fe21607

Please sign in to comment.