Skip to content

Commit

Permalink
stream: flow for 'data' listeners upon 'readable' remove
Browse files Browse the repository at this point in the history
When there is at least one 'data' listener try to flow when
last 'readable' listener gets removed and the stream is not piped.

Currently if we have both 'readable' and 'data' listeners set only
'readable' listener will get called and stream will be stalled without
'data' and 'end' events if 'readable' listener neither read()'s nor
resume()'s the stream.

Fixes: #21398
  • Loading branch information
lundibundi committed Aug 9, 2018
1 parent 566d11a commit 6fa9528
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 2 deletions.
34 changes: 33 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,12 @@ changes:
- version: v10.0.0
pr-url: https://github.com/nodejs/node/pull/18994
description: Using `'readable'` requires calling `.read()`.
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/21696
description: >
When using both `'readable'` and [`'data'`][] the stream will try to
flow once last `'readable'` was removed and there are still [`'data'`][]
listeners.
-->

The `'readable'` event is emitted when there is data available to be read from
Expand Down Expand Up @@ -825,7 +831,33 @@ result in increased throughput.

If both `'readable'` and [`'data'`][] are used at the same time, `'readable'`
takes precedence in controlling the flow, i.e. `'data'` will be emitted
only when [`stream.read()`][stream-read] is called.
only when [`stream.read()`][stream-read] is called. But when last `'readable'`
was removed the stream will try to [`stream.resume()`][stream-resume] to fulfill
`'data'` listeners.

```js
const fs = require('fs');
const rr = fs.createReadStream('bar.txt', { encoding: 'utf8' });
rr.once('readable', () => {
console.log('readable');
});
rr.on('data', (chunk) => {
console.log(chunk);
});
rr.on('end', () => {
console.log('end');
});
```

The output of running this script is:

```txt
$ node test.js
readable
abc
end
```

##### readable.destroy([error])
<!-- YAML
Expand Down
10 changes: 9 additions & 1 deletion lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,15 @@ Readable.prototype.removeAllListeners = function(ev) {
};

function updateReadableListening(self) {
self._readableState.readableListening = self.listenerCount('readable') > 0;
const state = self._readableState;
state.readableListening = self.listenerCount('readable') > 0;
// try to start flowing for 'data' listeners
// (if pipesCount is not 0 then we have already 'flowed')
if (!state.readableListening &&
self.listenerCount('data') > 0 &&
state.pipesCount === 0) {
self.resume();
}
}

function nReadingNextTick(self) {
Expand Down
29 changes: 29 additions & 0 deletions test/parallel/test-stream-readable-and-data.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
'use strict';

const common = require('../common');

// This test ensures that if we have both 'readable' and 'data'
// listeners on Readable instance once the 'readable' listeners
// are gone and there are still 'data' listeners stream will try
// to flow to satisfy the 'data' listeners.

const assert = require('assert');
const { Readable } = require('stream');

const r = new Readable({
read: () => {},
});

const data = ['foo', 'bar', 'baz'];

let receivedData = '';
r.once('readable', common.mustCall());
r.on('data', (chunk) => receivedData += chunk);
r.once('end', common.mustCall(() => {
assert.strictEqual(receivedData, data.join(''));
}));

r.push(data[0]);
r.push(data[1]);
r.push(data[2]);
r.push(null);

0 comments on commit 6fa9528

Please sign in to comment.