Skip to content

Commit

Permalink
stream: fix legacy pipe error handling
Browse files Browse the repository at this point in the history
Fixes: nodejs#35237

PR-URL: nodejs#35257
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
  • Loading branch information
ronag authored and Trott committed Sep 23, 2020
1 parent 81379d1 commit 6be80e1
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 27 deletions.
27 changes: 23 additions & 4 deletions lib/internal/streams/legacy.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

const {
ArrayIsArray,
ObjectSetPrototypeOf,
} = primordials;

Expand Down Expand Up @@ -58,12 +59,12 @@ Stream.prototype.pipe = function(dest, options) {
function onerror(er) {
cleanup();
if (EE.listenerCount(this, 'error') === 0) {
throw er; // Unhandled stream error in pipe.
this.emit('error', er);
}
}

source.on('error', onerror);
dest.on('error', onerror);
prependListener(source, 'error', onerror);
prependListener(dest, 'error', onerror);

// Remove all the event listeners that were added.
function cleanup() {
Expand Down Expand Up @@ -92,4 +93,22 @@ Stream.prototype.pipe = function(dest, options) {
return dest;
};

module.exports = Stream;
function prependListener(emitter, event, fn) {
// Sadly this is not cacheable as some libraries bundle their own
// event emitter implementation with them.
if (typeof emitter.prependListener === 'function')
return emitter.prependListener(event, fn);

// This is a hack to make sure that our error handler is attached before any
// userland ones. NEVER DO THIS. This is here only because this code needs
// to continue to work with older versions of Node.js that do not include
// the prependListener() method. The goal is to eventually remove this hack.
if (!emitter._events || !emitter._events[event])
emitter.on(event, fn);
else if (ArrayIsArray(emitter._events[event]))
emitter._events[event].unshift(fn);
else
emitter._events[event] = [fn, emitter._events[event]];
}

module.exports = { Stream, prependListener };
21 changes: 1 addition & 20 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
'use strict';

const {
ArrayIsArray,
NumberIsInteger,
NumberIsNaN,
ObjectDefineProperties,
Expand All @@ -38,7 +37,7 @@ module.exports = Readable;
Readable.ReadableState = ReadableState;

const EE = require('events');
const Stream = require('internal/streams/legacy');
const { Stream, prependListener } = require('internal/streams/legacy');
const { Buffer } = require('buffer');

let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
Expand Down Expand Up @@ -69,24 +68,6 @@ function nop() {}

const { errorOrDestroy } = destroyImpl;

function prependListener(emitter, event, fn) {
// Sadly this is not cacheable as some libraries bundle their own
// event emitter implementation with them.
if (typeof emitter.prependListener === 'function')
return emitter.prependListener(event, fn);

// This is a hack to make sure that our error handler is attached before any
// userland ones. NEVER DO THIS. This is here only because this code needs
// to continue to work with older versions of Node.js that do not include
// the prependListener() method. The goal is to eventually remove this hack.
if (!emitter._events || !emitter._events[event])
emitter.on(event, fn);
else if (ArrayIsArray(emitter._events[event]))
emitter._events[event].unshift(fn);
else
emitter._events[event] = [fn, emitter._events[event]];
}

function ReadableState(options, stream, isDuplex) {
// Duplex streams are both readable and writable, but share
// the same options object.
Expand Down
2 changes: 1 addition & 1 deletion lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ module.exports = Writable;
Writable.WritableState = WritableState;

const EE = require('events');
const Stream = require('internal/streams/legacy');
const Stream = require('internal/streams/legacy').Stream;
const { Buffer } = require('buffer');
const destroyImpl = require('internal/streams/destroy');
const {
Expand Down
2 changes: 1 addition & 1 deletion lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const internalBuffer = require('internal/buffer');
// Lazy loaded
let promises = null;

const Stream = module.exports = require('internal/streams/legacy');
const Stream = module.exports = require('internal/streams/legacy').Stream;
Stream.Readable = require('internal/streams/readable');
Stream.Writable = require('internal/streams/writable');
Stream.Duplex = require('internal/streams/duplex');
Expand Down
16 changes: 15 additions & 1 deletion test/parallel/test-stream-pipe-error-handling.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const Stream = require('stream').Stream;
const { Stream, PassThrough } = require('stream');

{
const source = new Stream();
Expand Down Expand Up @@ -108,3 +108,17 @@ const Stream = require('stream').Stream;
w.removeListener('error', () => {});
removed = true;
}

{
const _err = new Error('this should be handled');
const destination = new PassThrough();
destination.once('error', common.mustCall((err) => {
assert.strictEqual(err, _err);
}));

const stream = new Stream();
stream
.pipe(destination);

destination.destroy(_err);
}

0 comments on commit 6be80e1

Please sign in to comment.