Skip to content

Commit

Permalink
stream: use finished for async iteration
Browse files Browse the repository at this point in the history
PR-URL: #39282
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
ronag committed Jul 8, 2021
1 parent b2fa795 commit ce00381
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 49 deletions.
2 changes: 1 addition & 1 deletion lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ const maybeOverridePrepareStackTrace = (globalThis, error, trace) => {
};

const aggregateTwoErrors = hideStackFrames((innerError, outerError) => {
if (innerError && outerError) {
if (innerError && outerError && innerError !== outerError) {
if (ArrayIsArray(outerError.errors)) {
// If `outerError` is already an `AggregateError`.
ArrayPrototypePush(outerError.errors, innerError);
Expand Down
75 changes: 28 additions & 47 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const { Buffer } = require('buffer');
const {
addAbortSignalNoValidate,
} = require('internal/streams/add-abort-signal');
const eos = require('internal/streams/end-of-stream');

let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
debug = fn;
Expand All @@ -57,12 +58,14 @@ const {
} = require('internal/streams/state');

const {
ERR_INVALID_ARG_TYPE,
ERR_METHOD_NOT_IMPLEMENTED,
ERR_STREAM_PREMATURE_CLOSE,
ERR_STREAM_PUSH_AFTER_EOF,
ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
} = require('internal/errors').codes;
aggregateTwoErrors,
codes: {
ERR_INVALID_ARG_TYPE,
ERR_METHOD_NOT_IMPLEMENTED,
ERR_STREAM_PUSH_AFTER_EOF,
ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
}
} = require('internal/errors');
const { validateObject } = require('internal/validators');

const kPaused = Symbol('kPaused');
Expand Down Expand Up @@ -1090,12 +1093,6 @@ function streamToAsyncIterator(stream, options) {
async function* createAsyncIterator(stream, options) {
let callback = nop;

const opts = {
destroyOnReturn: true,
destroyOnError: true,
...options,
};

function next(resolve) {
if (this === stream) {
callback();
Expand All @@ -1105,54 +1102,38 @@ async function* createAsyncIterator(stream, options) {
}
}

const state = stream._readableState;
stream.on('readable', next);

let error;
eos(stream, { writable: false }, (err) => {
error = err ? aggregateTwoErrors(error, err) : null;
callback();
callback = nop;
});

let error = state.errored;
let errorEmitted = state.errorEmitted;
let endEmitted = state.endEmitted;
let closeEmitted = state.closeEmitted;

stream
.on('readable', next)
.on('error', function(err) {
error = err;
errorEmitted = true;
next.call(this);
})
.on('end', function() {
endEmitted = true;
next.call(this);
})
.on('close', function() {
closeEmitted = true;
next.call(this);
});

let errorThrown = false;
try {
while (true) {
const chunk = stream.destroyed ? null : stream.read();
if (chunk !== null) {
yield chunk;
} else if (errorEmitted) {
} else if (error) {
throw error;
} else if (endEmitted) {
break;
} else if (closeEmitted) {
throw new ERR_STREAM_PREMATURE_CLOSE();
} else if (error === null) {
return;
} else {
await new Promise(next);
}
}
} catch (err) {
if (opts.destroyOnError) {
destroyImpl.destroyer(stream, err);
}
errorThrown = true;
throw err;
error = aggregateTwoErrors(error, err);
throw error;
} finally {
if (!errorThrown && opts.destroyOnReturn) {
if (state.autoDestroy || !endEmitted) {
if (error) {
if (options?.destroyOnError !== false) {
destroyImpl.destroyer(stream, error);
}
} else if (options?.destroyOnReturn !== false) {
if (error === undefined || stream._readableState.autoDestroy) {
destroyImpl.destroyer(stream, null);
}
}
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-stream-readable-async-iterators.js
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ async function tests() {
const iterator = readable[Symbol.asyncIterator]();

const err = new Error('kaboom');
readable.destroy(new Error('kaboom'));
readable.destroy(err);
await assert.rejects(iterator.next.bind(iterator), err);
}

Expand Down

0 comments on commit ce00381

Please sign in to comment.