Skip to content

Commit

Permalink
stream: error state cleanup
Browse files Browse the repository at this point in the history
Clean up end simplify errored state.

- errorEmitted should be set in the same tick as 'error' is emitted.
- errored should be set as soon as an error occurs.
- errored should exist on Readable as well.
- refactor destroy logic and make it easier to follow.

PR-URL: #30851
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
  • Loading branch information
ronag authored and BridgeAR committed Dec 15, 2019
1 parent 2d13896 commit 67ed526
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 50 deletions.
3 changes: 3 additions & 0 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ function ReadableState(options, stream, isDuplex) {
// Has it been destroyed
this.destroyed = false;

// Indicates whether the stream has errored.
this.errored = false;

// Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
// Everything else in the universe uses 'utf8', though.
Expand Down
96 changes: 55 additions & 41 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
@@ -1,41 +1,25 @@
'use strict';

function needError(stream, err) {
if (!err) {
return false;
}

const r = stream._readableState;
const w = stream._writableState;

if ((w && w.errorEmitted) || (r && r.errorEmitted)) {
return false;
}

if (w) {
w.errorEmitted = true;
}
if (r) {
r.errorEmitted = true;
}

return true;
}

// Undocumented cb() API, needed for core, not for public API.
// The cb() will be invoked synchronously if _destroy is synchronous.
// If cb is passed no 'error' event will be emitted.
function destroy(err, cb) {
const r = this._readableState;
const w = this._writableState;

if (w && err) {
w.errored = true;
if (err) {
if (w) {
w.errored = true;
}
if (r) {
r.errored = true;
}
}

if ((w && w.destroyed) || (r && r.destroyed)) {
if (cb) {
cb(err);
} else if (needError(this, err)) {
} else if (err) {
process.nextTick(emitErrorNT, this, err);
}

Expand All @@ -53,17 +37,24 @@ function destroy(err, cb) {
}

this._destroy(err || null, (err) => {
const emitClose = (w && w.emitClose) || (r && r.emitClose);
if (err) {
if (w) {
w.errored = true;
}
if (r) {
r.errored = true;
}
}

if (cb) {
// Invoke callback before scheduling emitClose so that callback
// can schedule before.
cb(err);
if (emitClose) {
process.nextTick(emitCloseNT, this);
}
} else if (needError(this, err)) {
process.nextTick(emitClose ? emitErrorCloseNT : emitErrorNT, this, err);
} else if (emitClose) {
// Don't emit 'error' if passed a callback.
process.nextTick(emitCloseNT, this);
} else if (err) {
process.nextTick(emitErrorCloseNT, this, err);
} else {
process.nextTick(emitCloseNT, this);
}
});
Expand All @@ -72,15 +63,34 @@ function destroy(err, cb) {
}

function emitErrorCloseNT(self, err) {
self.emit('error', err);
self.emit('close');
emitErrorNT(self, err);
emitCloseNT(self);
}

function emitCloseNT(self) {
self.emit('close');
const r = self._readableState;
const w = self._writableState;

if ((w && w.emitClose) || (r && r.emitClose)) {
self.emit('close');
}
}

function emitErrorNT(self, err) {
const r = self._readableState;
const w = self._writableState;

if ((w && w.errorEmitted) || (r && r.errorEmitted)) {
return;
}

if (w) {
w.errorEmitted = true;
}
if (r) {
r.errorEmitted = true;
}

self.emit('error', err);
}

Expand All @@ -90,6 +100,7 @@ function undestroy() {

if (r) {
r.destroyed = false;
r.errored = false;
r.reading = false;
r.ended = false;
r.endEmitted = false;
Expand Down Expand Up @@ -118,14 +129,17 @@ function errorOrDestroy(stream, err) {
const r = stream._readableState;
const w = stream._writableState;

if (w & err) {
w.errored = true;
}

if ((r && r.autoDestroy) || (w && w.autoDestroy))
stream.destroy(err);
else if (needError(stream, err))
stream.emit('error', err);
else if (err) {
if (w) {
w.errored = true;
}
if (r) {
r.errored = true;
}
emitErrorNT(stream, err);
}
}


Expand Down
57 changes: 56 additions & 1 deletion test/parallel/test-stream-readable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,20 @@ const assert = require('assert');
cb(expected);
});

let ticked = false;
read.on('end', common.mustNotCall('no end event'));
read.on('error', common.mustCall((err) => {
assert.strictEqual(ticked, true);
assert.strictEqual(read._readableState.errorEmitted, true);
assert.strictEqual(read._readableState.errored, true);
assert.strictEqual(err, expected);
}));

read.destroy();
assert.strictEqual(read._readableState.errorEmitted, false);
assert.strictEqual(read._readableState.errored, true);
assert.strictEqual(read.destroyed, true);
ticked = true;
}

{
Expand Down Expand Up @@ -174,10 +181,58 @@ const assert = require('assert');

const expected = new Error('kaboom');

read.on('close', common.mustCall());
let ticked = false;
read.on('close', common.mustCall(() => {
assert.strictEqual(read._readableState.errorEmitted, false);
assert.strictEqual(ticked, true);
}));
// 'error' should not be emitted since a callback is passed to
// destroy(err, callback);
read.on('error', common.mustNotCall());

assert.strictEqual(read._readableState.errored, false);
assert.strictEqual(read._readableState.errorEmitted, false);

read.destroy(expected, common.mustCall(function(err) {
assert.strictEqual(read._readableState.errored, true);
assert.strictEqual(err, expected);
}));
assert.strictEqual(read._readableState.errorEmitted, false);
assert.strictEqual(read._readableState.errored, true);
ticked = true;
}

{
const readable = new Readable({
destroy: common.mustCall(function(err, cb) {
process.nextTick(cb, new Error('kaboom 1'));
}),
read() {}
});

let ticked = false;
readable.on('close', common.mustCall(() => {
assert.strictEqual(ticked, true);
assert.strictEqual(readable._readableState.errorEmitted, true);
}));
readable.on('error', common.mustCall((err) => {
assert.strictEqual(ticked, true);
assert.strictEqual(err.message, 'kaboom 2');
assert.strictEqual(readable._readableState.errorEmitted, true);
}));

readable.destroy();
assert.strictEqual(readable.destroyed, true);
assert.strictEqual(readable._readableState.errored, false);
assert.strictEqual(readable._readableState.errorEmitted, false);

// Test case where `readable.destroy()` is called again with an error before
// the `_destroy()` callback is called.
readable.destroy(new Error('kaboom 2'));
assert.strictEqual(readable._readableState.errorEmitted, false);
assert.strictEqual(readable._readableState.errored, true);

ticked = true;
}

{
Expand Down
34 changes: 26 additions & 8 deletions test/parallel/test-stream-writable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,22 @@ const assert = require('assert');
write(chunk, enc, cb) { cb(); }
});

write.on('close', common.mustCall());
write.on('error', common.mustCall());
let ticked = false;
write.on('close', common.mustCall(() => {
assert.strictEqual(ticked, true);
}));
write.on('error', common.mustCall((err) => {
assert.strictEqual(ticked, true);
assert.strictEqual(err.message, 'kaboom 1');
assert.strictEqual(write._writableState.errorEmitted, true);
}));

write.destroy(new Error('kaboom 1'));
write.destroy(new Error('kaboom 2'));
assert.strictEqual(write._writableState.errorEmitted, true);
assert.strictEqual(write._writableState.errored, true);
assert.strictEqual(write._writableState.errorEmitted, false);
assert.strictEqual(write.destroyed, true);
ticked = true;
}

{
Expand All @@ -176,20 +185,29 @@ const assert = require('assert');
}
});

writable.on('close', common.mustCall());
writable.on('error', common.expectsError({
type: Error,
message: 'kaboom 2'
let ticked = false;
writable.on('close', common.mustCall(() => {
assert.strictEqual(ticked, true);
assert.strictEqual(writable._writableState.errorEmitted, true);
}));
writable.on('error', common.mustCall((err) => {
assert.strictEqual(ticked, true);
assert.strictEqual(err.message, 'kaboom 2');
assert.strictEqual(writable._writableState.errorEmitted, true);
}));

writable.destroy();
assert.strictEqual(writable.destroyed, true);
assert.strictEqual(writable._writableState.errored, false);
assert.strictEqual(writable._writableState.errorEmitted, false);

// Test case where `writable.destroy()` is called again with an error before
// the `_destroy()` callback is called.
writable.destroy(new Error('kaboom 2'));
assert.strictEqual(writable._writableState.errorEmitted, true);
assert.strictEqual(writable._writableState.errorEmitted, false);
assert.strictEqual(writable._writableState.errored, true);

ticked = true;
}

{
Expand Down

0 comments on commit 67ed526

Please sign in to comment.