Skip to content

Commit

Permalink
feat: dont close channel too early
Browse files Browse the repository at this point in the history
  • Loading branch information
dryajov authored and jacobheun committed Feb 7, 2019
1 parent 55d096e commit 9ea6999
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 15 deletions.
20 changes: 13 additions & 7 deletions src/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class Channel extends EE {
this._log = (name, data) => {
log({
op: name,
channel: this._name,
name: this._name,
id: this._id,
endedLocal: this._endedLocal,
endedRemote: this._endedRemote,
Expand All @@ -38,10 +38,12 @@ class Channel extends EE {

this._msgs = pushable((err) => {
this._log('source closed', err)
if (err && typeof err !== 'boolean') {
setImmediate(() => this.emit('error', err))
}
if (this._reset) { return } // don't try closing the channel on reset
if (err) { setImmediate(() => this.emit('error', err)) }

this.endChan()
// this.endChan()
})

this._source = this._msgs
Expand All @@ -56,7 +58,9 @@ class Channel extends EE {
this._endedLocal = end || false

// source ended, close the stream
if (end === true) { return this.endChan() }
if (end === true) {
return this.endChan()
}

// source errored, reset stream
if (end || this._reset) {
Expand Down Expand Up @@ -104,9 +108,11 @@ class Channel extends EE {
// close for reading
close (err) {
this._log('close', err)
this._endedRemote = err || true
this._msgs.end(this._endedRemote)
this.emit('close', err)
if (!this._endedRemote) {
this._endedRemote = err || true
this._msgs.end(this._endedRemote)
this.emit('close', err)
}
}

reset (err) {
Expand Down
21 changes: 14 additions & 7 deletions src/coder.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,15 @@ let States = {
PARSING: 0,
READING: 1
}
let state = States.PARSING

exports.decode = () => {
let state = States.PARSING
let offset = 0
let message = null
let length = 0
let buffer = null
let pos = 0

const decode = (msg) => {
try {
let offset = 0
Expand All @@ -52,8 +59,12 @@ exports.decode = () => {
}
}

let pos = 0
const read = (msg, data, length) => {
if (length <= 0) {
state = States.PARSING
return [0, msg, data]
}

let left = length - msg.length
if (left < 0) { left = 0 }
if (msg.length > 0) {
Expand All @@ -65,10 +76,6 @@ exports.decode = () => {
return [left, msg, data]
}

let offset = 0
let message = null
let length = 0
let buffer = null
return through(function (msg) {
while (msg.length) {
if (States.PARSING === state) {
Expand All @@ -90,7 +97,7 @@ exports.decode = () => {
if (length <= 0 && States.PARSING === state) {
this.queue(message)
offset = 0
message = {}
message = null
length = 0
pos = 0
}
Expand Down
2 changes: 1 addition & 1 deletion src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class Plex extends EE {
}

this._initiator = !!initiator
this._chanId = this._initiator ? 0 : 1
this._chanId = this._initiator ? 1 : 0
this._channels = {}
this._endedRemote = false // remote stream ended
this._endedLocal = false // local stream ended
Expand Down
205 changes: 205 additions & 0 deletions test/channel.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ const pull = require('pull-stream')
const pair = require('pull-pair/duplex')
const pushable = require('pull-pushable')
const abortable = require('pull-abortable')
const through = require('pull-through')
const defer = require('pull-defer')
const lp = require('pull-length-prefixed')

const Connection = require('interface-connection').Connection
const Plex = require('../src')

function closeAndWait (stream) {
Expand Down Expand Up @@ -279,4 +283,205 @@ describe('channel', () => {
closeAndWait(dialerConn)
closeAndWait(listenerConn)
})

it('should be able to send and receive from same stream', (done) => {
const p = pair()

const plex1 = new Plex(true)
const plex2 = new Plex(false)

pull(plex1, p[0], plex1)
pull(plex2, p[1], plex2)

plex2.on('stream', (stream) => {
pull(
stream,
through(function (data) {
this.queue(data.toString().toUpperCase())
}),
stream
)
})

const stream = plex1.createStream('stream 1')
pull(
pull.values([Buffer.from('hello from plex1!!')]),
stream,
pull.collect((err, data) => {
expect(err).to.not.exist()
expect(data[0].toString()).to.eql('HELLO FROM PLEX1!!')
done()
})
)
})

it('should be able to send and receive from same stream with delayed pipe', (done) => {
const p = pair()

const plex1 = new Plex(true)
const plex2 = new Plex(false)

pull(plex1, p[0], plex1)
pull(plex2, p[1], plex2)

plex2.on('stream', (stream) => {
setTimeout(() => pull(
stream,
through(function (data) {
this.queue(data.toString().toUpperCase())
}),
stream
), 800)
})

const stream = plex1.createStream('stream 1')
pull(
pull.values([Buffer.from('hello from plex1!!')]),
stream,
pull.collect((err, data) => {
expect(err).to.not.exist()
expect(data[0].toString()).to.eql('HELLO FROM PLEX1!!')
done()
})
)
})

it('should be able to send and receive from same stream with deferred stream', (done) => {
const p = pair()

const plex1 = new Plex(true)
const plex2 = new Plex(false)

pull(plex1, p[0], plex1)
pull(plex2, p[1], plex2)

const stream2 = defer.duplex()
plex2.on('stream', (_stream) => {
stream2.resolve(_stream)
})

pull(
stream2,
through(function (data) {
this.queue(data.toString().toUpperCase())
}),
stream2
)

const stream1 = plex1.createStream('stream 1')
pull(
pull.values([Buffer.from('hello from plex1!!')]),
stream1,
pull.collect((err, data) => {
expect(err).to.not.exist()
expect(data[0].toString()).to.eql('HELLO FROM PLEX1!!')
done()
})
)
})

it('should be able to send and receive from same stream with deferred and delayed stream', (done) => {
const p = pair()

const plex1 = new Plex(true)
const plex2 = new Plex(false)

pull(plex1, p[0], plex1)
pull(plex2, p[1], plex2)

const stream2 = defer.duplex()
plex2.on('stream', (_stream) => {
stream2.resolve(_stream)
})

setTimeout(() => pull(
stream2,
through(function (data) {
this.queue(data.toString().toUpperCase())
}),
stream2
), 800)

const stream1 = plex1.createStream('stream 1')
pull(
pull.values([Buffer.from('hello from plex1!!')]),
stream1,
pull.collect((err, data) => {
expect(err).to.not.exist()
expect(data[0].toString()).to.eql('HELLO FROM PLEX1!!')
done()
})
)
})

it('should work with connection', (done) => {
const p = pair()

const plex1 = new Plex(true)
const plex2 = new Plex(false)

pull(plex1, p[0], plex1)
pull(plex2, p[1], plex2)

plex2.on('stream', (stream) => {
const conn = new Connection(stream)
pull(
conn,
through(function (data) {
this.queue(data.toString().toUpperCase())
}),
conn
)
})

const stream = plex1.createStream('stream 1')
const conn = new Connection(stream)
pull(
pull.values([Buffer.from('hello from plex1!!')]),
conn,
pull.collect((err, data) => {
expect(err).to.not.exist()
expect(data[0].toString()).to.eql('HELLO FROM PLEX1!!')
done()
})
)
})

it('should work with connection length prefixed', (done) => {
const p = pair()

const plex1 = new Plex(true)
const plex2 = new Plex(false)

pull(plex1, p[0], plex1)
pull(plex2, p[1], plex2)

plex2.on('stream', (stream) => {
const conn = new Connection(stream)
pull(
conn,
lp.decode(),
through(function (data) {
this.queue(Buffer.from(data.toString().toUpperCase()))
}),
lp.encode(),
conn
)
})

const stream = plex1.createStream('stream 1')
const conn = new Connection(stream)

pull(
pull.values([Buffer.from('hello from plex1!!')]),
lp.encode(),
conn,
lp.decode(),
pull.collect((err, data) => {
expect(err).to.not.exist()
expect(data[0].toString()).to.eql('HELLO FROM PLEX1!!')
done()
})
)
})
})

0 comments on commit 9ea6999

Please sign in to comment.