Skip to content

Commit

Permalink
fix: dont blow up the stack
Browse files Browse the repository at this point in the history
  • Loading branch information
dryajov authored and jacobheun committed Feb 7, 2019
1 parent d43efac commit c041265
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 51 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"dirty-chai": "^2.0.1",
"interface-connection": "^0.3.2",
"lodash.defaults": "^4.2.0",
"looper": "^4.0.0",
"pull-offset-limit": "^1.1.1",
"pull-pair": "^1.1.0",
"pull-pushable": "^2.2.0",
Expand Down
71 changes: 35 additions & 36 deletions src/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const pushable = require('pull-pushable')
const defaults = require('lodash.defaults')
const looper = require('looper')

const consts = require('./consts')
const EE = require('events')
Expand Down Expand Up @@ -50,10 +51,10 @@ class Channel extends EE {
})
}

// this._log('new channel', this._name)
this._log('new channel', this._name)

this._msgs = pushable((err) => {
// this._log('source closed', err)
this._log('source closed', err)
if (err && typeof err !== 'boolean') {
setImmediate(() => this.emit('error', err))
}
Expand All @@ -63,33 +64,33 @@ class Channel extends EE {
this._source = this._msgs

this.sink = (read) => {
const next = (end, data) => {
// this._log('sink', data)

// stream already ended
if (this._endedLocal) { return }

this._endedLocal = end || false

// source ended, close the stream
if (end === true) {
return this.endChan()
}

// source errored, reset stream
if (end || this._reset) {
this.resetChan()
this.emit('error', end || this._reset)
this.reset()
return
}

// just send
this.sendMsg(data)
return read(null, next)
}
const next = looper(() => {
read(null, (end, data) => {
// stream already ended
if (this._endedLocal) { return }

this._endedLocal = end || false

// source ended, close the stream
if (end === true) {
return this.endChan()
}

// source errored, reset stream
if (end || this._reset) {
this.resetChan()
this.emit('error', end || this._reset)
this.reset()
return
}

// just send
this.sendMsg(data)
next()
})
})

read(null, next)
next()
}
}

Expand Down Expand Up @@ -118,13 +119,13 @@ class Channel extends EE {
}

push (data) {
// this._log('push', data)
this._log('push', data)
this._msgs.push(data)
}

// close for reading
close (err) {
// this._log('close', err)
this._log('close', err)
if (!this._endedRemote) {
this._endedRemote = err || true
this._msgs.end(this._endedRemote)
Expand All @@ -134,14 +135,12 @@ class Channel extends EE {
}

reset (err) {
// this._log('reset', err)
this._log('reset', err)
this._reset = err || 'channel reset!'
this.close(this._reset)
}

openChan () {
// this._log('openChan')

if (this.open) { return } // chan already open

let name
Expand All @@ -154,7 +153,7 @@ class Channel extends EE {
}

sendMsg (data) {
// this._log('sendMsg', data)
this._log('sendMsg')

if (!this.open) {
this.openChan()
Expand All @@ -168,7 +167,7 @@ class Channel extends EE {
}

endChan () {
// this._log('endChan')
this._log('endChan')

if (!this.open) {
return
Expand All @@ -181,7 +180,7 @@ class Channel extends EE {
}

resetChan () {
// this._log('endChan')
this._log('resetChan')

if (!this.open) {
return
Expand Down
29 changes: 14 additions & 15 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const pull = require('pull-stream')
const pushable = require('pull-pushable')
const through = require('pull-through')
const looper = require('looper')

const defautls = require('lodash.defaults')

Expand Down Expand Up @@ -57,7 +58,7 @@ class Mplex extends EE {
}

this._chandata = pushable((err) => {
// this._log('chandata ended')
this._log('mplex ended')
this._endedRemote = true
this.close(err)
})
Expand All @@ -82,15 +83,16 @@ class Mplex extends EE {
}),
coder.decode(),
(read) => {
function next (end, data) {
if (self._endedLocal) { return }
if (end === true) { return self.close() }
if (end) { return self.reset(end) }
self._handle(data)
return read(null, next)
}

return read(null, next)
const next = looper(() => {
read(null, function (end, data) {
if (self._endedLocal) { return }
if (end === true) { return self.close() }
if (end) { return self.reset(end) }
self._handle(data)
next()
})
})
next()
})
}

Expand All @@ -99,7 +101,7 @@ class Mplex extends EE {
}

close (err) {
// this._log('close', err)
this._log('close', err)

if (this.destroyed) { return }

Expand Down Expand Up @@ -132,14 +134,12 @@ class Mplex extends EE {
}

push (data) {
// this._log('push', data)
if (data.data &&
Buffer.byteLength(data.data) > this._maxMsgSize) {
this._chandata.end(new Error('message too large!'))
}

this._chandata.push(data)
log('buffer', this._chandata.buffer)
}

_nextChanId () {
Expand Down Expand Up @@ -173,7 +173,6 @@ class Mplex extends EE {
}

id = typeof id === 'number' ? id : this._nextChanId(initiator)
// if (list.has(id)) {
if (list[id]) {
this.emit('error', new Error(`channel with id ${id} already exist!`))
return
Expand Down Expand Up @@ -203,7 +202,7 @@ class Mplex extends EE {
}

_handle (msg) {
// this._log('_handle', msg)
this._log('_handle', msg)
const { id, type, data } = msg
switch (type) {
case consts.type.NEW: {
Expand Down

0 comments on commit c041265

Please sign in to comment.