Skip to content

Commit

Permalink
docs: add more js docs
Browse files Browse the repository at this point in the history
refactor: clean up code for clarity
  • Loading branch information
jacobheun committed Feb 7, 2019
1 parent 5c327ae commit c13aa6a
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 20 deletions.
77 changes: 63 additions & 14 deletions src/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,20 @@ const debug = require('debug')
const log = debug('pull-plex:chan')
log.err = debug('pull-plex:chan:err')

/**
* @fires Channel#close
* @fires Channel#error
*/
class Channel extends EE {
/**
* @constructor
* @param {Object} opts
* @param {number} opts.id
* @param {boolean} opts.initiator
* @param {string} opts.name
* @param {boolean} opts.open
* @param {Mplex} opts.plex
*/
constructor (opts) {
super()

Expand All @@ -38,19 +51,6 @@ class Channel extends EE {
? Types.OUT_RESET
: Types.IN_RESET

this._log = (name, data) => {
if (!debug.enabled) return
log({
op: name,
name: this._name,
id: this._id,
endedLocal: this._endedLocal,
endedRemote: this._endedRemote,
initiator: this._initiator,
data: (data && data.toString()) || ''
})
}

this._log('new channel', this._name)

this._msgs = pushable((err) => {
Expand Down Expand Up @@ -118,12 +118,38 @@ class Channel extends EE {
return this._endedRemote && this._endedLocal
}

/**
* A convenience wrapper for the log that adds useful metadata to logs
* @private
* @param {string} name The name of the operation being logged
* @param {Buffer|string} data Logged with the metadata. Must be `.toString` capable. Default: `''`
*/
_log (name, data) {
if (!debug.enabled) return
log({
op: name,
name: this._name,
id: this._id,
endedLocal: this._endedLocal,
endedRemote: this._endedRemote,
initiator: this._initiator,
data: (data && data.toString()) || ''
})
}

/**
* Pushes `data` into the channel
* @param {Buffer} data
*/
push (data) {
this._log('push')
this._msgs.push(data)
}

// close for reading
/**
* Closes the channel for writing
* @param {Error} err
*/
close (err) {
this._log('close', err)
if (!this._endedRemote) {
Expand All @@ -134,12 +160,21 @@ class Channel extends EE {
}
}

/**
* Closes the channel with the given error
* @param {Error} err Default: `'channel reset!'`
*/
reset (err) {
this._log('reset', err)
this._reset = err || 'channel reset!'
this.close(this._reset)
}

/**
* Opens the channel if it's not already open. Attempting
* to open an already opened channel is ignored.
* @param {string} name
*/
openChan (name) {
if (this.open) { return } // chan already open

Expand All @@ -151,6 +186,12 @@ class Channel extends EE {
])
}

/**
* Pushes `data` wrapped in a `Message` into the channel.
* If the channel is not open, it will be opened automatically.
*
* @param {Buffer} data
*/
sendMsg (data) {
this._log('sendMsg')

Expand All @@ -165,6 +206,10 @@ class Channel extends EE {
])
}

/**
* Ends the channel by sending an END `Message`.
* If the channel is not open, no action will be taken.
*/
endChan () {
this._log('endChan')

Expand All @@ -178,6 +223,10 @@ class Channel extends EE {
])
}

/**
* Resets the channel by sending a RESET `Message`.
* If the channel is not open, no action will be taken.
*/
resetChan () {
this._log('resetChan')

Expand Down
33 changes: 32 additions & 1 deletion src/coder.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,17 @@ const debug = require('debug')
const log = debug('pull-plex:coder')
log.err = debug('pull-plex:coder:err')

const { MAX_MSG_SIZE } = require('./consts')

const PULL_LENGTH = 10 * 1024
const empty = Buffer.alloc(0)

/**
* Creates a Through PullStream that will varint encode all
* messages passed through it.
*
* @returns {PullStream} A through stream that varint encodes all messages
*/
exports.encode = () => {
let pool = Buffer.alloc(PULL_LENGTH)
let used = 0
Expand All @@ -32,11 +40,24 @@ exports.encode = () => {
})
}

/**
* @typedef {number} States
*/

/**
* @enum {States}
*/
const States = {
PARSING: 0,
READING: 1
}

/**
* Creates a Through PullStream that will varint decodes all
* messages passed through it.
*
* @returns {PullStream} A through stream that varint decodes all messages
*/
exports.decode = () => {
let state = States.PARSING

Expand All @@ -57,6 +78,7 @@ exports.decode = () => {

const decode = (msg) => {
const [h, offset, length] = tryDecode(msg)
// If there is a header, process it
if (h !== void 0) {
const message = {
id: h >> 3,
Expand All @@ -68,22 +90,27 @@ exports.decode = () => {
return [msg.slice(offset), message, length]
}

// There was no header, return the message
return [msg]
}

const read = (msg, data, length) => {
// If we're done reading, start parsing the message
if (length <= 0) {
state = States.PARSING
return [0, msg, data]
}

// Read more data
let left = length - msg.length
if (left < 0) { left = 0 }
const size = length - left
if (msg.length > 0) {
const buff = Buffer.isBuffer(msg) ? msg : Buffer.from(msg)
data.push(buff.slice(0, size))
}

// If we finished reading, start parsing
if (left <= 0) { state = States.PARSING }
return [left, msg.slice(size), data]
}
Expand All @@ -93,9 +120,10 @@ exports.decode = () => {
let marker = 0
let message = null
let accumulating = false
let buffer = Buffer.alloc(1 << 20)
let buffer = Buffer.alloc(MAX_MSG_SIZE)
return through(function (msg) {
while (msg && msg.length) {
// Reading is done for this message, start processing it
if (States.PARSING === state) {
if (accumulating) {
used += msg.copy(buffer, used)
Expand All @@ -117,8 +145,11 @@ exports.decode = () => {
accumulating = false
}

// We're not done reading the message, keep reading it
if (States.READING === state) {
[length, msg, message.data] = read(msg, message.data, length)

// If we read the whole message, add it to the queue
if (length <= 0 && States.PARSING === state) {
message.data = message.data.length
? message.data.length === 1
Expand Down
4 changes: 3 additions & 1 deletion src/consts.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* @typedef {number} MessageType
*/

/**
/**
* @enum {MessageType}
*/
exports.Types = {
Expand All @@ -16,3 +16,5 @@ exports.Types = {
IN_RESET: 5,
OUT_RESET: 6
}

exports.MAX_MSG_SIZE = 1 << 20 // 1MB
6 changes: 2 additions & 4 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,14 @@ const looper = require('looper')
const EE = require('events')

const Channel = require('./channel')
const { Types } = require('./consts')
const { Types, MAX_MSG_SIZE } = require('./consts')
const coder = require('./coder')

const debug = require('debug')

const log = debug('pull-plex')
log.err = debug('pull-plex:err')

const MAX_MSG_SIZE = 1 << 20 // 1mb

/**
* @typedef {Object} Message
* @property {number} id
Expand All @@ -33,7 +31,7 @@ const MAX_MSG_SIZE = 1 << 20 // 1mb
class Mplex extends EE {
/**
* @constructor
* @param {object} opts
* @param {Object} opts
* @param {boolean} opts.initiator Is this starting the stream. Default: `true`
* @param {function(Channel, number)} opts.onChan A handler for new streams. Can be used instead of `.on('stream')`
* @param {number} opts.maxChannels Maximum number of channels to have open. Default: `10000`
Expand Down

0 comments on commit c13aa6a

Please sign in to comment.