Skip to content

Commit

Permalink
stream: fix deadlock when pipeing to full sink
Browse files Browse the repository at this point in the history
When piping a paused Readable to a full Writable we didn't
register a drain listener which cause the src to never
resume.

Refs: #48666
  • Loading branch information
ronag committed Jul 7, 2023
1 parent ab1b4df commit 997d680
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 10 deletions.
28 changes: 18 additions & 10 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -747,17 +747,23 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// to get stuck in a permanently paused state if that write
// also returned false.
// => Check whether `dest` is still a piping destination.
if (!cleanedUp) {
if (state.pipes.length === 1 && state.pipes[0] === dest) {
debug('false write response, pause', 0);
state.awaitDrainWriters = dest;
state.multiAwaitDrain = false;
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
debug('false write response, pause', state.awaitDrainWriters.size);
state.awaitDrainWriters.add(dest);
}
src.pause();
if (cleanedUp) {
return
}

if (state.pipes.length === 1 && state.pipes[0] === dest) {
debug('false write response, pause', 0);
state.awaitDrainWriters = dest;
state.multiAwaitDrain = false;
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
debug('false write response, pause', state.awaitDrainWriters.size);
state.awaitDrainWriters.add(dest);
}

src.pause();
}

function registerDrain () {
if (!ondrain) {
// When the dest drains, it reduces the awaitDrain counter
// on the source. This would be more elegant with a .once()
Expand All @@ -775,6 +781,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
debug('dest.write', ret);
if (ret === false) {
pause();
registerDrain();
}
}

Expand Down Expand Up @@ -825,6 +832,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
if (state.flowing) {
pause();
}
registerDrain();
} else if (!state.flowing) {
debug('pipe resume');
src.resume();
Expand Down
27 changes: 27 additions & 0 deletions test/parallel/test-stream-pipe-deadlock.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"use strict";

const common = require('../common');
const { Readable, Writable } = require('stream');

// https://github.com/nodejs/node/issues/48666
(async () => {
// Prepare src that is internally ended, with buffered data pending
const src = new Readable({ read() {} });
src.push(Buffer.alloc(100));
src.push(null);
src.pause();

// Give it time to settle
await new Promise((resolve) => setImmediate(resolve));

const dst = new Writable({
highWaterMark: 1000,
write(buf, enc, cb) {
process.nextTick(cb);
}
});

dst.write(Buffer.alloc(1000)); // Fill write buffer
dst.on('finish', common.mustCall());
src.pipe(dst);
})();

0 comments on commit 997d680

Please sign in to comment.