diff --git a/src/channel.js b/src/channel.js index 00c039a..2261e1c 100644 --- a/src/channel.js +++ b/src/channel.js @@ -25,7 +25,7 @@ class Channel extends EE { this._log = (name, data) => { log({ op: name, - channel: this._name, + name: this._name, id: this._id, endedLocal: this._endedLocal, endedRemote: this._endedRemote, @@ -38,10 +38,12 @@ class Channel extends EE { this._msgs = pushable((err) => { this._log('source closed', err) + if (err && typeof err !== 'boolean') { + setImmediate(() => this.emit('error', err)) + } if (this._reset) { return } // don't try closing the channel on reset - if (err) { setImmediate(() => this.emit('error', err)) } - this.endChan() + // this.endChan() }) this._source = this._msgs @@ -56,7 +58,9 @@ class Channel extends EE { this._endedLocal = end || false // source ended, close the stream - if (end === true) { return this.endChan() } + if (end === true) { + return this.endChan() + } // source errored, reset stream if (end || this._reset) { @@ -104,9 +108,11 @@ class Channel extends EE { // close for reading close (err) { this._log('close', err) - this._endedRemote = err || true - this._msgs.end(this._endedRemote) - this.emit('close', err) + if (!this._endedRemote) { + this._endedRemote = err || true + this._msgs.end(this._endedRemote) + this.emit('close', err) + } } reset (err) { diff --git a/src/coder.js b/src/coder.js index 26ee28a..0a3c8e0 100644 --- a/src/coder.js +++ b/src/coder.js @@ -28,8 +28,15 @@ let States = { PARSING: 0, READING: 1 } -let state = States.PARSING + exports.decode = () => { + let state = States.PARSING + let offset = 0 + let message = null + let length = 0 + let buffer = null + let pos = 0 + const decode = (msg) => { try { let offset = 0 @@ -52,8 +59,12 @@ exports.decode = () => { } } - let pos = 0 const read = (msg, data, length) => { + if (length <= 0) { + state = States.PARSING + return [0, msg, data] + } + let left = length - msg.length if (left < 0) { left = 0 } if (msg.length > 0) { @@ -65,10 +76,6 @@ exports.decode = () => { return [left, msg, data] } - let offset = 0 - let message = null - let length = 0 - let buffer = null return through(function (msg) { while (msg.length) { if (States.PARSING === state) { @@ -90,7 +97,7 @@ exports.decode = () => { if (length <= 0 && States.PARSING === state) { this.queue(message) offset = 0 - message = {} + message = null length = 0 pos = 0 } diff --git a/src/index.js b/src/index.js index 7c82814..856ae44 100644 --- a/src/index.js +++ b/src/index.js @@ -24,7 +24,7 @@ class Plex extends EE { } this._initiator = !!initiator - this._chanId = this._initiator ? 0 : 1 + this._chanId = this._initiator ? 1 : 0 this._channels = {} this._endedRemote = false // remote stream ended this._endedLocal = false // local stream ended diff --git a/test/channel.spec.js b/test/channel.spec.js index 47f4761..3f0e5d4 100644 --- a/test/channel.spec.js +++ b/test/channel.spec.js @@ -12,7 +12,11 @@ const pull = require('pull-stream') const pair = require('pull-pair/duplex') const pushable = require('pull-pushable') const abortable = require('pull-abortable') +const through = require('pull-through') +const defer = require('pull-defer') +const lp = require('pull-length-prefixed') +const Connection = require('interface-connection').Connection const Plex = require('../src') function closeAndWait (stream) { @@ -279,4 +283,205 @@ describe('channel', () => { closeAndWait(dialerConn) closeAndWait(listenerConn) }) + + it('should be able to send and receive from same stream', (done) => { + const p = pair() + + const plex1 = new Plex(true) + const plex2 = new Plex(false) + + pull(plex1, p[0], plex1) + pull(plex2, p[1], plex2) + + plex2.on('stream', (stream) => { + pull( + stream, + through(function (data) { + this.queue(data.toString().toUpperCase()) + }), + stream + ) + }) + + const stream = plex1.createStream('stream 1') + pull( + pull.values([Buffer.from('hello from plex1!!')]), + stream, + pull.collect((err, data) => { + expect(err).to.not.exist() + expect(data[0].toString()).to.eql('HELLO FROM PLEX1!!') + done() + }) + ) + }) + + it('should be able to send and receive from same stream with delayed pipe', (done) => { + const p = pair() + + const plex1 = new Plex(true) + const plex2 = new Plex(false) + + pull(plex1, p[0], plex1) + pull(plex2, p[1], plex2) + + plex2.on('stream', (stream) => { + setTimeout(() => pull( + stream, + through(function (data) { + this.queue(data.toString().toUpperCase()) + }), + stream + ), 800) + }) + + const stream = plex1.createStream('stream 1') + pull( + pull.values([Buffer.from('hello from plex1!!')]), + stream, + pull.collect((err, data) => { + expect(err).to.not.exist() + expect(data[0].toString()).to.eql('HELLO FROM PLEX1!!') + done() + }) + ) + }) + + it('should be able to send and receive from same stream with deferred stream', (done) => { + const p = pair() + + const plex1 = new Plex(true) + const plex2 = new Plex(false) + + pull(plex1, p[0], plex1) + pull(plex2, p[1], plex2) + + const stream2 = defer.duplex() + plex2.on('stream', (_stream) => { + stream2.resolve(_stream) + }) + + pull( + stream2, + through(function (data) { + this.queue(data.toString().toUpperCase()) + }), + stream2 + ) + + const stream1 = plex1.createStream('stream 1') + pull( + pull.values([Buffer.from('hello from plex1!!')]), + stream1, + pull.collect((err, data) => { + expect(err).to.not.exist() + expect(data[0].toString()).to.eql('HELLO FROM PLEX1!!') + done() + }) + ) + }) + + it('should be able to send and receive from same stream with deferred and delayed stream', (done) => { + const p = pair() + + const plex1 = new Plex(true) + const plex2 = new Plex(false) + + pull(plex1, p[0], plex1) + pull(plex2, p[1], plex2) + + const stream2 = defer.duplex() + plex2.on('stream', (_stream) => { + stream2.resolve(_stream) + }) + + setTimeout(() => pull( + stream2, + through(function (data) { + this.queue(data.toString().toUpperCase()) + }), + stream2 + ), 800) + + const stream1 = plex1.createStream('stream 1') + pull( + pull.values([Buffer.from('hello from plex1!!')]), + stream1, + pull.collect((err, data) => { + expect(err).to.not.exist() + expect(data[0].toString()).to.eql('HELLO FROM PLEX1!!') + done() + }) + ) + }) + + it('should work with connection', (done) => { + const p = pair() + + const plex1 = new Plex(true) + const plex2 = new Plex(false) + + pull(plex1, p[0], plex1) + pull(plex2, p[1], plex2) + + plex2.on('stream', (stream) => { + const conn = new Connection(stream) + pull( + conn, + through(function (data) { + this.queue(data.toString().toUpperCase()) + }), + conn + ) + }) + + const stream = plex1.createStream('stream 1') + const conn = new Connection(stream) + pull( + pull.values([Buffer.from('hello from plex1!!')]), + conn, + pull.collect((err, data) => { + expect(err).to.not.exist() + expect(data[0].toString()).to.eql('HELLO FROM PLEX1!!') + done() + }) + ) + }) + + it('should work with connection length prefixed', (done) => { + const p = pair() + + const plex1 = new Plex(true) + const plex2 = new Plex(false) + + pull(plex1, p[0], plex1) + pull(plex2, p[1], plex2) + + plex2.on('stream', (stream) => { + const conn = new Connection(stream) + pull( + conn, + lp.decode(), + through(function (data) { + this.queue(Buffer.from(data.toString().toUpperCase())) + }), + lp.encode(), + conn + ) + }) + + const stream = plex1.createStream('stream 1') + const conn = new Connection(stream) + + pull( + pull.values([Buffer.from('hello from plex1!!')]), + lp.encode(), + conn, + lp.decode(), + pull.collect((err, data) => { + expect(err).to.not.exist() + expect(data[0].toString()).to.eql('HELLO FROM PLEX1!!') + done() + }) + ) + }) })