From ce003817512d163d2b93a16b6191ccaeef0115b2 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 6 Jul 2021 00:00:56 +0200 Subject: [PATCH] stream: use finished for async iteration PR-URL: https://github.com/nodejs/node/pull/39282 Reviewed-By: Benjamin Gruenbaum Reviewed-By: James M Snell --- lib/internal/errors.js | 2 +- lib/internal/streams/readable.js | 75 +++++++------------ .../test-stream-readable-async-iterators.js | 2 +- 3 files changed, 30 insertions(+), 49 deletions(-) diff --git a/lib/internal/errors.js b/lib/internal/errors.js index fb01b8f6731ff1..30a2a8c0399ba8 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -150,7 +150,7 @@ const maybeOverridePrepareStackTrace = (globalThis, error, trace) => { }; const aggregateTwoErrors = hideStackFrames((innerError, outerError) => { - if (innerError && outerError) { + if (innerError && outerError && innerError !== outerError) { if (ArrayIsArray(outerError.errors)) { // If `outerError` is already an `AggregateError`. ArrayPrototypePush(outerError.errors, innerError); diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 6eb07cdd4a9b14..22a644b3f3383c 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -45,6 +45,7 @@ const { Buffer } = require('buffer'); const { addAbortSignalNoValidate, } = require('internal/streams/add-abort-signal'); +const eos = require('internal/streams/end-of-stream'); let debug = require('internal/util/debuglog').debuglog('stream', (fn) => { debug = fn; @@ -57,12 +58,14 @@ const { } = require('internal/streams/state'); const { - ERR_INVALID_ARG_TYPE, - ERR_METHOD_NOT_IMPLEMENTED, - ERR_STREAM_PREMATURE_CLOSE, - ERR_STREAM_PUSH_AFTER_EOF, - ERR_STREAM_UNSHIFT_AFTER_END_EVENT, -} = require('internal/errors').codes; + aggregateTwoErrors, + codes: { + ERR_INVALID_ARG_TYPE, + ERR_METHOD_NOT_IMPLEMENTED, + ERR_STREAM_PUSH_AFTER_EOF, + ERR_STREAM_UNSHIFT_AFTER_END_EVENT, + } +} = require('internal/errors'); const { validateObject } = require('internal/validators'); const kPaused = Symbol('kPaused'); @@ -1090,12 +1093,6 @@ function streamToAsyncIterator(stream, options) { async function* createAsyncIterator(stream, options) { let callback = nop; - const opts = { - destroyOnReturn: true, - destroyOnError: true, - ...options, - }; - function next(resolve) { if (this === stream) { callback(); @@ -1105,54 +1102,38 @@ async function* createAsyncIterator(stream, options) { } } - const state = stream._readableState; + stream.on('readable', next); + + let error; + eos(stream, { writable: false }, (err) => { + error = err ? aggregateTwoErrors(error, err) : null; + callback(); + callback = nop; + }); - let error = state.errored; - let errorEmitted = state.errorEmitted; - let endEmitted = state.endEmitted; - let closeEmitted = state.closeEmitted; - - stream - .on('readable', next) - .on('error', function(err) { - error = err; - errorEmitted = true; - next.call(this); - }) - .on('end', function() { - endEmitted = true; - next.call(this); - }) - .on('close', function() { - closeEmitted = true; - next.call(this); - }); - - let errorThrown = false; try { while (true) { const chunk = stream.destroyed ? null : stream.read(); if (chunk !== null) { yield chunk; - } else if (errorEmitted) { + } else if (error) { throw error; - } else if (endEmitted) { - break; - } else if (closeEmitted) { - throw new ERR_STREAM_PREMATURE_CLOSE(); + } else if (error === null) { + return; } else { await new Promise(next); } } } catch (err) { - if (opts.destroyOnError) { - destroyImpl.destroyer(stream, err); - } - errorThrown = true; - throw err; + error = aggregateTwoErrors(error, err); + throw error; } finally { - if (!errorThrown && opts.destroyOnReturn) { - if (state.autoDestroy || !endEmitted) { + if (error) { + if (options?.destroyOnError !== false) { + destroyImpl.destroyer(stream, error); + } + } else if (options?.destroyOnReturn !== false) { + if (error === undefined || stream._readableState.autoDestroy) { destroyImpl.destroyer(stream, null); } } diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index ec1cb464845e70..d546505062eeef 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -205,7 +205,7 @@ async function tests() { const iterator = readable[Symbol.asyncIterator](); const err = new Error('kaboom'); - readable.destroy(new Error('kaboom')); + readable.destroy(err); await assert.rejects(iterator.next.bind(iterator), err); }