Skip to content

Commit

Permalink
stream: postpone setting flowing for on('readable')
Browse files Browse the repository at this point in the history
Now state.flowing will be set only after all of the 'readable'
listeners are gone and if we have at least one 'data' listener.

* on('data') will not flow (flowing === null and not false) if there
  are 'readable' listeners
* pipe() will work regardless of 'readable' listeners
* isPause reports only user .pause call (before setting 'data' listener
  when there is already 'readable' listener also set flowing to false)
* resume always sets stream to flowing state
  • Loading branch information
lundibundi committed Aug 9, 2018
1 parent 6fa9528 commit c07abd7
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 13 deletions.
17 changes: 8 additions & 9 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -801,8 +801,8 @@ Readable.prototype.on = function(ev, fn) {
// a few lines down. This is needed to support once('readable').
state.readableListening = this.listenerCount('readable') > 0;

// Try start flowing on next tick if stream isn't explicitly paused
if (state.flowing !== false)
// Try start flowing on next tick for data if stream isn't explicitly paused
if (!state.readableListening && state.flowing !== false)
this.resume();
} else if (ev === 'readable') {
if (!state.endEmitted && !state.readableListening) {
Expand Down Expand Up @@ -855,13 +855,14 @@ Readable.prototype.removeAllListeners = function(ev) {
function updateReadableListening(self) {
const state = self._readableState;
state.readableListening = self.listenerCount('readable') > 0;

// try to start flowing for 'data' listeners
// (if pipesCount is not 0 then we have already 'flowed')
if (!state.readableListening &&
self.listenerCount('data') > 0 &&
state.pipesCount === 0) {
if (self.listenerCount('data') > 0 &&
!self.isPaused() &&
state.pipesCount === 0 &&
!state.readableListening)
self.resume();
}
}

function nReadingNextTick(self) {
Expand All @@ -875,9 +876,7 @@ Readable.prototype.resume = function() {
var state = this._readableState;
if (!state.flowing) {
debug('resume');
// we flow only if there is no one listening
// for readable
state.flowing = !state.readableListening;
state.flowing = true;
resume(this, state);
}
return this;
Expand Down
26 changes: 26 additions & 0 deletions test/parallel/test-stream-readable-and-data-pause.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
'use strict';

const common = require('../common');

// This test ensures that if we have both 'readable' and 'data'
// listeners on Readable instance once all of the 'readable' listeners
// are gone and there are still 'data' listeners stream will *not*
// try to flow if it was explicitly paused.

const { Readable } = require('stream');

const r = new Readable({
read: () => {},
});

const data = ['foo', 'bar', 'baz'];

r.pause();

r.on('data', common.mustNotCall());
r.on('end', common.mustNotCall());
r.once('readable', common.mustCall());

for (const d of data)
r.push(d);
r.push(null);
7 changes: 3 additions & 4 deletions test/parallel/test-stream-readable-and-data.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const common = require('../common');

// This test ensures that if we have both 'readable' and 'data'
// listeners on Readable instance once the 'readable' listeners
// listeners on Readable instance once all of the 'readable' listeners
// are gone and there are still 'data' listeners stream will try
// to flow to satisfy the 'data' listeners.

Expand All @@ -23,7 +23,6 @@ r.once('end', common.mustCall(() => {
assert.strictEqual(receivedData, data.join(''));
}));

r.push(data[0]);
r.push(data[1]);
r.push(data[2]);
for (const d of data)
r.push(d);
r.push(null);
34 changes: 34 additions & 0 deletions test/parallel/test-stream-readable-pipe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
'use strict';

const common = require('../common');

// This test ensures that if have 'readable' listener
// on Readable instance it will not disrupt the pipe.

const assert = require('assert');
const { Readable, Writable } = require('stream');

let receivedData = '';
const w = new Writable({
write: (chunk, env, callback) => {
receivedData += chunk;
callback();
},
});

const data = ['foo', 'bar', 'baz'];
const r = new Readable({
read: () => {},
});

r.on('readable', common.mustCall());

r.pipe(w);
r.push(data[0]);
r.push(data[1]);
r.push(data[2]);
r.push(null);

w.on('finish', common.mustCall(() => {
assert.strictEqual(receivedData, data.join(''));
}));
40 changes: 40 additions & 0 deletions test/parallel/test-stream-readable-remove-pipe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'use strict';

const common = require('../common');

// This test ensures that if we remove last 'readable' listener
// on Readable instance that is piped it will not disrupt the pipe.

const assert = require('assert');
const { Readable, Writable } = require('stream');

let receivedData = '';
const w = new Writable({
write: (chunk, env, callback) => {
receivedData += chunk;
callback();
},
});

const data = ['foo', 'bar', 'baz'];
const r = new Readable({
read: () => {},
});

const listener = common.mustNotCall();
r.on('readable', listener);

r.pipe(w);
r.push(data[0]);

r.removeListener('readable', listener);

process.nextTick(() => {
r.push(data[1]);
r.push(data[2]);
r.push(null);
});

w.on('finish', common.mustCall(() => {
assert.strictEqual(receivedData, data.join(''));
}));

0 comments on commit c07abd7

Please sign in to comment.