From 6548fcb854e52bb3ccca03ccd90d4e9e3ea57cd0 Mon Sep 17 00:00:00 2001 From: Stephen Belanger Date: Fri, 25 Sep 2020 14:17:55 -0700 Subject: [PATCH] lib: make diagnostics_channel async iterable --- lib/diagnostics_channel.js | 74 ++++++++++++++++++- ...test-diagnostics-channel-async-iterable.js | 31 ++++++++ 2 files changed, 104 insertions(+), 1 deletion(-) create mode 100644 test/parallel/test-diagnostics-channel-async-iterable.js diff --git a/lib/diagnostics_channel.js b/lib/diagnostics_channel.js index 0a3552dc975040..ab2aa18df4a76a 100644 --- a/lib/diagnostics_channel.js +++ b/lib/diagnostics_channel.js @@ -3,10 +3,14 @@ const { ArrayPrototypeIndexOf, ArrayPrototypePush, + ArrayPrototypeShift, ArrayPrototypeSplice, ObjectCreate, ObjectGetPrototypeOf, ObjectSetPrototypeOf, + Promise, + PromiseResolve, + SymbolAsyncIterator, SymbolHasInstance, WeakRefPrototypeGet } = primordials; @@ -21,8 +25,72 @@ const { triggerUncaughtException } = internalBinding('errors'); const { WeakReference } = internalBinding('util'); +class AsyncIterableChannel { + constructor(channel) { + this.channel = channel; + this.events = []; + this.waiting = []; + + this.subscriber = (message) => { + const resolve = ArrayPrototypeShift(this.waiting); + if (resolve) { + return resolve({ + value: message, + done: false + }); + } + + ArrayPrototypePush(this.events, message); + }; + + channel.subscribe(this.subscriber); + } + + [SymbolAsyncIterator]() { + return this; + } + + return() { + const data = { done: true }; + this.done = true; + + this.channel.unsubscribe(this.subscriber); + + for (let i = 0; i < this.waiting.length; i++) { + const resolve = this.waiting[i]; + resolve(data); + } + + return PromiseResolve(data); + } + + next() { + const event = ArrayPrototypeShift(this.events); + if (event) { + return PromiseResolve({ + value: event, + done: false + }); + } + + if (this.done) { + return PromiseResolve({ + done: true + }); + } + + return new Promise((resolve) => { + ArrayPrototypePush(this.waiting, resolve); + }); + } +} + // TODO(qard): should there be a C++ channel interface? class ActiveChannel { + [SymbolAsyncIterator]() { + return new AsyncIterableChannel(this); + } + subscribe(subscription) { if (typeof subscription !== 'function') { throw new ERR_INVALID_ARG_TYPE('subscription', ['function'], @@ -71,7 +139,11 @@ class Channel { static [SymbolHasInstance](instance) { const prototype = ObjectGetPrototypeOf(instance); return prototype === Channel.prototype || - prototype === ActiveChannel.prototype; + prototype === ActiveChannel.prototype; + } + + [SymbolAsyncIterator]() { + return new AsyncIterableChannel(this); } subscribe(subscription) { diff --git a/test/parallel/test-diagnostics-channel-async-iterable.js b/test/parallel/test-diagnostics-channel-async-iterable.js new file mode 100644 index 00000000000000..594b4ff6b52cf8 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-async-iterable.js @@ -0,0 +1,31 @@ +'use strict'; + +const common = require('../common'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); + +const input = { + foo: 'bar' +}; + +const channel = dc.channel('test'); + +const done = common.mustCall(); + +async function main() { + for await (const message of channel) { + assert.strictEqual(channel.hasSubscribers, true); + assert.strictEqual(message, input); + break; + } + + // Make sure the subscription is cleaned up when breaking the loop! + assert.strictEqual(channel.hasSubscribers, false); + done(); +} + +main(); + +setTimeout(common.mustCall(() => { + channel.publish(input); +}), 1);