Skip to content
This repository has been archived by the owner on Aug 11, 2020. It is now read-only.

Commit

Permalink
quic: implement sendFD() support
Browse files Browse the repository at this point in the history
Fixes: #75
  • Loading branch information
addaleax committed Oct 3, 2019
1 parent eb88db6 commit 874fa0b
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 3 deletions.
62 changes: 59 additions & 3 deletions lib/internal/quic/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const {
validateQuicClientSessionOptions,
validateQuicSocketOptions,
} = require('internal/quic/util');
const { validateNumber } = require('internal/validators');
const util = require('util');
const assert = require('internal/assert');
const EventEmitter = require('events');
Expand All @@ -32,7 +33,7 @@ const {
translatePeerCertificate
} = require('_tls_common');
const {
defaultTriggerAsyncIdScope, // eslint-disable-line no-unused-vars
defaultTriggerAsyncIdScope,
symbols: {
async_id_symbol,
owner_symbol,
Expand All @@ -52,8 +53,8 @@ const {

const {
ShutdownWrap,
kReadBytesOrError, // eslint-disable-line no-unused-vars
streamBaseState // eslint-disable-line no-unused-vars
kReadBytesOrError,
streamBaseState
} = internalBinding('stream_wrap');

const {
Expand All @@ -78,6 +79,10 @@ const {
exceptionWithHostPort
} = require('internal/errors');

const { FileHandle } = internalBinding('fs');
const { StreamPipe } = internalBinding('stream_pipe');
const { UV_EOF } = internalBinding('uv');

const {
QuicSocket: QuicSocketHandle,
initSecureContext,
Expand Down Expand Up @@ -153,6 +158,8 @@ const kHandshakePost = Symbol('kHandshakePost');
const kInit = Symbol('kInit');
const kMaybeBind = Symbol('kMaybeBind');
const kMaybeReady = Symbol('kMaybeReady');
const kOnFileUnpipe = Symbol('kOnFileUnpipe');
const kOnPipedFileHandleRead = Symbol('kOnPipedFileHandleRead');
const kReady = Symbol('kReady');
const kReceiveStart = Symbol('kReceiveStart');
const kReceiveStop = Symbol('kReceiveStop');
Expand All @@ -161,6 +168,7 @@ const kRemoveStream = Symbol('kRemoveStream');
const kServerBusy = Symbol('kServerBusy');
const kSetHandle = Symbol('kSetHandle');
const kSetSocket = Symbol('kSetSocket');
const kStartFilePipe = Symbol('kStartFilePipe');
const kStreamClose = Symbol('kStreamClose');
const kStreamReset = Symbol('kStreamReset');
const kTrackWriteState = Symbol('kTrackWriteState');
Expand Down Expand Up @@ -2253,6 +2261,54 @@ class QuicStream extends Duplex {
streamOnResume.call(this);
}

sendFD(fd, { offset = -1, length = -1 } = {}) {
if (this.destroyed || this.#closed)
return;

validateNumber(fd, 'fd');
this[kUpdateTimer]();
this.ownsFd = false;

// Close the writable side of the stream, but only as far as the writable
// stream implementation is concerned.
this._final = null;
this.end();

defaultTriggerAsyncIdScope(this[async_id_symbol],
QuicStream[kStartFilePipe],
this, fd, offset, length);
}

static [kStartFilePipe](stream, fd, offset, length) {
const handle = new FileHandle(fd, offset, length);
handle.onread = QuicStream[kOnPipedFileHandleRead];
handle.stream = stream;

const pipe = new StreamPipe(handle, stream[kHandle]);
pipe.onunpipe = QuicStream[kOnFileUnpipe];
pipe.start();

// Exact length of the file doesn't matter here, since the
// stream is closing anyway - just use 1 to signify that
// a write does exist
stream[kTrackWriteState](stream, 1);
}

static [kOnFileUnpipe]() { // Called on the StreamPipe instance.
const stream = this.sink[owner_symbol];
if (stream.ownsFd)
this.source.close().catch((err) => stream.emit(err));
else
this.source.releaseFD();
}

static [kOnPipedFileHandleRead]() {
const err = streamBaseState[kReadBytesOrError];
if (err < 0 && err !== UV_EOF) {
this.stream.destroy(errnoException(err, 'sendFD'));
}
}

get resetReceived() {
return (this.#resetCode !== undefined) ?
{ code: this.#resetCode | 0, finalSize: this.#resetFinalSize | 0 } :
Expand Down
70 changes: 70 additions & 0 deletions test/parallel/test-quic-send-fd.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
'use strict';
const common = require('../common');
if (!common.hasQuic)
common.skip('missing quic');

const assert = require('assert');
const quic = require('quic');
const fs = require('fs');

const fixtures = require('../common/fixtures');
const key = fixtures.readKey('agent1-key.pem', 'binary');
const cert = fixtures.readKey('agent1-cert.pem', 'binary');
const ca = fixtures.readKey('ca1-cert.pem', 'binary');

const server = quic.createSocket({ port: 0, validateAddress: true });

server.listen({
key,
cert,
ca,
rejectUnauthorized: false,
maxCryptoBuffer: 4096,
alpn: 'meow'
});

server.on('session', common.mustCall((session) => {
session.on('secure', common.mustCall((servername, alpn, cipher) => {
const stream = session.openStream({ halfOpen: false });
stream.sendFD(fs.openSync(__filename, 'r'));
stream.on('data', common.mustNotCall());
stream.on('finish', common.mustNotCall());
stream.on('close', common.mustCall());
stream.on('end', common.mustNotCall());
}));

session.on('close', common.mustCall());
}));

server.on('ready', common.mustCall(() => {
const client = quic.createSocket({
port: 0,
client: {
key,
cert,
ca,
alpn: 'meow'
}
});

const req = client.connect({
address: 'localhost',
port: server.address.port
});

req.on('stream', common.mustCall((stream) => {
const data = [];
stream.on('data', (chunk) => data.push(chunk));
stream.on('end', common.mustCall(() => {
assert.deepStrictEqual(Buffer.concat(data), fs.readFileSync(__filename));

// TODO(addaleax): Figure out why .close() is insufficient.
client.destroy();
server.destroy();
}));
}));

req.on('close', common.mustCall());
}));

server.on('close', common.mustCall());

0 comments on commit 874fa0b

Please sign in to comment.