From c07abd7fe802bf2b8b68bcd79feddd99994de5ab Mon Sep 17 00:00:00 2001 From: Denys Otrishko Date: Thu, 9 Aug 2018 12:00:22 +0300 Subject: [PATCH] stream: postpone setting flowing for on('readable') Now state.flowing will be set only after all of the 'readable' listeners are gone and if we have at least one 'data' listener. * on('data') will not flow (flowing === null and not false) if there are 'readable' listeners * pipe() will work regardless of 'readable' listeners * isPause reports only user .pause call (before setting 'data' listener when there is already 'readable' listener also set flowing to false) * resume always sets stream to flowing state --- lib/_stream_readable.js | 17 ++++---- .../test-stream-readable-and-data-pause.js | 26 ++++++++++++ .../parallel/test-stream-readable-and-data.js | 7 ++-- test/parallel/test-stream-readable-pipe.js | 34 ++++++++++++++++ .../test-stream-readable-remove-pipe.js | 40 +++++++++++++++++++ 5 files changed, 111 insertions(+), 13 deletions(-) create mode 100644 test/parallel/test-stream-readable-and-data-pause.js create mode 100644 test/parallel/test-stream-readable-pipe.js create mode 100644 test/parallel/test-stream-readable-remove-pipe.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 1834214e01b67c..86e0d8d222ad1a 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -801,8 +801,8 @@ Readable.prototype.on = function(ev, fn) { // a few lines down. This is needed to support once('readable'). state.readableListening = this.listenerCount('readable') > 0; - // Try start flowing on next tick if stream isn't explicitly paused - if (state.flowing !== false) + // Try start flowing on next tick for data if stream isn't explicitly paused + if (!state.readableListening && state.flowing !== false) this.resume(); } else if (ev === 'readable') { if (!state.endEmitted && !state.readableListening) { @@ -855,13 +855,14 @@ Readable.prototype.removeAllListeners = function(ev) { function updateReadableListening(self) { const state = self._readableState; state.readableListening = self.listenerCount('readable') > 0; + // try to start flowing for 'data' listeners // (if pipesCount is not 0 then we have already 'flowed') - if (!state.readableListening && - self.listenerCount('data') > 0 && - state.pipesCount === 0) { + if (self.listenerCount('data') > 0 && + !self.isPaused() && + state.pipesCount === 0 && + !state.readableListening) self.resume(); - } } function nReadingNextTick(self) { @@ -875,9 +876,7 @@ Readable.prototype.resume = function() { var state = this._readableState; if (!state.flowing) { debug('resume'); - // we flow only if there is no one listening - // for readable - state.flowing = !state.readableListening; + state.flowing = true; resume(this, state); } return this; diff --git a/test/parallel/test-stream-readable-and-data-pause.js b/test/parallel/test-stream-readable-and-data-pause.js new file mode 100644 index 00000000000000..330fca6abe302d --- /dev/null +++ b/test/parallel/test-stream-readable-and-data-pause.js @@ -0,0 +1,26 @@ +'use strict'; + +const common = require('../common'); + +// This test ensures that if we have both 'readable' and 'data' +// listeners on Readable instance once all of the 'readable' listeners +// are gone and there are still 'data' listeners stream will *not* +// try to flow if it was explicitly paused. + +const { Readable } = require('stream'); + +const r = new Readable({ + read: () => {}, +}); + +const data = ['foo', 'bar', 'baz']; + +r.pause(); + +r.on('data', common.mustNotCall()); +r.on('end', common.mustNotCall()); +r.once('readable', common.mustCall()); + +for (const d of data) + r.push(d); +r.push(null); diff --git a/test/parallel/test-stream-readable-and-data.js b/test/parallel/test-stream-readable-and-data.js index 286094667ae642..d0cc46399155b5 100644 --- a/test/parallel/test-stream-readable-and-data.js +++ b/test/parallel/test-stream-readable-and-data.js @@ -3,7 +3,7 @@ const common = require('../common'); // This test ensures that if we have both 'readable' and 'data' -// listeners on Readable instance once the 'readable' listeners +// listeners on Readable instance once all of the 'readable' listeners // are gone and there are still 'data' listeners stream will try // to flow to satisfy the 'data' listeners. @@ -23,7 +23,6 @@ r.once('end', common.mustCall(() => { assert.strictEqual(receivedData, data.join('')); })); -r.push(data[0]); -r.push(data[1]); -r.push(data[2]); +for (const d of data) + r.push(d); r.push(null); diff --git a/test/parallel/test-stream-readable-pipe.js b/test/parallel/test-stream-readable-pipe.js new file mode 100644 index 00000000000000..2e238fbc34507f --- /dev/null +++ b/test/parallel/test-stream-readable-pipe.js @@ -0,0 +1,34 @@ +'use strict'; + +const common = require('../common'); + +// This test ensures that if have 'readable' listener +// on Readable instance it will not disrupt the pipe. + +const assert = require('assert'); +const { Readable, Writable } = require('stream'); + +let receivedData = ''; +const w = new Writable({ + write: (chunk, env, callback) => { + receivedData += chunk; + callback(); + }, +}); + +const data = ['foo', 'bar', 'baz']; +const r = new Readable({ + read: () => {}, +}); + +r.on('readable', common.mustCall()); + +r.pipe(w); +r.push(data[0]); +r.push(data[1]); +r.push(data[2]); +r.push(null); + +w.on('finish', common.mustCall(() => { + assert.strictEqual(receivedData, data.join('')); +})); diff --git a/test/parallel/test-stream-readable-remove-pipe.js b/test/parallel/test-stream-readable-remove-pipe.js new file mode 100644 index 00000000000000..7582aa8acfc01e --- /dev/null +++ b/test/parallel/test-stream-readable-remove-pipe.js @@ -0,0 +1,40 @@ +'use strict'; + +const common = require('../common'); + +// This test ensures that if we remove last 'readable' listener +// on Readable instance that is piped it will not disrupt the pipe. + +const assert = require('assert'); +const { Readable, Writable } = require('stream'); + +let receivedData = ''; +const w = new Writable({ + write: (chunk, env, callback) => { + receivedData += chunk; + callback(); + }, +}); + +const data = ['foo', 'bar', 'baz']; +const r = new Readable({ + read: () => {}, +}); + +const listener = common.mustNotCall(); +r.on('readable', listener); + +r.pipe(w); +r.push(data[0]); + +r.removeListener('readable', listener); + +process.nextTick(() => { + r.push(data[1]); + r.push(data[2]); + r.push(null); +}); + +w.on('finish', common.mustCall(() => { + assert.strictEqual(receivedData, data.join('')); +}));