From 46ed078607569490421aa21d94a99d6575c57995 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 6 Feb 2022 11:05:51 +0100 Subject: [PATCH] stream: resume stream on drain Previously we would just resume "flowing" the stream without reseting the "paused" state. Fixes this by properly using pause/resume methods for .pipe. Fixes: https://github.com/nodejs/node/issues/41785 PR-URL: https://github.com/nodejs/node/pull/41848 Reviewed-By: Benjamin Gruenbaum Reviewed-By: Matteo Collina Reviewed-By: Luigi Pinca Reviewed-By: James M Snell --- lib/internal/streams/readable.js | 3 +-- .../test-stream-readable-pause-and-resume.js | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index fa900fb0756618..146f712c4a2f7e 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -830,8 +830,7 @@ function pipeOnDrain(src, dest) { if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) && EE.listenerCount(src, 'data')) { - state.flowing = true; - flow(src); + src.resume(); } }; } diff --git a/test/parallel/test-stream-readable-pause-and-resume.js b/test/parallel/test-stream-readable-pause-and-resume.js index 294ef2c35d4608..53229ec3339e5c 100644 --- a/test/parallel/test-stream-readable-pause-and-resume.js +++ b/test/parallel/test-stream-readable-pause-and-resume.js @@ -56,3 +56,19 @@ function readAndPause() { assert(readable.isPaused()); }); } + +{ + const { PassThrough } = require('stream'); + + const source3 = new PassThrough(); + const target3 = new PassThrough(); + + const chunk = Buffer.allocUnsafe(1000); + while (target3.write(chunk)); + + source3.pipe(target3); + target3.on('drain', common.mustCall(() => { + assert(!source3.isPaused()); + })); + target3.on('data', () => {}); +}