diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 61dd06b5edec76..ad4f98593da270 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -105,7 +105,10 @@ const { const { createWriteWrap, writeGeneric, - writevGeneric + writevGeneric, + onStreamRead, + kMaybeDestroy, + kUpdateTimer } = require('internal/stream_base_commons'); const { kTimeout, @@ -142,7 +145,6 @@ const kHandle = Symbol('handle'); const kID = Symbol('id'); const kInit = Symbol('init'); const kInfoHeaders = Symbol('sent-info-headers'); -const kMaybeDestroy = Symbol('maybe-destroy'); const kLocalSettings = Symbol('local-settings'); const kOptions = Symbol('options'); const kOwner = owner_symbol; @@ -156,7 +158,6 @@ const kServer = Symbol('server'); const kSession = Symbol('session'); const kState = Symbol('state'); const kType = Symbol('type'); -const kUpdateTimer = Symbol('update-timer'); const kWriteGeneric = Symbol('write-generic'); const kDefaultSocketTimeout = 2 * 60 * 1000; @@ -374,36 +375,6 @@ function onStreamClose(code) { } } -// Receives a chunk of data for a given stream and forwards it on -// to the Http2Stream Duplex for processing. -function onStreamRead(nread, buf) { - const stream = this[kOwner]; - if (nread >= 0 && !stream.destroyed) { - debug(`Http2Stream ${stream[kID]} [Http2Session ` + - `${sessionName(stream[kSession][kType])}]: receiving data chunk ` + - `of size ${nread}`); - stream[kUpdateTimer](); - if (!stream.push(buf)) { - if (!stream.destroyed) // we have to check a second time - this.readStop(); - } - return; - } - - // Last chunk was received. End the readable side. - debug(`Http2Stream ${stream[kID]} [Http2Session ` + - `${sessionName(stream[kSession][kType])}]: ending readable.`); - - // defer this until we actually emit end - if (!stream.readable) { - stream[kMaybeDestroy](); - } else { - stream.on('end', stream[kMaybeDestroy]); - stream.push(null); - stream.read(0); - } -} - // Called when the remote peer settings have been updated. // Resets the cached settings. function onSettings() { @@ -2145,6 +2116,7 @@ function afterOpen(session, options, headers, streamOptions, err, fd) { class ServerHttp2Stream extends Http2Stream { constructor(session, handle, id, options, headers) { super(session, options); + handle.owner = this; this[kInit](id, handle); this[kProtocol] = headers[HTTP2_HEADER_SCHEME]; this[kAuthority] = headers[HTTP2_HEADER_AUTHORITY]; diff --git a/lib/internal/stream_base_commons.js b/lib/internal/stream_base_commons.js index 9bd2dd90bc119e..5c49026e99e3fe 100644 --- a/lib/internal/stream_base_commons.js +++ b/lib/internal/stream_base_commons.js @@ -1,11 +1,14 @@ 'use strict'; const { Buffer } = require('buffer'); -const errors = require('internal/errors'); const { internalBinding } = require('internal/bootstrap/loaders'); const { WriteWrap } = internalBinding('stream_wrap'); +const { UV_EOF } = internalBinding('uv'); +const { errnoException } = require('internal/errors'); +const { owner_symbol } = require('internal/async_hooks').symbols; -const errnoException = errors.errnoException; +const kMaybeDestroy = Symbol('kMaybeDestroy'); +const kUpdateTimer = Symbol('kUpdateTimer'); function handleWriteReq(req, data, encoding) { const { handle } = req; @@ -82,8 +85,54 @@ function afterWriteDispatched(self, req, err, cb) { } } +function onStreamRead(nread, buf) { + const handle = this; + const stream = this[owner_symbol]; + + stream[kUpdateTimer](); + + if (nread > 0 && !stream.destroyed) { + if (!stream.push(buf)) { + handle.reading = false; + if (!stream.destroyed) { + const err = handle.readStop(); + if (err) + stream.destroy(errnoException(err, 'read')); + } + } + + return; + } + + if (nread === 0) { + return; + } + + if (nread !== UV_EOF) { + return stream.destroy(errnoException(nread, 'read')); + } + + // defer this until we actually emit end + if (stream._readableState.endEmitted) { + if (stream[kMaybeDestroy]) + stream[kMaybeDestroy](); + } else { + if (stream[kMaybeDestroy]) + stream.on('end', stream[kMaybeDestroy]); + + // push a null to signal the end of data. + // Do it before `maybeDestroy` for correct order of events: + // `end` -> `close` + stream.push(null); + stream.read(0); + } +} + module.exports = { createWriteWrap, writevGeneric, - writeGeneric + writeGeneric, + onStreamRead, + kMaybeDestroy, + kUpdateTimer, }; diff --git a/lib/net.js b/lib/net.js index 2700a54a0780ce..9bf064d41e5fcf 100644 --- a/lib/net.js +++ b/lib/net.js @@ -37,8 +37,7 @@ const assert = require('assert'); const { internalBinding } = require('internal/bootstrap/loaders'); const { UV_EADDRINUSE, - UV_EINVAL, - UV_EOF + UV_EINVAL } = internalBinding('uv'); const { Buffer } = require('buffer'); @@ -62,7 +61,9 @@ const { const { createWriteWrap, writevGeneric, - writeGeneric + writeGeneric, + onStreamRead, + kUpdateTimer } = require('internal/stream_base_commons'); const errors = require('internal/errors'); const { @@ -209,7 +210,7 @@ function initSocketHandle(self) { // Handle creation may be deferred to bind() or connect() time. if (self._handle) { self._handle[owner_symbol] = self; - self._handle.onread = onread; + self._handle.onread = onStreamRead; self[async_id_symbol] = getNewAsyncId(self._handle); } } @@ -515,6 +516,12 @@ Object.defineProperty(Socket.prototype, 'bufferSize', { } }); +Object.defineProperty(Socket.prototype, kUpdateTimer, { + get: function() { + return this._unrefTimer; + } +}); + // Just call handle.readStart until we have enough in the buffer Socket.prototype._read = function(n) { @@ -616,61 +623,6 @@ Socket.prototype._destroy = function(exception, cb) { } }; - -// This function is called whenever the handle gets a -// buffer, or when there's an error reading. -function onread(nread, buffer) { - var handle = this; - var self = handle[owner_symbol]; - assert(handle === self._handle, 'handle != self._handle'); - - self._unrefTimer(); - - debug('onread', nread); - - if (nread > 0) { - debug('got data'); - - // read success. - // In theory (and in practice) calling readStop right now - // will prevent this from being called again until _read() gets - // called again. - - // Optimization: emit the original buffer with end points - var ret = self.push(buffer); - - if (handle.reading && !ret) { - handle.reading = false; - debug('readStop'); - var err = handle.readStop(); - if (err) - self.destroy(errnoException(err, 'read')); - } - return; - } - - // if we didn't get any bytes, that doesn't necessarily mean EOF. - // wait for the next one. - if (nread === 0) { - debug('not any data, keep waiting'); - return; - } - - // Error, possibly EOF. - if (nread !== UV_EOF) { - return self.destroy(errnoException(nread, 'read')); - } - - debug('EOF'); - - // push a null to signal the end of data. - // Do it before `maybeDestroy` for correct order of events: - // `end` -> `close` - self.push(null); - self.read(0); -} - - Socket.prototype._getpeername = function() { if (!this._peername) { if (!this._handle || !this._handle.getpeername) {