diff --git a/lib/internal/quic/core.js b/lib/internal/quic/core.js index 144a2bb3a4..221e75b8d8 100644 --- a/lib/internal/quic/core.js +++ b/lib/internal/quic/core.js @@ -21,6 +21,7 @@ const { validateQuicClientSessionOptions, validateQuicSocketOptions, } = require('internal/quic/util'); +const { validateNumber } = require('internal/validators'); const util = require('util'); const assert = require('internal/assert'); const EventEmitter = require('events'); @@ -32,7 +33,7 @@ const { translatePeerCertificate } = require('_tls_common'); const { - defaultTriggerAsyncIdScope, // eslint-disable-line no-unused-vars + defaultTriggerAsyncIdScope, symbols: { async_id_symbol, owner_symbol, @@ -52,8 +53,8 @@ const { const { ShutdownWrap, - kReadBytesOrError, // eslint-disable-line no-unused-vars - streamBaseState // eslint-disable-line no-unused-vars + kReadBytesOrError, + streamBaseState } = internalBinding('stream_wrap'); const { @@ -78,6 +79,10 @@ const { exceptionWithHostPort } = require('internal/errors'); +const { FileHandle } = internalBinding('fs'); +const { StreamPipe } = internalBinding('stream_pipe'); +const { UV_EOF } = internalBinding('uv'); + const { QuicSocket: QuicSocketHandle, initSecureContext, @@ -153,6 +158,8 @@ const kHandshakePost = Symbol('kHandshakePost'); const kInit = Symbol('kInit'); const kMaybeBind = Symbol('kMaybeBind'); const kMaybeReady = Symbol('kMaybeReady'); +const kOnFileUnpipe = Symbol('kOnFileUnpipe'); +const kOnPipedFileHandleRead = Symbol('kOnPipedFileHandleRead'); const kReady = Symbol('kReady'); const kReceiveStart = Symbol('kReceiveStart'); const kReceiveStop = Symbol('kReceiveStop'); @@ -161,6 +168,7 @@ const kRemoveStream = Symbol('kRemoveStream'); const kServerBusy = Symbol('kServerBusy'); const kSetHandle = Symbol('kSetHandle'); const kSetSocket = Symbol('kSetSocket'); +const kStartFilePipe = Symbol('kStartFilePipe'); const kStreamClose = Symbol('kStreamClose'); const kStreamReset = Symbol('kStreamReset'); const kTrackWriteState = Symbol('kTrackWriteState'); @@ -2253,6 +2261,54 @@ class QuicStream extends Duplex { streamOnResume.call(this); } + sendFD(fd, { offset = -1, length = -1 } = {}) { + if (this.destroyed || this.#closed) + return; + + validateNumber(fd, 'fd'); + this[kUpdateTimer](); + this.ownsFd = false; + + // Close the writable side of the stream, but only as far as the writable + // stream implementation is concerned. + this._final = null; + this.end(); + + defaultTriggerAsyncIdScope(this[async_id_symbol], + QuicStream[kStartFilePipe], + this, fd, offset, length); + } + + static [kStartFilePipe](stream, fd, offset, length) { + const handle = new FileHandle(fd, offset, length); + handle.onread = QuicStream[kOnPipedFileHandleRead]; + handle.stream = stream; + + const pipe = new StreamPipe(handle, stream[kHandle]); + pipe.onunpipe = QuicStream[kOnFileUnpipe]; + pipe.start(); + + // Exact length of the file doesn't matter here, since the + // stream is closing anyway - just use 1 to signify that + // a write does exist + stream[kTrackWriteState](stream, 1); + } + + static [kOnFileUnpipe]() { // Called on the StreamPipe instance. + const stream = this.sink[owner_symbol]; + if (stream.ownsFd) + this.source.close().catch((err) => stream.emit(err)); + else + this.source.releaseFD(); + } + + static [kOnPipedFileHandleRead]() { + const err = streamBaseState[kReadBytesOrError]; + if (err < 0 && err !== UV_EOF) { + this.stream.destroy(errnoException(err, 'sendFD')); + } + } + get resetReceived() { return (this.#resetCode !== undefined) ? { code: this.#resetCode | 0, finalSize: this.#resetFinalSize | 0 } : diff --git a/test/parallel/test-quic-send-fd.js b/test/parallel/test-quic-send-fd.js new file mode 100644 index 0000000000..3113711600 --- /dev/null +++ b/test/parallel/test-quic-send-fd.js @@ -0,0 +1,70 @@ +'use strict'; +const common = require('../common'); +if (!common.hasQuic) + common.skip('missing quic'); + +const assert = require('assert'); +const quic = require('quic'); +const fs = require('fs'); + +const fixtures = require('../common/fixtures'); +const key = fixtures.readKey('agent1-key.pem', 'binary'); +const cert = fixtures.readKey('agent1-cert.pem', 'binary'); +const ca = fixtures.readKey('ca1-cert.pem', 'binary'); + +const server = quic.createSocket({ port: 0, validateAddress: true }); + +server.listen({ + key, + cert, + ca, + rejectUnauthorized: false, + maxCryptoBuffer: 4096, + alpn: 'meow' +}); + +server.on('session', common.mustCall((session) => { + session.on('secure', common.mustCall((servername, alpn, cipher) => { + const stream = session.openStream({ halfOpen: false }); + stream.sendFD(fs.openSync(__filename, 'r')); + stream.on('data', common.mustNotCall()); + stream.on('finish', common.mustNotCall()); + stream.on('close', common.mustCall()); + stream.on('end', common.mustNotCall()); + })); + + session.on('close', common.mustCall()); +})); + +server.on('ready', common.mustCall(() => { + const client = quic.createSocket({ + port: 0, + client: { + key, + cert, + ca, + alpn: 'meow' + } + }); + + const req = client.connect({ + address: 'localhost', + port: server.address.port + }); + + req.on('stream', common.mustCall((stream) => { + const data = []; + stream.on('data', (chunk) => data.push(chunk)); + stream.on('end', common.mustCall(() => { + assert.deepStrictEqual(Buffer.concat(data), fs.readFileSync(__filename)); + + // TODO(addaleax): Figure out why .close() is insufficient. + client.destroy(); + server.destroy(); + })); + })); + + req.on('close', common.mustCall()); +})); + +server.on('close', common.mustCall());