Skip to content

Commit

Permalink
stream: readable track errorEmitted
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Aug 7, 2019

Unverified

This user has not yet uploaded their public signing key.
1 parent f018384 commit bb2a821
Showing 4 changed files with 35 additions and 14 deletions.
3 changes: 3 additions & 0 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
@@ -117,6 +117,9 @@ function ReadableState(options, stream, isDuplex) {
this.resumeScheduled = false;
this.paused = true;

// True if the error was already emitted and should not be thrown again
this.errorEmitted = false;

// Should close be emitted on destroy. Defaults to true.
this.emitClose = options.emitClose !== false;

2 changes: 2 additions & 0 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
@@ -428,11 +428,13 @@ function onwriteError(stream, state, sync, er, cb) {
// This can emit finish, and it will always happen
// after error
process.nextTick(finishMaybe, stream, state);
stream._writableState.errorEmitted = true;
errorOrDestroy(stream, er);
} else {
// The caller expect this to happen before if
// it is async
cb(er);
stream._writableState.errorEmitted = true;
errorOrDestroy(stream, er);
// This can emit finish, but finish must
// always follow error
3 changes: 2 additions & 1 deletion lib/internal/streams/async_iterator.js
Original file line number Diff line number Diff line change
@@ -132,7 +132,8 @@ const createReadableStreamAsyncIterator = (stream) => {
[kLastReject]: { value: null, writable: true },
[kError]: { value: null, writable: true },
[kEnded]: {
value: stream._readableState.endEmitted,
value: stream._readableState.endEmitted ||
stream._readableState.errorEmitted,
writable: true
},
// The function passed to new Promise is cached so we avoid allocating a new
41 changes: 28 additions & 13 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
@@ -2,21 +2,32 @@

// Undocumented cb() API, needed for core, not for public API
function destroy(err, cb) {
const readableDestroyed = this._readableState &&
this._readableState.destroyed;
const writableDestroyed = this._writableState &&
this._writableState.destroyed;
const rState = this._readableState;
const wState = this._writableState;

const readableDestroyed = rState && rState.destroyed;
const writableDestroyed = wState && wState.destroyed;

if (readableDestroyed || writableDestroyed) {
if (cb) {
cb(err);
} else if (err) {
if (!this._writableState) {
process.nextTick(emitErrorNT, this, err);
} else if (!this._writableState.errorEmitted) {
this._writableState.errorEmitted = true;
process.nextTick(emitErrorNT, this, err);
const errorEmitted = (rState && rState.errorEmitted) ||
(wState && wState.errorEmitted);

if (errorEmitted) {
return this;
}

if (rState) {
rState.errorEmitted = true;
}

if (wState) {
wState.errorEmitted = true;
}

process.nextTick(emitErrorNT, this, err);
}

return this;
@@ -25,13 +36,13 @@ function destroy(err, cb) {
// We set destroyed to true before firing error callbacks in order
// to make it re-entrance safe in case destroy() is called within callbacks

if (this._readableState) {
this._readableState.destroyed = true;
if (rState) {
rState.destroyed = true;
}

// If this is a duplex stream mark the writable part as destroyed as well
if (this._writableState) {
this._writableState.destroyed = true;
if (wState) {
wState.destroyed = true;
}

this._destroy(err || null, (err) => {
@@ -74,6 +85,7 @@ function undestroy() {
this._readableState.reading = false;
this._readableState.ended = false;
this._readableState.endEmitted = false;
this._readableState.errorEmitted = false;
}

if (this._writableState) {
@@ -107,6 +119,9 @@ function errorOrDestroy(stream, err) {
if (wState) {
wState.errorEmitted = true;
}
if (rState) {
rState.errorEmitted = true;
}
stream.emit('error', err);
}
}

0 comments on commit bb2a821

Please sign in to comment.