diff --git a/node_modules/readable-stream/lib/internal/streams/add-abort-signal.js b/node_modules/readable-stream/lib/internal/streams/add-abort-signal.js index c6ba8b9c298f1..3a26a1d3e6d76 100644 --- a/node_modules/readable-stream/lib/internal/streams/add-abort-signal.js +++ b/node_modules/readable-stream/lib/internal/streams/add-abort-signal.js @@ -1,6 +1,7 @@ 'use strict' const { AbortError, codes } = require('../../ours/errors') +const { isNodeStream, isWebStream, kControllerErrorFunction } = require('./utils') const eos = require('./end-of-stream') const { ERR_INVALID_ARG_TYPE } = codes @@ -12,13 +13,10 @@ const validateAbortSignal = (signal, name) => { throw new ERR_INVALID_ARG_TYPE(name, 'AbortSignal', signal) } } -function isNodeStream(obj) { - return !!(obj && typeof obj.pipe === 'function') -} module.exports.addAbortSignal = function addAbortSignal(signal, stream) { validateAbortSignal(signal, 'signal') - if (!isNodeStream(stream)) { - throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream) + if (!isNodeStream(stream) && !isWebStream(stream)) { + throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream) } return module.exports.addAbortSignalNoValidate(signal, stream) } @@ -26,13 +24,21 @@ module.exports.addAbortSignalNoValidate = function (signal, stream) { if (typeof signal !== 'object' || !('aborted' in signal)) { return stream } - const onAbort = () => { - stream.destroy( - new AbortError(undefined, { - cause: signal.reason - }) - ) - } + const onAbort = isNodeStream(stream) + ? () => { + stream.destroy( + new AbortError(undefined, { + cause: signal.reason + }) + ) + } + : () => { + stream[kControllerErrorFunction]( + new AbortError(undefined, { + cause: signal.reason + }) + ) + } if (signal.aborted) { onAbort() } else { diff --git a/node_modules/readable-stream/lib/internal/streams/compose.js b/node_modules/readable-stream/lib/internal/streams/compose.js index 4a00aead883c2..f565c12ef3620 100644 --- a/node_modules/readable-stream/lib/internal/streams/compose.js +++ b/node_modules/readable-stream/lib/internal/streams/compose.js @@ -3,11 +3,20 @@ const { pipeline } = require('./pipeline') const Duplex = require('./duplex') const { destroyer } = require('./destroy') -const { isNodeStream, isReadable, isWritable } = require('./utils') +const { + isNodeStream, + isReadable, + isWritable, + isWebStream, + isTransformStream, + isWritableStream, + isReadableStream +} = require('./utils') const { AbortError, codes: { ERR_INVALID_ARG_VALUE, ERR_MISSING_ARGS } } = require('../../ours/errors') +const eos = require('./end-of-stream') module.exports = function compose(...streams) { if (streams.length === 0) { throw new ERR_MISSING_ARGS('streams') @@ -24,14 +33,17 @@ module.exports = function compose(...streams) { streams[idx] = Duplex.from(streams[idx]) } for (let n = 0; n < streams.length; ++n) { - if (!isNodeStream(streams[n])) { + if (!isNodeStream(streams[n]) && !isWebStream(streams[n])) { // TODO(ronag): Add checks for non streams. continue } - if (n < streams.length - 1 && !isReadable(streams[n])) { + if ( + n < streams.length - 1 && + !(isReadable(streams[n]) || isReadableStream(streams[n]) || isTransformStream(streams[n])) + ) { throw new ERR_INVALID_ARG_VALUE(`streams[${n}]`, orgStreams[n], 'must be readable') } - if (n > 0 && !isWritable(streams[n])) { + if (n > 0 && !(isWritable(streams[n]) || isWritableStream(streams[n]) || isTransformStream(streams[n]))) { throw new ERR_INVALID_ARG_VALUE(`streams[${n}]`, orgStreams[n], 'must be writable') } } @@ -53,8 +65,8 @@ module.exports = function compose(...streams) { } const head = streams[0] const tail = pipeline(streams, onfinished) - const writable = !!isWritable(head) - const readable = !!isReadable(tail) + const writable = !!(isWritable(head) || isWritableStream(head) || isTransformStream(head)) + const readable = !!(isReadable(tail) || isReadableStream(tail) || isTransformStream(tail)) // TODO(ronag): Avoid double buffering. // Implement Writable/Readable/Duplex traits. @@ -67,25 +79,49 @@ module.exports = function compose(...streams) { readable }) if (writable) { - d._write = function (chunk, encoding, callback) { - if (head.write(chunk, encoding)) { - callback() - } else { - ondrain = callback + if (isNodeStream(head)) { + d._write = function (chunk, encoding, callback) { + if (head.write(chunk, encoding)) { + callback() + } else { + ondrain = callback + } } - } - d._final = function (callback) { - head.end() - onfinish = callback - } - head.on('drain', function () { - if (ondrain) { - const cb = ondrain - ondrain = null - cb() + d._final = function (callback) { + head.end() + onfinish = callback } - }) - tail.on('finish', function () { + head.on('drain', function () { + if (ondrain) { + const cb = ondrain + ondrain = null + cb() + } + }) + } else if (isWebStream(head)) { + const writable = isTransformStream(head) ? head.writable : head + const writer = writable.getWriter() + d._write = async function (chunk, encoding, callback) { + try { + await writer.ready + writer.write(chunk).catch(() => {}) + callback() + } catch (err) { + callback(err) + } + } + d._final = async function (callback) { + try { + await writer.ready + writer.close().catch(() => {}) + onfinish = callback + } catch (err) { + callback(err) + } + } + } + const toRead = isTransformStream(tail) ? tail.readable : tail + eos(toRead, () => { if (onfinish) { const cb = onfinish onfinish = null @@ -94,25 +130,46 @@ module.exports = function compose(...streams) { }) } if (readable) { - tail.on('readable', function () { - if (onreadable) { - const cb = onreadable - onreadable = null - cb() - } - }) - tail.on('end', function () { - d.push(null) - }) - d._read = function () { - while (true) { - const buf = tail.read() - if (buf === null) { - onreadable = d._read - return + if (isNodeStream(tail)) { + tail.on('readable', function () { + if (onreadable) { + const cb = onreadable + onreadable = null + cb() + } + }) + tail.on('end', function () { + d.push(null) + }) + d._read = function () { + while (true) { + const buf = tail.read() + if (buf === null) { + onreadable = d._read + return + } + if (!d.push(buf)) { + return + } } - if (!d.push(buf)) { - return + } + } else if (isWebStream(tail)) { + const readable = isTransformStream(tail) ? tail.readable : tail + const reader = readable.getReader() + d._read = async function () { + while (true) { + try { + const { value, done } = await reader.read() + if (!d.push(value)) { + return + } + if (done) { + d.push(null) + return + } + } catch { + return + } } } } @@ -128,7 +185,9 @@ module.exports = function compose(...streams) { callback(err) } else { onclose = callback - destroyer(tail, err) + if (isNodeStream(tail)) { + destroyer(tail, err) + } } } return d diff --git a/node_modules/readable-stream/lib/internal/streams/destroy.js b/node_modules/readable-stream/lib/internal/streams/destroy.js index 768f2d79d3a89..db76c29f94bab 100644 --- a/node_modules/readable-stream/lib/internal/streams/destroy.js +++ b/node_modules/readable-stream/lib/internal/streams/destroy.js @@ -36,7 +36,7 @@ function destroy(err, cb) { const w = this._writableState // With duplex streams we use the writable side for state. const s = w || r - if ((w && w.destroyed) || (r && r.destroyed)) { + if ((w !== null && w !== undefined && w.destroyed) || (r !== null && r !== undefined && r.destroyed)) { if (typeof cb === 'function') { cb() } @@ -107,14 +107,14 @@ function emitCloseNT(self) { if (r) { r.closeEmitted = true } - if ((w && w.emitClose) || (r && r.emitClose)) { + if ((w !== null && w !== undefined && w.emitClose) || (r !== null && r !== undefined && r.emitClose)) { self.emit('close') } } function emitErrorNT(self, err) { const r = self._readableState const w = self._writableState - if ((w && w.errorEmitted) || (r && r.errorEmitted)) { + if ((w !== null && w !== undefined && w.errorEmitted) || (r !== null && r !== undefined && r.errorEmitted)) { return } if (w) { @@ -162,10 +162,11 @@ function errorOrDestroy(stream, err, sync) { const r = stream._readableState const w = stream._writableState - if ((w && w.destroyed) || (r && r.destroyed)) { + if ((w !== null && w !== undefined && w.destroyed) || (r !== null && r !== undefined && r.destroyed)) { return this } - if ((r && r.autoDestroy) || (w && w.autoDestroy)) stream.destroy(err) + if ((r !== null && r !== undefined && r.autoDestroy) || (w !== null && w !== undefined && w.autoDestroy)) + stream.destroy(err) else if (err) { // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 err.stack // eslint-disable-line no-unused-expressions @@ -228,16 +229,18 @@ function constructNT(stream) { } } try { - stream._construct(onConstruct) + stream._construct((err) => { + process.nextTick(onConstruct, err) + }) } catch (err) { - onConstruct(err) + process.nextTick(onConstruct, err) } } function emitConstructNT(stream) { stream.emit(kConstruct) } function isRequest(stream) { - return stream && stream.setHeader && typeof stream.abort === 'function' + return (stream === null || stream === undefined ? undefined : stream.setHeader) && typeof stream.abort === 'function' } function emitCloseLegacy(stream) { stream.emit('close') diff --git a/node_modules/readable-stream/lib/internal/streams/duplexify.js b/node_modules/readable-stream/lib/internal/streams/duplexify.js index 43300ddc8a45b..599fb47ab53c2 100644 --- a/node_modules/readable-stream/lib/internal/streams/duplexify.js +++ b/node_modules/readable-stream/lib/internal/streams/duplexify.js @@ -282,8 +282,6 @@ function _duplexify(pair) { cb(err) } else if (err) { d.destroy(err) - } else if (!readable && !writable) { - d.destroy() } } diff --git a/node_modules/readable-stream/lib/internal/streams/end-of-stream.js b/node_modules/readable-stream/lib/internal/streams/end-of-stream.js index 57dbaa48a3ca5..043c9c4bdac51 100644 --- a/node_modules/readable-stream/lib/internal/streams/end-of-stream.js +++ b/node_modules/readable-stream/lib/internal/streams/end-of-stream.js @@ -10,20 +10,23 @@ const process = require('process/') const { AbortError, codes } = require('../../ours/errors') const { ERR_INVALID_ARG_TYPE, ERR_STREAM_PREMATURE_CLOSE } = codes const { kEmptyObject, once } = require('../../ours/util') -const { validateAbortSignal, validateFunction, validateObject } = require('../validators') -const { Promise } = require('../../ours/primordials') +const { validateAbortSignal, validateFunction, validateObject, validateBoolean } = require('../validators') +const { Promise, PromisePrototypeThen } = require('../../ours/primordials') const { isClosed, isReadable, isReadableNodeStream, + isReadableStream, isReadableFinished, isReadableErrored, isWritable, isWritableNodeStream, + isWritableStream, isWritableFinished, isWritableErrored, isNodeStream, - willEmitClose: _willEmitClose + willEmitClose: _willEmitClose, + kIsClosedPromise } = require('./utils') function isRequest(stream) { return stream.setHeader && typeof stream.abort === 'function' @@ -42,6 +45,12 @@ function eos(stream, options, callback) { validateFunction(callback, 'callback') validateAbortSignal(options.signal, 'options.signal') callback = once(callback) + if (isReadableStream(stream) || isWritableStream(stream)) { + return eosWeb(stream, options, callback) + } + if (!isNodeStream(stream)) { + throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream) + } const readable = (_options$readable = options.readable) !== null && _options$readable !== undefined ? _options$readable @@ -50,10 +59,6 @@ function eos(stream, options, callback) { (_options$writable = options.writable) !== null && _options$writable !== undefined ? _options$writable : isWritableNodeStream(stream) - if (!isNodeStream(stream)) { - // TODO: Webstreams. - throw new ERR_INVALID_ARG_TYPE('stream', 'Stream', stream) - } const wState = stream._writableState const rState = stream._readableState const onlegacyfinish = () => { @@ -117,6 +122,14 @@ function eos(stream, options, callback) { } callback.call(stream) } + const onclosed = () => { + closed = true + const errored = isWritableErrored(stream) || isReadableErrored(stream) + if (errored && typeof errored !== 'boolean') { + return callback.call(stream, errored) + } + callback.call(stream) + } const onrequest = () => { stream.req.on('finish', onfinish) } @@ -153,22 +166,22 @@ function eos(stream, options, callback) { (rState !== null && rState !== undefined && rState.errorEmitted) ) { if (!willEmitClose) { - process.nextTick(onclose) + process.nextTick(onclosed) } } else if ( !readable && (!willEmitClose || isReadable(stream)) && (writableFinished || isWritable(stream) === false) ) { - process.nextTick(onclose) + process.nextTick(onclosed) } else if ( !writable && (!willEmitClose || isWritable(stream)) && (readableFinished || isReadable(stream) === false) ) { - process.nextTick(onclose) + process.nextTick(onclosed) } else if (rState && stream.req && stream.aborted) { - process.nextTick(onclose) + process.nextTick(onclosed) } const cleanup = () => { callback = nop @@ -209,9 +222,53 @@ function eos(stream, options, callback) { } return cleanup } +function eosWeb(stream, options, callback) { + let isAborted = false + let abort = nop + if (options.signal) { + abort = () => { + isAborted = true + callback.call( + stream, + new AbortError(undefined, { + cause: options.signal.reason + }) + ) + } + if (options.signal.aborted) { + process.nextTick(abort) + } else { + const originalCallback = callback + callback = once((...args) => { + options.signal.removeEventListener('abort', abort) + originalCallback.apply(stream, args) + }) + options.signal.addEventListener('abort', abort) + } + } + const resolverFn = (...args) => { + if (!isAborted) { + process.nextTick(() => callback.apply(stream, args)) + } + } + PromisePrototypeThen(stream[kIsClosedPromise].promise, resolverFn, resolverFn) + return nop +} function finished(stream, opts) { + var _opts + let autoCleanup = false + if (opts === null) { + opts = kEmptyObject + } + if ((_opts = opts) !== null && _opts !== undefined && _opts.cleanup) { + validateBoolean(opts.cleanup, 'cleanup') + autoCleanup = opts.cleanup + } return new Promise((resolve, reject) => { - eos(stream, opts, (err) => { + const cleanup = eos(stream, opts, (err) => { + if (autoCleanup) { + cleanup() + } if (err) { reject(err) } else { diff --git a/node_modules/readable-stream/lib/internal/streams/operators.js b/node_modules/readable-stream/lib/internal/streams/operators.js index 323a74a17c32e..869cacb39faca 100644 --- a/node_modules/readable-stream/lib/internal/streams/operators.js +++ b/node_modules/readable-stream/lib/internal/streams/operators.js @@ -2,12 +2,15 @@ const AbortController = globalThis.AbortController || require('abort-controller').AbortController const { - codes: { ERR_INVALID_ARG_TYPE, ERR_MISSING_ARGS, ERR_OUT_OF_RANGE }, + codes: { ERR_INVALID_ARG_VALUE, ERR_INVALID_ARG_TYPE, ERR_MISSING_ARGS, ERR_OUT_OF_RANGE }, AbortError } = require('../../ours/errors') const { validateAbortSignal, validateInteger, validateObject } = require('../validators') const kWeakHandler = require('../../ours/primordials').Symbol('kWeak') const { finished } = require('./end-of-stream') +const staticCompose = require('./compose') +const { addAbortSignalNoValidate } = require('./add-abort-signal') +const { isWritable, isNodeStream } = require('./utils') const { ArrayPrototypePush, MathFloor, @@ -20,6 +23,23 @@ const { } = require('../../ours/primordials') const kEmpty = Symbol('kEmpty') const kEof = Symbol('kEof') +function compose(stream, options) { + if (options != null) { + validateObject(options, 'options') + } + if ((options === null || options === undefined ? undefined : options.signal) != null) { + validateAbortSignal(options.signal, 'options.signal') + } + if (isNodeStream(stream) && !isWritable(stream)) { + throw new ERR_INVALID_ARG_VALUE('stream', stream, 'must be writable') + } + const composedStream = staticCompose(this, stream) + if (options !== null && options !== undefined && options.signal) { + // Not validating as we already validated before + addAbortSignalNoValidate(options.signal, composedStream) + } + return composedStream +} function map(fn, options) { if (typeof fn !== 'function') { throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn) @@ -424,7 +444,8 @@ module.exports.streamReturningOperators = { filter, flatMap, map, - take + take, + compose } module.exports.promiseReturningOperators = { every, diff --git a/node_modules/readable-stream/lib/internal/streams/pipeline.js b/node_modules/readable-stream/lib/internal/streams/pipeline.js index 016e96ee6ff24..8393ba5146991 100644 --- a/node_modules/readable-stream/lib/internal/streams/pipeline.js +++ b/node_modules/readable-stream/lib/internal/streams/pipeline.js @@ -24,7 +24,16 @@ const { AbortError } = require('../../ours/errors') const { validateFunction, validateAbortSignal } = require('../validators') -const { isIterable, isReadable, isReadableNodeStream, isNodeStream } = require('./utils') +const { + isIterable, + isReadable, + isReadableNodeStream, + isNodeStream, + isTransformStream, + isWebStream, + isReadableStream, + isReadableEnded +} = require('./utils') const AbortController = globalThis.AbortController || require('abort-controller').AbortController let PassThrough let Readable @@ -74,7 +83,7 @@ async function* fromReadable(val) { } yield* Readable.prototype[SymbolAsyncIterator].call(val) } -async function pump(iterable, writable, finish, { end }) { +async function pumpToNode(iterable, writable, finish, { end }) { let error let onresolve = null const resume = (err) => { @@ -130,6 +139,31 @@ async function pump(iterable, writable, finish, { end }) { writable.off('drain', resume) } } +async function pumpToWeb(readable, writable, finish, { end }) { + if (isTransformStream(writable)) { + writable = writable.writable + } + // https://streams.spec.whatwg.org/#example-manual-write-with-backpressure + const writer = writable.getWriter() + try { + for await (const chunk of readable) { + await writer.ready + writer.write(chunk).catch(() => {}) + } + await writer.ready + if (end) { + await writer.close() + } + finish() + } catch (err) { + try { + await writer.abort(err) + finish(err) + } catch (err) { + finish(err) + } + } +} function pipeline(...streams) { return pipelineImpl(streams, once(popCallback(streams))) } @@ -215,13 +249,18 @@ function pipelineImpl(streams, callback, opts) { if (!isIterable(ret)) { throw new ERR_INVALID_RETURN_VALUE('Iterable, AsyncIterable or Stream', 'source', ret) } - } else if (isIterable(stream) || isReadableNodeStream(stream)) { + } else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) { ret = stream } else { ret = Duplex.from(stream) } } else if (typeof stream === 'function') { - ret = makeAsyncIterable(ret) + if (isTransformStream(ret)) { + var _ret + ret = makeAsyncIterable((_ret = ret) === null || _ret === undefined ? undefined : _ret.readable) + } else { + ret = makeAsyncIterable(ret) + } ret = stream(ret, { signal }) @@ -230,7 +269,7 @@ function pipelineImpl(streams, callback, opts) { throw new ERR_INVALID_RETURN_VALUE('AsyncIterable', `transform[${i - 1}]`, ret) } } else { - var _ret + var _ret2 if (!PassThrough) { PassThrough = require('./passthrough') } @@ -246,7 +285,7 @@ function pipelineImpl(streams, callback, opts) { // Handle Promises/A+ spec, `then` could be a getter that throws on // second use. - const then = (_ret = ret) === null || _ret === undefined ? undefined : _ret.then + const then = (_ret2 = ret) === null || _ret2 === undefined ? undefined : _ret2.then if (typeof then === 'function') { finishCount++ then.call( @@ -268,7 +307,13 @@ function pipelineImpl(streams, callback, opts) { ) } else if (isIterable(ret, true)) { finishCount++ - pump(ret, pt, finish, { + pumpToNode(ret, pt, finish, { + end + }) + } else if (isReadableStream(ret) || isTransformStream(ret)) { + const toRead = ret.readable || ret + finishCount++ + pumpToNode(toRead, pt, finish, { end }) } else { @@ -290,13 +335,47 @@ function pipelineImpl(streams, callback, opts) { if (isReadable(stream) && isLastStream) { lastStreamCleanup.push(cleanup) } + } else if (isTransformStream(ret) || isReadableStream(ret)) { + const toRead = ret.readable || ret + finishCount++ + pumpToNode(toRead, stream, finish, { + end + }) } else if (isIterable(ret)) { finishCount++ - pump(ret, stream, finish, { + pumpToNode(ret, stream, finish, { end }) } else { - throw new ERR_INVALID_ARG_TYPE('val', ['Readable', 'Iterable', 'AsyncIterable'], ret) + throw new ERR_INVALID_ARG_TYPE( + 'val', + ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], + ret + ) + } + ret = stream + } else if (isWebStream(stream)) { + if (isReadableNodeStream(ret)) { + finishCount++ + pumpToWeb(makeAsyncIterable(ret), stream, finish, { + end + }) + } else if (isReadableStream(ret) || isIterable(ret)) { + finishCount++ + pumpToWeb(ret, stream, finish, { + end + }) + } else if (isTransformStream(ret)) { + finishCount++ + pumpToWeb(ret.readable, stream, finish, { + end + }) + } else { + throw new ERR_INVALID_ARG_TYPE( + 'val', + ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], + ret + ) } ret = stream } else { @@ -320,16 +399,24 @@ function pipe(src, dst, finish, { end }) { } }) src.pipe(dst, { - end - }) + end: false + }) // If end is true we already will have a listener to end dst. + if (end) { // Compat. Before node v10.12.0 stdio used to throw an error so // pipe() did/does not end() stdio destinations. // Now they allow it but "secretly" don't close the underlying fd. - src.once('end', () => { + + function endFn() { ended = true dst.end() - }) + } + if (isReadableEnded(src)) { + // End the destination if the source has already ended. + process.nextTick(endFn) + } else { + src.once('end', endFn) + } } else { finish() } diff --git a/node_modules/readable-stream/lib/internal/streams/utils.js b/node_modules/readable-stream/lib/internal/streams/utils.js index f87e9fe68e6a8..e589ad96c6924 100644 --- a/node_modules/readable-stream/lib/internal/streams/utils.js +++ b/node_modules/readable-stream/lib/internal/streams/utils.js @@ -1,10 +1,12 @@ 'use strict' -const { Symbol, SymbolAsyncIterator, SymbolIterator } = require('../../ours/primordials') +const { Symbol, SymbolAsyncIterator, SymbolIterator, SymbolFor } = require('../../ours/primordials') const kDestroyed = Symbol('kDestroyed') const kIsErrored = Symbol('kIsErrored') const kIsReadable = Symbol('kIsReadable') const kIsDisturbed = Symbol('kIsDisturbed') +const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise') +const kControllerErrorFunction = SymbolFor('nodejs.webstream.controllerErrorFunction') function isReadableNodeStream(obj, strict = false) { var _obj$_readableState return !!( @@ -56,6 +58,24 @@ function isNodeStream(obj) { (typeof obj.pipe === 'function' && typeof obj.on === 'function')) ) } +function isReadableStream(obj) { + return !!( + obj && + !isNodeStream(obj) && + typeof obj.pipeThrough === 'function' && + typeof obj.getReader === 'function' && + typeof obj.cancel === 'function' + ) +} +function isWritableStream(obj) { + return !!(obj && !isNodeStream(obj) && typeof obj.getWriter === 'function' && typeof obj.abort === 'function') +} +function isTransformStream(obj) { + return !!(obj && !isNodeStream(obj) && typeof obj.readable === 'object' && typeof obj.writable === 'object') +} +function isWebStream(obj) { + return isReadableStream(obj) || isWritableStream(obj) || isTransformStream(obj) +} function isIterable(obj, isAsync) { if (obj == null) return false if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function' @@ -274,22 +294,28 @@ module.exports = { kIsErrored, isReadable, kIsReadable, + kIsClosedPromise, + kControllerErrorFunction, isClosed, isDestroyed, isDuplexNodeStream, isFinished, isIterable, isReadableNodeStream, + isReadableStream, isReadableEnded, isReadableFinished, isReadableErrored, isNodeStream, + isWebStream, isWritable, isWritableNodeStream, + isWritableStream, isWritableEnded, isWritableFinished, isWritableErrored, isServerRequest, isServerResponse, - willEmitClose + willEmitClose, + isTransformStream } diff --git a/node_modules/readable-stream/lib/internal/validators.js b/node_modules/readable-stream/lib/internal/validators.js index f9e6e555971a1..85b2e9cd593d9 100644 --- a/node_modules/readable-stream/lib/internal/validators.js +++ b/node_modules/readable-stream/lib/internal/validators.js @@ -1,3 +1,5 @@ +/* eslint jsdoc/require-jsdoc: "error" */ + 'use strict' const { @@ -199,6 +201,13 @@ const validateOneOf = hideStackFrames((value, name, oneOf) => { function validateBoolean(value, name) { if (typeof value !== 'boolean') throw new ERR_INVALID_ARG_TYPE(name, 'boolean', value) } + +/** + * @param {any} options + * @param {string} key + * @param {boolean} defaultValue + * @returns {boolean} + */ function getOwnPropertyValueOrDefault(options, key, defaultValue) { return options == null || !ObjectPrototypeHasOwnProperty(options, key) ? defaultValue : options[key] } @@ -228,6 +237,24 @@ const validateObject = hideStackFrames((value, name, options = null) => { } }) +/** + * @callback validateDictionary - We are using the Web IDL Standard definition + * of "dictionary" here, which means any value + * whose Type is either Undefined, Null, or + * Object (which includes functions). + * @param {*} value + * @param {string} name + * @see https://webidl.spec.whatwg.org/#es-dictionary + * @see https://tc39.es/ecma262/#table-typeof-operator-results + */ + +/** @type {validateDictionary} */ +const validateDictionary = hideStackFrames((value, name) => { + if (value != null && typeof value !== 'object' && typeof value !== 'function') { + throw new ERR_INVALID_ARG_TYPE(name, 'a dictionary', value) + } +}) + /** * @callback validateArray * @param {*} value @@ -247,7 +274,36 @@ const validateArray = hideStackFrames((value, name, minLength = 0) => { } }) -// eslint-disable-next-line jsdoc/require-returns-check +/** + * @callback validateStringArray + * @param {*} value + * @param {string} name + * @returns {asserts value is string[]} + */ + +/** @type {validateStringArray} */ +function validateStringArray(value, name) { + validateArray(value, name) + for (let i = 0; i < value.length; i++) { + validateString(value[i], `${name}[${i}]`) + } +} + +/** + * @callback validateBooleanArray + * @param {*} value + * @param {string} name + * @returns {asserts value is boolean[]} + */ + +/** @type {validateBooleanArray} */ +function validateBooleanArray(value, name) { + validateArray(value, name) + for (let i = 0; i < value.length; i++) { + validateBoolean(value[i], `${name}[${i}]`) + } +} + /** * @param {*} signal * @param {string} [name='signal'] @@ -370,13 +426,71 @@ function validateUnion(value, name, union) { throw new ERR_INVALID_ARG_TYPE(name, `('${ArrayPrototypeJoin(union, '|')}')`, value) } } + +/* + The rules for the Link header field are described here: + https://www.rfc-editor.org/rfc/rfc8288.html#section-3 + + This regex validates any string surrounded by angle brackets + (not necessarily a valid URI reference) followed by zero or more + link-params separated by semicolons. +*/ +const linkValueRegExp = /^(?:<[^>]*>)(?:\s*;\s*[^;"\s]+(?:=(")?[^;"\s]*\1)?)*$/ + +/** + * @param {any} value + * @param {string} name + */ +function validateLinkHeaderFormat(value, name) { + if (typeof value === 'undefined' || !RegExpPrototypeExec(linkValueRegExp, value)) { + throw new ERR_INVALID_ARG_VALUE( + name, + value, + 'must be an array or string of format "; rel=preload; as=style"' + ) + } +} + +/** + * @param {any} hints + * @return {string} + */ +function validateLinkHeaderValue(hints) { + if (typeof hints === 'string') { + validateLinkHeaderFormat(hints, 'hints') + return hints + } else if (ArrayIsArray(hints)) { + const hintsLength = hints.length + let result = '' + if (hintsLength === 0) { + return result + } + for (let i = 0; i < hintsLength; i++) { + const link = hints[i] + validateLinkHeaderFormat(link, 'hints') + result += link + if (i !== hintsLength - 1) { + result += ', ' + } + } + return result + } + throw new ERR_INVALID_ARG_VALUE( + 'hints', + hints, + 'must be an array or string of format "; rel=preload; as=style"' + ) +} module.exports = { isInt32, isUint32, parseFileMode, validateArray, + validateStringArray, + validateBooleanArray, validateBoolean, validateBuffer, + validateDictionary, validateEncoding, validateFunction, validateInt32, @@ -391,5 +505,6 @@ module.exports = { validateUint32, validateUndefined, validateUnion, - validateAbortSignal + validateAbortSignal, + validateLinkHeaderValue } diff --git a/node_modules/readable-stream/lib/ours/primordials.js b/node_modules/readable-stream/lib/ours/primordials.js index 6a98b01681caf..9464cc7fea6a1 100644 --- a/node_modules/readable-stream/lib/ours/primordials.js +++ b/node_modules/readable-stream/lib/ours/primordials.js @@ -90,6 +90,7 @@ module.exports = { return self.trim() }, Symbol, + SymbolFor: Symbol.for, SymbolAsyncIterator: Symbol.asyncIterator, SymbolHasInstance: Symbol.hasInstance, SymbolIterator: Symbol.iterator, diff --git a/node_modules/readable-stream/lib/stream/promises.js b/node_modules/readable-stream/lib/stream/promises.js index d44dd8ad0e0f3..b85c51f47f1ce 100644 --- a/node_modules/readable-stream/lib/stream/promises.js +++ b/node_modules/readable-stream/lib/stream/promises.js @@ -1,15 +1,22 @@ 'use strict' const { ArrayPrototypePop, Promise } = require('../ours/primordials') -const { isIterable, isNodeStream } = require('../internal/streams/utils') +const { isIterable, isNodeStream, isWebStream } = require('../internal/streams/utils') const { pipelineImpl: pl } = require('../internal/streams/pipeline') const { finished } = require('../internal/streams/end-of-stream') +require('stream') function pipeline(...streams) { return new Promise((resolve, reject) => { let signal let end const lastArg = streams[streams.length - 1] - if (lastArg && typeof lastArg === 'object' && !isNodeStream(lastArg) && !isIterable(lastArg)) { + if ( + lastArg && + typeof lastArg === 'object' && + !isNodeStream(lastArg) && + !isIterable(lastArg) && + !isWebStream(lastArg) + ) { const options = ArrayPrototypePop(streams) signal = options.signal end = options.end diff --git a/node_modules/readable-stream/package.json b/node_modules/readable-stream/package.json index 7df83d9eb990a..c4f6504cc7cc6 100644 --- a/node_modules/readable-stream/package.json +++ b/node_modules/readable-stream/package.json @@ -1,6 +1,6 @@ { "name": "readable-stream", - "version": "4.3.0", + "version": "4.4.0", "description": "Node.js Streams, a user-land copy of the stream library from Node.js", "homepage": "https://github.com/nodejs/readable-stream", "license": "MIT", diff --git a/package-lock.json b/package-lock.json index 6086216ff2f11..2fa7b733d2736 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10777,9 +10777,9 @@ } }, "node_modules/readable-stream": { - "version": "4.3.0", - "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-4.3.0.tgz", - "integrity": "sha512-MuEnA0lbSi7JS8XM+WNJlWZkHAAdm7gETHdFK//Q/mChGyj2akEFtdLZh32jSdkWGbRwCW9pn6g3LWDdDeZnBQ==", + "version": "4.4.0", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-4.4.0.tgz", + "integrity": "sha512-kDMOq0qLtxV9f/SQv522h8cxZBqNZXuXNyjyezmfAAuribMyVXziljpQ/uQhfE1XLg2/TLTW2DsnoE4VAi/krg==", "inBundle": true, "dependencies": { "abort-controller": "^3.0.0",