Skip to content

Commit

Permalink
stream: close iterator in Readable.from
Browse files Browse the repository at this point in the history
Use for-of loop to traverse iterator and properly close it
if not all of its values are consumed.

Fixes: #32842
  • Loading branch information
vadzim committed Apr 14, 2020
1 parent cbe955c commit 125ba87
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 28 deletions.
88 changes: 60 additions & 28 deletions lib/internal/streams/from.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

const {
SymbolAsyncIterator,
SymbolIterator
SymbolIterator,
Promise
} = primordials;
const { Buffer } = require('buffer');

Expand All @@ -11,7 +12,6 @@ const {
} = require('internal/errors').codes;

function from(Readable, iterable, opts) {
let iterator;
if (typeof iterable === 'string' || iterable instanceof Buffer) {
return new Readable({
objectMode: true,
Expand All @@ -23,41 +23,73 @@ function from(Readable, iterable, opts) {
});
}

if (iterable && iterable[SymbolAsyncIterator])
iterator = iterable[SymbolAsyncIterator]();
else if (iterable && iterable[SymbolIterator])
iterator = iterable[SymbolIterator]();
else
throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
let onDataNeeded;

const readable = new Readable({
objectMode: true,
...opts
...opts,
read() {
onDataNeeded && onDataNeeded();
},
async destroy(error, cb) {
onDataNeeded && onDataNeeded();
try {
await pumping;
} catch (e) {
// Do not hide present error
if (!error) error = e;
}
cb(error);
},
});
// Reading boolean to protect against _read
// being called before last iteration completion.
let reading = false;
readable._read = function() {
if (!reading) {
reading = true;
next();
}
};
async function next() {

if (!iterable[SymbolAsyncIterator] && !iterable[SymbolIterator])
throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);

const pumping = pump();

return readable;

async function pump() {
/*
We're iterating over sync or async iterator with the appropriate sync
or async version of the `for-of` loop.
`for-await-of` loop has an edge case when looping over synchronous
iterator.
It does not close synchronous iterator with .return() if that iterator
yields rejected Promise, so finally blocks within such an iterator are
never executed.
In the application code developers can choose between async and sync
forms of the loop depending on their needs, but in the library code we
have to handle such edge cases properly and close iterators anyway.
*/
try {
const { value, done } = await iterator.next();
if (done) {
readable.push(null);
} else if (readable.push(await value)) {
next();
if (iterable[SymbolAsyncIterator]) {
for await (const data of iterable) {
if (readable.destroyed) return;
if (!readable.push(data)) {
await new Promise((resolve) => { onDataNeeded = resolve; });
if (readable.destroyed) return;
}
}
} else {
reading = false;
for (const data of iterable) {
const value = await data;
if (readable.destroyed) return;
if (!readable.push(value)) {
await new Promise((resolve) => { onDataNeeded = resolve; });
if (readable.destroyed) return;
}
}
}
} catch (err) {
readable.destroy(err);
if (!readable.destroyed) readable.push(null);
} catch (error) {
if (!readable.destroyed) readable.destroy(error);
}
}
return readable;
}

module.exports = from;
117 changes: 117 additions & 0 deletions test/parallel/test-readable-from-iterator-closing.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
'use strict';

const { mustCall, mustNotCall } = require('../common');
const { Readable } = require('stream');
const { strictEqual } = require('assert');

async function asyncSupport() {
const finallyMustCall = mustCall();
async function* generate() {
try {
yield 'a';
mustNotCall('only first item is read');
} finally {
finallyMustCall();
}
}

const stream = Readable.from(generate());

for await (const chunk of stream) {
strictEqual(chunk, 'a');
break;
}
}

asyncSupport().then(mustCall());

async function syncSupport() {
const finallyMustCall = mustCall();
function* generate() {
try {
yield 'a';
mustNotCall('only first item is read');
} finally {
finallyMustCall();
}
}

const stream = Readable.from(generate());

for await (const chunk of stream) {
strictEqual(chunk, 'a');
break;
}
}

syncSupport().then(mustCall());

async function syncPromiseSupport() {
const finallyMustCall = mustCall();
function* generate() {
try {
yield Promise.resolve('a');
mustNotCall('only first item is read');
} finally {
finallyMustCall();
}
}

const stream = Readable.from(generate());

for await (const chunk of stream) {
strictEqual(chunk, 'a');
break;
}
}

syncPromiseSupport().then(mustCall());

async function syncRejectedSupport() {
const finallyMustCall = mustCall();
const noBodyCall = mustNotCall();
const catchMustCall = mustCall();

function* generate() {
try {
yield Promise.reject('a');
mustNotCall();
} finally {
finallyMustCall();
}
}

const stream = Readable.from(generate());

try {
for await (const chunk of stream) {
noBodyCall(chunk);
}
} catch {
catchMustCall();
}
}

syncRejectedSupport().then(mustCall());

async function noReturnAfterThrow() {
const returnMustNotCall = mustNotCall();
const noBodyCall = mustNotCall();
const catchMustCall = mustCall();

const stream = Readable.from({
[Symbol.asyncIterator]() { return this; },
async next() { throw new Error('a'); },
async return() { returnMustNotCall(); return { done: true }; },
});

try {
for await (const chunk of stream) {
noBodyCall(chunk);
}
} catch {
catchMustCall();
}
}

noReturnAfterThrow().then(mustCall());

0 comments on commit 125ba87

Please sign in to comment.