diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 5b73646db809c2..9e53ecfc1dccfa 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -117,6 +117,9 @@ function ReadableState(options, stream, isDuplex) { this.resumeScheduled = false; this.paused = true; + // True if the error was already emitted and should not be thrown again + this.errorEmitted = false; + // Should close be emitted on destroy. Defaults to true. this.emitClose = options.emitClose !== false; diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index c6203a7d133fac..e85b4dead859bc 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -428,11 +428,13 @@ function onwriteError(stream, state, sync, er, cb) { // This can emit finish, and it will always happen // after error process.nextTick(finishMaybe, stream, state); + stream._writableState.errorEmitted = true; errorOrDestroy(stream, er); } else { // The caller expect this to happen before if // it is async cb(er); + stream._writableState.errorEmitted = true; errorOrDestroy(stream, er); // This can emit finish, but finish must // always follow error diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index defba235adbd56..382d8e8ce53f36 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -132,7 +132,8 @@ const createReadableStreamAsyncIterator = (stream) => { [kLastReject]: { value: null, writable: true }, [kError]: { value: null, writable: true }, [kEnded]: { - value: stream._readableState.endEmitted, + value: stream._readableState.endEmitted || + stream._readableState.errorEmitted, writable: true }, // The function passed to new Promise is cached so we avoid allocating a new diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 9c94ffe1734628..0ce3bb4fcf10ae 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -2,21 +2,32 @@ // Undocumented cb() API, needed for core, not for public API function destroy(err, cb) { - const readableDestroyed = this._readableState && - this._readableState.destroyed; - const writableDestroyed = this._writableState && - this._writableState.destroyed; + const rState = this._readableState; + const wState = this._writableState; + + const readableDestroyed = rState && rState.destroyed; + const writableDestroyed = wState && wState.destroyed; if (readableDestroyed || writableDestroyed) { if (cb) { cb(err); } else if (err) { - if (!this._writableState) { - process.nextTick(emitErrorNT, this, err); - } else if (!this._writableState.errorEmitted) { - this._writableState.errorEmitted = true; - process.nextTick(emitErrorNT, this, err); + const errorEmitted = (rState && rState.errorEmitted) || + (wState && wState.errorEmitted); + + if (errorEmitted) { + return this; + } + + if (rState) { + rState.errorEmitted = true; } + + if (wState) { + wState.errorEmitted = true; + } + + process.nextTick(emitErrorNT, this, err); } return this; @@ -25,13 +36,13 @@ function destroy(err, cb) { // We set destroyed to true before firing error callbacks in order // to make it re-entrance safe in case destroy() is called within callbacks - if (this._readableState) { - this._readableState.destroyed = true; + if (rState) { + rState.destroyed = true; } // If this is a duplex stream mark the writable part as destroyed as well - if (this._writableState) { - this._writableState.destroyed = true; + if (wState) { + wState.destroyed = true; } this._destroy(err || null, (err) => { @@ -74,6 +85,7 @@ function undestroy() { this._readableState.reading = false; this._readableState.ended = false; this._readableState.endEmitted = false; + this._readableState.errorEmitted = false; } if (this._writableState) { @@ -107,6 +119,9 @@ function errorOrDestroy(stream, err) { if (wState) { wState.errorEmitted = true; } + if (rState) { + rState.errorEmitted = true; + } stream.emit('error', err); } }