diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index ed5556e5d0a600..92a91c30171af1 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -17,7 +17,7 @@ const { } = require('internal/errors').codes; function isRequest(stream) { - return stream.setHeader && typeof stream.abort === 'function'; + return stream && stream.setHeader && typeof stream.abort === 'function'; } function destroyer(stream, reading, writing, callback) { @@ -43,22 +43,13 @@ function destroyer(stream, reading, writing, callback) { // request.destroy just do .end - .abort is what we want if (isRequest(stream)) return stream.abort(); - if (typeof stream.destroy === 'function') { - if (stream.req && stream._writableState === undefined) { - // This is a ClientRequest - // TODO(mcollina): backward compatible fix to avoid crashing. - // Possibly remove in a later semver-major change. - stream.req.on('error', noop); - } - return stream.destroy(err); - } + if (isRequest(stream.req)) return stream.req.abort(); + if (typeof stream.destroy === 'function') return stream.destroy(err); callback(err || new ERR_STREAM_DESTROYED('pipe')); }; } -function noop() {} - function pipe(from, to) { return from.pipe(to); } diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 4a41f053bd0a85..76a6171bb1bd5a 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1,7 +1,14 @@ 'use strict'; const common = require('../common'); -const { Stream, Writable, Readable, Transform, pipeline } = require('stream'); +const { + Stream, + Writable, + Readable, + Transform, + pipeline, + PassThrough +} = require('stream'); const assert = require('assert'); const http = require('http'); const { promisify } = require('util'); @@ -483,3 +490,30 @@ const { promisify } = require('util'); { code: 'ERR_INVALID_CALLBACK' } ); } + +{ + const server = http.Server(function(req, res) { + res.write('asd'); + }); + server.listen(0, function() { + http.request({ + port: this.address().port, + path: '/', + method: 'GET' + }, (res) => { + const stream = new PassThrough(); + + stream.on('error', common.mustCall()); + + pipeline( + res, + stream, + common.mustCall((err) => { + server.close(); + }) + ); + + stream.destroy(new Error('oh no')); + }).end().on('error', common.mustNotCall()); + }); +}