Skip to content

Commit

Permalink
stream: Break up the onread function
Browse files Browse the repository at this point in the history
A primary motivation of this is to make the onread function more
inline-friendly, but also to make it more easy to explore not having
onread at all, in favor of always using push() to signal the end of
reading.
  • Loading branch information
isaacs committed Mar 1, 2013
1 parent c116120 commit 7764b84
Showing 1 changed file with 83 additions and 81 deletions.
164 changes: 83 additions & 81 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,21 +105,51 @@ function Readable(options) {
// similar to how Writable.write() returns true if you should
// write() some more.
Readable.prototype.push = function(chunk) {
var rs = this._readableState;
rs.onread(null, chunk);

// if it's past the high water mark, we can push in some more.
// Also, if we have no data yet, we can stand some
// more bytes. This is to work around cases where hwm=0,
// such as the repl. Also, if the push() triggered a
// readable event, and the user called read(largeNumber) such that
// needReadable was set, then we ought to push more, so that another
// 'readable' event will be triggered.
return rs.needReadable ||
rs.length < rs.highWaterMark ||
rs.length === 0;
var state = this._readableState;
return readableAddChunk(this, state, chunk);
};

function readableAddChunk(stream, state, chunk) {
state.reading = false;

var er = chunkInvalid(state, chunk);
if (er) {
stream.emit('error', er);
} else if (chunk === null || chunk === undefined) {
onreadEof(stream, state);
} else if (state.objectMode || chunk && chunk.length > 0) {
if (state.decoder)
chunk = state.decoder.write(chunk);

// update the buffer info.
state.length += state.objectMode ? 1 : chunk.length;
state.buffer.push(chunk);

if (state.needReadable)
emitReadable(stream);

maybeReadMore(stream, state);
}

return needMoreData(state);
}



// if it's past the high water mark, we can push in some more.
// Also, if we have no data yet, we can stand some
// more bytes. This is to work around cases where hwm=0,
// such as the repl. Also, if the push() triggered a
// readable event, and the user called read(largeNumber) such that
// needReadable was set, then we ought to push more, so that another
// 'readable' event will be triggered.
function needMoreData(state) {
return !state.ended &&
(state.needReadable ||
state.length < state.highWaterMark ||
state.length === 0);
}

// backwards compatibility.
Readable.prototype.setEncoding = function(enc) {
if (!StringDecoder)
Expand Down Expand Up @@ -263,15 +293,20 @@ Readable.prototype.read = function(n) {
return ret;
};

// This is the function passed to _read(n,cb) as the callback.
// It should be called exactly once for every _read() call.
function onread(stream, er, chunk) {
var state = stream._readableState;
var sync = state.sync;

// If we get something that is not a buffer, string, null, or undefined,
// and we're not in objectMode, then that's an error.
// Otherwise stream chunks are all considered to be of length=1, and the
// watermarks determine how many objects to keep in the buffer, rather than
// how many bytes or characters.
if (er)
stream.emit('error', er);
else
stream.push(chunk);
}

function chunkInvalid(state, chunk) {
var er = null;
if (!Buffer.isBuffer(chunk) &&
'string' !== typeof chunk &&
chunk !== null &&
Expand All @@ -280,68 +315,26 @@ function onread(stream, er, chunk) {
!er) {
er = new TypeError('Invalid non-string/buffer chunk');
}
return er;
}

state.reading = false;
if (er)
return stream.emit('error', er);

if (chunk === null || chunk === undefined) {
// eof
state.ended = true;
if (state.decoder) {
chunk = state.decoder.end();
if (chunk && chunk.length) {
state.buffer.push(chunk);
state.length += state.objectMode ? 1 : chunk.length;
}
function onreadEof(stream, state) {
state.ended = true;
if (state.decoder) {
var chunk = state.decoder.end();
if (chunk && chunk.length) {
state.buffer.push(chunk);
state.length += state.objectMode ? 1 : chunk.length;
}

// if we've ended and we have some data left, then emit
// 'readable' now to make sure it gets picked up.
if (state.length > 0)
emitReadable(stream);
else
endReadable(stream);
return;
}

// at this point, if we got a zero-length buffer or string,
// and we're not in object-mode, then there's really no point
// continuing. it means that there is nothing to read right
// now, but as we have not received the EOF-signaling null,
// we're not ended. we've already unset the reading flag,
// so just get out of here.
if (!state.objectMode &&
(chunk || typeof chunk === 'string') &&
0 === chunk.length)
return;

if (state.decoder)
chunk = state.decoder.write(chunk);

// update the buffer info.
state.length += state.objectMode ? 1 : chunk.length;
state.buffer.push(chunk);

// if we haven't gotten any data,
// and we haven't ended, then don't bother telling the user
// that it's time to read more data. Otherwise, emitting 'readable'
// probably will trigger another stream.read(), which can trigger
// another _read(n,cb) before this one returns!
if (state.length === 0) {
state.reading = true;
stream._read(state.bufferSize, state.onread);
return;
}

if (state.needReadable)
// if we've ended and we have some data left, then emit
// 'readable' now to make sure it gets picked up.
if (state.length > 0)
emitReadable(stream);
else if (state.sync)
process.nextTick(function() {
maybeReadMore(stream, state);
});
else
maybeReadMore(stream, state);
endReadable(stream);
}

// Don't emit readable right away in sync mode, because this can trigger
Expand All @@ -365,17 +358,26 @@ function emitReadable(stream) {
function emitReadable_(stream) {
var state = stream._readableState;
stream.emit('readable');
maybeReadMore(stream, state);
}


// at this point, the user has presumably seen the 'readable' event,
// and called read() to consume some data. that may have triggered
// in turn another _read(n,cb) call, in which case reading = true if
// it's in progress.
// However, if we're not ended, or reading, and the length < hwm,
// then go ahead and try to read some more right now preemptively.
function maybeReadMore(stream, state) {
// at this point, the user has presumably seen the 'readable' event,
// and called read() to consume some data. that may have triggered
// in turn another _read(n,cb) call, in which case reading = true if
// it's in progress.
// However, if we're not ended, or reading, and the length < hwm,
// then go ahead and try to read some more right now preemptively.
if (!state.reading && !state.ending && !state.ended &&
if (state.sync)
process.nextTick(function() {
maybeReadMore_(stream, state);
});
else
maybeReadMore_(stream, state);
}

function maybeReadMore_(stream, state) {
if (!state.reading && !state.ended &&
state.length < state.highWaterMark) {
stream.read(0);
}
Expand Down

0 comments on commit 7764b84

Please sign in to comment.