Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

doc: fix nits in stream.md #28591

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 57 additions & 55 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,20 +119,20 @@ that implements an HTTP server:
const http = require('http');

const server = http.createServer((req, res) => {
// `req` is an http.IncomingMessage, which is a Readable Stream
// `res` is an http.ServerResponse, which is a Writable Stream
// `req` is an http.IncomingMessage, which is a Readable Stream.
// `res` is an http.ServerResponse, which is a Writable Stream.

let body = '';
// Get the data as utf8 strings.
// If an encoding is not set, Buffer objects will be received.
req.setEncoding('utf8');

// Readable streams emit 'data' events once a listener is added
// Readable streams emit 'data' events once a listener is added.
req.on('data', (chunk) => {
body += chunk;
});

// The 'end' event indicates that the entire body has been received
// The 'end' event indicates that the entire body has been received.
req.on('end', () => {
try {
const data = JSON.parse(body);
Expand Down Expand Up @@ -254,7 +254,7 @@ function writeOneMillionTimes(writer, data, encoding, callback) {
do {
i--;
if (i === 0) {
// last time!
// Last time!
writer.write(data, encoding, callback);
} else {
// See if we should continue, or wait.
Expand All @@ -263,8 +263,8 @@ function writeOneMillionTimes(writer, data, encoding, callback) {
}
} while (i > 0 && ok);
if (i > 0) {
// had to stop early!
// write some more once it drains
// Had to stop early!
// Write some more once it drains.
writer.once('drain', write);
}
}
Expand Down Expand Up @@ -414,7 +414,7 @@ Calling the [`stream.write()`][stream-write] method after calling
[`stream.end()`][stream-end] will raise an error.

```js
// Write 'hello, ' and then end with 'world!'
// Write 'hello, ' and then end with 'world!'.
const fs = require('fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
Expand Down Expand Up @@ -484,6 +484,15 @@ added: v11.4.0

Is `true` if it is safe to call [`writable.write()`][stream-write].

##### writable.writableFinished
<!-- YAML
added: v12.6.0
-->

* {boolean}

Is `true` if after the [`'finish'`][] event has been emitted.

##### writable.writableHighWaterMark
<!-- YAML
added: v9.3.0
Expand All @@ -503,16 +512,6 @@ This property contains the number of bytes (or objects) in the queue
ready to be written. The value provides introspection data regarding
the status of the `highWaterMark`.

##### writable.writableFinished
<!-- YAML
added: v12.6.0
-->

* {boolean}

Is `true` if all data has been flushed to the underlying system. After
the [`'finish'`][] event has been emitted.

##### writable.writableObjectMode
<!-- YAML
added: v12.3.0
Expand Down Expand Up @@ -698,11 +697,11 @@ const writable = new Writable();

pass.pipe(writable);
pass.unpipe(writable);
// readableFlowing is now false
// readableFlowing is now false.

pass.on('data', (chunk) => { console.log(chunk.toString()); });
pass.write('ok'); // Will not emit 'data'
pass.resume(); // Must be called to make stream emit 'data'
pass.write('ok'); // Will not emit 'data'.
pass.resume(); // Must be called to make stream emit 'data'.
```

While `readable.readableFlowing` is `false`, data may be accumulating
Expand Down Expand Up @@ -845,7 +844,7 @@ cause some amount of data to be read into an internal buffer.
```javascript
const readable = getReadableStreamSomehow();
readable.on('readable', function() {
// There is some data to read now
// There is some data to read now.
let data;

while (data = this.read()) {
Expand Down Expand Up @@ -990,7 +989,7 @@ named `file.txt`:
const fs = require('fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt'
// All the data from readable goes into 'file.txt'.
readable.pipe(writable);
```
It is possible to attach multiple `Writable` streams to a single `Readable`
Expand Down Expand Up @@ -1065,7 +1064,7 @@ readable.on('readable', () => {

The `while` loop is necessary when processing data with
`readable.read()`. Only after `readable.read()` returns `null`,
[`'readable'`]() will be emitted.
[`'readable'`][] will be emitted.

A `Readable` stream in object mode will always return a single item from
a call to [`readable.read(size)`][stream-read], regardless of the value of the
Expand Down Expand Up @@ -1196,7 +1195,7 @@ const fs = require('fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second
// but only for the first second.
readable.pipe(writable);
setTimeout(() => {
console.log('Stop writing to file.txt.');
Expand Down Expand Up @@ -1235,9 +1234,9 @@ use of a [`Transform`][] stream instead. See the [API for Stream Implementers][]
section for more information.

```js
// Pull off a header delimited by \n\n
// use unshift() if we get too much
// Call the callback with (error, header, stream)
// Pull off a header delimited by \n\n.
// Use unshift() if we get too much.
// Call the callback with (error, header, stream).
const { StringDecoder } = require('string_decoder');
function parseHeader(stream, callback) {
stream.on('error', callback);
Expand All @@ -1249,13 +1248,13 @@ function parseHeader(stream, callback) {
while (null !== (chunk = stream.read())) {
const str = decoder.write(chunk);
if (str.match(/\n\n/)) {
// Found the header boundary
// Found the header boundary.
const split = str.split(/\n\n/);
header += split.shift();
const remaining = split.join('\n\n');
const buf = Buffer.from(remaining, 'utf8');
stream.removeListener('error', callback);
// Remove the 'readable' listener before unshifting
// Remove the 'readable' listener before unshifting.
stream.removeListener('readable', onReadable);
if (buf.length)
stream.unshift(buf);
Expand Down Expand Up @@ -1327,13 +1326,13 @@ const fs = require('fs');
async function print(readable) {
readable.setEncoding('utf8');
let data = '';
for await (const k of readable) {
data += k;
for await (const chunk of readable) {
data += chunk;
}
console.log(data);
}

print(fs.createReadStream('file')).catch(console.log);
print(fs.createReadStream('file')).catch(console.error);
```

If the loop terminates with a `break` or a `throw`, the stream will be
Expand Down Expand Up @@ -1429,7 +1428,7 @@ finished(rs, (err) => {
}
});

rs.resume(); // drain the stream
rs.resume(); // Drain the stream.
```

Especially useful in error handling scenarios where a stream is destroyed
Expand All @@ -1449,7 +1448,7 @@ async function run() {
}

run().catch(console.error);
rs.resume(); // drain the stream
rs.resume(); // Drain the stream.
```

### stream.pipeline(...streams, callback)
Expand Down Expand Up @@ -1512,6 +1511,7 @@ run().catch(console.error);
* `options` {Object} Options provided to `new stream.Readable([options])`.
By default, `Readable.from()` will set `options.objectMode` to `true`, unless
this is explicitly opted out by setting `options.objectMode` to `false`.
* Returns: {stream.Readable}

A utility method for creating Readable Streams out of iterators.

Expand Down Expand Up @@ -1559,10 +1559,10 @@ on the type of stream being created, as detailed in the chart below:

| Use-case | Class | Method(s) to implement |
| -------- | ----- | ---------------------- |
| Reading only | [`Readable`] | <code>[_read][stream-_read]</code> |
| Writing only | [`Writable`] | <code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code>, <code>[_final][stream-_final]</code> |
| Reading and writing | [`Duplex`] | <code>[_read][stream-_read]</code>, <code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code>, <code>[_final][stream-_final]</code> |
| Operate on written data, then read the result | [`Transform`] | <code>[_transform][stream-_transform]</code>, <code>[_flush][stream-_flush]</code>, <code>[_final][stream-_final]</code> |
| Reading only | [`Readable`] | <code>[_read()][stream-_read]</code> |
| Writing only | [`Writable`] | <code>[_write()][stream-_write]</code>, <code>[_writev()][stream-_writev]</code>, <code>[_final()][stream-_final]</code> |
| Reading and writing | [`Duplex`] | <code>[_read()][stream-_read]</code>, <code>[_write()][stream-_write]</code>, <code>[_writev()][stream-_writev]</code>, <code>[_final()][stream-_final]</code> |
| Operate on written data, then read the result | [`Transform`] | <code>[_transform()][stream-_transform]</code>, <code>[_flush()][stream-_flush]</code>, <code>[_final()][stream-_final]</code> |

The implementation code for a stream should *never* call the "public" methods
of a stream that are intended for use by consumers (as described in the
Expand Down Expand Up @@ -1647,7 +1647,7 @@ const { Writable } = require('stream');

class MyWritable extends Writable {
constructor(options) {
// Calls the stream.Writable() constructor
// Calls the stream.Writable() constructor.
super(options);
// ...
}
Expand Down Expand Up @@ -1890,6 +1890,8 @@ changes:
* `objectMode` {boolean} Whether this stream should behave
as a stream of objects. Meaning that [`stream.read(n)`][stream-read] returns
a single value instead of a `Buffer` of size `n`. **Default:** `false`.
* `emitClose` {boolean} Whether or not the stream should emit `'close'`
after it has been destroyed. **Default:** `true`.
* `read` {Function} Implementation for the [`stream._read()`][stream-_read]
method.
* `destroy` {Function} Implementation for the
Expand All @@ -1903,7 +1905,7 @@ const { Readable } = require('stream');

class MyReadable extends Readable {
constructor(options) {
// Calls the stream.Readable(options) constructor
// Calls the stream.Readable(options) constructor.
super(options);
// ...
}
Expand Down Expand Up @@ -2030,18 +2032,18 @@ class SourceWrapper extends Readable {

// Every time there's data, push it into the internal buffer.
this._source.ondata = (chunk) => {
// If push() returns false, then stop reading from source
// If push() returns false, then stop reading from source.
if (!this.push(chunk))
this._source.readStop();
};

// When the source ends, push the EOF-signaling `null` chunk
// When the source ends, push the EOF-signaling `null` chunk.
this._source.onend = () => {
this.push(null);
};
}
// _read will be called when the stream wants to pull more data in
// the advisory size argument is ignored in this case.
// _read() will be called when the stream wants to pull more data in.
// The advisory size argument is ignored in this case.
_read(size) {
this._source.readStart();
}
Expand Down Expand Up @@ -2074,7 +2076,7 @@ const myReadable = new Readable({
process.nextTick(() => this.emit('error', err));
return;
}
// do some work
// Do some work.
}
});
```
Expand Down Expand Up @@ -2212,7 +2214,7 @@ class MyDuplex extends Duplex {
}

_write(chunk, encoding, callback) {
// The underlying source only deals with strings
// The underlying source only deals with strings.
if (Buffer.isBuffer(chunk))
chunk = chunk.toString();
this[kSource].writeSomeData(chunk);
Expand Down Expand Up @@ -2245,12 +2247,12 @@ the `Readable` side.
```js
const { Transform } = require('stream');

// All Transform streams are also Duplex Streams
// All Transform streams are also Duplex Streams.
const myTransform = new Transform({
writableObjectMode: true,

transform(chunk, encoding, callback) {
// Coerce the chunk to a number if necessary
// Coerce the chunk to a number if necessary.
chunk |= 0;

// Transform the chunk into something else.
Expand Down Expand Up @@ -2389,7 +2391,7 @@ user programs.
[`stream.write()`][stream-write].
* `encoding` {string} If the chunk is a string, then this is the
encoding type. If chunk is a buffer, then this is the special
value - 'buffer', ignore it in this case.
value - `'buffer'`, ignore it in this case.
* `callback` {Function} A callback function (optionally with an error
argument and data) to be called after the supplied `chunk` has been
processed.
Expand Down Expand Up @@ -2497,12 +2499,12 @@ const writeable = fs.createWriteStream('./file');

(async function() {
for await (const chunk of iterator) {
// Handle backpressure on write
// Handle backpressure on write().
if (!writeable.write(chunk))
await once(writeable, 'drain');
}
writeable.end();
// Ensure completion without errors
// Ensure completion without errors.
await once(writeable, 'finish');
})();
```
Expand All @@ -2521,7 +2523,7 @@ const writeable = fs.createWriteStream('./file');
(async function() {
const readable = Readable.from(iterator);
readable.pipe(writeable);
// Ensure completion without errors
// Ensure completion without errors.
await once(writeable, 'finish');
})();
```
Expand Down Expand Up @@ -2564,7 +2566,7 @@ For example, consider the following code:
// WARNING! BROKEN!
net.createServer((socket) => {

// We add an 'end' listener, but never consume the data
// We add an 'end' listener, but never consume the data.
socket.on('end', () => {
// It will never get here.
socket.end('The message was received but was not processed.\n');
Expand All @@ -2580,7 +2582,7 @@ The workaround in this situation is to call the
[`stream.resume()`][stream-resume] method to begin the flow of data:

```js
// Workaround
// Workaround.
net.createServer((socket) => {
socket.on('end', () => {
socket.end('The message was received but was not processed.\n');
Expand Down