Skip to content

Commit

Permalink
stream: destroy readable on read error
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Jul 10, 2021
1 parent ff975fe commit df16291
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 82 deletions.
3 changes: 0 additions & 3 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1525,9 +1525,6 @@ added: v16.3.0
* `destroyOnReturn` {boolean} When set to `false`, calling `return` on the
async iterator, or exiting a `for await...of` iteration using a `break`,
`return`, or `throw` will not destroy the stream. **Default:** `true`.
* `destroyOnError` {boolean} When set to `false`, if the stream emits an
error while it's being iterated, the iterator will not destroy the stream.
**Default:** `true`.
* Returns: {AsyncIterator} to consume the stream.

The iterator created by this method gives users the option to cancel the
Expand Down
24 changes: 18 additions & 6 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,23 @@ Readable.prototype.read = function(n) {
if (state.length === 0)
state.needReadable = true;
// Call internal read method
this._read(state.highWaterMark);
try {
const result = this._read(state.highWaterMark);
if (result != null) {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
nop,
function(err) {
errorOrDestroy(this, err);
});
}
}
} catch (err) {
errorOrDestroy(this, err);
}

state.sync = false;
// If _read pushed data synchronously, then `reading` will be false,
// and we need to re-evaluate how much data we can return to the user.
Expand Down Expand Up @@ -1128,11 +1144,7 @@ async function* createAsyncIterator(stream, options) {
error = aggregateTwoErrors(error, err);
throw error;
} finally {
if (error) {
if (options?.destroyOnError !== false) {
destroyImpl.destroyer(stream, error);
}
} else if (options?.destroyOnReturn !== false) {
if (error || options?.destroyOnReturn !== false) {
if (error === undefined || stream._readableState.autoDestroy) {
destroyImpl.destroyer(stream, null);
}
Expand Down
60 changes: 0 additions & 60 deletions test/parallel/test-stream-readable-async-iterators.js
Original file line number Diff line number Diff line change
Expand Up @@ -750,22 +750,6 @@ async function tests() {
})());
}

function createErrorReadable() {
const opts = { read() { throw new Error('inner'); } };
return new Readable(opts);
}

// Check default destroys on return
(async function() {
const readable = createReadable();
for await (const chunk of readable.iterator()) {
assert.strictEqual(chunk, 5);
break;
}

assert.ok(readable.destroyed);
})().then(common.mustCall());

// Check explicit destroying on return
(async function() {
const readable = createReadable();
Expand All @@ -777,50 +761,6 @@ async function tests() {
assert.ok(readable.destroyed);
})().then(common.mustCall());

// Check default destroys on error
(async function() {
const readable = createErrorReadable();
try {
// eslint-disable-next-line no-unused-vars
for await (const chunk of readable) { }
assert.fail('should have thrown');
} catch (err) {
assert.strictEqual(err.message, 'inner');
}

assert.ok(readable.destroyed);
})().then(common.mustCall());

// Check explicit destroys on error
(async function() {
const readable = createErrorReadable();
const opts = { destroyOnError: true, destroyOnReturn: false };
try {
// eslint-disable-next-line no-unused-vars
for await (const chunk of readable.iterator(opts)) { }
assert.fail('should have thrown');
} catch (err) {
assert.strictEqual(err.message, 'inner');
}

assert.ok(readable.destroyed);
})().then(common.mustCall());

// Check explicit non-destroy with return true
(async function() {
const readable = createErrorReadable();
const opts = { destroyOnError: false, destroyOnReturn: true };
try {
// eslint-disable-next-line no-unused-vars
for await (const chunk of readable.iterator(opts)) { }
assert.fail('should have thrown');
} catch (err) {
assert.strictEqual(err.message, 'inner');
}

assert.ok(!readable.destroyed);
})().then(common.mustCall());

// Check explicit non-destroy with return true
(async function() {
const readable = createReadable();
Expand Down
21 changes: 8 additions & 13 deletions test/parallel/test-stream-readable-with-unimplemented-_read.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
'use strict';
require('../common');

const assert = require('assert');
const common = require('../common');
const { Readable } = require('stream');

const readable = new Readable();

assert.throws(
() => {
readable.read();
},
{
code: 'ERR_METHOD_NOT_IMPLEMENTED',
name: 'Error',
message: 'The _read() method is not implemented'
}
);
readable.read();
readable.on('error', common.expectsError({
code: 'ERR_METHOD_NOT_IMPLEMENTED',
name: 'Error',
message: 'The _read() method is not implemented'
}));
readable.on('close', common.mustCall());

0 comments on commit df16291

Please sign in to comment.