diff --git a/src/channel.js b/src/channel.js index 3273704..dd68db2 100644 --- a/src/channel.js +++ b/src/channel.js @@ -11,7 +11,20 @@ const debug = require('debug') const log = debug('pull-plex:chan') log.err = debug('pull-plex:chan:err') +/** + * @fires Channel#close + * @fires Channel#error + */ class Channel extends EE { + /** + * @constructor + * @param {Object} opts + * @param {number} opts.id + * @param {boolean} opts.initiator + * @param {string} opts.name + * @param {boolean} opts.open + * @param {Mplex} opts.plex + */ constructor (opts) { super() @@ -38,19 +51,6 @@ class Channel extends EE { ? Types.OUT_RESET : Types.IN_RESET - this._log = (name, data) => { - if (!debug.enabled) return - log({ - op: name, - name: this._name, - id: this._id, - endedLocal: this._endedLocal, - endedRemote: this._endedRemote, - initiator: this._initiator, - data: (data && data.toString()) || '' - }) - } - this._log('new channel', this._name) this._msgs = pushable((err) => { @@ -118,12 +118,38 @@ class Channel extends EE { return this._endedRemote && this._endedLocal } + /** + * A convenience wrapper for the log that adds useful metadata to logs + * @private + * @param {string} name The name of the operation being logged + * @param {Buffer|string} data Logged with the metadata. Must be `.toString` capable. Default: `''` + */ + _log (name, data) { + if (!debug.enabled) return + log({ + op: name, + name: this._name, + id: this._id, + endedLocal: this._endedLocal, + endedRemote: this._endedRemote, + initiator: this._initiator, + data: (data && data.toString()) || '' + }) + } + + /** + * Pushes `data` into the channel + * @param {Buffer} data + */ push (data) { this._log('push') this._msgs.push(data) } - // close for reading + /** + * Closes the channel for writing + * @param {Error} err + */ close (err) { this._log('close', err) if (!this._endedRemote) { @@ -134,12 +160,21 @@ class Channel extends EE { } } + /** + * Closes the channel with the given error + * @param {Error} err Default: `'channel reset!'` + */ reset (err) { this._log('reset', err) this._reset = err || 'channel reset!' this.close(this._reset) } + /** + * Opens the channel if it's not already open. Attempting + * to open an already opened channel is ignored. + * @param {string} name + */ openChan (name) { if (this.open) { return } // chan already open @@ -151,6 +186,12 @@ class Channel extends EE { ]) } + /** + * Pushes `data` wrapped in a `Message` into the channel. + * If the channel is not open, it will be opened automatically. + * + * @param {Buffer} data + */ sendMsg (data) { this._log('sendMsg') @@ -165,6 +206,10 @@ class Channel extends EE { ]) } + /** + * Ends the channel by sending an END `Message`. + * If the channel is not open, no action will be taken. + */ endChan () { this._log('endChan') @@ -178,6 +223,10 @@ class Channel extends EE { ]) } + /** + * Resets the channel by sending a RESET `Message`. + * If the channel is not open, no action will be taken. + */ resetChan () { this._log('resetChan') diff --git a/src/coder.js b/src/coder.js index 297015f..2f098a1 100644 --- a/src/coder.js +++ b/src/coder.js @@ -8,9 +8,17 @@ const debug = require('debug') const log = debug('pull-plex:coder') log.err = debug('pull-plex:coder:err') +const { MAX_MSG_SIZE } = require('./consts') + const PULL_LENGTH = 10 * 1024 const empty = Buffer.alloc(0) +/** + * Creates a Through PullStream that will varint encode all + * messages passed through it. + * + * @returns {PullStream} A through stream that varint encodes all messages + */ exports.encode = () => { let pool = Buffer.alloc(PULL_LENGTH) let used = 0 @@ -32,11 +40,24 @@ exports.encode = () => { }) } +/** + * @typedef {number} States + */ + +/** + * @enum {States} + */ const States = { PARSING: 0, READING: 1 } +/** + * Creates a Through PullStream that will varint decodes all + * messages passed through it. + * + * @returns {PullStream} A through stream that varint decodes all messages + */ exports.decode = () => { let state = States.PARSING @@ -57,6 +78,7 @@ exports.decode = () => { const decode = (msg) => { const [h, offset, length] = tryDecode(msg) + // If there is a header, process it if (h !== void 0) { const message = { id: h >> 3, @@ -68,15 +90,18 @@ exports.decode = () => { return [msg.slice(offset), message, length] } + // There was no header, return the message return [msg] } const read = (msg, data, length) => { + // If we're done reading, start parsing the message if (length <= 0) { state = States.PARSING return [0, msg, data] } + // Read more data let left = length - msg.length if (left < 0) { left = 0 } const size = length - left @@ -84,6 +109,8 @@ exports.decode = () => { const buff = Buffer.isBuffer(msg) ? msg : Buffer.from(msg) data.push(buff.slice(0, size)) } + + // If we finished reading, start parsing if (left <= 0) { state = States.PARSING } return [left, msg.slice(size), data] } @@ -93,9 +120,10 @@ exports.decode = () => { let marker = 0 let message = null let accumulating = false - let buffer = Buffer.alloc(1 << 20) + let buffer = Buffer.alloc(MAX_MSG_SIZE) return through(function (msg) { while (msg && msg.length) { + // Reading is done for this message, start processing it if (States.PARSING === state) { if (accumulating) { used += msg.copy(buffer, used) @@ -117,8 +145,11 @@ exports.decode = () => { accumulating = false } + // We're not done reading the message, keep reading it if (States.READING === state) { [length, msg, message.data] = read(msg, message.data, length) + + // If we read the whole message, add it to the queue if (length <= 0 && States.PARSING === state) { message.data = message.data.length ? message.data.length === 1 diff --git a/src/consts.js b/src/consts.js index b04c423..1ce0877 100644 --- a/src/consts.js +++ b/src/consts.js @@ -4,7 +4,7 @@ * @typedef {number} MessageType */ - /** +/** * @enum {MessageType} */ exports.Types = { @@ -16,3 +16,5 @@ exports.Types = { IN_RESET: 5, OUT_RESET: 6 } + +exports.MAX_MSG_SIZE = 1 << 20 // 1MB diff --git a/src/index.js b/src/index.js index 2d7bcfd..607cdf3 100644 --- a/src/index.js +++ b/src/index.js @@ -8,7 +8,7 @@ const looper = require('looper') const EE = require('events') const Channel = require('./channel') -const { Types } = require('./consts') +const { Types, MAX_MSG_SIZE } = require('./consts') const coder = require('./coder') const debug = require('debug') @@ -16,8 +16,6 @@ const debug = require('debug') const log = debug('pull-plex') log.err = debug('pull-plex:err') -const MAX_MSG_SIZE = 1 << 20 // 1mb - /** * @typedef {Object} Message * @property {number} id @@ -33,7 +31,7 @@ const MAX_MSG_SIZE = 1 << 20 // 1mb class Mplex extends EE { /** * @constructor - * @param {object} opts + * @param {Object} opts * @param {boolean} opts.initiator Is this starting the stream. Default: `true` * @param {function(Channel, number)} opts.onChan A handler for new streams. Can be used instead of `.on('stream')` * @param {number} opts.maxChannels Maximum number of channels to have open. Default: `10000`