Skip to content

Commit

Permalink
feat: correct channel and plex closing
Browse files Browse the repository at this point in the history
  • Loading branch information
dryajov authored and jacobheun committed Feb 7, 2019
1 parent 19f0962 commit 0341fee
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 45 deletions.
2 changes: 1 addition & 1 deletion profile.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ function marker (n, done) {
}


spawn(1000, 1000, (err) => {
spawn(10000, 10000, (err) => {
if (err) {
throw err
}
Expand Down
14 changes: 5 additions & 9 deletions src/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ const EE = require('events')

const debug = require('debug')

const log = debug('pull-plex')
log.err = debug('pull-plex:err')
const log = debug('pull-plex:chan')
log.err = debug('pull-plex:chan:err')

class Channel extends EE {
constructor (id, name, plex, initiator, open) {
Expand All @@ -24,7 +24,6 @@ class Channel extends EE {

this._log = (name, data) => {
log({
src: 'channel.js',
op: name,
channel: this._name,
id: this._id,
Expand All @@ -42,7 +41,6 @@ class Channel extends EE {
if (this._reset) { return } // don't try closing the channel on reset

this.endChan()
if (err) { this.emit('error', err) }
})

this._source = this._msgs
Expand All @@ -57,9 +55,7 @@ class Channel extends EE {
this._endedLocal = end || false

// source ended, close the stream
if (end === true) {
return this.endChan()
}
if (end === true) { return this.endChan() }

// source errored, reset stream
if (end || this._reset) {
Expand Down Expand Up @@ -108,8 +104,8 @@ class Channel extends EE {
close (err) {
this._log('close', err)
this.emit('close', err)
this._endedRemote = err || true
this._msgs.end(err)
this._endedRemote = err
this._msgs.end(this._endedRemote)
}

reset (err) {
Expand Down
58 changes: 34 additions & 24 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,23 @@ class Plex extends EE {
this._initiator = !!initiator
this._chanId = this._initiator ? 1 : 0
this._channels = {}
this._destroyed = false
this._endedRemote = false // remote stream ended
this._endedLocal = false // local stream ended

this._log = (name, data) => {
log({
src: 'index.js',
op: name,
channel: this._name,
id: this._id,
localEnded: this._endedLocal,
remoteEnded: this._endedRemote,
initiator: this._initiator,
endedLocal: this._endedLocal,
endedRemote: this._endedRemote,
data: (data && data.toString()) || ''
})
}

this._chandata = pushable((err) => {
if (this._destroyed) { return }
this.destroy()
this._log('chandata ended')
this._endedRemote = true
this.close(err)
})

if (onChan) {
Expand All @@ -53,8 +52,9 @@ class Plex extends EE {
utils.decode(),
(read) => {
const next = (end, data) => {
if (end === true || this._destroyed) { return }
if (end) { return this.destroy(end) }
if (this._endedLocal) { return }
if (end === true) { return this.close() }
if (end) { return this.reset(end) }
this._handle(data)
return read(null, next)
}
Expand All @@ -67,33 +67,41 @@ class Plex extends EE {
return this._initiator
}

destroy (err) {
this._destroyed = true
close (err) {
this._log('close', err)

if (this.destroyed) { return }

if (err) {
setImmediate(() => this.emit('error', err))
}

err = err || new Error('Underlying stream has been closed')
this._chandata.end(err)
this._endedLocal = true

// propagate close to channels
Object
.keys(this._channels)
.forEach((id) => {
const chan = this._channels[id]
if (err) {
chan.reset(err)
} else {
chan.close()
}
delete this._channels[id]
chan.close(err)
})

if (err) {
setImmediate(() => this.emit('error', err))
}

this.emit('close')
}

get destroyed () {
return this._endedRemote && this._endedLocal
}

reset (err) {
err = err || new Error('Underlying stream has been closed')
this._chandata.end(err)
this.close(err)
}

push (data) {
log('push', data)
this._log('push', data)
this._chandata.push(data)
log('buffer', this._chandata.buffer)
}
Expand All @@ -107,6 +115,7 @@ class Plex extends EE {
}

_newStream (id, initiator, open, name) {
this._log('_newStream', Array.prototype.slice.call(arguments))
if (typeof initiator === 'string') {
name = initiator
initiator = false
Expand Down Expand Up @@ -138,6 +147,7 @@ class Plex extends EE {
}

_handle (msg) {
this._log('_handle', msg)
const { id, type, data } = msg
switch (type) {
case consts.type.NEW: {
Expand Down
14 changes: 13 additions & 1 deletion src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ const lp = require('pull-length-prefixed')
const cat = require('pull-cat')
const through = require('pull-through')

const debug = require('debug')

const log = debug('pull-plex:utils')
log.err = debug('pull-plex:utils:err')

exports.encode = () => {
return pull(
through(function (msg) {
Expand Down Expand Up @@ -33,8 +38,15 @@ exports.decode = () => {
try {
length = varint.decode(msg, offset)
offset += varint.decode.bytes

if (length > msg.length) {
throw new Error('partial buffer, need more data')
}

data = msg.slice(offset, offset + length)
} catch (err) {} // ignore if data is empty
} catch (err) {
log.err(err)
} // ignore if data is empty

const decoded = {
id: h >> 3,
Expand Down
22 changes: 12 additions & 10 deletions test/plex.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const consts = require('../src/consts')
const series = require('async/series')

describe('plex', () => {
it.only(`destroy should close both ends`, (done) => {
it(`reset should close both ends`, (done) => {
const p = pair()

const plex1 = new Plex(true)
Expand All @@ -44,17 +44,19 @@ describe('plex', () => {
plex2.on('close', () => {
expect().mark()
})

plex1.destroy()
plex1.reset()
})

it(`channel id should be correct`, () => [1, 0].forEach((type) => {
const initiator = Boolean(type)
const plex = new Plex(initiator)
describe(`check id`, () => [true, false].forEach((initiator) => {
it(`id should be ${initiator ? 'odd' : 'even'}`, () => {
const plex = new Plex(initiator)

const times = 10
for (let i = 0; i < times; i++) {
expect(Boolean(plex._nextChanId() & 1)).to.be.eql(initiator)
}
const times = 100
for (let i = 0; i < times; i++) {
const id = plex._nextChanId()
console.dir(id)
expect(Boolean(id & 1)).to.be.eql(initiator)
}
})
}))
})

0 comments on commit 0341fee

Please sign in to comment.