Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: fix handling generators in Readable.from #32844

Closed
33 changes: 32 additions & 1 deletion lib/internal/streams/from.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,52 @@ function from(Readable, iterable, opts) {
objectMode: true,
...opts
});

himself65 marked this conversation as resolved.
Show resolved Hide resolved
// Reading boolean to protect against _read
// being called before last iteration completion.
let reading = false;

// needToClose boolean if iterator needs to be explicitly closed
let needToClose = false;

readable._read = function() {
if (!reading) {
reading = true;
next();
}
};

readable._destroy = async function(error, cb) {
try {
await close();
} catch (e) {
error = error || e;
} finally {
process.nextTick(() => cb(error));
ronag marked this conversation as resolved.
Show resolved Hide resolved
}
};
vadzim marked this conversation as resolved.
Show resolved Hide resolved

async function close() {
if (needToClose) {
ronag marked this conversation as resolved.
Show resolved Hide resolved
needToClose = false;
if (typeof iterator.return === 'function') {
const { value } = await iterator.return();
await value;
}
}
}

ronag marked this conversation as resolved.
Show resolved Hide resolved
async function next() {
try {
needToClose = false;
const { value, done } = await iterator.next();
vadzim marked this conversation as resolved.
Show resolved Hide resolved
needToClose = !done;
const resolved = await value;
vadzim marked this conversation as resolved.
Show resolved Hide resolved
if (done) {
readable.push(null);
} else if (readable.push(await value)) {
} else if (readable.destroyed) {
await close();
} else if (readable.push(resolved)) {
next();
} else {
reading = false;
Expand Down
198 changes: 198 additions & 0 deletions test/parallel/test-readable-from-iterator-closing.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
'use strict';

const { mustCall, mustNotCall } = require('../common');
const { Readable } = require('stream');
const { strictEqual } = require('assert');

async function asyncSupport() {
const finallyMustCall = mustCall();
const bodyMustCall = mustCall();

async function* infiniteGenerate() {
try {
while (true) yield 'a';
} finally {
finallyMustCall();
}
}

const stream = Readable.from(infiniteGenerate());

for await (const chunk of stream) {
bodyMustCall();
strictEqual(chunk, 'a');
break;
}
}

async function syncSupport() {
const finallyMustCall = mustCall();
const bodyMustCall = mustCall();

function* infiniteGenerate() {
try {
while (true) yield 'a';
} finally {
finallyMustCall();
}
}

const stream = Readable.from(infiniteGenerate());

for await (const chunk of stream) {
bodyMustCall();
strictEqual(chunk, 'a');
break;
}
}

async function syncPromiseSupport() {
const returnMustBeAwaited = mustCall();
const bodyMustCall = mustCall();

function* infiniteGenerate() {
try {
while (true) yield Promise.resolve('a');
} finally {
// eslint-disable-next-line no-unsafe-finally
return { then(cb) {
returnMustBeAwaited();
cb();
} };
}
}

const stream = Readable.from(infiniteGenerate());

for await (const chunk of stream) {
bodyMustCall();
strictEqual(chunk, 'a');
break;
}
}

async function syncRejectedSupport() {
const returnMustBeAwaited = mustCall();
const bodyMustNotCall = mustNotCall();
const catchMustCall = mustCall();
const secondNextMustNotCall = mustNotCall();

function* generate() {
try {
yield Promise.reject('a');
secondNextMustNotCall();
} finally {
// eslint-disable-next-line no-unsafe-finally
return { then(cb) {
returnMustBeAwaited();
cb();
} };
}
}

const stream = Readable.from(generate());

try {
for await (const chunk of stream) {
bodyMustNotCall(chunk);
}
} catch {
catchMustCall();
}
}

async function noReturnAfterThrow() {
const returnMustNotCall = mustNotCall();
const bodyMustNotCall = mustNotCall();
const catchMustCall = mustCall();
const nextMustCall = mustCall();

const stream = Readable.from({
[Symbol.asyncIterator]() { return this; },
async next() {
nextMustCall();
throw new Error('a');
},
async return() {
returnMustNotCall();
return { done: true };
},
});

try {
for await (const chunk of stream) {
bodyMustNotCall(chunk);
}
} catch {
catchMustCall();
}
}

async function closeStreamWhileNextIsPending() {
const finallyMustCall = mustCall();
const dataMustCall = mustCall();

let resolveDestroy;
const destroyed =
new Promise((resolve) => { resolveDestroy = mustCall(resolve); });
let resolveYielded;
const yielded =
new Promise((resolve) => { resolveYielded = mustCall(resolve); });

async function* infiniteGenerate() {
try {
while (true) {
yield 'a';
resolveYielded();
await destroyed;
}
} finally {
finallyMustCall();
}
}

const stream = Readable.from(infiniteGenerate());

stream.on('data', (data) => {
dataMustCall();
strictEqual(data, 'a');
});

yielded.then(() => {
stream.destroy();
resolveDestroy();
});
}

async function closeAfterNullYielded() {
const finallyMustCall = mustCall();
const dataMustCall = mustCall(3);

function* infiniteGenerate() {
try {
yield 'a';
yield 'a';
yield 'a';
while (true) yield null;
} finally {
finallyMustCall();
}
}

const stream = Readable.from(infiniteGenerate());

stream.on('data', (chunk) => {
dataMustCall();
strictEqual(chunk, 'a');
});
}

Promise.all([
asyncSupport(),
syncSupport(),
syncPromiseSupport(),
syncRejectedSupport(),
noReturnAfterThrow(),
closeStreamWhileNextIsPending(),
closeAfterNullYielded(),
]).then(mustCall());