Skip to content

Commit

Permalink
net: allow reading data into a static buffer
Browse files Browse the repository at this point in the history
Co-Authored-By: Anna Henningsen <anna@addaleax.net>

PR-URL: #25436
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
  • Loading branch information
mscdex committed Aug 23, 2019
1 parent 9d21b03 commit 8292b28
Show file tree
Hide file tree
Showing 7 changed files with 474 additions and 65 deletions.
57 changes: 46 additions & 11 deletions benchmark/net/net-s2c.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,48 +5,84 @@ const common = require('../common.js');
const PORT = common.PORT;

const bench = common.createBenchmark(main, {
len: [64, 102400, 1024 * 1024 * 16],
sendchunklen: [256, 32 * 1024, 128 * 1024, 16 * 1024 * 1024],
type: ['utf', 'asc', 'buf'],
recvbuflen: [0, 64 * 1024, 1024 * 1024],
recvbufgenfn: ['true', 'false'],
dur: [5]
});

var chunk;
var encoding;
var recvbuf;
var received = 0;

function main({ dur, sendchunklen, type, recvbuflen, recvbufgenfn }) {
if (isFinite(recvbuflen) && recvbuflen > 0)
recvbuf = Buffer.alloc(recvbuflen);

function main({ dur, len, type }) {
switch (type) {
case 'buf':
chunk = Buffer.alloc(len, 'x');
chunk = Buffer.alloc(sendchunklen, 'x');
break;
case 'utf':
encoding = 'utf8';
chunk = 'ü'.repeat(len / 2);
chunk = 'ü'.repeat(sendchunklen / 2);
break;
case 'asc':
encoding = 'ascii';
chunk = 'x'.repeat(len);
chunk = 'x'.repeat(sendchunklen);
break;
default:
throw new Error(`invalid type: ${type}`);
}

const reader = new Reader();
const writer = new Writer();
var writer;
var socketOpts;
if (recvbuf === undefined) {
writer = new Writer();
socketOpts = { port: PORT };
} else {
let buffer = recvbuf;
if (recvbufgenfn === 'true') {
let bufidx = -1;
const bufpool = [
recvbuf,
Buffer.from(recvbuf),
Buffer.from(recvbuf),
];
buffer = () => {
bufidx = (bufidx + 1) % bufpool.length;
return bufpool[bufidx];
};
}
socketOpts = {
port: PORT,
onread: {
buffer,
callback: function(nread, buf) {
received += nread;
}
}
};
}

// The actual benchmark.
const server = net.createServer((socket) => {
reader.pipe(socket);
});

server.listen(PORT, () => {
const socket = net.connect(PORT);
const socket = net.connect(socketOpts);
socket.on('connect', () => {
bench.start();

socket.pipe(writer);
if (recvbuf === undefined)
socket.pipe(writer);

setTimeout(() => {
const bytes = writer.received;
const bytes = received;
const gbits = (bytes * 8) / (1024 * 1024 * 1024);
bench.end(gbits);
process.exit(0);
Expand All @@ -58,12 +94,11 @@ function main({ dur, len, type }) {
const net = require('net');

function Writer() {
this.received = 0;
this.writable = true;
}

Writer.prototype.write = function(chunk, encoding, cb) {
this.received += chunk.length;
received += chunk.length;

if (typeof encoding === 'function')
encoding();
Expand Down
36 changes: 36 additions & 0 deletions doc/api/net.md
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,9 @@ for the [`'connect'`][] event **once**.
<!-- YAML
added: v0.1.90
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/25436
description: Added `onread` option.
- version: v6.0.0
pr-url: https://github.com/nodejs/node/pull/6021
description: The `hints` option defaults to `0` in all cases now.
Expand Down Expand Up @@ -629,6 +632,39 @@ For [IPC][] connections, available `options` are:
See [Identifying paths for IPC connections][]. If provided, the TCP-specific
options above are ignored.

For both types, available `options` include:

* `onread` {Object} - If specified, incoming data is stored in a single `buffer`
and passed to the supplied `callback` when data arrives on the socket.
Note: this will cause the streaming functionality to not provide any data,
however events like `'error'`, `'end'`, and `'close'` will still be emitted
as normal and methods like `pause()` and `resume()` will also behave as
expected.
* `buffer` {Buffer|Uint8Array|Function} - Either a reusable chunk of memory to
use for storing incoming data or a function that returns such.
* `callback` {Function} This function is called for every chunk of incoming
data. Two arguments are passed to it: the number of bytes written to
`buffer` and a reference to `buffer`. Return `false` from this function to
implicitly `pause()` the socket. This function will be executed in the
global context.

Following is an example of a client using the `onread` option:

```js
const net = require('net');
net.connect({
port: 80,
onread: {
// Reuses a 4KiB Buffer for every read from the socket
buffer: Buffer.alloc(4 * 1024),
callback: function(nread, buf) {
// Received data is available in `buf` from 0 to `nread`
console.log(buf.toString('utf8', 0, nread));
}
}
});
```

#### socket.connect(path[, connectListener])

* `path` {string} Path the client should connect to. See
Expand Down
31 changes: 26 additions & 5 deletions lib/internal/stream_base_commons.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const {
setUnrefTimeout,
getTimerDuration
} = require('internal/timers');
const { isUint8Array } = require('internal/util/types');
const { clearTimeout } = require('timers');

const kMaybeDestroy = Symbol('kMaybeDestroy');
Expand All @@ -32,6 +33,9 @@ const kHandle = Symbol('kHandle');
const kSession = Symbol('kSession');

const debug = require('internal/util/debuglog').debuglog('stream');
const kBuffer = Symbol('kBuffer');
const kBufferGen = Symbol('kBufferGen');
const kBufferCb = Symbol('kBufferCb');

function handleWriteReq(req, data, encoding) {
const { handle } = req;
Expand Down Expand Up @@ -161,9 +165,23 @@ function onStreamRead(arrayBuffer) {
stream[kUpdateTimer]();

if (nread > 0 && !stream.destroyed) {
const offset = streamBaseState[kArrayBufferOffset];
const buf = new FastBuffer(arrayBuffer, offset, nread);
if (!stream.push(buf)) {
let ret;
let result;
const userBuf = stream[kBuffer];
if (userBuf) {
result = (stream[kBufferCb](nread, userBuf) !== false);
const bufGen = stream[kBufferGen];
if (bufGen !== null) {
const nextBuf = bufGen();
if (isUint8Array(nextBuf))
stream[kBuffer] = ret = nextBuf;
}
} else {
const offset = streamBaseState[kArrayBufferOffset];
const buf = new FastBuffer(arrayBuffer, offset, nread);
result = stream.push(buf);
}
if (!result) {
handle.reading = false;
if (!stream.destroyed) {
const err = handle.readStop();
Expand All @@ -172,7 +190,7 @@ function onStreamRead(arrayBuffer) {
}
}

return;
return ret;
}

if (nread === 0) {
Expand Down Expand Up @@ -241,5 +259,8 @@ module.exports = {
kUpdateTimer,
kHandle,
kSession,
setStreamTimeout
setStreamTimeout,
kBuffer,
kBufferCb,
kBufferGen
};
Loading

0 comments on commit 8292b28

Please sign in to comment.