Skip to content

Commit

Permalink
stream: avoid drain for sync streams
Browse files Browse the repository at this point in the history
Previously a sync writable receiving chunks
larger than highwatermark would unecessarily
ping pong needDrain.

PR-URL: #32887
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
ronag authored and BethGriggs committed Apr 27, 2020
1 parent be4703e commit 941cf4a
Showing 8 changed files with 25 additions and 17 deletions.
7 changes: 4 additions & 3 deletions benchmark/streams/writable-manywrites.js
Original file line number Diff line number Diff line change
@@ -7,11 +7,12 @@ const bench = common.createBenchmark(main, {
n: [2e6],
sync: ['yes', 'no'],
writev: ['yes', 'no'],
callback: ['yes', 'no']
callback: ['yes', 'no'],
len: [1024, 32 * 1024]
});

function main({ n, sync, writev, callback }) {
const b = Buffer.allocUnsafe(1024);
function main({ n, sync, writev, callback, len }) {
const b = Buffer.allocUnsafe(len);
const s = new Writable();
sync = sync === 'yes';

11 changes: 6 additions & 5 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
@@ -337,11 +337,6 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {

state.length += len;

const ret = state.length < state.highWaterMark;
// We must ensure that previous needDrain will not be reset to false.
if (!ret)
state.needDrain = true;

if (state.writing || state.corked || state.errored) {
state.buffered.push({ chunk, encoding, callback });
if (state.allBuffers && encoding !== 'buffer') {
@@ -359,6 +354,12 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
state.sync = false;
}

const ret = state.length < state.highWaterMark;

// We must ensure that previous needDrain will not be reset to false.
if (!ret)
state.needDrain = true;

// Return false if errored or destroyed in order to break
// any synchronous while(stream.write(data)) loops.
return ret && !state.errored && !state.destroyed;
6 changes: 5 additions & 1 deletion test/parallel/test-stream-big-packet.js
Original file line number Diff line number Diff line change
@@ -36,7 +36,11 @@ class TestStream extends stream.Transform {
}
}

const s1 = new stream.PassThrough();
const s1 = new stream.Transform({
transform(chunk, encoding, cb) {
process.nextTick(cb, null, chunk);
}
});
const s2 = new stream.PassThrough();
const s3 = new TestStream();
s1.pipe(s3);
2 changes: 1 addition & 1 deletion test/parallel/test-stream-catch-rejections.js
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ const assert = require('assert');
captureRejections: true,
highWaterMark: 1,
write(chunk, enc, cb) {
cb();
process.nextTick(cb);
}
});

Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ const writable = new stream.Writable({
});
}

cb();
process.nextTick(cb);
}, 3)
});

6 changes: 3 additions & 3 deletions test/parallel/test-stream-pipe-await-drain.js
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ reader._read = () => {};

writer1._write = common.mustCall(function(chunk, encoding, cb) {
this.emit('chunk-received');
cb();
process.nextTick(cb);
}, 1);

writer1.once('chunk-received', () => {
@@ -42,7 +42,7 @@ writer2._write = common.mustCall((chunk, encoding, cb) => {
reader._readableState.awaitDrainWriters.size,
1,
'awaitDrain should be 1 after first push, actual is ' +
reader._readableState.awaitDrainWriters
reader._readableState.awaitDrainWriters.size
);
// Not calling cb here to "simulate" slow stream.
// This should be called exactly once, since the first .write() call
@@ -54,7 +54,7 @@ writer3._write = common.mustCall((chunk, encoding, cb) => {
reader._readableState.awaitDrainWriters.size,
2,
'awaitDrain should be 2 after second push, actual is ' +
reader._readableState.awaitDrainWriters
reader._readableState.awaitDrainWriters.size
);
// Not calling cb here to "simulate" slow stream.
// This should be called exactly once, since the first .write() call
6 changes: 4 additions & 2 deletions test/parallel/test-stream-writable-needdrain-state.js
Original file line number Diff line number Diff line change
@@ -10,8 +10,10 @@ const transform = new stream.Transform({
});

function _transform(chunk, encoding, cb) {
assert.strictEqual(transform._writableState.needDrain, true);
cb();
process.nextTick(() => {
assert.strictEqual(transform._writableState.needDrain, true);
cb();
});
}

assert.strictEqual(transform._writableState.needDrain, false);
2 changes: 1 addition & 1 deletion test/parallel/test-stream2-finish-pipe.js
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ r._read = function(size) {

const w = new stream.Writable();
w._write = function(data, encoding, cb) {
cb(null);
process.nextTick(cb, null);
};

r.pipe(w);

0 comments on commit 941cf4a

Please sign in to comment.