From a20e854e573af83e667cedc23d278861202bdc05 Mon Sep 17 00:00:00 2001 From: jakecastelli <959672929@qq.com> Date: Thu, 30 May 2024 17:21:15 +0930 Subject: [PATCH 1/5] stream: make sure _destroy is called --- lib/internal/streams/compose.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/internal/streams/compose.js b/lib/internal/streams/compose.js index 22f010b9834385..cdb7d3eb4d51a3 100644 --- a/lib/internal/streams/compose.js +++ b/lib/internal/streams/compose.js @@ -238,13 +238,13 @@ module.exports = function compose(...streams) { ondrain = null; onfinish = null; + if (isNodeStream(tail)) + destroyer(tail, err); + if (onclose === null) { callback(err); } else { onclose = callback; - if (isNodeStream(tail)) { - destroyer(tail, err); - } } }; From c731f78dfcaf289e198597a87454c0894d5d1471 Mon Sep 17 00:00:00 2001 From: jakecastelli <959672929@qq.com> Date: Thu, 30 May 2024 22:54:57 +0930 Subject: [PATCH 2/5] fixup! add test --- test/parallel/test-stream-compose.js | 49 ++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/test/parallel/test-stream-compose.js b/test/parallel/test-stream-compose.js index cba7f9519d87eb..2de9c021523f16 100644 --- a/test/parallel/test-stream-compose.js +++ b/test/parallel/test-stream-compose.js @@ -4,6 +4,7 @@ const common = require('../common'); const { + Duplex, Readable, Transform, Writable, @@ -494,3 +495,51 @@ const assert = require('assert'); assert.deepStrictEqual(await newStream.toArray(), [Buffer.from('Steve RogersOn your left')]); })().then(common.mustCall()); } + +{ + class SlowProcessor extends Duplex { + constructor(options) { + super({ ...options, objectMode: true }); + this.stuff = []; + } + + _write(message, _, callback) { + this.stuff.push(message); + callback(); + } + + _destroy(err, cb) { + cb(err); + } + + _read() { + // Emulate some slow processing + setTimeout(() => { + if (this.stuff.length) { + this.push(this.stuff.shift()); + } else if (this.writableEnded) { + this.push(null); + } else { + setTimeout(() => this._read(), 100); + } + }, 100); + } + } + + const pass = new PassThrough({ objectMode: true }); + const slow = new SlowProcessor(); + + const composed = compose( + pass, + slow + ).on('error', () => {}); + + composed.write('hello'); + composed.write('world'); + composed.end(); + + setTimeout(() => { + composed.destroy(new Error('an unexpected error')); + assert.strictEqual(slow.destroyed, true); + }, 100); +} From 5d93185910195eb1376da1936a252650a94df473 Mon Sep 17 00:00:00 2001 From: jakecastelli <38635403+jakecastelli@users.noreply.github.com> Date: Fri, 31 May 2024 08:55:59 +0930 Subject: [PATCH 3/5] Update lib/internal/streams/compose.js Co-authored-by: Robert Nagy --- lib/internal/streams/compose.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/internal/streams/compose.js b/lib/internal/streams/compose.js index cdb7d3eb4d51a3..1b04a0f70a1e62 100644 --- a/lib/internal/streams/compose.js +++ b/lib/internal/streams/compose.js @@ -238,8 +238,9 @@ module.exports = function compose(...streams) { ondrain = null; onfinish = null; - if (isNodeStream(tail)) + if (isNodeStream(tail)) { destroyer(tail, err); + } if (onclose === null) { callback(err); From cdea1dd3aabc734a617fc458aa6e06bd9f417471 Mon Sep 17 00:00:00 2001 From: jakecastelli <959672929@qq.com> Date: Tue, 4 Jun 2024 20:49:49 +0930 Subject: [PATCH 4/5] fixup! use common.platformTimeout --- test/parallel/test-stream-compose.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/parallel/test-stream-compose.js b/test/parallel/test-stream-compose.js index 2de9c021523f16..200c4283c54706 100644 --- a/test/parallel/test-stream-compose.js +++ b/test/parallel/test-stream-compose.js @@ -520,7 +520,7 @@ const assert = require('assert'); } else if (this.writableEnded) { this.push(null); } else { - setTimeout(() => this._read(), 100); + setTimeout(() => this._read(), common.platformTimeout(100)); } }, 100); } @@ -541,5 +541,5 @@ const assert = require('assert'); setTimeout(() => { composed.destroy(new Error('an unexpected error')); assert.strictEqual(slow.destroyed, true); - }, 100); + }, common.platformTimeout(100)); } From d8a6156f7cb8a289a3b9e630c727964e133c6d5a Mon Sep 17 00:00:00 2001 From: jakecastelli <959672929@qq.com> Date: Tue, 4 Jun 2024 23:44:27 +0930 Subject: [PATCH 5/5] fixup! remove slow process as its not needed --- test/parallel/test-stream-compose.js | 30 ++++++++++++---------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/test/parallel/test-stream-compose.js b/test/parallel/test-stream-compose.js index 200c4283c54706..1ff8c39b7a2234 100644 --- a/test/parallel/test-stream-compose.js +++ b/test/parallel/test-stream-compose.js @@ -497,7 +497,7 @@ const assert = require('assert'); } { - class SlowProcessor extends Duplex { + class DuplexProcess extends Duplex { constructor(options) { super({ ...options, objectMode: true }); this.stuff = []; @@ -513,33 +513,29 @@ const assert = require('assert'); } _read() { - // Emulate some slow processing - setTimeout(() => { - if (this.stuff.length) { - this.push(this.stuff.shift()); - } else if (this.writableEnded) { - this.push(null); - } else { - setTimeout(() => this._read(), common.platformTimeout(100)); - } - }, 100); + if (this.stuff.length) { + this.push(this.stuff.shift()); + } else if (this.writableEnded) { + this.push(null); + } else { + this._read(); + } } } const pass = new PassThrough({ objectMode: true }); - const slow = new SlowProcessor(); + const duplex = new DuplexProcess(); const composed = compose( pass, - slow + duplex ).on('error', () => {}); composed.write('hello'); composed.write('world'); composed.end(); - setTimeout(() => { - composed.destroy(new Error('an unexpected error')); - assert.strictEqual(slow.destroyed, true); - }, common.platformTimeout(100)); + composed.destroy(new Error('an unexpected error')); + assert.strictEqual(duplex.destroyed, true); + }