Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: flow for 'data' listeners upon removal of last 'readable' listener #21696

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,12 @@ changes:
- version: v10.0.0
pr-url: https://github.com/nodejs/node/pull/18994
description: Using `'readable'` requires calling `.read()`.
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/21696
description: >
When using both `'readable'` and [`'data'`][] the stream will try to
flow once last `'readable'` was removed and there are still [`'data'`][]
listeners.
-->

The `'readable'` event is emitted when there is data available to be read from
Expand Down Expand Up @@ -825,7 +831,33 @@ result in increased throughput.

If both `'readable'` and [`'data'`][] are used at the same time, `'readable'`
takes precedence in controlling the flow, i.e. `'data'` will be emitted
only when [`stream.read()`][stream-read] is called.
only when [`stream.read()`][stream-read] is called. But when last `'readable'`
was removed the stream will try to [`stream.resume()`][stream-resume] to fulfill
`'data'` listeners.

```js
const fs = require('fs');
const rr = fs.createReadStream('bar.txt', { encoding: 'utf8' });
rr.once('readable', () => {
console.log('readable');
});
rr.on('data', (chunk) => {
console.log(chunk);
});
rr.on('end', () => {
console.log('end');
});
```

The output of running this script is:

```txt
$ node test.js
readable
abc

end
```

##### readable.destroy([error])
<!-- YAML
Expand Down
19 changes: 13 additions & 6 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -801,8 +801,8 @@ Readable.prototype.on = function(ev, fn) {
// a few lines down. This is needed to support once('readable').
state.readableListening = this.listenerCount('readable') > 0;

// Try start flowing on next tick if stream isn't explicitly paused
if (state.flowing !== false)
// Try start flowing on next tick for data if stream isn't explicitly paused
if (!state.readableListening && state.flowing !== false)
this.resume();
} else if (ev === 'readable') {
if (!state.endEmitted && !state.readableListening) {
Expand Down Expand Up @@ -853,7 +853,16 @@ Readable.prototype.removeAllListeners = function(ev) {
};

function updateReadableListening(self) {
self._readableState.readableListening = self.listenerCount('readable') > 0;
const state = self._readableState;
state.readableListening = self.listenerCount('readable') > 0;

// try to start flowing for 'data' listeners
// (if pipesCount is not 0 then we have already 'flowed')
if (self.listenerCount('data') > 0 &&
!self.isPaused() &&
state.pipesCount === 0 &&
!state.readableListening)
self.resume();
}

function nReadingNextTick(self) {
Expand All @@ -867,9 +876,7 @@ Readable.prototype.resume = function() {
var state = this._readableState;
if (!state.flowing) {
debug('resume');
// we flow only if there is no one listening
// for readable
state.flowing = !state.readableListening;
state.flowing = true;
resume(this, state);
}
return this;
Expand Down
26 changes: 26 additions & 0 deletions test/parallel/test-stream-readable-and-data-pause.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
'use strict';

const common = require('../common');

// This test ensures that if we have both 'readable' and 'data'
// listeners on Readable instance once all of the 'readable' listeners
// are gone and there are still 'data' listeners stream will *not*
// try to flow if it was explicitly paused.

const { Readable } = require('stream');

const r = new Readable({
read: () => {},
});

const data = ['foo', 'bar', 'baz'];

r.pause();

r.on('data', common.mustNotCall());
r.on('end', common.mustNotCall());
r.once('readable', common.mustCall());

for (const d of data)
r.push(d);
r.push(null);
28 changes: 28 additions & 0 deletions test/parallel/test-stream-readable-and-data.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
'use strict';

const common = require('../common');

// This test ensures that if we have both 'readable' and 'data'
// listeners on Readable instance once all of the 'readable' listeners
// are gone and there are still 'data' listeners stream will try
// to flow to satisfy the 'data' listeners.

const assert = require('assert');
const { Readable } = require('stream');

const r = new Readable({
read: () => {},
});

const data = ['foo', 'bar', 'baz'];

let receivedData = '';
r.once('readable', common.mustCall());
r.on('data', (chunk) => receivedData += chunk);
r.once('end', common.mustCall(() => {
assert.strictEqual(receivedData, data.join(''));
}));

for (const d of data)
r.push(d);
r.push(null);
34 changes: 34 additions & 0 deletions test/parallel/test-stream-readable-pipe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
'use strict';

const common = require('../common');

// This test ensures that if have 'readable' listener
// on Readable instance it will not disrupt the pipe.

const assert = require('assert');
const { Readable, Writable } = require('stream');

let receivedData = '';
const w = new Writable({
write: (chunk, env, callback) => {
receivedData += chunk;
callback();
},
});

const data = ['foo', 'bar', 'baz'];
const r = new Readable({
read: () => {},
});

r.on('readable', common.mustCall());

r.pipe(w);
r.push(data[0]);
r.push(data[1]);
r.push(data[2]);
r.push(null);

w.on('finish', common.mustCall(() => {
assert.strictEqual(receivedData, data.join(''));
}));
2 changes: 1 addition & 1 deletion test/parallel/test-stream-readable-reading-readingMore.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ const Readable = require('stream').Readable;
readable.push('pushed');

// we are still not flowing, we will be resuming in the next tick
assert.strictEqual(state.flowing, false);
assert.strictEqual(state.flowing, null);

// wait for nextTick, so the readableListener flag resets
process.nextTick(function() {
Expand Down
40 changes: 40 additions & 0 deletions test/parallel/test-stream-readable-remove-pipe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'use strict';

const common = require('../common');

// This test ensures that if we remove last 'readable' listener
// on Readable instance that is piped it will not disrupt the pipe.

const assert = require('assert');
const { Readable, Writable } = require('stream');

let receivedData = '';
const w = new Writable({
write: (chunk, env, callback) => {
receivedData += chunk;
callback();
},
});

const data = ['foo', 'bar', 'baz'];
const r = new Readable({
read: () => {},
});

const listener = common.mustNotCall();
r.on('readable', listener);

r.pipe(w);
r.push(data[0]);

r.removeListener('readable', listener);

process.nextTick(() => {
r.push(data[1]);
r.push(data[2]);
r.push(null);
});

w.on('finish', common.mustCall(() => {
assert.strictEqual(receivedData, data.join(''));
}));