Skip to content

Commit

Permalink
stream: convert string to Buffer when calling unshift(<string>)
Browse files Browse the repository at this point in the history
`readable.unshift` can take a string as an argument, but that
string wasn't being converted to a Buffer, which caused a
<TypeError: Argument must be a buffer> in some cases. Also if a
string was passed, that string was coerced to utf8 encoding.

A second optional argument `encoding` was added to `unshift` to
fix the encoding issue.

Fixes: #27192
PR-URL: #27194
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
  • Loading branch information
marcosc90 authored and targos committed Jun 3, 2019
1 parent 887dd60 commit a47ee80
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 16 deletions.
2 changes: 1 addition & 1 deletion doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -2349,7 +2349,7 @@ such as `process.stdout.on('data')`.
[`sign.sign()`]: crypto.html#crypto_sign_sign_privatekey_outputencoding
[`stream.pipe()`]: stream.html#stream_readable_pipe_destination_options
[`stream.push()`]: stream.html#stream_readable_push_chunk_encoding
[`stream.unshift()`]: stream.html#stream_readable_unshift_chunk
[`stream.unshift()`]: stream.html#stream_readable_unshift_chunk_encoding
[`stream.write()`]: stream.html#stream_writable_write_chunk_encoding_callback
[`subprocess.kill()`]: child_process.html#child_process_subprocess_kill_signal
[`subprocess.send()`]: child_process.html#child_process_subprocess_send_message_sendhandle_options_callback
Expand Down
4 changes: 3 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1195,7 +1195,7 @@ setTimeout(() => {
}, 1000);
```

##### readable.unshift(chunk)
##### readable.unshift(chunk[, encoding])
<!-- YAML
added: v0.9.11
changes:
Expand All @@ -1208,6 +1208,8 @@ changes:
read queue. For streams not operating in object mode, `chunk` must be a
string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be
any JavaScript value other than `null`.
* `encoding` {string} Encoding of string chunks. Must be a valid
`Buffer` encoding, such as `'utf8'` or `'ascii'`.

The `readable.unshift()` method pushes a chunk of data back into the internal
buffer. This is useful in certain situations where a stream is being consumed by
Expand Down
32 changes: 18 additions & 14 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,28 @@ Readable.prototype._destroy = function(err, cb) {
// similar to how Writable.write() returns true if you should
// write() some more.
Readable.prototype.push = function(chunk, encoding) {
const state = this._readableState;
var skipChunkCheck;
return readableAddChunk(this, chunk, encoding, false);
};

// Unshift should *always* be something directly out of read()
Readable.prototype.unshift = function(chunk, encoding) {
return readableAddChunk(this, chunk, encoding, true);
};

function readableAddChunk(stream, chunk, encoding, addToFront) {
debug('readableAddChunk', chunk);
const state = stream._readableState;

let skipChunkCheck;

if (!state.objectMode) {
if (typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
if (encoding !== state.encoding) {
if (addToFront && state.encoding && state.encoding !== encoding) {
// When unshifting, if state.encoding is set, we have to save
// the string in the BufferList with the state encoding
chunk = Buffer.from(chunk, encoding).toString(state.encoding);
} else if (encoding !== state.encoding) {
chunk = Buffer.from(chunk, encoding);
encoding = '';
}
Expand All @@ -223,17 +238,6 @@ Readable.prototype.push = function(chunk, encoding) {
skipChunkCheck = true;
}

return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);
};

// Unshift should *always* be something directly out of read()
Readable.prototype.unshift = function(chunk) {
return readableAddChunk(this, chunk, null, true, false);
};

function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
debug('readableAddChunk', chunk);
const state = stream._readableState;
if (chunk === null) {
state.reading = false;
onEofChunk(stream, state);
Expand Down
187 changes: 187 additions & 0 deletions test/parallel/test-stream-readable-unshift.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const { Readable } = require('stream');

{
// Check that strings are saved as Buffer
const readable = new Readable({ read() {} });

const string = 'abc';

readable.on('data', common.mustCall((chunk) => {
assert(Buffer.isBuffer(chunk));
assert.strictEqual(chunk.toString('utf8'), string);
}, 1));

readable.unshift(string);

}

{
// Check that data goes at the beginning
const readable = new Readable({ read() {} });
const unshift = 'front';
const push = 'back';

const expected = [unshift, push];
readable.on('data', common.mustCall((chunk) => {
assert.strictEqual(chunk.toString('utf8'), expected.shift());
}, 2));


readable.push(push);
readable.unshift(unshift);
}

{
// Check that buffer is saved with correct encoding
const readable = new Readable({ read() {} });

const encoding = 'base64';
const string = Buffer.from('abc').toString(encoding);

readable.on('data', common.mustCall((chunk) => {
assert.strictEqual(chunk.toString(encoding), string);
}, 1));

readable.unshift(string, encoding);

}

{

const streamEncoding = 'base64';

function checkEncoding(readable) {

// chunk encodings
const encodings = ['utf8', 'binary', 'hex', 'base64'];
const expected = [];

readable.on('data', common.mustCall((chunk) => {
const { encoding, string } = expected.pop();
assert.strictEqual(chunk.toString(encoding), string);
}, encodings.length));

for (const encoding of encodings) {
const string = 'abc';

// If encoding is the same as the state.encoding the string is
// saved as is
const expect = encoding !== streamEncoding ?
Buffer.from(string, encoding).toString(streamEncoding) : string;

expected.push({ encoding, string: expect });

readable.unshift(string, encoding);
}
}

const r1 = new Readable({ read() {} });
r1.setEncoding(streamEncoding);
checkEncoding(r1);

const r2 = new Readable({ read() {}, encoding: streamEncoding });
checkEncoding(r2);

}

{
// Both .push & .unshift should have the same behaviour
// When setting an encoding, each chunk should be emitted with that encoding
const encoding = 'base64';

function checkEncoding(readable) {
const string = 'abc';
readable.on('data', common.mustCall((chunk) => {
assert.strictEqual(chunk, Buffer.from(string).toString(encoding));
}, 2));

readable.push(string);
readable.unshift(string);
}

const r1 = new Readable({ read() {} });
r1.setEncoding(encoding);
checkEncoding(r1);

const r2 = new Readable({ read() {}, encoding });
checkEncoding(r2);

}

{
// Check that error is thrown for invalid chunks

const readable = new Readable({ read() {} });
function checkError(fn) {
common.expectsError(fn, {
code: 'ERR_INVALID_ARG_TYPE',
type: TypeError
});
}

checkError(() => readable.unshift([]));
checkError(() => readable.unshift({}));
checkError(() => readable.unshift(0));

}

{
// Check that ObjectMode works
const readable = new Readable({ objectMode: true, read() {} });

const chunks = ['a', 1, {}, []];

readable.on('data', common.mustCall((chunk) => {
assert.strictEqual(chunk, chunks.pop());
}, chunks.length));

for (const chunk of chunks) {
readable.unshift(chunk);
}
}

{

// Should not throw: https://github.com/nodejs/node/issues/27192
const highWaterMark = 50;
class ArrayReader extends Readable {
constructor(opt) {
super({ highWaterMark });
// The error happened only when pushing above hwm
this.buffer = new Array(highWaterMark * 2).fill(0).map(String);
}
_read(size) {
while (this.buffer.length) {
const chunk = this.buffer.shift();
if (!this.buffer.length) {
this.push(chunk);
this.push(null);
return true;
}
if (!this.push(chunk))
return;
}
}
}

function onRead() {
while (null !== (stream.read())) {
// Remove the 'readable' listener before unshifting
stream.removeListener('readable', onRead);
stream.unshift('a');
stream.on('data', (chunk) => {
console.log(chunk.length);
});
break;
}
}

const stream = new ArrayReader();
stream.once('readable', common.mustCall(onRead));
stream.on('end', common.mustCall(() => {}));

}

0 comments on commit a47ee80

Please sign in to comment.