From 41bf7288299c946902bdf1fb677b2746ae79fa47 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Thu, 8 Jun 2023 23:33:02 +0530 Subject: [PATCH] stream: implement ReadableStream.from Fixes: https://github.com/nodejs/node/issues/48389 --- lib/internal/webstreams/readablestream.js | 76 +++++++++++++++++++++++ lib/internal/webstreams/util.js | 53 ++++++++++++++++ 2 files changed, 129 insertions(+) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 1f96a709959301..4baf9474239ead 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -6,6 +6,7 @@ const { ArrayBufferPrototypeSlice, ArrayPrototypePush, ArrayPrototypeShift, + Boolean, DataView, FunctionPrototypeBind, FunctionPrototypeCall, @@ -110,6 +111,8 @@ const { nonOpCancel, nonOpPull, nonOpStart, + getIterator, + iteratorNext, kType, kState, } = require('internal/webstreams/util'); @@ -314,6 +317,10 @@ class ReadableStream { return isReadableStreamLocked(this); } + static from(iterable) { + return readableStreamFromIterable(iterable); + } + /** * @param {any} [reason] * @returns { Promise } @@ -1249,6 +1256,75 @@ const isReadableStreamBYOBReader = // ---- ReadableStream Implementation +function readableStreamFromIterable(iterable) { + let stream; + const iteratorRecord = getIterator(iterable, 'async'); + + const startAlgorithm = nonOpStart; + + function pullAlgorithm() { + let nextResult; + try { + nextResult = iteratorNext(iteratorRecord); + } catch (error) { + return PromiseReject(error); + } + const nextPromise = PromiseResolve(nextResult); + return PromisePrototypeThen(nextPromise, (iterResult) => { + if (typeof iterResult !== 'object' || iterResult === null) { + throw new ERR_INVALID_STATE.TypeError( + 'The promise returned by the iterator.next() method must fulfill with an object'); + } + const done = Boolean(iterResult.done); + if (done) { + readableStreamDefaultControllerClose(stream[kState].controller); + } else { + readableStreamDefaultControllerEnqueue(stream[kState].controller, iterResult.value); + } + }); + } + + function cancelAlgorithm(reason) { + const iterator = iteratorRecord.iterator; + let returnMethod; + try { + returnMethod = iterator.return; + } catch (error) { + return PromiseReject(error); + } + if (returnMethod === undefined) { + return PromiseResolve(); + } + let returnResult; + try { + returnResult = FunctionPrototypeCall(returnMethod, iterator, reason); + } catch (error) { + return PromiseReject(error); + } + const returnPromise = PromiseResolve(returnResult); + return PromisePrototypeThen(returnPromise, (iterResult) => { + if (typeof iterResult !== 'object' || iterResult === null) { + throw new ERR_INVALID_STATE.TypeError( + 'The promise returned by the iterator.return() method must fulfill with an object'); + } + return undefined; + }); + } + + stream = new ReadableStream({ + start: startAlgorithm, + pull: pullAlgorithm, + cancel: cancelAlgorithm, + }, { + size() { + return 1; + }, + highWaterMark: 0, + }); + + return stream; +} + function readableStreamPipeTo( source, dest, diff --git a/lib/internal/webstreams/util.js b/lib/internal/webstreams/util.js index abc064170926dd..1979c55667b167 100644 --- a/lib/internal/webstreams/util.js +++ b/lib/internal/webstreams/util.js @@ -13,6 +13,8 @@ const { PromiseReject, ReflectGet, Symbol, + SymbolAsyncIterator, + SymbolIterator, Uint8Array, } = primordials; @@ -20,6 +22,7 @@ const { codes: { ERR_INVALID_ARG_VALUE, ERR_OPERATION_FAILED, + ERR_INVALID_STATE, }, } = require('internal/errors'); @@ -217,6 +220,54 @@ function lazyTransfer() { return transfer; } +function createAsyncFromSyncIterator(syncIteratorRecord) { + const syncIterable = { + [SymbolIterator]: () => syncIteratorRecord.iterator, + }; + + const asyncIterator = (async function* () { + return yield* syncIterable; + }()); + + const nextMethod = asyncIterator.next; + return { iterator: asyncIterator, nextMethod, done: false }; +} + +function getIterator(obj, kind = 'sync', method) { + if (method === undefined) { + if (kind === 'async') { + method = obj[SymbolAsyncIterator]; + if (method === undefined) { + const syncMethod = obj[SymbolIterator]; + const syncIteratorRecord = getIterator(obj, 'sync', syncMethod); + return createAsyncFromSyncIterator(syncIteratorRecord); + } + } else { + method = obj[SymbolIterator]; + } + } + + const iterator = FunctionPrototypeCall(method, obj); + if (typeof iterator !== 'object' || iterator === null) { + throw new ERR_INVALID_STATE.TypeError('The iterator method must return an object'); + } + const nextMethod = iterator.next; + return { iterator, nextMethod, done: false }; +} + +function iteratorNext(iteratorRecord, value) { + let result; + if (value === undefined) { + result = FunctionPrototypeCall(iteratorRecord.nextMethod, iteratorRecord.iterator); + } else { + result = FunctionPrototypeCall(iteratorRecord.nextMethod, iteratorRecord.iterator, [value]); + } + if (typeof result !== 'object' || result === null) { + throw new ERR_INVALID_STATE.TypeError('The iterator.next() method must return an object'); + } + return result; +} + module.exports = { ArrayBufferViewGetBuffer, ArrayBufferViewGetByteLength, @@ -243,6 +294,8 @@ module.exports = { nonOpPull, nonOpStart, nonOpWrite, + getIterator, + iteratorNext, kType, kState, };