From bb275ef2a4105c3a66920f64d32c5a024a14921f Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 7 Jul 2021 11:33:55 +0200 Subject: [PATCH] stream: unify stream utils Unify stream helps into utils. PR-URL: https://github.com/nodejs/node/pull/39294 Reviewed-By: Matteo Collina Reviewed-By: Benjamin Gruenbaum Reviewed-By: James M Snell --- lib/_http_client.js | 2 - lib/_http_incoming.js | 6 - lib/internal/streams/add-abort-signal.js | 4 +- lib/internal/streams/destroy.js | 29 ++-- lib/internal/streams/end-of-stream.js | 90 ++++------- lib/internal/streams/pipeline.js | 14 +- lib/internal/streams/utils.js | 192 +++++++++++++++++++++-- lib/stream/promises.js | 4 +- test/parallel/test-stream-finished.js | 3 +- test/parallel/test-stream-pipeline.js | 2 +- 10 files changed, 235 insertions(+), 111 deletions(-) diff --git a/lib/_http_client.js b/lib/_http_client.js index 598b585bcfa383..280c6ebab76073 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -53,7 +53,6 @@ const { prepareError, } = require('_http_common'); const { OutgoingMessage } = require('_http_outgoing'); -const { kDestroy } = require('internal/streams/destroy'); const Agent = require('_http_agent'); const { Buffer } = require('buffer'); const { defaultTriggerAsyncIdScope } = require('internal/async_hooks'); @@ -610,7 +609,6 @@ function parserOnIncomingClient(res, shouldKeepAlive) { DTRACE_HTTP_CLIENT_RESPONSE(socket, req); req.res = res; res.req = req; - res[kDestroy] = null; // Add our listener first, so that we guarantee socket cleanup res.on('end', responseOnEnd); diff --git a/lib/_http_incoming.js b/lib/_http_incoming.js index a92687ce37bfbc..31b7db6f6c7c99 100644 --- a/lib/_http_incoming.js +++ b/lib/_http_incoming.js @@ -31,7 +31,6 @@ const { } = primordials; const { Readable, finished } = require('stream'); -const { kDestroy } = require('internal/streams/destroy'); const kHeaders = Symbol('kHeaders'); const kHeadersCount = Symbol('kHeadersCount'); @@ -199,11 +198,6 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) { } }; -IncomingMessage.prototype[kDestroy] = function(err) { - this.socket = null; - this.destroy(err); -}; - IncomingMessage.prototype._addHeaderLines = _addHeaderLines; function _addHeaderLines(headers, n) { if (headers && headers.length) { diff --git a/lib/internal/streams/add-abort-signal.js b/lib/internal/streams/add-abort-signal.js index ba0da5e8bc4ac2..80814f0936782d 100644 --- a/lib/internal/streams/add-abort-signal.js +++ b/lib/internal/streams/add-abort-signal.js @@ -18,13 +18,13 @@ const validateAbortSignal = (signal, name) => { } }; -function isStream(obj) { +function isNodeStream(obj) { return !!(obj && typeof obj.pipe === 'function'); } module.exports.addAbortSignal = function addAbortSignal(signal, stream) { validateAbortSignal(signal, 'signal'); - if (!isStream(stream)) { + if (!isNodeStream(stream)) { throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream); } return module.exports.addAbortSignalNoValidate(signal, stream); diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index df2d9f7f71987a..dd81fdacd5d93d 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -10,6 +10,12 @@ const { const { Symbol, } = primordials; +const { + kDestroyed, + isDestroyed, + isFinished, + isServerRequest +} = require('internal/streams/utils'); const kDestroy = Symbol('kDestroy'); const kConstruct = Symbol('kConstruct'); @@ -364,8 +370,6 @@ function isRequest(stream) { return stream && stream.setHeader && typeof stream.abort === 'function'; } -const kDestroyed = Symbol('kDestroyed'); - function emitCloseLegacy(stream) { stream.emit('close'); } @@ -375,31 +379,20 @@ function emitErrorCloseLegacy(stream, err) { process.nextTick(emitCloseLegacy, stream); } -function isDestroyed(stream) { - return stream.destroyed || stream[kDestroyed]; -} - -function isReadable(stream) { - return stream.readable && !stream.readableEnded && !isDestroyed(stream); -} - -function isWritable(stream) { - return stream.writable && !stream.writableEnded && !isDestroyed(stream); -} - // Normalize destroy for legacy. function destroyer(stream, err) { if (isDestroyed(stream)) { return; } - if (!err && (isReadable(stream) || isWritable(stream))) { + if (!err && !isFinished(stream)) { err = new AbortError(); } // TODO: Remove isRequest branches. - if (typeof stream[kDestroy] === 'function') { - stream[kDestroy](err); + if (isServerRequest(stream)) { + stream.socket = null; + stream.destroy(err); } else if (isRequest(stream)) { stream.abort(); } else if (isRequest(stream.req)) { @@ -421,8 +414,6 @@ function destroyer(stream, err) { } module.exports = { - kDestroy, - isDestroyed, construct, destroyer, destroy, diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index efc2441c51ee39..274c2796edd443 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -17,48 +17,23 @@ const { validateObject, } = require('internal/validators'); +const { + isClosed, + isReadable, + isReadableNodeStream, + isReadableFinished, + isWritable, + isWritableNodeStream, + isWritableFinished, + willEmitClose: _willEmitClose, +} = require('internal/streams/utils'); + function isRequest(stream) { return stream.setHeader && typeof stream.abort === 'function'; } -function isServerResponse(stream) { - return ( - typeof stream._sent100 === 'boolean' && - typeof stream._removedConnection === 'boolean' && - typeof stream._removedContLen === 'boolean' && - typeof stream._removedTE === 'boolean' && - typeof stream._closed === 'boolean' - ); -} - -function isReadable(stream) { - return typeof stream.readable === 'boolean' || - typeof stream.readableEnded === 'boolean' || - !!stream._readableState; -} - -function isWritable(stream) { - return typeof stream.writable === 'boolean' || - typeof stream.writableEnded === 'boolean' || - !!stream._writableState; -} - -function isWritableFinished(stream) { - if (stream.writableFinished) return true; - const wState = stream._writableState; - if (!wState || wState.errored) return false; - return wState.finished || (wState.ended && wState.length === 0); -} - const nop = () => {}; -function isReadableEnded(stream) { - if (stream.readableEnded) return true; - const rState = stream._readableState; - if (!rState || rState.errored) return false; - return rState.endEmitted || (rState.ended && rState.length === 0); -} - function eos(stream, options, callback) { if (arguments.length === 2) { callback = options; @@ -74,13 +49,12 @@ function eos(stream, options, callback) { callback = once(callback); const readable = options.readable || - (options.readable !== false && isReadable(stream)); + (options.readable !== false && isReadableNodeStream(stream)); const writable = options.writable || - (options.writable !== false && isWritable(stream)); + (options.writable !== false && isWritableNodeStream(stream)); const wState = stream._writableState; const rState = stream._readableState; - const state = wState || rState; const onlegacyfinish = () => { if (!stream.writable) onfinish(); @@ -89,16 +63,13 @@ function eos(stream, options, callback) { // TODO (ronag): Improve soft detection to include core modules and // common ecosystem modules that do properly emit 'close' but fail // this generic check. - let willEmitClose = isServerResponse(stream) || ( - state && - state.autoDestroy && - state.emitClose && - state.closed === false && - isReadable(stream) === readable && - isWritable(stream) === writable + let willEmitClose = ( + _willEmitClose(stream) && + isReadableNodeStream(stream) === readable && + isWritableNodeStream(stream) === writable ); - let writableFinished = stream.writableFinished || wState?.finished; + let writableFinished = isWritableFinished(stream, false); const onfinish = () => { writableFinished = true; // Stream should not be destroyed here. If it is that @@ -107,12 +78,12 @@ function eos(stream, options, callback) { if (stream.destroyed) willEmitClose = false; if (willEmitClose && (!stream.readable || readable)) return; - if (!readable || readableEnded) callback.call(stream); + if (!readable || readableFinished) callback.call(stream); }; - let readableEnded = stream.readableEnded || rState?.endEmitted; + let readableFinished = isReadableFinished(stream, false); const onend = () => { - readableEnded = true; + readableFinished = true; // Stream should not be destroyed here. If it is that // means that user space is doing something differently and // we cannot trust willEmitClose. @@ -126,7 +97,7 @@ function eos(stream, options, callback) { callback.call(stream, err); }; - let closed = wState?.closed || rState?.closed; + let closed = isClosed(stream); const onclose = () => { closed = true; @@ -137,13 +108,13 @@ function eos(stream, options, callback) { return callback.call(stream, errored); } - if (readable && !readableEnded) { - if (!isReadableEnded(stream)) + if (readable && !readableFinished) { + if (!isReadableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); } if (writable && !writableFinished) { - if (!isWritableFinished(stream)) + if (!isWritableFinished(stream, false)) return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); } @@ -185,19 +156,16 @@ function eos(stream, options, callback) { } } else if ( !readable && - (!willEmitClose || stream.readable) && - writableFinished + (!willEmitClose || isReadable(stream)) && + (writableFinished || !isWritable(stream)) ) { process.nextTick(onclose); } else if ( !writable && - (!willEmitClose || stream.writable) && - readableEnded + (!willEmitClose || isWritable(stream)) && + (readableFinished || !isReadable(stream)) ) { process.nextTick(onclose); - } else if (!wState && !rState && stream._closed === true) { - // _closed is for OutgoingMessage which is not a proper Writable. - process.nextTick(onclose); } else if ((rState && stream.req && stream.aborted)) { process.nextTick(onclose); } diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 5759dbd4a580a3..c98b3b3d21b633 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -27,8 +27,8 @@ const { validateCallback } = require('internal/validators'); const { isIterable, - isReadable, - isStream, + isReadableNodeStream, + isNodeStream, } = require('internal/streams/utils'); let PassThrough; @@ -87,7 +87,7 @@ function popCallback(streams) { function makeAsyncIterable(val) { if (isIterable(val)) { return val; - } else if (isReadable(val)) { + } else if (isReadableNodeStream(val)) { // Legacy streams are not Iterable. return fromReadable(val); } @@ -204,7 +204,7 @@ function pipeline(...streams) { const reading = i < streams.length - 1; const writing = i > 0; - if (isStream(stream)) { + if (isNodeStream(stream)) { finishCount++; destroys.push(destroyer(stream, reading, writing, finish)); } @@ -216,7 +216,7 @@ function pipeline(...streams) { throw new ERR_INVALID_RETURN_VALUE( 'Iterable, AsyncIterable or Stream', 'source', ret); } - } else if (isIterable(stream) || isReadable(stream)) { + } else if (isIterable(stream) || isReadableNodeStream(stream)) { ret = stream; } else { throw new ERR_INVALID_ARG_TYPE( @@ -271,8 +271,8 @@ function pipeline(...streams) { finishCount++; destroys.push(destroyer(ret, false, true, finish)); } - } else if (isStream(stream)) { - if (isReadable(ret)) { + } else if (isNodeStream(stream)) { + if (isReadableNodeStream(ret)) { ret.pipe(stream); // Compat. Before node v10.12.0 stdio used to throw an error so diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 62eea022685e18..0cd9c8eb36062f 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -1,22 +1,34 @@ 'use strict'; const { + Symbol, SymbolAsyncIterator, SymbolIterator, } = primordials; -function isReadable(obj) { - return !!(obj && typeof obj.pipe === 'function' && - typeof obj.on === 'function'); +const kDestroyed = Symbol('kDestroyed'); + +function isReadableNodeStream(obj) { + return !!( + obj && + typeof obj.pipe === 'function' && + typeof obj.on === 'function' && + (!obj._writableState || obj._readableState?.readable !== false) && // Duplex + (!obj._writableState || obj._readableState) // Writable has .pipe. + ); } -function isWritable(obj) { - return !!(obj && typeof obj.write === 'function' && - typeof obj.on === 'function'); +function isWritableNodeStream(obj) { + return !!( + obj && + typeof obj.write === 'function' && + typeof obj.on === 'function' && + (!obj._readableState || obj._writableState?.writable !== false) // Duplex + ); } -function isStream(obj) { - return isReadable(obj) || isWritable(obj); +function isNodeStream(obj) { + return isReadableNodeStream(obj) || isWritableNodeStream(obj); } function isIterable(obj, isAsync) { @@ -27,8 +39,170 @@ function isIterable(obj, isAsync) { typeof obj[SymbolIterator] === 'function'; } +function isDestroyed(stream) { + if (!isNodeStream(stream)) return null; + const wState = stream._writableState; + const rState = stream._readableState; + const state = wState || rState; + return !!(stream.destroyed || stream[kDestroyed] || state?.destroyed); +} + +// Have been end():d. +function isWritableEnded(stream) { + if (!isWritableNodeStream(stream)) return null; + if (stream.writableEnded === true) return true; + const wState = stream._writableState; + if (wState?.errored) return false; + if (typeof wState?.ended !== 'boolean') return null; + return wState.ended; +} + +// Have emitted 'finish'. +function isWritableFinished(stream, strict) { + if (!isWritableNodeStream(stream)) return null; + if (stream.writableFinished === true) return true; + const wState = stream._writableState; + if (wState?.errored) return false; + if (typeof wState?.finished !== 'boolean') return null; + return !!( + wState.finished || + (strict === false && wState.ended === true && wState.length === 0) + ); +} + +// Have been push(null):d. +function isReadableEnded(stream) { + if (!isReadableNodeStream(stream)) return null; + if (stream.readableEnded === true) return true; + const rState = stream._readableState; + if (!rState || rState.errored) return false; + if (typeof rState?.ended !== 'boolean') return null; + return rState.ended; +} + +// Have emitted 'end'. +function isReadableFinished(stream, strict) { + if (!isReadableNodeStream(stream)) return null; + const rState = stream._readableState; + if (rState?.errored) return false; + if (typeof rState?.endEmitted !== 'boolean') return null; + return !!( + rState.endEmitted || + (strict === false && rState.ended === true && rState.length === 0) + ); +} + +function isReadable(stream) { + const r = isReadableNodeStream(stream); + if (r === null || typeof stream.readable !== 'boolean') return null; + if (isDestroyed(stream)) return false; + return r && stream.readable && !isReadableFinished(stream); +} + +function isWritable(stream) { + const r = isWritableNodeStream(stream); + if (r === null || typeof stream.writable !== 'boolean') return null; + if (isDestroyed(stream)) return false; + return r && stream.writable && !isWritableEnded(stream); +} + +function isFinished(stream, opts) { + if (!isNodeStream(stream)) { + return null; + } + + if (isDestroyed(stream)) { + return true; + } + + if (opts?.readable !== false && isReadable(stream)) { + return false; + } + + if (opts?.writable !== false && isWritable(stream)) { + return false; + } + + return true; +} + +function isClosed(stream) { + if (!isNodeStream(stream)) { + return null; + } + + const wState = stream._writableState; + const rState = stream._readableState; + + if ( + typeof wState?.closed === 'boolean' || + typeof rState?.closed === 'boolean' + ) { + return wState?.closed || rState?.closed; + } + + if (typeof stream._closed === 'boolean' && isOutgoingMessage(stream)) { + return stream._closed; + } + + return null; +} + +function isOutgoingMessage(stream) { + return ( + typeof stream._closed === 'boolean' && + typeof stream._defaultKeepAlive === 'boolean' && + typeof stream._removedConnection === 'boolean' && + typeof stream._removedContLen === 'boolean' + ); +} + +function isServerResponse(stream) { + return ( + typeof stream._sent100 === 'boolean' && + isOutgoingMessage(stream) + ); +} + +function isServerRequest(stream) { + return ( + typeof stream._consuming === 'boolean' && + typeof stream._dumped === 'boolean' && + stream.req?.upgradeOrConnect === undefined + ); +} + +function willEmitClose(stream) { + if (!isNodeStream(stream)) return null; + + const wState = stream._writableState; + const rState = stream._readableState; + const state = wState || rState; + + return (!state && isServerResponse(stream)) || !!( + state && + state.autoDestroy && + state.emitClose && + state.closed === false + ); +} + module.exports = { + kDestroyed, + isClosed, + isDestroyed, + isFinished, isIterable, isReadable, - isStream, + isReadableNodeStream, + isReadableEnded, + isReadableFinished, + isNodeStream, + isWritable, + isWritableNodeStream, + isWritableEnded, + isWritableFinished, + isServerRequest, + isServerResponse, + willEmitClose, }; diff --git a/lib/stream/promises.js b/lib/stream/promises.js index f5d873197323e8..8a8e66417c6057 100644 --- a/lib/stream/promises.js +++ b/lib/stream/promises.js @@ -15,7 +15,7 @@ const { const { isIterable, - isStream, + isNodeStream, } = require('internal/streams/utils'); const pl = require('internal/streams/pipeline'); @@ -26,7 +26,7 @@ function pipeline(...streams) { let signal; const lastArg = streams[streams.length - 1]; if (lastArg && typeof lastArg === 'object' && - !isStream(lastArg) && !isIterable(lastArg)) { + !isNodeStream(lastArg) && !isIterable(lastArg)) { const options = ArrayPrototypePop(streams); signal = options.signal; validateAbortSignal(signal, 'options.signal'); diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index 8e371911698336..8ada0c4c348cb7 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -415,7 +415,7 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); d._writableState = {}; d._writableState.finished = true; finished(d, { readable: false, writable: true }, common.mustCall((err) => { - assert.strictEqual(err, undefined); + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); })); d._writableState.errored = true; d.emit('close'); @@ -586,7 +586,6 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); }); } - { const w = new Writable({ write(chunk, encoding, callback) { diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index e2e5fe2e0d561a..1f4474e6b5fce6 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1035,7 +1035,7 @@ const net = require('net'); const dst = new PassThrough(); dst.readable = false; pipeline(src, dst, common.mustSucceed(() => { - assert.strictEqual(dst.destroyed, false); + assert.strictEqual(dst.destroyed, true); })); src.end(); }