Skip to content

Commit

Permalink
stream: support decoding buffers for Writables
Browse files Browse the repository at this point in the history
Support decoding the input of writable streams to a specific
decoding before passing it to `_write()`.

By default, all data written to a writable stream is encoded
into Buffers. This change enables the reverse situation, i.e.
when it is desired by the stream implementer to process all
input as strings, whether it was passed to `write()` as a string
or not.

This makes sense for multi-byte character encodings where
the buffers that are written using `write()` may contain
partial characters, so calling `chunk.toString()` is not universally
applicable.

Fixes: #7315
  • Loading branch information
addaleax committed Jan 6, 2017
1 parent 6d3c5b7 commit ecc7b9e
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 11 deletions.
3 changes: 3 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,9 @@ constructor and implement the `writable._write()` method. The
* `decodeStrings` {Boolean} Whether or not to decode strings into
Buffers before passing them to [`stream._write()`][stream-_write].
Defaults to `true`
* `decodeBuffers` {Boolean} Whether or not to decode Buffers into strings
using the default encoding before passing them to
[`stream._write()`][stream-_write].
* `objectMode` {Boolean} Whether or not the
[`stream.write(anyObj)`][stream-write] is a valid operation. When set,
it becomes possible to write JavaScript values other than string or
Expand Down
68 changes: 57 additions & 11 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const util = require('util');
const internalUtil = require('internal/util');
const Stream = require('stream');
const Buffer = require('buffer').Buffer;
var StringDecoder;

util.inherits(Writable, Stream);

Expand Down Expand Up @@ -46,6 +47,8 @@ function WritableState(options, stream) {
// drain event flag.
this.needDrain = false;
// at the start of calling end()
this.flushing = false;
// at the start of calling endWritable()
this.ending = false;
// when end() has been called, and returned
this.ended = false;
Expand All @@ -55,6 +58,8 @@ function WritableState(options, stream) {
// should we decode strings into buffers before passing to _write?
// this is here so that some node-core streams can optimize string
// handling at a lower level.
// This is confusingly named. For character encodings (like utf8), setting
// decodeStrings to true will *encode* strings as Buffers.
var noDecode = options.decodeStrings === false;
this.decodeStrings = !noDecode;

Expand All @@ -63,6 +68,21 @@ function WritableState(options, stream) {
// Everything else in the universe uses 'utf8', though.
this.defaultEncoding = options.defaultEncoding || 'utf8';

// A StringDecoder instance that is used for decoding incoming Buffers
// if that is desired by the stream implementer, as indicated by the
// `decodeBuffers` option.
this.stringDecoder = null;
if (options.decodeBuffers) {
// Check whether the caller explicitly requested inconsistent options.
if (options.decodeStrings === true) {
throw new Error('decodeBuffers and decodeStrings cannot both be true');
}

if (!StringDecoder)
StringDecoder = require('string_decoder').StringDecoder;
this.stringDecoder = new StringDecoder(this.defaultEncoding);
}

// not an actual buffer we keep track of, but a measurement
// of how much we're waiting to get pushed to some underlying
// socket or file.
Expand Down Expand Up @@ -276,23 +296,43 @@ Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
};

function decodeChunk(state, chunk, encoding) {
if (!state.objectMode &&
state.decodeStrings !== false &&
typeof chunk === 'string') {
chunk = Buffer.from(chunk, encoding);
if (state.objectMode)
return chunk;

var sd = state.stringDecoder;
if (typeof chunk === 'string') {
if (sd !== null && encoding === sd.encoding && sd.lastNeed === 0)
return chunk; // No re-encoding encessary.

if (state.decodeStrings !== false || sd !== null)
chunk = Buffer.from(chunk, encoding);
}

if (sd !== null) {
// chunk is always a Buffer now.
if (state.flushing) {
chunk = sd.end(chunk);
} else {
chunk = sd.write(chunk);
}
}

return chunk;
}

// if we're already writing something, then just put this
// in the queue, and wait our turn. Otherwise, call _write
// If we return false, then we need a drain event, so set that flag.
function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
if (!isBuf) {
var sd = state.stringDecoder;
if (!isBuf || sd) {
chunk = decodeChunk(state, chunk, encoding);
if (chunk instanceof Buffer)
encoding = 'buffer';
else if (sd)
encoding = sd.encoding;
}

var len = state.objectMode ? 1 : chunk.length;

state.length += len;
Expand Down Expand Up @@ -459,20 +499,26 @@ Writable.prototype._write = function(chunk, encoding, cb) {

Writable.prototype._writev = null;

Writable.prototype.end = function(chunk, encoding, cb) {
const empty = Buffer.alloc(0);

Writable.prototype.end = function(chunk, enc, cb) {
var state = this._writableState;

if (typeof chunk === 'function') {
cb = chunk;
chunk = null;
encoding = null;
} else if (typeof encoding === 'function') {
cb = encoding;
encoding = null;
enc = null;
} else if (typeof enc === 'function') {
cb = enc;
enc = null;
}

state.flushing = true;

if (chunk !== null && chunk !== undefined)
this.write(chunk, encoding);
this.write(chunk, enc);
else if (state.stringDecoder && state.stringDecoder.lastNeed > 0)
this.write(empty);

// .end() fully uncorks
if (state.corked) {
Expand Down
121 changes: 121 additions & 0 deletions test/parallel/test-stream-writable-decode-buffers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
'use strict';
require('../common');
const assert = require('assert');

const stream = require('stream');

class ChunkStoringWritable extends stream.Writable {
constructor(options) {
super(options);

this.chunks = [];
}

_write(data, encoding, callback) {
this.chunks.push({ data, encoding });
callback();
}
}

{
const w = new ChunkStoringWritable({ decodeBuffers: true });
w.write(Buffer.from('e4bd', 'hex'));
w.write(Buffer.from('a0e5', 'hex'));
w.write(Buffer.from('a5bd', 'hex'));
w.end();

assert.deepStrictEqual(w.chunks.map((c) => c.data), ['', '你', '好']);
}

{
const w = new ChunkStoringWritable({ decodeBuffers: true });
w.write(Buffer.from('你', 'utf8'));
w.write(Buffer.from('好', 'utf8'));
w.end();

assert.deepStrictEqual(w.chunks.map((c) => c.data), ['你', '好']);
}

{
const w = new ChunkStoringWritable({ decodeBuffers: true });
w.write(Buffer.from('80', 'hex'));
w.end();

assert.deepStrictEqual(w.chunks.map((c) => c.data), ['\ufffd']);
}

{
const w = new ChunkStoringWritable({ decodeBuffers: true });
w.write(Buffer.from('c3', 'hex'));
w.end();

assert.deepStrictEqual(w.chunks.map((c) => c.data), ['', '\ufffd']);
}

{
const w = new ChunkStoringWritable({
decodeBuffers: true,
defaultEncoding: 'utf16le'
});

w.write(Buffer.from('你好', 'utf16le'));
w.end();

assert.deepStrictEqual(w.chunks.map((c) => c.data), ['你好']);
}

{
const w = new ChunkStoringWritable({
decodeBuffers: true,
defaultEncoding: 'base64'
});

w.write(Buffer.from('你好', 'utf16le'));
w.end();

assert.deepStrictEqual(w.chunks.map((c) => c.data), ['YE99', 'WQ==']);
}

{
const w = new ChunkStoringWritable({
decodeBuffers: true,
defaultEncoding: 'utf16le'
});

w.write('你好', 'utf16le');
w.end();

assert.deepStrictEqual(w.chunks.map((c) => c.data), ['你好']);
}

{
const w = new ChunkStoringWritable({ decodeBuffers: true });

w.write(Buffer.from([0x44, 0xc3])); // Ends on incomplete UTF8.

// This write should *not* be passed through directly as there's
// input pending.
w.write('a');

w.end(Buffer.from('bc7373656c', 'hex'));

assert.deepStrictEqual(w.chunks.map((c) => c.data), ['D', '�a', '�ssel']);
}

{
const w = new ChunkStoringWritable({ decodeBuffers: true });

w.write('a');
w.setDefaultEncoding('ucs2');
w.end('换');

assert.deepStrictEqual(w.chunks, [
{ data: 'a', encoding: 'utf8' },
{ data: 'bc', encoding: 'utf8' }
]);
}

assert.throws(() => new stream.Writable({
decodeBuffers: true,
decodeStrings: true
}), /decodeBuffers and decodeStrings cannot both be true/);

0 comments on commit ecc7b9e

Please sign in to comment.