Skip to content

Commit

Permalink
lib: merge onread handlers for http2 streams & net.Socket
Browse files Browse the repository at this point in the history
Refs: #20993
Co-authored-by: Anna Henningsen <anna@addaleax.net>
  • Loading branch information
aks- and addaleax committed Aug 27, 2018
1 parent e03f9c5 commit 9c1443b
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 95 deletions.
38 changes: 5 additions & 33 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ const {
const {
createWriteWrap,
writeGeneric,
writevGeneric
writevGeneric,
onStreamRead,
kMaybeDestroy,
kUpdateTimer
} = require('internal/stream_base_commons');
const {
kTimeout,
Expand Down Expand Up @@ -142,7 +145,6 @@ const kHandle = Symbol('handle');
const kID = Symbol('id');
const kInit = Symbol('init');
const kInfoHeaders = Symbol('sent-info-headers');
const kMaybeDestroy = Symbol('maybe-destroy');
const kLocalSettings = Symbol('local-settings');
const kOptions = Symbol('options');
const kOwner = owner_symbol;
Expand All @@ -156,7 +158,6 @@ const kServer = Symbol('server');
const kSession = Symbol('session');
const kState = Symbol('state');
const kType = Symbol('type');
const kUpdateTimer = Symbol('update-timer');
const kWriteGeneric = Symbol('write-generic');

const kDefaultSocketTimeout = 2 * 60 * 1000;
Expand Down Expand Up @@ -374,36 +375,6 @@ function onStreamClose(code) {
}
}

// Receives a chunk of data for a given stream and forwards it on
// to the Http2Stream Duplex for processing.
function onStreamRead(nread, buf) {
const stream = this[kOwner];
if (nread >= 0 && !stream.destroyed) {
debug(`Http2Stream ${stream[kID]} [Http2Session ` +
`${sessionName(stream[kSession][kType])}]: receiving data chunk ` +
`of size ${nread}`);
stream[kUpdateTimer]();
if (!stream.push(buf)) {
if (!stream.destroyed) // we have to check a second time
this.readStop();
}
return;
}

// Last chunk was received. End the readable side.
debug(`Http2Stream ${stream[kID]} [Http2Session ` +
`${sessionName(stream[kSession][kType])}]: ending readable.`);

// defer this until we actually emit end
if (!stream.readable) {
stream[kMaybeDestroy]();
} else {
stream.on('end', stream[kMaybeDestroy]);
stream.push(null);
stream.read(0);
}
}

// Called when the remote peer settings have been updated.
// Resets the cached settings.
function onSettings() {
Expand Down Expand Up @@ -2145,6 +2116,7 @@ function afterOpen(session, options, headers, streamOptions, err, fd) {
class ServerHttp2Stream extends Http2Stream {
constructor(session, handle, id, options, headers) {
super(session, options);
handle.owner = this;
this[kInit](id, handle);
this[kProtocol] = headers[HTTP2_HEADER_SCHEME];
this[kAuthority] = headers[HTTP2_HEADER_AUTHORITY];
Expand Down
55 changes: 52 additions & 3 deletions lib/internal/stream_base_commons.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
'use strict';

const { Buffer } = require('buffer');
const errors = require('internal/errors');
const { internalBinding } = require('internal/bootstrap/loaders');
const { WriteWrap } = internalBinding('stream_wrap');
const { UV_EOF } = process.binding('uv');
const { errnoException } = require('internal/errors');
const { owner_symbol } = require('internal/async_hooks').symbols;

const errnoException = errors.errnoException;
const kMaybeDestroy = Symbol('kMaybeDestroy');
const kUpdateTimer = Symbol('kUpdateTimer');

function handleWriteReq(req, data, encoding) {
const { handle } = req;
Expand Down Expand Up @@ -82,8 +85,54 @@ function afterWriteDispatched(self, req, err, cb) {
}
}

function onStreamRead(nread, buf) {
const handle = this;
const stream = this[owner_symbol];

stream[kUpdateTimer]();

if (nread > 0 && !stream.destroyed) {
if (!stream.push(buf)) {
handle.reading = false;
if (!stream.destroyed) {
const err = handle.readStop();
if (err)
stream.destroy(errnoException(err, 'read'));
}
}

return;
}

if (nread === 0) {
return;
}

if (nread !== UV_EOF) {
return stream.destroy(errnoException(nread, 'read'));
}

// defer this until we actually emit end
if (stream._readableState.endEmitted) {
if (stream[kMaybeDestroy])
stream[kMaybeDestroy]();
} else {
if (stream[kMaybeDestroy])
stream.on('end', stream[kMaybeDestroy]);

// push a null to signal the end of data.
// Do it before `maybeDestroy` for correct order of events:
// `end` -> `close`
stream.push(null);
stream.read(0);
}
}

module.exports = {
createWriteWrap,
writevGeneric,
writeGeneric
writeGeneric,
onStreamRead,
kMaybeDestroy,
kUpdateTimer,
};
70 changes: 11 additions & 59 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ const assert = require('assert');
const { internalBinding } = require('internal/bootstrap/loaders');
const {
UV_EADDRINUSE,
UV_EINVAL,
UV_EOF
UV_EINVAL
} = internalBinding('uv');

const { Buffer } = require('buffer');
Expand All @@ -62,7 +61,9 @@ const {
const {
createWriteWrap,
writevGeneric,
writeGeneric
writeGeneric,
onStreamRead,
kUpdateTimer
} = require('internal/stream_base_commons');
const errors = require('internal/errors');
const {
Expand Down Expand Up @@ -209,7 +210,7 @@ function initSocketHandle(self) {
// Handle creation may be deferred to bind() or connect() time.
if (self._handle) {
self._handle[owner_symbol] = self;
self._handle.onread = onread;
self._handle.onread = onStreamRead;
self[async_id_symbol] = getNewAsyncId(self._handle);
}
}
Expand Down Expand Up @@ -515,6 +516,12 @@ Object.defineProperty(Socket.prototype, 'bufferSize', {
}
});

Object.defineProperty(Socket.prototype, kUpdateTimer, {
get: function() {
return this._unrefTimer;
}
});


// Just call handle.readStart until we have enough in the buffer
Socket.prototype._read = function(n) {
Expand Down Expand Up @@ -616,61 +623,6 @@ Socket.prototype._destroy = function(exception, cb) {
}
};


// This function is called whenever the handle gets a
// buffer, or when there's an error reading.
function onread(nread, buffer) {
var handle = this;
var self = handle[owner_symbol];
assert(handle === self._handle, 'handle != self._handle');

self._unrefTimer();

debug('onread', nread);

if (nread > 0) {
debug('got data');

// read success.
// In theory (and in practice) calling readStop right now
// will prevent this from being called again until _read() gets
// called again.

// Optimization: emit the original buffer with end points
var ret = self.push(buffer);

if (handle.reading && !ret) {
handle.reading = false;
debug('readStop');
var err = handle.readStop();
if (err)
self.destroy(errnoException(err, 'read'));
}
return;
}

// if we didn't get any bytes, that doesn't necessarily mean EOF.
// wait for the next one.
if (nread === 0) {
debug('not any data, keep waiting');
return;
}

// Error, possibly EOF.
if (nread !== UV_EOF) {
return self.destroy(errnoException(nread, 'read'));
}

debug('EOF');

// push a null to signal the end of data.
// Do it before `maybeDestroy` for correct order of events:
// `end` -> `close`
self.push(null);
self.read(0);
}


Socket.prototype._getpeername = function() {
if (!this._peername) {
if (!this._handle || !this._handle.getpeername) {
Expand Down

0 comments on commit 9c1443b

Please sign in to comment.