From e2f075628f523330e28a49447ba3511ea65fedb5 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Wed, 14 Aug 2019 20:55:11 +0100 Subject: [PATCH 01/18] refactor: move files --- src/{internals/index.js => mplex.js} | 0 src/{internals/channel.js => stream.js} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename src/{internals/index.js => mplex.js} (100%) rename src/{internals/channel.js => stream.js} (100%) diff --git a/src/internals/index.js b/src/mplex.js similarity index 100% rename from src/internals/index.js rename to src/mplex.js diff --git a/src/internals/channel.js b/src/stream.js similarity index 100% rename from src/internals/channel.js rename to src/stream.js From c0292947fa4c45c279cb51c1db88baad26a1accc Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 15 Aug 2019 11:46:05 +0100 Subject: [PATCH 02/18] refactor: async iterators based mplex License: MIT Signed-off-by: Alan Shaw --- .aegir.js | 35 --- .gitignore | 12 +- README.md | 187 ++++++++----- examples/dialer.js | 49 +++- examples/listener.js | 43 +-- examples/util.js | 17 ++ package.json | 36 +-- src/adapter.js | 64 +++++ src/codec.js | 3 - src/coder/decode.js | 69 +++++ src/coder/encode.js | 51 ++++ src/coder/index.js | 4 + src/index.js | 28 +- src/message-types.js | 33 +++ src/mplex.js | 560 ++++++++----------------------------- src/muxer.js | 108 ------- src/restrict-size.js | 18 ++ src/stream.js | 235 ++++------------ test/browser.js | 43 --- test/coder.spec.js | 106 +++++++ test/compliance.spec.js | 10 +- test/internals.node.js | 460 ------------------------------ test/mplex.spec.js | 70 ----- test/muxer.spec.js | 74 ----- test/node.js | 3 - test/restrict-size.spec.js | 48 ++++ test/stream.spec.js | 506 +++++++++++++++++++++++++++++++++ 27 files changed, 1295 insertions(+), 1577 deletions(-) delete mode 100644 .aegir.js create mode 100644 examples/util.js create mode 100644 src/adapter.js delete mode 100644 src/codec.js create mode 100644 src/coder/decode.js create mode 100644 src/coder/encode.js create mode 100644 src/coder/index.js create mode 100644 src/message-types.js delete mode 100644 src/muxer.js create mode 100644 src/restrict-size.js delete mode 100644 test/browser.js create mode 100644 test/coder.spec.js delete mode 100644 test/internals.node.js delete mode 100644 test/mplex.spec.js delete mode 100644 test/muxer.spec.js delete mode 100644 test/node.js create mode 100644 test/restrict-size.spec.js create mode 100644 test/stream.spec.js diff --git a/.aegir.js b/.aegir.js deleted file mode 100644 index b0484cc..0000000 --- a/.aegir.js +++ /dev/null @@ -1,35 +0,0 @@ -'use strict' - -const WSlibp2p = require('libp2p-websockets') -const multiaddr = require('multiaddr') -const pull = require('pull-stream') - -const multiplex = require('./src') - -let listener -const boot = (done) => { - const ws = new WSlibp2p() - const mh = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') - listener = ws.createListener((transportSocket) => { - const muxedConn = multiplex.listener(transportSocket) - muxedConn.on('stream', (connRx) => { - const connTx = muxedConn.newStream() - pull(connRx, connTx, connRx) - }) - }) - - listener.listen(mh, done) -} - -const shutdown = (done) => { - listener.close(done) -} - -module.exports = { - hooks: { - browser: { - pre: boot, - post: shutdown - } - } -} diff --git a/.gitignore b/.gitignore index d14c494..db79d1f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,13 +1,7 @@ +node_modules +coverage +.nyc_output package-lock.json yarn.lock docs - -**/node_modules -**/*.log -test/setup/tmp-disposable-nodes-addrs.json dist -coverage -**/*.swp -examples/sub-module/**/bundle.js -examples/sub-module/**/*-minified.js -examples/sub-module/*-bundle.js diff --git a/README.md b/README.md index 4335dca..4215d9c 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,4 @@ -js-libp2p-mplex -=================== +# js-libp2p-mplex [![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://protocol.ai) [![](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/) @@ -12,100 +11,146 @@ js-libp2p-mplex ![](https://img.shields.io/badge/npm-%3E%3D6.0.0-orange.svg?style=flat-square) ![](https://img.shields.io/badge/Node.js-%3E%3D10.0.0-orange.svg?style=flat-square) -> JavaScript implementation of https://github.com/libp2p/mplex +> JavaScript implementation of [mplex](https://github.com/libp2p/specs/tree/master/mplex). [![](https://github.com/libp2p/interface-stream-muxer/raw/master/img/badge.png)](https://github.com/libp2p/interface-stream-muxer) ## Lead Maintainer -[Vasco Santos](https://github.com/vasco-santos). +[Vasco Santos](https://github.com/vasco-santos) + +## Install + +```sh +npm install libp2p-mplex +``` ## Usage -Let's define a `listener.js`, which starts a TCP server on port 9999 and waits for a connection. Once we get a connection, we wait for a stream. And finally, once we have the stream, we pull the data from that stream, and printing it to the console. - -```JavaScript -const mplex = require('libp2p-mplex') -const tcp = require('net') -const pull = require('pull-stream') -const toPull = require('stream-to-pull-stream') - -const listener = tcp.createServer((socket) => { - console.log('[listener] Got connection!') - - const muxer = mplex.listener(toPull(socket)) - - muxer.on('stream', (stream) => { - console.log('[listener] Got stream!') - pull( - stream, - pull.drain((data) => { - console.log('[listener] Received:') - console.log(data.toString()) - }) - ) - }) -}) +```js +const Mplex = require('libp2p-mplex') +const pipe = require('it-pipe') -listener.listen(9999, () => { - console.log('[listener] listening on 9999') +const muxer = new Mplex({ + onStream: stream => { // Receive a duplex stream from the remote + // ...receive data from the remote and optionally send data back + } }) + +pipe(conn, muxer, conn) // conn is duplex connection to another peer + +const stream = muxer.newStream() // Create a new duplex stream to the remote + +// Use the duplex stream to send some data to the remote... +pipe([1, 2, 3], stream) ``` -Now, let's define `dialer.js` who will connect to our `listener` over a TCP socket. Once we have that, we'll put a message in the stream for our `listener`. +## API -```JavaScript -const mplex = require('libp2p-mplex') -const tcp = require('net') -const pull = require('pull-stream') -const toPull = require('stream-to-pull-stream') +### `const muxer = new Mplex([options])` -const socket = tcp.connect(9999) +Create a new _duplex_ stream that can be piped together with a connection in order to allow multiplexed communications. -const muxer = mplex.dialer(toPull(socket)) +e.g. -console.log('[dialer] opening stream') -const stream = muxer.newStream((err) => { - console.log('[dialer] opened stream') - if (err) throw err -}) +```js +const Mplex = require('libp2p-mplex') +const pipe = require('it-pipe') -pull( - pull.values(['hey, how is it going. I am the dialer']), - stream -) +// Create a duplex muxer +const muxer = new Mplex() + +// Use the muxer in a pipeline +pipe(conn, muxer, conn) // conn is duplex connection to another peer ``` -Now we can first run `listener.js` and then `dialer.js` to see the -following output: +`options` is an optional `Object` that may have the following properties: + +* `onStream` - A function called when receiving a new stream from the remote. e.g. + ```js + // Receive a new stream on the muxed connection + const onStream = stream => { + // Read from this stream and write back to it (echo server) + pipe( + stream, + source => (async function * () { + for await (const data of source) yield data + })() + stream + ) + } + const muxer = new Mplex({ onStream }) + // ... + ``` + **Note:** The `onStream` function can be passed in place of the `options` object. i.e. + ```js + new Mplex(stream => { /* ... */ }) + ``` +* `signal` - An [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) which can be used to abort the muxer, _including_ all of it's multiplexed connections. e.g. + ```js + const controller = new AbortController() + const muxer = new Mplex({ signal: controller.signal }) + + pipe(conn, muxer, conn) + + controller.abort() + ``` +* `maxMsgSize` - The maximum size in bytes the data field of multiplexed messages may contain (default 1MB) + +### `muxer.onStream` + +Use this property as an alternative to passing `onStream` as an option to the `Mplex` constructor. + +### `const stream = muxer.newStream([options])` + +Initiate a new stream with the remote. Returns a [duplex stream](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#duplex-it). + +e.g. -*listener.js* +```js +// Create a new stream on the muxed connection +const stream = muxer.newStream() -``` -$ node listener.js -[listener] listening on 9999 -[listener] Got connection! -[listener] Got stream! -[listener] Received: -hey, how is it going. I am the dialer +// Use this new stream like any other duplex stream: +pipe([1, 2, 3], stream, consume) ``` -*dialer.js* +In addition to `sink` and `source` properties, this stream also has the following API, that will **normally _not_ be used by stream consumers**. -``` -$ node dialer.js -[dialer] opening stream -[dialer] opened stream -``` +#### `stream.close()` -## Install +Closes the stream for **reading**. If iterating over the source of this stream in a `for await of` loop, it will return (exit the loop) after any buffered data has been consumed. -```sh -> npm install libp2p-mplex -``` +This function is called automatically by the muxer when it receives a `CLOSE` message from the remote. -## API +The source will return normally, the sink will continue to consume. -```js -const mplex = require('libp2p-mplex') -``` +#### `stream.abort([err])` + +Closes the stream for **reading** _and_ **writing**. This should be called when a _local error_ has occurred. + +Note, if called without an error any buffered data in the source can still be consumed and the stream will end normally. + +This will cause a `RESET` message to be sent to the remote, _unless_ the sink has already ended. + +The sink will return and the source will throw if an error is passed or return normally if not. + +#### `stream.reset()` + +Closes the stream _immediately_ for **reading** _and_ **writing**. This should be called when a _remote error_ has occurred. + +This function is called automatically by the muxer when it receives a `RESET` message from the remote. + +The sink will return and the source will throw. + +## Contribute + +The libp2p implementation in JavaScript is a work in progress. As such, there are a few things you can do right now to help out: + + - Go through the modules and **check out existing issues**. This is especially useful for modules in active development. Some knowledge of IPFS/libp2p may be required, as well as the infrastructure behind it - for instance, you may need to read up on p2p and more complex operations like muxing to be able to help technically. + - **Perform code reviews**. More eyes will help a) speed the project along b) ensure quality and c) reduce possible future bugs. + - **Add tests**. There can never be enough tests. + +## License + +[MIT](LICENSE) © Protocol Labs diff --git a/examples/dialer.js b/examples/dialer.js index 96ef141..aec6d88 100644 --- a/examples/dialer.js +++ b/examples/dialer.js @@ -2,21 +2,42 @@ 'use strict' const tcp = require('net') -const pull = require('pull-stream') -const toPull = require('stream-to-pull-stream') -const multiplex = require('../src') +const pipe = require('it-pipe') +const AbortController = require('abort-controller') +const { toIterable } = require('./util') +const Mplex = require('../src') -const socket = tcp.connect(9999) +const socket = toIterable(tcp.connect(9999)) +console.log('[dialer] socket stream opened') -const muxer = multiplex.dialer(toPull(socket)) +const controller = new AbortController() -console.log('[dialer] opening stream') -const stream = muxer.newStream((err) => { - console.log('[dialer] opened stream') - if (err) throw err -}) +const muxer = new Mplex({ signal: controller.signal }) -pull( - pull.values(['hey, how is it going. I am the dialer']), - stream -) +const pipeMuxerToSocket = async () => { + await pipe(muxer, socket, muxer) + console.log('[dialer] socket stream closed') +} + +const sendAndReceive = async () => { + const muxedStream = muxer.newStream() + console.log('[dialer] muxed stream opened') + + await pipe( + ['hey, how is it going. I am the dialer'], + muxedStream, + async source => { + for await (const chunk of source) { + console.log('[dialer] received:') + console.log(chunk.toString()) + } + } + ) + console.log('[dialer] muxed stream closed') + + // Close the socket stream after 1s + setTimeout(() => controller.abort(), 1000) +} + +pipeMuxerToSocket() +sendAndReceive() diff --git a/examples/listener.js b/examples/listener.js index e57671e..37f167f 100644 --- a/examples/listener.js +++ b/examples/listener.js @@ -2,27 +2,34 @@ 'use strict' const tcp = require('net') -const pull = require('pull-stream') -const toPull = require('stream-to-pull-stream') -const multiplex = require('../src') +const pipe = require('it-pipe') +const { toIterable } = require('./util') +const Mplex = require('../src') -const listener = tcp.createServer((socket) => { +const listener = tcp.createServer(async socket => { console.log('[listener] Got connection!') - const muxer = multiplex.listener(toPull(socket)) - - muxer.on('stream', (stream) => { - console.log('[listener] Got stream!') - pull( - stream, - pull.drain((data) => { - console.log('[listener] Received:') - console.log(data.toString()) - }) - ) + const muxer = new Mplex({ + async onStream (stream) { + console.log('[listener] muxed stream opened') + await pipe( + stream, + source => (async function * () { + for await (const chunk of source) { + console.log('[listener] received:') + console.log(chunk.toString()) + yield 'thanks for the message, I am the listener' + } + })(), + stream + ) + console.log('[listener] muxed stream closed') + } }) -}) -listener.listen(9999, () => { - console.log('[listener] listening on 9999') + socket = toIterable(socket) + await pipe(socket, muxer, socket) + console.log('[listener] socket stream closed') }) + +listener.listen(9999, () => console.log('[listener] listening on 9999')) diff --git a/examples/util.js b/examples/util.js new file mode 100644 index 0000000..ad9e35d --- /dev/null +++ b/examples/util.js @@ -0,0 +1,17 @@ +// Simple convertion of Node.js duplex to iterable duplex (no backpressure) +exports.toIterable = socket => { + return { + sink: async source => { + try { + for await (const chunk of source) { + socket.write(chunk) + } + } catch (err) { + // If not an abort then destroy the socket with an error + return socket.destroy(err.code === 'ABORT_ERR' ? null : err) + } + socket.end() + }, + source: socket + } +} diff --git a/package.json b/package.json index 4d535ff..6636bc4 100644 --- a/package.json +++ b/package.json @@ -14,6 +14,7 @@ "test": "aegir test -t node -t browser", "test:node": "aegir test -t node", "test:browser": "aegir test -t browser", + "coverage": "nyc --reporter=text --reporter=lcov npm run test:node", "release": "aegir release -t node -t browser", "release-minor": "aegir release --type minor -t node -t browser", "release-major": "aegir release --type major -t node -t browser" @@ -23,6 +24,13 @@ "url": "git+https://github.com/libp2p/js-libp2p-mplex.git" }, "keywords": [ + "multiplex", + "mplex", + "stream", + "muxer", + "connection", + "duplex", + "libp2p", "IPFS" ], "license": "MIT", @@ -31,29 +39,25 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-mplex#readme", "devDependencies": { - "aegir": "^18.2.1", + "aegir": "^20.0.0", "chai": "^4.2.0", - "chai-checkmark": "^1.0.1", "dirty-chai": "^2.0.1", - "interface-stream-muxer": "~0.6.0", - "libp2p-tcp": "~0.13.0", - "libp2p-websockets": "~0.12.2", - "multiaddr": "^6.0.6", - "pull-pair": "^1.1.0", - "through2": "^2.0.3" + "interface-stream-muxer": "github:libp2p/interface-stream-muxer#refactor/async-iterators", + "p-defer": "^3.0.0", + "random-bytes": "^1.0.0", + "random-int": "^2.0.0", + "streaming-iterables": "^4.1.0" }, "dependencies": { - "async": "^2.6.2", - "chunky": "0.0.0", - "concat-stream": "^1.6.2", + "abort-controller": "^3.0.0", + "abortable-iterator": "^2.0.0", + "async-iterator-to-pull-stream": "^1.3.0", + "bl": "^3.0.0", "debug": "^4.1.1", "interface-connection": "~0.3.3", - "pull-catch": "^1.0.1", + "it-pipe": "^1.0.1", + "it-pushable": "^1.2.1", "pull-stream": "^3.6.9", - "pull-stream-to-stream": "^1.3.4", - "pump": "^3.0.0", - "readable-stream": "^3.1.1", - "stream-to-pull-stream": "^1.7.3", "varint": "^5.0.0" }, "contributors": [ diff --git a/src/adapter.js b/src/adapter.js new file mode 100644 index 0000000..4ba90cf --- /dev/null +++ b/src/adapter.js @@ -0,0 +1,64 @@ +'use strict' + +const Mplex = require('.') +const AbortController = require('abort-controller') +const pull = require('pull-stream/pull') +const toPull = require('async-iterator-to-pull-stream') +const { Connection } = require('interface-connection') +const EE = require('events') +const noop = () => {} + +function create (conn, isListener) { + const toConn = stream => { + const { source } = stream + stream.source = (async function * () { + for await (const chunk of source) { + yield chunk.slice() // convert bl to Buffer + } + })() + Object.assign(stream.source, { push: source.push, end: source.end }) + return new Connection(toPull.duplex(stream), conn) + } + + const abortController = new AbortController() + const adapterMuxer = Object.assign(new EE(), { + newStream (cb) { + cb = cb || noop + const stream = muxer.newStream() + const conn = toConn(stream) + setTimeout(() => cb(null, conn)) + return conn + }, + end (err, cb) { + if (typeof err === 'function') { + cb = err + err = null + } + cb = cb || noop + abortController.abort() + cb() + } + }) + + const muxer = new Mplex({ + signal: abortController.signal, + onStream: stream => adapterMuxer.emit('stream', toConn(stream)) + }) + + pull( + conn, + toPull.duplex(muxer), + read => (end, cb) => { + if (end) adapterMuxer.emit('close', end) + read(end, cb) + }, + conn + ) + + return adapterMuxer +} + +module.exports = create +module.exports.multicodec = Mplex.multicodec +module.exports.dialer = conn => create(conn, false) +module.exports.listener = conn => create(conn, true) diff --git a/src/codec.js b/src/codec.js deleted file mode 100644 index bed17c4..0000000 --- a/src/codec.js +++ /dev/null @@ -1,3 +0,0 @@ -'use strict' - -module.exports = '/mplex/6.7.0' diff --git a/src/coder/decode.js b/src/coder/decode.js new file mode 100644 index 0000000..b69a1d4 --- /dev/null +++ b/src/coder/decode.js @@ -0,0 +1,69 @@ +'use strict' + +const varint = require('varint') +const BufferList = require('bl') + +module.exports = source => (async function * decode () { + const decoder = new Decoder() + for await (const chunk of source) { + const msgs = decoder.write(chunk) + for (let i = 0; i < msgs.length; i++) yield msgs[i] + } +})() + +class Decoder { + constructor () { + this._buffer = new BufferList() + // optimisation to allow varint to take a bl (well a proxy to) + this._bufferProxy = new Proxy({}, { + get: (_, prop) => prop[0] === 'l' ? this._buffer[prop] : this._buffer.get(parseInt(prop)) + }) + this._headerInfo = null + } + + write (chunk) { + if (!chunk || !chunk.length) return [] + + this._buffer.append(chunk) + + if (!this._headerInfo) { + try { + this._headerInfo = this._decodeHeader(this._bufferProxy) + } catch (err) { + return [] // not enough data yet...probably + } + + // remove the header from the buffer + this._buffer = this._buffer.shallowSlice(this._headerInfo.offset) + } + + const { id, type, length } = this._headerInfo + + if (this._buffer.length < length) return [] // not got enough data yet + + if (this._buffer.length === length) { + const msg = { id, type, data: this._buffer } + + this._headerInfo = null + this._buffer = new BufferList() + + return [msg] + } + + const msg = { id, type, data: this._buffer.shallowSlice(0, length) } + const rest = this._buffer.shallowSlice(length) + + this._headerInfo = null + this._buffer = new BufferList() + + return [msg, ...this.write(rest)] + } + + _decodeHeader (data) { + const h = varint.decode(data) + let offset = varint.decode.bytes + const length = varint.decode(data, offset) + offset += varint.decode.bytes + return { id: h >> 3, type: h & 7, offset, length } + } +} diff --git a/src/coder/encode.js b/src/coder/encode.js new file mode 100644 index 0000000..4d6328d --- /dev/null +++ b/src/coder/encode.js @@ -0,0 +1,51 @@ +'use strict' + +const varint = require('varint') + +const POOL_SIZE = 10 * 1024 +const empty = Buffer.alloc(0) + +class Encoder { + constructor () { + this._pool = Buffer.allocUnsafe(POOL_SIZE) + this._poolOffset = 0 + } + + write (msg) { + const pool = this._pool + let offset = this._poolOffset + + varint.encode(msg.id << 3 | msg.type, pool, offset) + offset += varint.encode.bytes + varint.encode(msg.data ? msg.data.length : 0, pool, offset) + offset += varint.encode.bytes + + const header = pool.slice(this._poolOffset, offset) + + if (POOL_SIZE - offset < 100) { + this._pool = Buffer.allocUnsafe(POOL_SIZE) + this._poolOffset = 0 + } else { + this._poolOffset = offset + } + + if (!msg.data) return [header, empty] + + if (msg.data.shallowSlice) { + msg.data = msg.data.slice() // If BufferList, convert to buffer + } + + return [header, msg.data] + } +} + +const encoder = new Encoder() + +module.exports = source => (async function * encode () { + for await (const msg of source) { + const chunks = encoder.write(msg) + for (let i = 0; i < chunks.length; i++) { + yield chunks[i] + } + } +})() diff --git a/src/coder/index.js b/src/coder/index.js new file mode 100644 index 0000000..aa271e4 --- /dev/null +++ b/src/coder/index.js @@ -0,0 +1,4 @@ +'use strict' + +exports.encode = require('./encode') +exports.decode = require('./decode') diff --git a/src/index.js b/src/index.js index 006eeaf..3cfe70b 100644 --- a/src/index.js +++ b/src/index.js @@ -1,29 +1,3 @@ 'use strict' -const toStream = require('pull-stream-to-stream') -const MplexCore = require('./internals') -const MULTIPLEX_CODEC = require('./codec') -const Muxer = require('./muxer') - -const pump = require('pump') - -function create (rawConn, isListener) { - const stream = toStream(rawConn) - - // Cleanup and destroy the connection when it ends as the converted stream - // doesn't emit 'close' but .destroy will trigger a 'close' event. - stream.on('end', () => stream.destroy()) - - const mpx = new MplexCore({ - halfOpen: true, - initiator: !isListener - }) - pump(stream, mpx, stream) - - return new Muxer(rawConn, mpx) -} - -exports = module.exports = create -exports.multicodec = MULTIPLEX_CODEC -exports.dialer = (conn) => create(conn, false) -exports.listener = (conn) => create(conn, true) +module.exports = require('./mplex') diff --git a/src/message-types.js b/src/message-types.js new file mode 100644 index 0000000..2c2e996 --- /dev/null +++ b/src/message-types.js @@ -0,0 +1,33 @@ +'use strict' + +const MessageTypes = Object.freeze({ + NEW_STREAM: 0, + MESSAGE_RECEIVER: 1, + MESSAGE_INITIATOR: 2, + CLOSE_RECEIVER: 3, + CLOSE_INITIATOR: 4, + RESET_RECEIVER: 5, + RESET_INITIATOR: 6 +}) + +exports.MessageTypes = MessageTypes + +exports.InitiatorMessageTypes = Object.freeze({ + NEW_STREAM: MessageTypes.NEW_STREAM, + MESSAGE: MessageTypes.MESSAGE_INITIATOR, + CLOSE: MessageTypes.CLOSE_INITIATOR, + RESET: MessageTypes.RESET_INITIATOR +}) + +exports.ReceiverMessageTypes = Object.freeze({ + MESSAGE: MessageTypes.MESSAGE_RECEIVER, + CLOSE: MessageTypes.CLOSE_RECEIVER, + RESET: MessageTypes.RESET_RECEIVER +}) + +exports.MessageTypeNames = Object.freeze( + Object.entries(MessageTypes).reduce((obj, e) => { + obj[e[1]] = e[0] + return obj + }, {}) +) diff --git a/src/mplex.js b/src/mplex.js index db92d3f..ddba084 100644 --- a/src/mplex.js +++ b/src/mplex.js @@ -1,477 +1,149 @@ 'use strict' -/* @flow */ -const stream = require('readable-stream') -const varint = require('varint') -const debug = require('debug') - -const Channel = require('./channel') -/* :: import type {ChannelOpts} from './channel' */ - -const SIGNAL_FLUSH = Buffer.from([0]) - -const empty = Buffer.alloc(0) -let pool = Buffer.alloc(10 * 1024) -let used = 0 - -/* :: -type MultiplexOpts = { - binaryName?: bool, - limit?: number, - initiator?: bool -} - -type ChannelCallback = (Channel) => void -*/ - -class Multiplex extends stream.Duplex { - constructor (opts/* :: ?: MultiplexOpts | ChannelCallback */, onchannel /* :: ?: ChannelCallback */) { - super() - if (typeof opts === 'function') { - onchannel = opts - opts = {} - } - - if (!opts) { - opts = {} - } - - if (onchannel) { - this.on('stream', onchannel) - } - - this.destroyed = false - this.limit = opts.limit || 0 - if (opts.initiator == null) { - opts.initiator = true - } - - this.initiator = opts.initiator - - this._corked = 0 - this._options = opts - this._binaryName = Boolean(opts.binaryName) - this._local = [] - this._remote = [] - this._list = this._local - this._receiving = null - this._chunked = false - this._state = 0 - this._type = 0 - this._channel = 0 - this._missing = 0 - this._message = null - - this.log = debug('mplex:main:' + Math.floor(Math.random() * 100000)) - this.log('construction') - - let bufSize = 100 - if (this.limit) { - bufSize = varint.encodingLength(this.limit) - } - this._buf = Buffer.alloc(bufSize) - this._ptr = 0 - this._awaitChannelDrains = 0 - this._onwritedrain = null - this._ondrain = [] - this._finished = false - - this.once('finish', this._clear) - - // setup id handling - this._nextId = this.initiator ? 0 : 1 - } - - // Generate the next stream id - _nextStreamId ()/* : number */ { - let id = this._nextId - this._nextId += 2 - return id - } - - createStream (name/* : Buffer | string */, opts/* : ChannelOpts */)/* : Channel */ { - if (this.destroyed) { - throw new Error('Multiplexer is destroyed') - } - const id = this._nextStreamId() - let channelName = this._name(name || id.toString()) - const options = Object.assign(this._options, opts) - this.log('createStream: %s', id, channelName.toString(), options) - - const channel = new Channel(channelName, this, options) - return this._addChannel(channel, id, this._local) - } - - receiveStream (name/* : Buffer | string */, opts/* : ChannelOpts */)/* : Channel */ { - if (this.destroyed) { - throw new Error('Multiplexer is destroyed') - } - - if (name === undefined || name === null) { - throw new Error('Name is needed when receiving a stream') - } - - const channelName = this._name(name) - this.log('receiveStream: ' + channelName.toString()) - const channel = new Channel( - channelName, - this, - Object.assign(this._options, opts) - ) - - if (!this._receiving) { - this._receiving = {} - } - - if (this._receiving[channel.name]) { - throw new Error('You are already receiving this stream') - } - - this._receiving[channel.name] = channel - - return channel - } - - _name (name/* : Buffer | string */)/* : Buffer | string */ { - if (!this._binaryName) { - return name.toString() - } - return Buffer.isBuffer(name) ? name : Buffer.from(name) - } - - _send (header/* : number */, data /* :: ?: Buffer */)/* : bool */ { - const len = data ? data.length : 0 - const oldUsed = used - - this.log('_send', header, len) - - varint.encode(header, pool, used) - used += varint.encode.bytes - varint.encode(len, pool, used) - used += varint.encode.bytes - - let buf = pool.slice(oldUsed, used) - - if (pool.length - used < 100) { - pool = Buffer.alloc(10 * 1024) - used = 0 - } - - if (data) { - buf = Buffer.concat([ - buf, - data - ]) - } - - // Push and return the results - return this.push(buf) - } - - _addChannel (channel/* : Channel */, id/* : number */, list/* : Array */)/* : Channel */ { - this.log('_addChannel', id) - list[id] = channel - channel.on('finalize', () => { - this.log('_remove channel', id) - list[id] = null - }) - channel.open(id, list === this._local) - - return channel - } - - _writeVarint (data/* : Buffer */, offset/* : number */)/* : number */ { - for (offset; offset < data.length; offset++) { - if (this._ptr === this._buf.length) { - return this._lengthError(data) - } - - this._buf[this._ptr++] = data[offset] - - if (!(data[offset] & 0x80)) { - if (this._state === 0) { - const header = varint.decode(this._buf) - this._type = header & 7 - this._channel = header >> 3 - this._list = this._type & 1 ? this._local : this._remote - const chunked = this._list.length > this._channel && - this._list[this._channel] && - this._list[this._channel].chunked - - this._chunked = Boolean(this._type === 1 || this._type === 2) && chunked - } else { - this._missing = varint.decode(this._buf) - - if (this.limit && this._missing > this.limit) { - return this._lengthError(data) - } - } - - this._state++ - this._ptr = 0 - return offset + 1 +const pipe = require('it-pipe') +const pushable = require('it-pushable') +const log = require('debug')('mplex:mplex') +const abortable = require('abortable-iterator') +const Coder = require('./coder') +const restrictSize = require('./restrict-size') +const { MessageTypes, MessageTypeNames } = require('./message-types') +const createStream = require('./stream') + +class Mplex { + constructor (options) { + options = options || {} + options = typeof options === 'function' ? { onStream: options } : options + + this._streamId = 0 + this._streams = { initiators: new Map(), receivers: new Map() } + this._options = options + + this.sink = this._createSink() + this.source = this._createSource() + this.onStream = options.onStream + } + + // Initiate a new stream with the given name + newStream (name) { + const id = this._streamId++ + name = name == null ? id.toString() : String(name) + log('new initiator stream %s %s', id, name) + const send = msg => { + if (log.enabled) { + log('initiator stream send', { id: msg.id, type: MessageTypeNames[msg.type], data: msg.data }) } + return this.source.push(msg) } - - return data.length - } - - _lengthError (data/* : Buffer */)/* : number */ { - this.destroy(new Error('Incoming message is too big')) - return data.length - } - - _writeMessage (data/* : Buffer */, offset/* : number */)/* : number */ { - const free = data.length - offset - const missing = this._missing - - if (!this._message) { - if (missing <= free) { // fast track - no copy - this._missing = 0 - this._push(data.slice(offset, offset + missing)) - return offset + missing - } - if (this._chunked) { - this._missing -= free - this._push(data.slice(offset, data.length)) - return data.length - } - this._message = Buffer.alloc(missing) + const onEnd = () => { + log('initiator stream %s ended', id) + this._streams.initiators.delete(id) } - - data.copy(this._message, this._ptr, offset, offset + missing) - - if (missing <= free) { - this._missing = 0 - this._push(this._message) - return offset + missing - } - - this._missing -= free - this._ptr += free - - return data.length + const stream = createStream({ id, name, send, onEnd }) + this._streams.initiators.set(id, stream) + return stream } - _push (data/* : Buffer */) { - this.log('_push', data.length) - if (!this._missing) { - this._ptr = 0 - this._state = 0 - this._message = null - } - - if (this._type === 0) { // open - this.log('open', this._channel) - if (this.destroyed || this._finished) { - return - } - - let name - if (this._binaryName) { - name = data - } else { - name = data.toString() || this._channel.toString() - } - this.log('open name', name) - let channel - if (this._receiving && this._receiving[name]) { - channel = this._receiving[name] - delete this._receiving[name] - this._addChannel(channel, this._channel, this._list) - } else { - channel = new Channel(name, this, this._options) - this.emit('stream', this._addChannel( - channel, - this._channel, - this._list), channel.name) - } - return - } - - const stream = this._list[this._channel] - if (!stream) { - return + _newReceiverStream ({ id, name }) { + if (this._streams.receivers.has(id)) { + throw new Error(`stream ${id} already exists!`) } - - switch (this._type) { - case 5: // local error - case 6: { // remote error - const error = new Error(data.toString() || 'Channel destroyed') - stream.local = false - stream.destroy(error) - return - } - - case 3: // local end - case 4: { // remote end - stream.push(null) - return + log('new receiver stream %s %s', id, name) + const send = msg => { + if (log.enabled) { + log('receiver stream send', { id: msg.id, type: MessageTypeNames[msg.type], data: msg.data }) } - - case 1: // local packet - case 2: // remote packet - if (!stream.push(data)) { - this._awaitChannelDrains++ - stream._awaitDrain++ - } - break - default: // no action + return this.source.push(msg) } - } - - _onchanneldrain (drained/* : number */) { - this._awaitChannelDrains -= drained - - if (this._awaitChannelDrains) { - return - } - - const ondrain = this._onwritedrain - this._onwritedrain = null - - if (ondrain) { - ondrain() + const onEnd = () => { + log('receiver stream %s ended', id) + this._streams.receivers.delete(id) } + const stream = createStream({ id, name, send, type: 'receiver', onEnd }) + this._streams.receivers.set(id, stream) + return stream } - _write (data/* : Buffer */, enc/* : string */, cb/* : () => void */) { - this.log('_write', data.length) - if (this._finished) { - cb() - return - } - - if (this._corked) { - this._onuncork(this._write.bind(this, data, enc, cb)) - return - } - - if (data === SIGNAL_FLUSH) { - this._finish(cb) - return - } - - let offset = 0 - while (offset < data.length) { - if (this._state === 2) { - offset = this._writeMessage(data, offset) - } else { - offset = this._writeVarint(data, offset) + _createSink () { + return async source => { + if (this._options.signal) { + source = abortable(source, this._options.signal) } - } - - if (this._state === 2 && !this._missing) { - this._push(empty) - } - - if (this._awaitChannelDrains) { - this._onwritedrain = cb - } else { - cb() - } - } - _finish (cb/* : () => void */) { - this._onuncork(() => { - if (this._writableState.prefinished === false) { - this._writableState.prefinished = true + try { + await pipe( + source, + Coder.decode, + restrictSize(this._options.maxMsgSize), + async source => { + for await (const msg of source) { + this._handleIncoming(msg) + } + } + ) + } catch (err) { + log('error in sink', err) + return this.source.end(err) // End the source with an error } - this.emit('prefinish') - this._onuncork(cb) - }) - } - cork () { - if (++this._corked === 1) { - this.emit('cork') + this.source.end() } } - uncork () { - if (this._corked && --this._corked === 0) { - this.emit('uncork') + _createSource () { + const onEnd = err => { + const { initiators, receivers } = this._streams + // Abort all the things! + for (const s of initiators.values()) s.abort(err) + for (const s of receivers.values()) s.abort(err) } + const source = pushable(onEnd) + const encodedSource = pipe( + source, + restrictSize(this._options.maxMsgSize), + Coder.encode + ) + return Object.assign(encodedSource, { + push: source.push, + end: source.end, + return: source.return + }) } - end (data/* :: ?: Buffer | () => void */, enc/* :: ?: string | () => void */, cb/* :: ?: () => void */) { - this.log('end') - if (typeof data === 'function') { - cb = data - data = undefined - } - if (typeof enc === 'function') { - cb = enc - enc = undefined - } - - if (data) { - this.write(data) + _handleIncoming ({ id, type, data }) { + if (log.enabled) { + log('incoming message', { id, type: MessageTypeNames[type], data: data.slice() }) } - if (!this._writableState.ending) { - this.write(SIGNAL_FLUSH) + // Create a new stream? + if (type === MessageTypes.NEW_STREAM && this.onStream) { + const stream = this._newReceiverStream({ id, name: data.toString() }) + return this.onStream(stream) } - return stream.Writable.prototype.end.call(this, cb) - } + const list = type & 1 ? this._streams.initiators : this._streams.receivers + const stream = list.get(id) - _onuncork (fn/* : () => void */) { - if (this._corked) { - this.once('uncork', fn) - return - } + if (!stream) return log('missing stream %s', id) - fn() - } - - _read () { - while (this._ondrain.length) { - this._ondrain.shift()() - } - } - - _clear () { - this.log('_clear') - if (this._finished) { - return + switch (type) { + case MessageTypes.MESSAGE_INITIATOR: + case MessageTypes.MESSAGE_RECEIVER: + stream.source.push(data) + break + case MessageTypes.CLOSE_INITIATOR: + case MessageTypes.CLOSE_RECEIVER: + stream.close() + break + case MessageTypes.RESET_INITIATOR: + case MessageTypes.RESET_RECEIVER: + stream.reset() + break + default: + log('unknown message type %s', type) } - - this._finished = true - - const list = this._local.concat(this._remote) - - this._local = [] - this._remote = [] - - list.forEach(function (stream) { - if (stream) { - stream.local = false - stream.destroy(null) - } - }) - - this.push(null) - } - - finalize () { - this._clear() - } - - _destroy (err/* :: ?: Error */, callback) { - this.log('destroy') - - const list = this._local.concat(this._remote) - - list.forEach(function (stream) { - if (stream) { - stream.destroy(err || new Error('Channel destroyed')) - } - }) - - this._clear() - callback(err) } } -module.exports = Multiplex +Mplex.multicodec = '/mplex/6.7.0' + +module.exports = Mplex diff --git a/src/muxer.js b/src/muxer.js deleted file mode 100644 index 2f0ee64..0000000 --- a/src/muxer.js +++ /dev/null @@ -1,108 +0,0 @@ -'use strict' - -const EventEmitter = require('events').EventEmitter -const Connection = require('interface-connection').Connection -const toPull = require('stream-to-pull-stream') -const pull = require('pull-stream') -const pullCatch = require('pull-catch') -const setImmediate = require('async/setImmediate') -const debug = require('debug') -const log = debug('mplex') -log.error = debug('mplex:error') - -const MULTIPLEX_CODEC = require('./codec') - -function noop () {} - -// Catch error makes sure that even though we get the "Channel destroyed" error -// from when closing streams, that it's not leaking through since it's not -// really an error for us, channels shoul close cleanly. -function catchError (stream) { - return { - source: pull( - stream.source, - pullCatch((err) => { - if (err.message === 'Channel destroyed') { - return - } - return false - }) - ), - sink: stream.sink - } -} - -class MultiplexMuxer extends EventEmitter { - constructor (conn, multiplex) { - super() - this.multiplex = multiplex - this.conn = conn - this.multicodec = MULTIPLEX_CODEC - - multiplex.on('close', () => this.emit('close')) - multiplex.on('error', (err) => this.emit('error', err)) - - multiplex.on('stream', (stream, id) => { - const muxedConn = new Connection( - catchError(toPull.duplex(stream)), - this.conn - ) - this.emit('stream', muxedConn) - }) - } - - /** - * Conditionally emit errors if we have listeners. All other - * events are sent to EventEmitter.emit - * - * @param {string} eventName - * @param {...any} args - * @returns {void} - */ - emit (eventName, ...args) { - if (eventName === 'error' && !this._events.error) { - log.error('error', ...args) - } else { - super.emit(eventName, ...args) - } - } - - // method added to enable pure stream muxer feeling - newStream (callback) { - callback = callback || noop - let stream - try { - stream = this.multiplex.createStream() - } catch (err) { - return setImmediate(() => callback(err)) - } - - const conn = new Connection( - catchError(toPull.duplex(stream)), - this.conn - ) - - setImmediate(() => callback(null, conn)) - - return conn - } - - /** - * Destroys multiplex and ends all internal streams - * - * @param {Error} err Optional error to pass to end the muxer with - * @param {function()} callback Optional - * @returns {void} - */ - end (err, callback) { - if (typeof err === 'function') { - callback = err - err = null - } - callback = callback || noop - this.multiplex.destroy(err) - callback() - } -} - -module.exports = MultiplexMuxer diff --git a/src/restrict-size.js b/src/restrict-size.js new file mode 100644 index 0000000..fa90763 --- /dev/null +++ b/src/restrict-size.js @@ -0,0 +1,18 @@ +'use strict' + +const MAX_MSG_SIZE = 1 << 20 // 1MB + +module.exports = max => { + max = max || MAX_MSG_SIZE + + return source => { + return (async function * restrictSize () { + for await (const msg of source) { + if (msg.data && msg.data.length >= max) { + throw Object.assign(new Error('message size too large!'), { code: 'ERR_MSG_TOO_BIG' }) + } + yield msg + } + })() + } +} diff --git a/src/stream.js b/src/stream.js index 56e7f5f..11c1b6f 100644 --- a/src/stream.js +++ b/src/stream.js @@ -1,192 +1,77 @@ 'use strict' -/* @flow */ -const stream = require('readable-stream') -const debug = require('debug') +const abortable = require('abortable-iterator') +const AbortController = require('abort-controller') +const log = require('debug')('mplex:stream') +const pushable = require('it-pushable') +const { InitiatorMessageTypes, ReceiverMessageTypes } = require('./message-types') -/* :: import type Multiplex from './index' +module.exports = ({ id, name, send, onEnd = (() => {}), type = 'initiator' }) => { + const abortController = new AbortController() + const resetController = new AbortController() + const Types = type === 'initiator' ? InitiatorMessageTypes : ReceiverMessageTypes -export type ChannelOpts = { - chunked?: bool, - halfOpen?: bool, - lazy?: bool -} -*/ - -class Channel extends stream.Duplex { - constructor (name/* : Buffer | string */, plex/* : Multiplex */, opts/* : ChannelOpts = {} */) { - const halfOpen = Boolean(opts.halfOpen) - super({ - allowHalfOpen: halfOpen - }) - - this.name = name - this.log = debug('mplex:channel:' + this.name.toString()) - this.channel = 0 - this.initiator = false - this.chunked = Boolean(opts.chunked) - this.halfOpen = halfOpen - this.destroyed = false - this.finalized = false - this.local = true - - this._multiplex = plex - this._dataHeader = 0 - this._opened = false - this._awaitDrain = 0 - this._lazy = Boolean(opts.lazy) - - let finished = false - let ended = false - this.log('open, halfOpen: ' + this.halfOpen) + name = String(name == null ? id : name) - this.once('end', () => { - this.log('end') - this._read() // trigger drain + let sourceEnded = false + let sinkEnded = false + let endErr - if (this.destroyed) { - return - } - - ended = true - if (finished) { - this._finalize() - } else if (!this.halfOpen) { - this.end() - } - }) - - this.once('finish', function onfinish () { - if (this.destroyed) { - return - } - - if (!this._opened) { - return this.once('open', onfinish) - } - - if (this._lazy && this.initiator) { - this._open() - } - - this._multiplex._send( - this.channel << 3 | (this.initiator ? 4 : 3), - null - ) - - finished = true - - if (ended) { - this._finalize() - } - }) + const onSourceEnd = err => { + sourceEnded = true + log('%s stream %s source end', type, name, err) + if (err && !endErr) endErr = err + if (sinkEnded) onEnd(endErr) } - /** - * Conditionally emit errors if we have listeners. All other - * events are sent to EventEmitter.emit - * @param {string} eventName - * @param {...any} args - * @returns {void} - */ - emit (eventName, ...args) { - if (eventName === 'error' && !this._events.error) { - this.log('error', ...args) - } else { - super.emit(eventName, ...args) - } + const onSinkEnd = err => { + sinkEnded = true + log('%s stream %s sink end', type, name, err) + if (err && !endErr) endErr = err + if (sourceEnded) onEnd(endErr) } - _destroy (err/* : Error */, callback) { - this.log('_destroy:' + (this.local ? 'local' : 'remote')) - - if (this.local && this._opened) { - if (this._lazy && this.initiator) { - this._open() + const stream = { + // Close for reading + close: () => stream.source.end(), + // Close for reading and writing (local error) + abort: err => { + // End the source with the passed error + stream.source.end(err) + abortController.abort() + }, + // Close immediately for reading and writing (remote error) + reset: () => resetController.abort(), + sink: async source => { + source = abortable(source, abortController.signal, { abortMessage: 'stream aborted', abortCode: 'ERR_MPLEX_STREAM_ABORT' }) + source = abortable(source, resetController.signal, { abortMessage: 'stream reset', abortCode: 'ERR_MPLEX_STREAM_RESET' }) + + if (type === 'initiator') { // If initiator, open a new stream + send({ id, type: Types.NEW_STREAM, data: name }) } - const msg = err ? Buffer.from(err.message) : null try { - this._multiplex._send( - this.channel << 3 | (this.initiator ? 6 : 5), - msg - ) - } catch (e) { /* do nothing */ } - } - - this._finalize() - callback(err) - } - - _finalize () { - if (this.finalized) { - return - } - - this.finalized = true - this.emit('finalize') - } - - _write (data/* : Buffer */, enc/* : string */, cb/* : () => void */) { - this.log('write: ', data.length) - if (!this._opened) { - this.once('open', () => { - this._write(data, enc, cb) - }) - return - } - - if (this.destroyed) { - cb() - return - } - - if (this._lazy && this.initiator) { - this._open() - } - - const drained = this._multiplex._send( - this._dataHeader, - data - ) - - if (drained) { - cb() - return - } - - this._multiplex._ondrain.push(cb) - } - - _read () { - if (this._awaitDrain) { - const drained = this._awaitDrain - this._awaitDrain = 0 - this._multiplex._onchanneldrain(drained) - } - } - - _open () { - let buf = null - if (Buffer.isBuffer(this.name)) { - buf = this.name - } else if (this.name !== this.channel.toString()) { - buf = Buffer.from(this.name) - } + for await (const data of source) { + send({ id, type: Types.MESSAGE, data }) + } + } catch (err) { + // Send no more data if this stream was remotely reset + if (err.code === 'ERR_MPLEX_STREAM_RESET') { + log('%s stream %s reset', type, name) + } else { + log('%s stream %s error', type, name, err) + send({ id, type: Types.RESET }) + } + + stream.source.end(err) + return onSinkEnd(err) + } - this._lazy = false - this._multiplex._send(this.channel << 3 | 0, buf) + send({ id, type: Types.CLOSE }) + onSinkEnd() + }, + source: pushable(onSourceEnd) } - open (channel/* : number */, initiator/* : bool */) { - this.log('open: ' + channel) - this.channel = channel - this.initiator = initiator - this._dataHeader = channel << 3 | (initiator ? 2 : 1) - this._opened = true - if (!this._lazy && this.initiator) this._open() - this.emit('open') - } + return stream } - -module.exports = Channel diff --git a/test/browser.js b/test/browser.js deleted file mode 100644 index ecc0fb5..0000000 --- a/test/browser.js +++ /dev/null @@ -1,43 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) -const WSlibp2p = require('libp2p-websockets') -const multiaddr = require('multiaddr') -const pull = require('pull-stream') - -const multiplex = require('../src') - -describe('browser-server', () => { - let ws - - before(() => { - ws = new WSlibp2p() - }) - - it('ricochet test', (done) => { - const mh = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') - const transportSocket = ws.dial(mh) - const muxedConn = multiplex.dialer(transportSocket) - - muxedConn.on('stream', (conn) => { - pull( - conn, - pull.collect((err, chunks) => { - expect(err).to.not.exist() - expect(chunks).to.be.eql([Buffer.from('hey')]) - pull(pull.empty(), conn) - }) - ) - }) - - pull( - pull.values([Buffer.from('hey')]), - muxedConn.newStream(), - pull.onEnd(done) - ) - }) -}) diff --git a/test/coder.spec.js b/test/coder.spec.js new file mode 100644 index 0000000..8070ab1 --- /dev/null +++ b/test/coder.spec.js @@ -0,0 +1,106 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 5] */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const BufferList = require('bl') +const { expect } = chai +chai.use(dirtyChai) + +const coder = require('../src/coder') + +describe('coder', () => { + it('should encode header', async () => { + const source = [{ id: 17, type: 0, data: Buffer.from('17') }] + + let data = Buffer.alloc(0) + for await (const chunk of coder.encode(source)) { + data = Buffer.concat([data, chunk]) + } + + const expectedHeader = Buffer.from('880102', 'hex') + expect(data.slice(0, expectedHeader.length)).to.be.eql(expectedHeader) + }) + + it('should decode header', async () => { + const source = [Buffer.from('8801023137', 'hex')] + for await (const msg of coder.decode(source)) { + msg.data = msg.data.slice() // convert BufferList to Buffer + expect(msg).to.be.eql({ id: 17, type: 0, data: Buffer.from('17') }) + } + }) + + it('should encode several msgs into buffer', async () => { + const source = [ + { id: 17, type: 0, data: Buffer.from('17') }, + { id: 19, type: 0, data: Buffer.from('19') }, + { id: 21, type: 0, data: Buffer.from('21') } + ] + + let data = Buffer.alloc(0) + for await (const chunk of coder.encode(source)) { + data = Buffer.concat([data, chunk]) + } + + expect(data).to.be.eql(Buffer.from('88010231379801023139a801023231', 'hex')) + }) + + it('should encode from BufferList', async () => { + const source = [{ + id: 17, + type: 0, + data: new BufferList([ + Buffer.from(Math.random().toString()), + Buffer.from(Math.random().toString()) + ]) + }] + + let data = Buffer.alloc(0) + for await (const chunk of coder.encode(source)) { + data = Buffer.concat([data, chunk]) + } + + expect(data).to.be.eql(Buffer.concat([ + Buffer.from('8801', 'hex'), + Buffer.from([source[0].data.length]), + source[0].data.slice() + ])) + }) + + it('should decode msgs from buffer', async () => { + const source = [Buffer.from('88010231379801023139a801023231', 'hex')] + + const res = [] + for await (const msg of coder.decode(source)) { + msg.data = msg.data.slice() // convert BufferList to Buffer + res.push(msg) + } + + expect(res).to.be.deep.eql([ + { id: 17, type: 0, data: Buffer.from('17') }, + { id: 19, type: 0, data: Buffer.from('19') }, + { id: 21, type: 0, data: Buffer.from('21') } + ]) + }) + + it('should encode zero length body msg', async () => { + const source = [{ id: 17, type: 0 }] + + let data = Buffer.alloc(0) + for await (const chunk of coder.encode(source)) { + data = Buffer.concat([data, chunk]) + } + + expect(data).to.be.eql(Buffer.from('880100', 'hex')) + }) + + it('should decode zero length body msg', async () => { + const source = [Buffer.from('880100', 'hex')] + + for await (const msg of coder.decode(source)) { + msg.data = msg.data.slice() // convert BufferList to Buffer + expect(msg).to.be.eql({ id: 17, type: 0, data: Buffer.alloc(0) }) + } + }) +}) diff --git a/test/compliance.spec.js b/test/compliance.spec.js index d74ccd8..b512640 100644 --- a/test/compliance.spec.js +++ b/test/compliance.spec.js @@ -2,15 +2,11 @@ 'use strict' const tests = require('interface-stream-muxer') -const multiplex = require('../src') +const Mplex = require('../src') describe('compliance', () => { tests({ - setup (cb) { - cb(null, multiplex) - }, - teardown (cb) { - cb() - } + setup: () => Mplex, + teardown () {} }) }) diff --git a/test/internals.node.js b/test/internals.node.js deleted file mode 100644 index c05e2ca..0000000 --- a/test/internals.node.js +++ /dev/null @@ -1,460 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -chai.use(require('dirty-chai')) -chai.use(require('chai-checkmark')) -const expect = chai.expect - -const concat = require('concat-stream') -const through = require('through2') -const net = require('net') -const chunky = require('chunky') -const pump = require('pump') - -const MplexCore = require('../src/internals') - -describe('Internals - MplexCore', () => { - it('one way piping work with 2 sub-streams', (done) => { - const plex1 = new MplexCore() - const stream1 = plex1.createStream() - const stream2 = plex1.createStream() - - function onStream (stream, id) { - stream.pipe(collect()) - } - - const plex2 = new MplexCore(onStream) - - plex1.pipe(plex2) - - stream1.write(Buffer.from('hello')) - stream2.write(Buffer.from('world')) - stream1.end() - stream2.end() - - let pending = 2 - const results = [] - - function collect () { - return concat(function (data) { - results.push(data.toString()) - - if (--pending === 0) { - results.sort() - expect(results[0].toString()).to.equal('hello') - expect(results[1].toString()).to.equal('world') - done() - } - }) - } - }) - - it('two way piping works with 2 sub-streams', (done) => { - const plex1 = new MplexCore() - - const plex2 = new MplexCore(function onStream (stream, id) { - const uppercaser = through(function (chunk, e, done) { - this.push(Buffer.from(chunk.toString().toUpperCase())) - this.end() - done() - }) - stream.pipe(uppercaser).pipe(stream) - }) - - plex1.pipe(plex2).pipe(plex1) - - const stream1 = plex1.createStream() - const stream2 = plex1.createStream() - - stream1.pipe(collect()) - stream2.pipe(collect()) - - stream1.write(Buffer.from('hello')) - stream2.write(Buffer.from('world')) - - let pending = 2 - const results = [] - - function collect () { - return concat(function (data) { - results.push(data.toString()) - if (--pending === 0) { - results.sort() - expect(results[0].toString()).to.equal('HELLO') - expect(results[1].toString()).to.equal('WORLD') - done() - } - }) - } - }) - - it('stream id should be exposed as stream.name', (done) => { - const plex1 = new MplexCore() - const stream1 = plex1.createStream('5') - expect(stream1.name).to.equal('5') - - const plex2 = new MplexCore(function onStream (stream, id) { - expect(stream.name).to.equal('5') - expect(id).to.equal('5') - done() - }) - - plex1.pipe(plex2) - - stream1.write(Buffer.from('hello')) - stream1.end() - }) - - it('stream id can be a long string', (done) => { - const plex1 = new MplexCore() - const stream1 = plex1.createStream('hello-yes-this-is-dog') - expect(stream1.name).to.equal('hello-yes-this-is-dog') - - const plex2 = new MplexCore(function onStream (stream, id) { - expect(stream.name).to.equal('hello-yes-this-is-dog') - expect(id).to.equal('hello-yes-this-is-dog') - done() - }) - - plex1.pipe(plex2) - - stream1.write(Buffer.from('hello')) - stream1.end() - }) - - it('destroy', (done) => { - const plex1 = new MplexCore() - const stream1 = plex1.createStream() - - expect(2).check(done) - - const plex2 = new MplexCore(function onStream (stream, id) { - stream.on('error', function (err) { - expect(err.message).to.equal('0 had an error').mark() - }) - }) - - plex1.pipe(plex2) - stream1.on('error', function (err) { - expect(err.message).to.equal('0 had an error').mark() - }) - stream1.write(Buffer.from('hello')) - stream1.destroy(new Error('0 had an error')) - }) - - it('testing invalid data error', (done) => { - const plex = new MplexCore() - - plex.on('error', function (err) { - if (err) { - expect(err.message).to.equal('Incoming message is too big') - done() - } - }) - // a really stupid thing to do - plex.write(Array(50000).join('\xff')) - }) - - it('overflow', (done) => { - let count = 0 - function check () { - if (++count === 2) { - done() - } - } - const plex1 = new MplexCore() - const plex2 = new MplexCore({ limit: 10 }) - - plex2.on('stream', function (stream) { - stream.on('error', function (err) { - expect(err.message).to.equal('Incoming message is too big') - check() - }) - }) - - plex2.on('error', function (err) { - if (err) { - expect(err.message).to.equal('Incoming message is too big') - check() - } - }) - - plex1.pipe(plex2).pipe(plex1) - - const stream = plex1.createStream() - - stream.write(Buffer.alloc(11)) - }) - - it('2 buffers packed into 1 chunk', (done) => { - const plex1 = new MplexCore() - const plex2 = new MplexCore(function (b) { - b.pipe(concat(function (body) { - expect(body.toString('utf8')).to.equal('abc\n123\n') - server.close() - plex1.end() - done() - })) - }) - - const a = plex1.createStream(1337) - a.write('abc\n') - a.write('123\n') - a.end() - - const server = net.createServer(function (stream) { - plex2.pipe(stream).pipe(plex2) - }) - server.listen(0, function () { - const port = server.address().port - plex1.pipe(net.connect(port)).pipe(plex1) - }) - }) - - it('chunks', (done) => { - let times = 100 - ;(function chunk () { - const collect = collector(function () { - if (--times === 0) { - done() - } else { - chunk() - } - }) - - const plex1 = new MplexCore() - const stream1 = plex1.createStream() - const stream2 = plex1.createStream() - - const plex2 = new MplexCore(function onStream (stream, id) { - stream.pipe(collect()) - }) - - plex1.pipe(through(function (buf, enc, next) { - const bufs = chunky(buf) - for (let i = 0; i < bufs.length; i++) this.push(bufs[i]) - next() - })).pipe(plex2) - - stream1.write(Buffer.from('hello')) - stream2.write(Buffer.from('world')) - stream1.end() - stream2.end() - })() - - function collector (cb) { - let pending = 2 - const results = [] - - return function () { - return concat(function (data) { - results.push(data.toString()) - if (--pending === 0) { - results.sort() - expect(results[0].toString()).to.equal('hello') - expect(results[1].toString()).to.equal('world') - cb() - } - }) - } - } - }) - - it('prefinish + corking', (done) => { - const plex = new MplexCore() - let async = false - - plex.on('prefinish', function () { - plex.cork() - process.nextTick(function () { - async = true - plex.uncork() - }) - }) - - plex.on('finish', function () { - expect(async).to.be.ok() - done() - }) - - plex.end() - }) - - it('quick message', (done) => { - const plex2 = new MplexCore() - const plex1 = new MplexCore(function (stream) { - stream.write('hello world') - }) - - plex1.pipe(plex2).pipe(plex1) - - setTimeout(function () { - const stream = plex2.createStream() - stream.on('data', function (data) { - expect(data).to.eql(Buffer.from('hello world')) - done() - }) - }, 100) - }) - - it('if onstream is not passed, stream is emitted', (done) => { - const plex1 = new MplexCore() - const plex2 = new MplexCore() - - plex1.pipe(plex2).pipe(plex1) - - plex2.on('stream', function (stream, id) { - expect(stream).to.exist() - expect(id).to.exist() - stream.write('hello world') - stream.end() - }) - - const stream = plex1.createStream() - stream.on('data', function (data) { - expect(data).to.eql(Buffer.from('hello world')) - stream.end() - setTimeout(() => done(), 1000) - }) - }) - - it('half close a muxed stream', (done) => { - const plex1 = new MplexCore() - const plex2 = new MplexCore() - - plex1.pipe(plex2).pipe(plex1) - - plex2.on('stream', function (stream, id) { - expect(stream).to.exist() - expect(id).to.exist() - - // let it flow - stream.on('data', function () {}) - - stream.on('end', function () { - done() - }) - - stream.on('error', function (err) { - expect(err).to.not.exist() - }) - - stream.write(Buffer.from('hello world')) - - stream.end() - }) - - const stream = plex1.createStream() - - stream.on('data', function (data) { - expect(data).to.eql(Buffer.from('hello world')) - }) - - stream.on('error', function (err) { - expect(err).to.not.exist() - }) - - stream.on('end', function () { - stream.end() - }) - }) - - it('half close a half closed muxed stream', (done) => { - const plex1 = new MplexCore({ halfOpen: true }) - const plex2 = new MplexCore({ halfOpen: true }) - - plex1.nameTag = 'plex1:' - plex2.nameTag = 'plex2:' - - plex1.pipe(plex2).pipe(plex1) - - plex2.on('stream', function (stream, id) { - expect(stream).to.exist() - expect(id).to.exist() - - stream.on('data', function (data) { - expect(data).to.eql(Buffer.from('some data')) - }) - - stream.on('end', function () { - stream.write(Buffer.from('hello world')) - stream.end() - }) - - stream.on('error', function (err) { - expect(err).to.not.exist() - }) - }) - - const stream = plex1.createStream() - - stream.on('data', function (data) { - expect(data).to.eql(Buffer.from('hello world')) - }) - - stream.on('error', function (err) { - expect(err).to.not.exist() - }) - - stream.on('end', function () { - done() - }) - - stream.write(Buffer.from('some data')) - - stream.end() - }) - - it('underlying error is propagated to muxed streams', (done) => { - let count = 0 - function check () { - if (++count === 4) { - done() - } - } - - const plex1 = new MplexCore() - const plex2 = new MplexCore() - - let socket - - plex2.on('stream', function (stream) { - stream.on('error', function (err) { - expect(err).to.exist() - check() - }) - - stream.on('close', function () { - check() - }) - - socket.destroy() - }) - - const stream1to2 = plex1.createStream(1337) - - stream1to2.on('error', function (err) { - expect(err).to.exist() - check() - }) - - stream1to2.on('close', function () { - check() - }) - - const server = net.createServer(function (stream) { - pump(plex2, stream) - pump(stream, plex2) - server.close() - }) - - server.listen(0, function () { - const port = server.address().port - socket = net.connect(port) - - pump(plex1, socket) - pump(socket, plex1) - }) - }) -}) diff --git a/test/mplex.spec.js b/test/mplex.spec.js deleted file mode 100644 index e67a996..0000000 --- a/test/mplex.spec.js +++ /dev/null @@ -1,70 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) -const pair = require('pull-pair/duplex') -const pull = require('pull-stream') - -const multiplex = require('../src') - -describe('multiplex-generic', () => { - let listenerSocket - let dialerSocket - - let listener - let dialer - - before(() => { - const p = pair() - dialerSocket = p[0] - listenerSocket = p[1] - }) - - it('attach to a duplex stream, as listener', () => { - listener = multiplex.listener(listenerSocket) - expect(listener).to.exist() - }) - - it('attach to a duplex stream, as dialer', () => { - dialer = multiplex.dialer(dialerSocket) - expect(dialer).to.exist() - }) - - it('open a multiplex stream from client', (done) => { - listener.once('stream', (conn) => { - pull(conn, conn) - }) - - const conn = dialer.newStream() - pull( - // Strings should be converted to Buffers - pull.values(['hello']), - conn, - pull.collect((err, res) => { - expect(err).to.not.exist() - expect(res).to.eql([Buffer.from('hello')]) - done() - }) - ) - }) - - it('open a multiplex stream from listener', (done) => { - dialer.once('stream', (conn) => { - pull(conn, conn) - }) - - const conn = listener.newStream() - pull( - pull.values([Buffer.from('hello')]), - conn, - pull.collect((err, res) => { - expect(err).to.not.exist() - expect(res).to.eql([Buffer.from('hello')]) - done() - }) - ) - }) -}) diff --git a/test/muxer.spec.js b/test/muxer.spec.js deleted file mode 100644 index 0bc918c..0000000 --- a/test/muxer.spec.js +++ /dev/null @@ -1,74 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) - -const pair = require('pull-pair/duplex') - -const Muxer = require('../src/muxer') -const Multiplex = require('../src/internals') - -describe('multiplex-muxer', () => { - let muxer - let multiplex - - it('can be created', () => { - const p = pair() - multiplex = new Multiplex() - muxer = new Muxer(p, multiplex) - }) - - it('catches newStream errors', (done) => { - multiplex.createStream = () => { - throw new Error('something nbad happened') - } - muxer.newStream((err) => { - expect(err).to.exist() - expect(err.message).to.equal('something nbad happened') - done() - }) - }) - - it('can be destroyed with an error', (done) => { - const p = pair() - const multiplex = new Multiplex() - const muxer = new Muxer(p, multiplex) - const error = new Error('bad things') - muxer.once('error', (err) => { - expect(err).to.eql(error) - done() - }) - muxer.end(error) - }) - - it('destroying with error does not throw with no listener', () => { - const p = pair() - const multiplex = new Multiplex() - const muxer = new Muxer(p, multiplex) - const error = new Error('bad things') - expect(() => muxer.end(error)).to.not.throw() - }) - - it('can get destroyed', (done) => { - expect(multiplex.destroyed).to.eql(false) - - muxer.end((err) => { - expect(err).to.not.exist() - expect(multiplex.destroyed).to.be.true() - done() - }) - }) - - it('should handle a repeat destroy', (done) => { - expect(multiplex.destroyed).to.be.true() - - muxer.end((err) => { - expect(err).to.not.exist() - expect(multiplex.destroyed).to.be.true() - done() - }) - }) -}) diff --git a/test/node.js b/test/node.js deleted file mode 100644 index ab22922..0000000 --- a/test/node.js +++ /dev/null @@ -1,3 +0,0 @@ -'use strict' - -require('./internals.node') diff --git a/test/restrict-size.spec.js b/test/restrict-size.spec.js new file mode 100644 index 0000000..90e15dd --- /dev/null +++ b/test/restrict-size.spec.js @@ -0,0 +1,48 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const { expect } = chai +chai.use(dirtyChai) +const pipe = require('it-pipe') +const randomBytes = require('random-bytes') +const { tap, consume, collect } = require('streaming-iterables') + +const restrictSize = require('../src/restrict-size') + +describe('restrict-size', () => { + it('should throw when size is too big', async () => { + const input = [ + { data: await randomBytes(8) }, + { data: await randomBytes(64) }, + { data: await randomBytes(16) } + ] + + const output = [] + + try { + await pipe( + input, + restrictSize(32), + tap(chunk => output.push(chunk)), + consume + ) + } catch (err) { + expect(err.code).to.equal('ERR_MSG_TOO_BIG') + expect(output).to.have.length(1) + expect(output[0]).to.deep.equal(input[0]) + return + } + throw new Error('did not restrict size') + }) + + it('should allow message with no data property', async () => { + const output = await pipe( + [{}], + restrictSize(32), + collect + ) + expect(output).to.deep.equal([{}]) + }) +}) diff --git a/test/stream.spec.js b/test/stream.spec.js new file mode 100644 index 0000000..634da42 --- /dev/null +++ b/test/stream.spec.js @@ -0,0 +1,506 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const { expect } = chai +chai.use(dirtyChai) +const pipe = require('it-pipe') +const randomBytes = require('random-bytes') +const randomInt = require('random-int') +const { tap, take, collect, consume } = require('streaming-iterables') +const defer = require('p-defer') + +const createStream = require('../src/stream') +const { MessageTypes, MessageTypeNames } = require('../src/message-types') + +function randomInput (min = 1, max = 100) { + return Promise.all( + Array.from(Array(randomInt(min, max)), () => randomBytes(randomInt(1, 128))) + ) +} + +function expectMsgType (actual, expected) { + expect(MessageTypeNames[actual]).to.equal(MessageTypeNames[expected]) +} + +const infiniteRandom = { + [Symbol.iterator]: function * () { + while (true) yield randomBytes(randomInt(1, 128)) + } +} + +describe('stream', () => { + it('should initiate stream with NEW_STREAM message', async () => { + const msgs = [] + const mockSend = msg => msgs.push(msg) + const id = randomInt(1000) + const stream = createStream({ id, send: mockSend }) + const input = await randomInput() + + await pipe(input, stream) + + expect(msgs[0].id).to.equal(id) + expectMsgType(msgs[0].type, MessageTypes.NEW_STREAM) + expect(msgs[0].data).to.deep.equal(id.toString()) + }) + + it('should initiate named stream with NEW_STREAM message', async () => { + const msgs = [] + const mockSend = msg => msgs.push(msg) + const id = randomInt(1000) + const name = `STREAM${Date.now()}` + const stream = createStream({ id, name, send: mockSend }) + const input = await randomInput() + + await pipe(input, stream) + + expect(msgs[0].id).to.equal(id) + expectMsgType(msgs[0].type, MessageTypes.NEW_STREAM) + expect(msgs[0].data).to.deep.equal(name) + }) + + it('should send data with MESSAGE_INITIATOR messages if stream initiator', async () => { + const msgs = [] + const mockSend = msg => msgs.push(msg) + const id = randomInt(1000) + const name = id.toString() + const stream = createStream({ id, name, send: mockSend, type: 'initiator' }) + const input = await randomInput() + + await pipe(input, stream) + + // First and last should be NEW_STREAM and CLOSE + const dataMsgs = msgs.slice(1, -1) + expect(dataMsgs).have.length(input.length) + + dataMsgs.forEach((msg, i) => { + expect(msg.id).to.equal(id) + expectMsgType(msg.type, MessageTypes.MESSAGE_INITIATOR) + expect(msg.data).to.deep.equal(input[i]) + }) + }) + + it('should send data with MESSAGE_RECEIVER messages if stream receiver', async () => { + const msgs = [] + const mockSend = msg => msgs.push(msg) + const id = randomInt(1000) + const name = id.toString() + const stream = createStream({ id, name, send: mockSend, type: 'receiver' }) + const input = await randomInput() + + await pipe(input, stream) + + // Last should be CLOSE + const dataMsgs = msgs.slice(0, -1) + expect(dataMsgs).have.length(input.length) + + dataMsgs.forEach((msg, i) => { + expect(msg.id).to.equal(id) + expectMsgType(msg.type, MessageTypes.MESSAGE_RECEIVER) + expect(msg.data).to.deep.equal(input[i]) + }) + }) + + it('should close stream with CLOSE_INITIATOR message if stream initiator', async () => { + const msgs = [] + const mockSend = msg => msgs.push(msg) + const id = randomInt(1000) + const name = id.toString() + const stream = createStream({ id, name, send: mockSend, type: 'initiator' }) + const input = await randomInput() + + await pipe(input, stream) + + const closeMsg = msgs[msgs.length - 1] + + expect(closeMsg.id).to.equal(id) + expectMsgType(closeMsg.type, MessageTypes.CLOSE_INITIATOR) + expect(closeMsg.data).to.not.exist() + }) + + it('should close stream with CLOSE_RECEIVER message if stream receiver', async () => { + const msgs = [] + const mockSend = msg => msgs.push(msg) + const id = randomInt(1000) + const name = id.toString() + const stream = createStream({ id, name, send: mockSend, type: 'receiver' }) + const input = await randomInput() + + await pipe(input, stream) + + const closeMsg = msgs[msgs.length - 1] + + expect(closeMsg.id).to.equal(id) + expectMsgType(closeMsg.type, MessageTypes.CLOSE_RECEIVER) + expect(closeMsg.data).to.not.exist() + }) + + it('should reset stream on error with RESET_INITIATOR message if stream initiator', async () => { + const msgs = [] + const mockSend = msg => msgs.push(msg) + const id = randomInt(1000) + const name = id.toString() + const stream = createStream({ id, name, send: mockSend, type: 'initiator' }) + const error = new Error(`Boom ${Date.now()}`) + const input = { + [Symbol.iterator]: function * () { + for (let i = 0; i < randomInt(1, 10); i++) { + yield randomBytes(randomInt(1, 128)) + } + throw error + } + } + + await pipe(input, stream) + + const resetMsg = msgs[msgs.length - 1] + + expect(resetMsg.id).to.equal(id) + expectMsgType(resetMsg.type, MessageTypes.RESET_INITIATOR) + expect(resetMsg.data).to.not.exist() + }) + + it('should reset stream on error with RESET_RECEIVER message if stream receiver', async () => { + const msgs = [] + const mockSend = msg => msgs.push(msg) + const id = randomInt(1000) + const name = id.toString() + const stream = createStream({ id, name, send: mockSend, type: 'receiver' }) + const error = new Error(`Boom ${Date.now()}`) + const input = { + [Symbol.iterator]: function * () { + for (let i = 0; i < randomInt(1, 10); i++) { + yield randomBytes(randomInt(1, 128)) + } + throw error + } + } + + await pipe(input, stream) + + const resetMsg = msgs[msgs.length - 1] + + expect(resetMsg.id).to.equal(id) + expectMsgType(resetMsg.type, MessageTypes.RESET_RECEIVER) + expect(resetMsg.data).to.not.exist() + }) + + it('should close for reading (remote close)', async () => { + const mockInitiatorSend = msg => receiver.source.push(msg) + const mockReceiverSend = msg => initiator.source.push(msg) + const id = randomInt(1000) + const name = id.toString() + const initiator = createStream({ id, name, send: mockInitiatorSend, type: 'initiator' }) + const receiver = createStream({ id, name, send: mockReceiverSend, type: 'receiver' }) + + // echo back (on the other side this will be { type: MESSAGE, data: msg }) + pipe( + receiver, + tap(msg => { + // when the initiator sends a CLOSE message, we call close + if (msg.type === MessageTypes.CLOSE_INITIATOR) { + receiver.close() + } + }), + receiver + ) + + const input = await randomInput() + const msgs = await pipe( + input, + initiator, + tap(msg => { + // when the receiver sends a CLOSE message, we call close + if (msg.type === MessageTypes.CLOSE_RECEIVER) { + initiator.close() + } + }), + collect + ) + + // NEW_STREAM should have been echoed back to us + expectMsgType(msgs[0].type, MessageTypes.MESSAGE_RECEIVER) + expectMsgType(msgs[0].data.type, MessageTypes.NEW_STREAM) + + // check the receiver echoed back all our data messages + expect(msgs.slice(1, -2).length).to.equal(input.length) + + msgs.slice(1, -2).forEach((msg, i) => { + expectMsgType(msg.data.type, MessageTypes.MESSAGE_INITIATOR) + expect(msg.data.data).to.deep.equal(input[i]) + }) + + // ...and echoed back the close message + expectMsgType(msgs[msgs.length - 2].type, MessageTypes.MESSAGE_RECEIVER) + expectMsgType(msgs[msgs.length - 2].data.type, MessageTypes.CLOSE_INITIATOR) + + // ...and finally sent a close message + const closeMsg = msgs[msgs.length - 1] + + expectMsgType(closeMsg.type, MessageTypes.CLOSE_RECEIVER) + expect(closeMsg.data).to.not.exist() + }) + + it('should close for reading and writing (abort on local error)', async () => { + const mockInitiatorSend = msg => receiver.source.push(msg) + const mockReceiverSend = msg => initiator.source.push(msg) + const id = randomInt(1000) + const name = id.toString() + const initiator = createStream({ id, name, send: mockInitiatorSend, type: 'initiator' }) + const receiver = createStream({ id, name, send: mockReceiverSend, type: 'receiver' }) + + // echo back (on the other side this will be { type: MESSAGE, data: msg }) + pipe( + receiver, + tap(msg => { + // when the initiator sends a RESET message, we call reset + if (msg.type === MessageTypes.RESET_INITIATOR) { + receiver.reset() + } + }), + receiver + ) + + const input = infiniteRandom + const error = new Error(`Boom ${Date.now()}`) + const maxMsgs = randomInt(1, 10) + const generatedMsgs = [] + const msgs = [] + + try { + let i = 0 + + await pipe( + input, + tap(msg => generatedMsgs.push(msg)), + tap(msg => { if (i++ >= maxMsgs) initiator.abort(error) }), + initiator, + tap(msg => msgs.push(msg)), + consume + ) + } catch (err) { + expect(err.message).to.equal(error.message) + + // NEW_STREAM should have been echoed back to us + expectMsgType(msgs[0].type, MessageTypes.MESSAGE_RECEIVER) + expectMsgType(msgs[0].data.type, MessageTypes.NEW_STREAM) + + expect(msgs).to.have.length(generatedMsgs.length) + + // check the receiver echoed back all our data messages, and nothing else + msgs.slice(1).forEach((msg, i) => { + expectMsgType(msg.data.type, MessageTypes.MESSAGE_INITIATOR) + expect(msg.data.data).to.deep.equal(generatedMsgs[i]) + }) + } + }) + + it('should close for reading and writing (abort on remote error)', async () => { + const mockInitiatorSend = msg => receiver.source.push(msg) + const mockReceiverSend = msg => initiator.source.push(msg) + const id = randomInt(1000) + const name = id.toString() + const initiator = createStream({ id, name, send: mockInitiatorSend, type: 'initiator' }) + const receiver = createStream({ id, name, send: mockReceiverSend, type: 'receiver' }) + + const error = new Error(`Boom ${Date.now()}`) + const maxMsgs = randomInt(1, 10) + let i = 0 + + // echo back (on the other side this will be { type: MESSAGE, data: msg }) + pipe( + receiver, + tap(msg => { if (i++ >= maxMsgs) receiver.abort(error) }), + receiver + ) + + const input = infiniteRandom + const generatedMsgs = [] + const msgs = [] + + try { + await pipe( + input, + tap(msg => generatedMsgs.push(msg)), + initiator, + tap(msg => msgs.push(msg)), + tap(msg => { + // when the receiver sends a RESET message, we call reset + if (msg.type === MessageTypes.RESET_RECEIVER) { + initiator.reset() + } + }), + consume + ) + } catch (err) { + expect(err.message).to.equal('stream reset') + + // NEW_STREAM should have been echoed back to us + expectMsgType(msgs[0].type, MessageTypes.MESSAGE_RECEIVER) + expectMsgType(msgs[0].data.type, MessageTypes.NEW_STREAM) + + // because the receiver errored we might not have received all our data messages + expect(msgs.length - 2).to.be.lte(generatedMsgs.length) + + // check the receiver echoed back some/all our data messages + msgs.slice(1, -1).forEach((msg, i) => { + expectMsgType(msg.data.type, MessageTypes.MESSAGE_INITIATOR) + expect(msg.data.data).to.deep.equal(generatedMsgs[i]) + }) + + // ...and finally a RESET message + expectMsgType(msgs[msgs.length - 1].type, MessageTypes.RESET_RECEIVER) + } + }) + + it('should close immediately for reading and writing (reset on local error)', async () => { + const mockInitiatorSend = msg => receiver.source.push(msg) + const mockReceiverSend = msg => initiator.source.push(msg) + const id = randomInt(1000) + const name = id.toString() + const initiator = createStream({ id, name, send: mockInitiatorSend, type: 'initiator' }) + const receiver = createStream({ id, name, send: mockReceiverSend, type: 'receiver' }) + + // echo back (on the other side this will be { type: MESSAGE, data: msg }) + pipe( + receiver, + tap(msg => { + // when the initiator sends a RESET message, we call reset + if (msg.type === MessageTypes.RESET_INITIATOR) { + receiver.reset() + } + }), + receiver + ) + + const input = infiniteRandom + const error = new Error(`Boom ${Date.now()}`) + const maxMsgs = randomInt(1, 10) + const generatedMsgs = [] + const msgs = [] + + try { + let i = 0 + + await pipe( + input, + tap(msg => generatedMsgs.push(msg)), + tap(msg => { if (i++ >= maxMsgs) throw error }), + initiator, + tap(msg => msgs.push(msg)), + consume + ) + } catch (err) { + expect(err.message).to.equal(error.message) + + // NEW_STREAM should have been echoed back to us + expectMsgType(msgs[0].type, MessageTypes.MESSAGE_RECEIVER) + expectMsgType(msgs[0].data.type, MessageTypes.NEW_STREAM) + + // because we errored locally we might not receive all the echo messages + // from the receiver before our source stream is ended + expect(msgs.length - 1).to.be.lte(generatedMsgs.length) + + // check the receiver echoed back some/all our data messages, and nothing else + msgs.slice(1).forEach((msg, i) => { + expectMsgType(msg.data.type, MessageTypes.MESSAGE_INITIATOR) + expect(msg.data.data).to.deep.equal(generatedMsgs[i]) + }) + } + }) + + it('should close immediately for reading and writing (reset on remote error)', async () => { + const mockInitiatorSend = msg => receiver.source.push(msg) + const mockReceiverSend = msg => initiator.source.push(msg) + const id = randomInt(1000) + const name = id.toString() + const initiator = createStream({ id, name, send: mockInitiatorSend, type: 'initiator' }) + const receiver = createStream({ id, name, send: mockReceiverSend, type: 'receiver' }) + + const error = new Error(`Boom ${Date.now()}`) + const maxMsgs = randomInt(1, 10) + let i = 0 + + // echo back (on the other side this will be { type: MESSAGE, data: msg }) + pipe( + receiver, + tap(msg => { if (i++ >= maxMsgs) throw error }), + receiver + ) + + const input = infiniteRandom + const generatedMsgs = [] + const msgs = [] + + try { + await pipe( + input, + tap(msg => generatedMsgs.push(msg)), + initiator, + tap(msg => msgs.push(msg)), + tap(msg => { + // when the receiver sends a RESET message, we call reset + if (msg.type === MessageTypes.RESET_RECEIVER) { + initiator.reset() + } + }), + consume + ) + } catch (err) { + expect(err.message).to.equal('stream reset') + + // NEW_STREAM should have been echoed back to us + expectMsgType(msgs[0].type, MessageTypes.MESSAGE_RECEIVER) + expectMsgType(msgs[0].data.type, MessageTypes.NEW_STREAM) + + // because we errored locally we might not receive all the echo messages + // from the receiver before our source stream is ended + expect(msgs.length - 2).to.be.lte(generatedMsgs.length) + + // check the receiver echoed back some/all our data messages + msgs.slice(1, -1).forEach((msg, i) => { + expectMsgType(msg.data.type, MessageTypes.MESSAGE_INITIATOR) + expect(msg.data.data).to.deep.equal(generatedMsgs[i]) + }) + + // ...and finally a RESET message + expectMsgType(msgs[msgs.length - 1].type, MessageTypes.RESET_RECEIVER) + } + }) + + it('should call onEnd only when both sides have closed', async () => { + const send = msg => stream.source.push(msg) + const id = randomInt(1000) + const name = id.toString() + const deferred = defer() + const onEnd = err => err ? deferred.reject(err) : deferred.resolve() + const stream = createStream({ id, name, send, onEnd }) + const input = await randomInput() + + pipe(input, stream, take(randomInt(1, input.length)), consume) + await deferred.promise + }) + + it('should call onEnd with error for local error', async () => { + const send = msg => stream.source.push(msg) + const id = randomInt(1000) + const name = id.toString() + const deferred = defer() + const onEnd = err => err ? deferred.reject(err) : deferred.resolve() + const stream = createStream({ id, name, send, onEnd }) + + const error = new Error(`Boom ${Date.now()}`) + const maxMsgs = randomInt(1, 10) + let i = 0 + + pipe(infiniteRandom, tap(msg => { if (i++ >= maxMsgs) throw error }), stream) + + try { + await deferred.promise + } catch (err) { + return expect(err.message).to.equal(error.message) + } + throw new Error('did not call onEnd with error') + }) +}) From a15a5e9b5e6a1744b2253ab213bf55f38241c1d8 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 15 Aug 2019 11:58:33 +0100 Subject: [PATCH 03/18] fix: add missing comma in README example Thanks @vasco-santos! License: MIT Signed-off-by: Alan Shaw --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 4215d9c..d5d8051 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,7 @@ pipe(conn, muxer, conn) // conn is duplex connection to another peer stream, source => (async function * () { for await (const data of source) yield data - })() + })(), stream ) } From 53337cbdfa91b584315cdff2848de4e5bf6a19cb Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 15 Aug 2019 12:45:08 +0100 Subject: [PATCH 04/18] refactor: add libp2p prefix to log lines License: MIT Signed-off-by: Alan Shaw --- src/mplex.js | 2 +- src/stream.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mplex.js b/src/mplex.js index ddba084..adb25cf 100644 --- a/src/mplex.js +++ b/src/mplex.js @@ -2,7 +2,7 @@ const pipe = require('it-pipe') const pushable = require('it-pushable') -const log = require('debug')('mplex:mplex') +const log = require('debug')('libp2p:mplex:mplex') const abortable = require('abortable-iterator') const Coder = require('./coder') const restrictSize = require('./restrict-size') diff --git a/src/stream.js b/src/stream.js index 11c1b6f..1320322 100644 --- a/src/stream.js +++ b/src/stream.js @@ -2,7 +2,7 @@ const abortable = require('abortable-iterator') const AbortController = require('abort-controller') -const log = require('debug')('mplex:stream') +const log = require('debug')('libp2p:mplex:stream') const pushable = require('it-pushable') const { InitiatorMessageTypes, ReceiverMessageTypes } = require('./message-types') From a8b59432e7580d9b6fc74408d743bcda52554b52 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 15 Aug 2019 13:18:00 +0100 Subject: [PATCH 05/18] docs: tweak comment Co-Authored-By: Jacob Heun --- src/coder/decode.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/coder/decode.js b/src/coder/decode.js index b69a1d4..6157951 100644 --- a/src/coder/decode.js +++ b/src/coder/decode.js @@ -39,7 +39,7 @@ class Decoder { const { id, type, length } = this._headerInfo - if (this._buffer.length < length) return [] // not got enough data yet + if (this._buffer.length < length) return [] // not enough data yet if (this._buffer.length === length) { const msg = { id, type, data: this._buffer } From fbd68e1e7dffe34d23913fae8f304f862d0ad0b6 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 15 Aug 2019 13:19:51 +0100 Subject: [PATCH 06/18] fix: remove double mplex from log prefix License: MIT Signed-off-by: Alan Shaw --- src/mplex.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mplex.js b/src/mplex.js index adb25cf..844921c 100644 --- a/src/mplex.js +++ b/src/mplex.js @@ -2,7 +2,7 @@ const pipe = require('it-pipe') const pushable = require('it-pushable') -const log = require('debug')('libp2p:mplex:mplex') +const log = require('debug')('libp2p:mplex') const abortable = require('abortable-iterator') const Coder = require('./coder') const restrictSize = require('./restrict-size') From c2f154d42f3868cc1150bd8a6cc77b1060cd2c45 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 15 Aug 2019 14:58:45 +0100 Subject: [PATCH 07/18] fix: max size restriction was exclusive not inclusive License: MIT Signed-off-by: Alan Shaw --- src/restrict-size.js | 2 +- test/restrict-size.spec.js | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/restrict-size.js b/src/restrict-size.js index fa90763..2524eab 100644 --- a/src/restrict-size.js +++ b/src/restrict-size.js @@ -8,7 +8,7 @@ module.exports = max => { return source => { return (async function * restrictSize () { for await (const msg of source) { - if (msg.data && msg.data.length >= max) { + if (msg.data && msg.data.length > max) { throw Object.assign(new Error('message size too large!'), { code: 'ERR_MSG_TOO_BIG' }) } yield msg diff --git a/test/restrict-size.spec.js b/test/restrict-size.spec.js index 90e15dd..c3e3d94 100644 --- a/test/restrict-size.spec.js +++ b/test/restrict-size.spec.js @@ -13,8 +13,11 @@ const restrictSize = require('../src/restrict-size') describe('restrict-size', () => { it('should throw when size is too big', async () => { + const maxSize = 32 + const input = [ { data: await randomBytes(8) }, + { data: await randomBytes(maxSize) }, { data: await randomBytes(64) }, { data: await randomBytes(16) } ] @@ -24,7 +27,7 @@ describe('restrict-size', () => { try { await pipe( input, - restrictSize(32), + restrictSize(maxSize), tap(chunk => output.push(chunk)), consume ) @@ -32,6 +35,7 @@ describe('restrict-size', () => { expect(err.code).to.equal('ERR_MSG_TOO_BIG') expect(output).to.have.length(1) expect(output[0]).to.deep.equal(input[0]) + expect(output[1]).to.deep.equal(input[1]) return } throw new Error('did not restrict size') From 007752115f2bba6db72d25b4242ea9767994b0e3 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 15 Aug 2019 15:17:06 +0100 Subject: [PATCH 08/18] fix: restrict size test License: MIT Signed-off-by: Alan Shaw --- test/restrict-size.spec.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/restrict-size.spec.js b/test/restrict-size.spec.js index c3e3d94..6129703 100644 --- a/test/restrict-size.spec.js +++ b/test/restrict-size.spec.js @@ -33,7 +33,7 @@ describe('restrict-size', () => { ) } catch (err) { expect(err.code).to.equal('ERR_MSG_TOO_BIG') - expect(output).to.have.length(1) + expect(output).to.have.length(2) expect(output[0]).to.deep.equal(input[0]) expect(output[1]).to.deep.equal(input[1]) return From b09126b2b23943ce5bbb59154eb4a158b60b467f Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 15 Aug 2019 15:21:17 +0100 Subject: [PATCH 09/18] fix: do not encode bufferlist to buffer License: MIT Signed-off-by: Alan Shaw --- src/coder/encode.js | 4 ---- test/coder.spec.js | 18 +++++++++--------- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/coder/encode.js b/src/coder/encode.js index 4d6328d..a319a02 100644 --- a/src/coder/encode.js +++ b/src/coder/encode.js @@ -31,10 +31,6 @@ class Encoder { if (!msg.data) return [header, empty] - if (msg.data.shallowSlice) { - msg.data = msg.data.slice() // If BufferList, convert to buffer - } - return [header, msg.data] } } diff --git a/test/coder.spec.js b/test/coder.spec.js index 8070ab1..30fd4e2 100644 --- a/test/coder.spec.js +++ b/test/coder.spec.js @@ -38,12 +38,12 @@ describe('coder', () => { { id: 21, type: 0, data: Buffer.from('21') } ] - let data = Buffer.alloc(0) + const data = new BufferList() for await (const chunk of coder.encode(source)) { - data = Buffer.concat([data, chunk]) + data.append(chunk) } - expect(data).to.be.eql(Buffer.from('88010231379801023139a801023231', 'hex')) + expect(data.slice()).to.be.eql(Buffer.from('88010231379801023139a801023231', 'hex')) }) it('should encode from BufferList', async () => { @@ -56,12 +56,12 @@ describe('coder', () => { ]) }] - let data = Buffer.alloc(0) + const data = new BufferList() for await (const chunk of coder.encode(source)) { - data = Buffer.concat([data, chunk]) + data.append(chunk) } - expect(data).to.be.eql(Buffer.concat([ + expect(data.slice()).to.be.eql(Buffer.concat([ Buffer.from('8801', 'hex'), Buffer.from([source[0].data.length]), source[0].data.slice() @@ -87,12 +87,12 @@ describe('coder', () => { it('should encode zero length body msg', async () => { const source = [{ id: 17, type: 0 }] - let data = Buffer.alloc(0) + const data = new BufferList() for await (const chunk of coder.encode(source)) { - data = Buffer.concat([data, chunk]) + data.append(chunk) } - expect(data).to.be.eql(Buffer.from('880100', 'hex')) + expect(data.slice()).to.be.eql(Buffer.from('880100', 'hex')) }) it('should decode zero length body msg', async () => { From 58160066ba88591d7bd453221c06bf18acf4ff7d Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Fri, 16 Aug 2019 14:17:13 +0100 Subject: [PATCH 10/18] perf: performance tweaks License: MIT Signed-off-by: Alan Shaw --- package.json | 7 +++---- src/adapter.js | 3 +++ src/coder/decode.js | 44 ++++++++++++++++++------------------------- src/coder/encode.js | 12 +++++++----- src/mplex.js | 45 ++++++++++++++++++++------------------------ src/restrict-size.js | 12 ++++++++++-- src/stream.js | 7 +++++-- test/coder.spec.js | 26 ++++++++++++++----------- 8 files changed, 81 insertions(+), 75 deletions(-) diff --git a/package.json b/package.json index 6636bc4..92607bc 100644 --- a/package.json +++ b/package.json @@ -45,18 +45,17 @@ "interface-stream-muxer": "github:libp2p/interface-stream-muxer#refactor/async-iterators", "p-defer": "^3.0.0", "random-bytes": "^1.0.0", - "random-int": "^2.0.0", - "streaming-iterables": "^4.1.0" + "random-int": "^2.0.0" }, "dependencies": { "abort-controller": "^3.0.0", - "abortable-iterator": "^2.0.0", + "abortable-iterator": "^2.1.0", "async-iterator-to-pull-stream": "^1.3.0", "bl": "^3.0.0", "debug": "^4.1.1", "interface-connection": "~0.3.3", "it-pipe": "^1.0.1", - "it-pushable": "^1.2.1", + "it-pushable": "^1.3.0", "pull-stream": "^3.6.9", "varint": "^5.0.0" }, diff --git a/src/adapter.js b/src/adapter.js index 4ba90cf..f93f8bd 100644 --- a/src/adapter.js +++ b/src/adapter.js @@ -3,6 +3,7 @@ const Mplex = require('.') const AbortController = require('abort-controller') const pull = require('pull-stream/pull') +const map = require('pull-stream/throughs/map') const toPull = require('async-iterator-to-pull-stream') const { Connection } = require('interface-connection') const EE = require('events') @@ -48,6 +49,8 @@ function create (conn, isListener) { pull( conn, toPull.duplex(muxer), + // convert bl to Buffer + map(chunk => chunk.slice()), read => (end, cb) => { if (end) adapterMuxer.emit('close', end) read(end, cb) diff --git a/src/coder/decode.js b/src/coder/decode.js index 6157951..1c2e916 100644 --- a/src/coder/decode.js +++ b/src/coder/decode.js @@ -3,11 +3,12 @@ const varint = require('varint') const BufferList = require('bl') +// Decode a chunk and yield an _array_ of decoded messages module.exports = source => (async function * decode () { const decoder = new Decoder() - for await (const chunk of source) { + for await (let chunk of source) { const msgs = decoder.write(chunk) - for (let i = 0; i < msgs.length; i++) yield msgs[i] + if (msgs.length) yield msgs } })() @@ -25,38 +26,29 @@ class Decoder { if (!chunk || !chunk.length) return [] this._buffer.append(chunk) - - if (!this._headerInfo) { - try { - this._headerInfo = this._decodeHeader(this._bufferProxy) - } catch (err) { - return [] // not enough data yet...probably + const msgs = [] + + while (true) { + if (!this._headerInfo) { + try { + this._headerInfo = this._decodeHeader(this._bufferProxy) + } catch (_) { + break // not enough data yet...probably + } } - // remove the header from the buffer - this._buffer = this._buffer.shallowSlice(this._headerInfo.offset) - } - - const { id, type, length } = this._headerInfo + const { id, type, length, offset } = this._headerInfo + const bufferedDataLength = this._buffer.length - offset - if (this._buffer.length < length) return [] // not enough data yet + if (bufferedDataLength < length) break // not enough data yet - if (this._buffer.length === length) { - const msg = { id, type, data: this._buffer } + msgs.push({ id, type, data: this._buffer.shallowSlice(offset, offset + length) }) + this._buffer.consume(offset + length) this._headerInfo = null - this._buffer = new BufferList() - - return [msg] } - const msg = { id, type, data: this._buffer.shallowSlice(0, length) } - const rest = this._buffer.shallowSlice(length) - - this._headerInfo = null - this._buffer = new BufferList() - - return [msg, ...this.write(rest)] + return msgs } _decodeHeader (data) { diff --git a/src/coder/encode.js b/src/coder/encode.js index a319a02..6f11b96 100644 --- a/src/coder/encode.js +++ b/src/coder/encode.js @@ -1,9 +1,9 @@ 'use strict' const varint = require('varint') +const BufferList = require('bl') const POOL_SIZE = 10 * 1024 -const empty = Buffer.alloc(0) class Encoder { constructor () { @@ -29,7 +29,7 @@ class Encoder { this._poolOffset = offset } - if (!msg.data) return [header, empty] + if (!msg.data) return header return [header, msg.data] } @@ -37,11 +37,13 @@ class Encoder { const encoder = new Encoder() +// Encode one or more messages and yield a BufferList of encoded messages module.exports = source => (async function * encode () { for await (const msg of source) { - const chunks = encoder.write(msg) - for (let i = 0; i < chunks.length; i++) { - yield chunks[i] + if (Array.isArray(msg)) { + yield new BufferList(msg.map(m => encoder.write(m))) + } else { + yield new BufferList(encoder.write(msg)) } } })() diff --git a/src/mplex.js b/src/mplex.js index 844921c..d29422a 100644 --- a/src/mplex.js +++ b/src/mplex.js @@ -27,39 +27,32 @@ class Mplex { newStream (name) { const id = this._streamId++ name = name == null ? id.toString() : String(name) - log('new initiator stream %s %s', id, name) - const send = msg => { - if (log.enabled) { - log('initiator stream send', { id: msg.id, type: MessageTypeNames[msg.type], data: msg.data }) - } - return this.source.push(msg) - } - const onEnd = () => { - log('initiator stream %s ended', id) - this._streams.initiators.delete(id) - } - const stream = createStream({ id, name, send, onEnd }) - this._streams.initiators.set(id, stream) - return stream + const registry = this._streams.initiators + return this._newStream({ id, name, type: 'initiator', registry }) } _newReceiverStream ({ id, name }) { - if (this._streams.receivers.has(id)) { - throw new Error(`stream ${id} already exists!`) + const registry = this._streams.receivers + return this._newStream({ id, name, type: 'receiver', registry }) + } + + _newStream({ id, name, type, registry }) { + if (registry.has(id)) { + throw new Error(`${type} stream ${id} already exists!`) } - log('new receiver stream %s %s', id, name) + log('new %s stream %s %s', type, id, name) const send = msg => { if (log.enabled) { - log('receiver stream send', { id: msg.id, type: MessageTypeNames[msg.type], data: msg.data }) + log('%s stream %s %s send', type, id, name, { id: msg.id, type: MessageTypeNames[msg.type], data: msg.data }) } return this.source.push(msg) } const onEnd = () => { - log('receiver stream %s ended', id) - this._streams.receivers.delete(id) + log('%s stream %s %s ended', type, id, name) + registry.delete(id) } - const stream = createStream({ id, name, send, type: 'receiver', onEnd }) - this._streams.receivers.set(id, stream) + const stream = createStream({ id, name, send, type, onEnd }) + registry.set(id, stream) return stream } @@ -75,8 +68,10 @@ class Mplex { Coder.decode, restrictSize(this._options.maxMsgSize), async source => { - for await (const msg of source) { - this._handleIncoming(msg) + for await (const msgs of source) { + for (const msg of msgs) { + this._handleIncoming(msg) + } } } ) @@ -96,7 +91,7 @@ class Mplex { for (const s of initiators.values()) s.abort(err) for (const s of receivers.values()) s.abort(err) } - const source = pushable(onEnd) + const source = pushable({ onEnd, writev: true }) const encodedSource = pipe( source, restrictSize(this._options.maxMsgSize), diff --git a/src/restrict-size.js b/src/restrict-size.js index 2524eab..65a85ee 100644 --- a/src/restrict-size.js +++ b/src/restrict-size.js @@ -5,11 +5,19 @@ const MAX_MSG_SIZE = 1 << 20 // 1MB module.exports = max => { max = max || MAX_MSG_SIZE + const checkSize = msg => { + if (msg.data && msg.data.length > max) { + throw Object.assign(new Error('message size too large!'), { code: 'ERR_MSG_TOO_BIG' }) + } + } + return source => { return (async function * restrictSize () { for await (const msg of source) { - if (msg.data && msg.data.length > max) { - throw Object.assign(new Error('message size too large!'), { code: 'ERR_MSG_TOO_BIG' }) + if (Array.isArray(msg)) { + msg.forEach(checkSize) + } else { + checkSize(msg) } yield msg } diff --git a/src/stream.js b/src/stream.js index 1320322..c955d70 100644 --- a/src/stream.js +++ b/src/stream.js @@ -36,6 +36,7 @@ module.exports = ({ id, name, send, onEnd = (() => {}), type = 'initiator' }) => close: () => stream.source.end(), // Close for reading and writing (local error) abort: err => { + log('%s stream %s abort', type, name, err) // End the source with the passed error stream.source.end(err) abortController.abort() @@ -43,8 +44,10 @@ module.exports = ({ id, name, send, onEnd = (() => {}), type = 'initiator' }) => // Close immediately for reading and writing (remote error) reset: () => resetController.abort(), sink: async source => { - source = abortable(source, abortController.signal, { abortMessage: 'stream aborted', abortCode: 'ERR_MPLEX_STREAM_ABORT' }) - source = abortable(source, resetController.signal, { abortMessage: 'stream reset', abortCode: 'ERR_MPLEX_STREAM_RESET' }) + source = abortable.multi(source, [ + { signal: abortController.signal, options: { abortMessage: 'stream aborted', abortCode: 'ERR_MPLEX_STREAM_ABORT' } }, + { signal: resetController.signal, options: { abortMessage: 'stream reset', abortCode: 'ERR_MPLEX_STREAM_RESET' } } + ]) if (type === 'initiator') { // If initiator, open a new stream send({ id, type: Types.NEW_STREAM, data: name }) diff --git a/test/coder.spec.js b/test/coder.spec.js index 30fd4e2..d0d4664 100644 --- a/test/coder.spec.js +++ b/test/coder.spec.js @@ -14,9 +14,9 @@ describe('coder', () => { it('should encode header', async () => { const source = [{ id: 17, type: 0, data: Buffer.from('17') }] - let data = Buffer.alloc(0) + const data = new BufferList() for await (const chunk of coder.encode(source)) { - data = Buffer.concat([data, chunk]) + data.append(chunk) } const expectedHeader = Buffer.from('880102', 'hex') @@ -25,9 +25,10 @@ describe('coder', () => { it('should decode header', async () => { const source = [Buffer.from('8801023137', 'hex')] - for await (const msg of coder.decode(source)) { - msg.data = msg.data.slice() // convert BufferList to Buffer - expect(msg).to.be.eql({ id: 17, type: 0, data: Buffer.from('17') }) + for await (const msgs of coder.decode(source)) { + expect(msgs.length).to.equal(1) + msgs[0].data = msgs[0].data.slice() // convert BufferList to Buffer + expect(msgs[0]).to.be.eql({ id: 17, type: 0, data: Buffer.from('17') }) } }) @@ -72,9 +73,11 @@ describe('coder', () => { const source = [Buffer.from('88010231379801023139a801023231', 'hex')] const res = [] - for await (const msg of coder.decode(source)) { - msg.data = msg.data.slice() // convert BufferList to Buffer - res.push(msg) + for await (const msgs of coder.decode(source)) { + for (const msg of msgs) { + msg.data = msg.data.slice() // convert BufferList to Buffer + res.push(msg) + } } expect(res).to.be.deep.eql([ @@ -98,9 +101,10 @@ describe('coder', () => { it('should decode zero length body msg', async () => { const source = [Buffer.from('880100', 'hex')] - for await (const msg of coder.decode(source)) { - msg.data = msg.data.slice() // convert BufferList to Buffer - expect(msg).to.be.eql({ id: 17, type: 0, data: Buffer.alloc(0) }) + for await (const msgs of coder.decode(source)) { + expect(msgs.length).to.equal(1) + msgs[0].data = msgs[0].data.slice() // convert BufferList to Buffer + expect(msgs[0]).to.be.eql({ id: 17, type: 0, data: Buffer.alloc(0) }) } }) }) From 66d5bbf73d273273aa5c6ee85411ef5267a4680b Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Fri, 16 Aug 2019 14:18:18 +0100 Subject: [PATCH 11/18] refactor: tweak log message License: MIT Signed-off-by: Alan Shaw --- src/mplex.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mplex.js b/src/mplex.js index d29422a..0fcd61e 100644 --- a/src/mplex.js +++ b/src/mplex.js @@ -43,7 +43,7 @@ class Mplex { log('new %s stream %s %s', type, id, name) const send = msg => { if (log.enabled) { - log('%s stream %s %s send', type, id, name, { id: msg.id, type: MessageTypeNames[msg.type], data: msg.data }) + log('%s stream %s %s send', type, id, name, { ...msg, type: MessageTypeNames[msg.type] }) } return this.source.push(msg) } From 9457e2f44051cc66c9f08545c2b19b6391796dc3 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 9 Sep 2019 21:48:31 +0100 Subject: [PATCH 12/18] fix: minor tweaks License: MIT Signed-off-by: Alan Shaw --- package.json | 2 +- src/coder/decode.js | 2 +- src/mplex.js | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/package.json b/package.json index 92607bc..b6704c2 100644 --- a/package.json +++ b/package.json @@ -55,7 +55,7 @@ "debug": "^4.1.1", "interface-connection": "~0.3.3", "it-pipe": "^1.0.1", - "it-pushable": "^1.3.0", + "it-pushable": "^1.3.1", "pull-stream": "^3.6.9", "varint": "^5.0.0" }, diff --git a/src/coder/decode.js b/src/coder/decode.js index 1c2e916..4b97928 100644 --- a/src/coder/decode.js +++ b/src/coder/decode.js @@ -6,7 +6,7 @@ const BufferList = require('bl') // Decode a chunk and yield an _array_ of decoded messages module.exports = source => (async function * decode () { const decoder = new Decoder() - for await (let chunk of source) { + for await (const chunk of source) { const msgs = decoder.write(chunk) if (msgs.length) yield msgs } diff --git a/src/mplex.js b/src/mplex.js index 0fcd61e..152ba88 100644 --- a/src/mplex.js +++ b/src/mplex.js @@ -36,7 +36,7 @@ class Mplex { return this._newStream({ id, name, type: 'receiver', registry }) } - _newStream({ id, name, type, registry }) { + _newStream ({ id, name, type, registry }) { if (registry.has(id)) { throw new Error(`${type} stream ${id} already exists!`) } From d24c57682526fdc173abf3d7c426a78f291fd767 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Tue, 10 Sep 2019 15:10:22 +0100 Subject: [PATCH 13/18] fix: log message --- src/mplex.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mplex.js b/src/mplex.js index 152ba88..e7c36c7 100644 --- a/src/mplex.js +++ b/src/mplex.js @@ -43,7 +43,7 @@ class Mplex { log('new %s stream %s %s', type, id, name) const send = msg => { if (log.enabled) { - log('%s stream %s %s send', type, id, name, { ...msg, type: MessageTypeNames[msg.type] }) + log('%s stream %s %s send', type, id, name, { ...msg, type: MessageTypeNames[msg.type], data: msg.data.slice() }) } return this.source.push(msg) } From e6303ef46e70c8db93f6911a0087aba9b5326157 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Mon, 16 Sep 2019 18:19:35 +0200 Subject: [PATCH 14/18] chore: update interface dep --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index b6704c2..edff9eb 100644 --- a/package.json +++ b/package.json @@ -42,7 +42,7 @@ "aegir": "^20.0.0", "chai": "^4.2.0", "dirty-chai": "^2.0.1", - "interface-stream-muxer": "github:libp2p/interface-stream-muxer#refactor/async-iterators", + "interface-stream-muxer": "^0.7.0", "p-defer": "^3.0.0", "random-bytes": "^1.0.0", "random-int": "^2.0.0" From 62af956c1f7977ee2f7f07fe8297fabb375b1d00 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Mon, 16 Sep 2019 19:03:46 +0200 Subject: [PATCH 15/18] fix(log): only slice when data is present --- src/mplex.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mplex.js b/src/mplex.js index e7c36c7..9716df6 100644 --- a/src/mplex.js +++ b/src/mplex.js @@ -43,7 +43,7 @@ class Mplex { log('new %s stream %s %s', type, id, name) const send = msg => { if (log.enabled) { - log('%s stream %s %s send', type, id, name, { ...msg, type: MessageTypeNames[msg.type], data: msg.data.slice() }) + log('%s stream %s %s send', type, id, name, { ...msg, type: MessageTypeNames[msg.type], data: msg.data && msg.data.slice() }) } return this.source.push(msg) } From 8d16c40f278d4d1cdb119a69325882d33ae53cd1 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Tue, 17 Sep 2019 15:55:34 +0200 Subject: [PATCH 16/18] chore: remove adapter code --- src/adapter.js | 67 -------------------------------------------------- 1 file changed, 67 deletions(-) delete mode 100644 src/adapter.js diff --git a/src/adapter.js b/src/adapter.js deleted file mode 100644 index f93f8bd..0000000 --- a/src/adapter.js +++ /dev/null @@ -1,67 +0,0 @@ -'use strict' - -const Mplex = require('.') -const AbortController = require('abort-controller') -const pull = require('pull-stream/pull') -const map = require('pull-stream/throughs/map') -const toPull = require('async-iterator-to-pull-stream') -const { Connection } = require('interface-connection') -const EE = require('events') -const noop = () => {} - -function create (conn, isListener) { - const toConn = stream => { - const { source } = stream - stream.source = (async function * () { - for await (const chunk of source) { - yield chunk.slice() // convert bl to Buffer - } - })() - Object.assign(stream.source, { push: source.push, end: source.end }) - return new Connection(toPull.duplex(stream), conn) - } - - const abortController = new AbortController() - const adapterMuxer = Object.assign(new EE(), { - newStream (cb) { - cb = cb || noop - const stream = muxer.newStream() - const conn = toConn(stream) - setTimeout(() => cb(null, conn)) - return conn - }, - end (err, cb) { - if (typeof err === 'function') { - cb = err - err = null - } - cb = cb || noop - abortController.abort() - cb() - } - }) - - const muxer = new Mplex({ - signal: abortController.signal, - onStream: stream => adapterMuxer.emit('stream', toConn(stream)) - }) - - pull( - conn, - toPull.duplex(muxer), - // convert bl to Buffer - map(chunk => chunk.slice()), - read => (end, cb) => { - if (end) adapterMuxer.emit('close', end) - read(end, cb) - }, - conn - ) - - return adapterMuxer -} - -module.exports = create -module.exports.multicodec = Mplex.multicodec -module.exports.dialer = conn => create(conn, false) -module.exports.listener = conn => create(conn, true) From cc141d5a6835ddf643b5ede11489d3acebd9b5f4 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Tue, 17 Sep 2019 15:55:46 +0200 Subject: [PATCH 17/18] docs: add comments and some jsdocs --- src/coder/decode.js | 14 +++++++-- src/coder/encode.js | 5 +++ src/mplex.js | 73 ++++++++++++++++++++++++++++++++++++++++++-- src/restrict-size.js | 6 ++++ src/stream.js | 11 ++++++- 5 files changed, 104 insertions(+), 5 deletions(-) diff --git a/src/coder/decode.js b/src/coder/decode.js index 4b97928..a19e112 100644 --- a/src/coder/decode.js +++ b/src/coder/decode.js @@ -15,13 +15,17 @@ module.exports = source => (async function * decode () { class Decoder { constructor () { this._buffer = new BufferList() - // optimisation to allow varint to take a bl (well a proxy to) + // optimization to allow varint to take a BufferList (well a proxy to) this._bufferProxy = new Proxy({}, { get: (_, prop) => prop[0] === 'l' ? this._buffer[prop] : this._buffer.get(parseInt(prop)) }) this._headerInfo = null } + /** + * @param {Buffer|BufferList} chunk + * @returns {object[]} An array of message objects + */ write (chunk) { if (!chunk || !chunk.length) return [] @@ -33,7 +37,7 @@ class Decoder { try { this._headerInfo = this._decodeHeader(this._bufferProxy) } catch (_) { - break // not enough data yet...probably + break // We haven't received enough data yet } } @@ -51,6 +55,12 @@ class Decoder { return msgs } + /** + * Attempts to decode the message header from the buffer + * @private + * @param {Buffer} data + * @returns {*} message header (id, type, offset, length) + */ _decodeHeader (data) { const h = varint.decode(data) let offset = varint.decode.bytes diff --git a/src/coder/encode.js b/src/coder/encode.js index 6f11b96..5e69a06 100644 --- a/src/coder/encode.js +++ b/src/coder/encode.js @@ -11,6 +11,11 @@ class Encoder { this._poolOffset = 0 } + /** + * Encodes the given message and returns it and its header + * @param {*} msg The message object to encode + * @returns {Buffer|Buffer[]} + */ write (msg) { const pool = this._pool let offset = this._poolOffset diff --git a/src/mplex.js b/src/mplex.js index 9716df6..9f6411c 100644 --- a/src/mplex.js +++ b/src/mplex.js @@ -10,20 +10,51 @@ const { MessageTypes, MessageTypeNames } = require('./message-types') const createStream = require('./stream') class Mplex { + /** + * @constructor + * @param {object} options + * @param {function(*)} options.onStream Called whenever an inbound stream is created + * @param {AbortSignal} options.signal An AbortController signal + */ constructor (options) { options = options || {} options = typeof options === 'function' ? { onStream: options } : options this._streamId = 0 - this._streams = { initiators: new Map(), receivers: new Map() } + this._streams = { + /** + * @type {Map} Stream to ids map + */ + initiators: new Map(), + /** + * @type {Map} Stream to ids map + */ + receivers: new Map() + } this._options = options + /** + * An iterable sink + */ this.sink = this._createSink() + + /** + * An iterable source + */ this.source = this._createSource() + + /** + * @property {function} onStream + */ this.onStream = options.onStream } - // Initiate a new stream with the given name + /** + * Initiate a new stream with the given name. If no name is + * provided, the id of th stream will be used. + * @param {string} [name] If name is not a string it will be cast to one + * @returns {Stream} + */ newStream (name) { const id = this._streamId++ name = name == null ? id.toString() : String(name) @@ -31,11 +62,29 @@ class Mplex { return this._newStream({ id, name, type: 'initiator', registry }) } + /** + * Called whenever an inbound stream is created + * @private + * @param {*} options + * @param {number} options.id + * @param {string} options.name + * @returns {*} A muxed stream + */ _newReceiverStream ({ id, name }) { const registry = this._streams.receivers return this._newStream({ id, name, type: 'receiver', registry }) } + /** + * Creates a new stream + * @private + * @param {object} options + * @param {number} options.id + * @param {string} options.name + * @param {string} options.type + * @param {Map} options.registry A map of streams to their ids + * @returns {*} A muxed stream + */ _newStream ({ id, name, type, registry }) { if (registry.has(id)) { throw new Error(`${type} stream ${id} already exists!`) @@ -56,6 +105,12 @@ class Mplex { return stream } + /** + * Creates a sink with an abortable source. Incoming messages will + * also have their size restricted. All messages will be varint decoded. + * @private + * @returns {*} Returns an iterable sink + */ _createSink () { return async source => { if (this._options.signal) { @@ -84,6 +139,12 @@ class Mplex { } } + /** + * Creates a source that restricts outgoing message sizes + * and varint encodes them. + * @private + * @returns {*} An iterable source + */ _createSource () { const onEnd = err => { const { initiators, receivers } = this._streams @@ -104,6 +165,14 @@ class Mplex { }) } + /** + * @private + * @param {object} options + * @param {number} options.id + * @param {string} options.type + * @param {Buffer|BufferList} options.data + * @returns {void} + */ _handleIncoming ({ id, type, data }) { if (log.enabled) { log('incoming message', { id, type: MessageTypeNames[type], data: data.slice() }) diff --git a/src/restrict-size.js b/src/restrict-size.js index 65a85ee..7477a9a 100644 --- a/src/restrict-size.js +++ b/src/restrict-size.js @@ -2,6 +2,12 @@ const MAX_MSG_SIZE = 1 << 20 // 1MB +/** + * Creates an iterable transform that restricts message sizes to + * the given maximum size. + * @param {number} [max] The maximum message size. Defaults to 1MB + * @returns {*} An iterable transform. + */ module.exports = max => { max = max || MAX_MSG_SIZE diff --git a/src/stream.js b/src/stream.js index c955d70..4c11f7a 100644 --- a/src/stream.js +++ b/src/stream.js @@ -6,7 +6,16 @@ const log = require('debug')('libp2p:mplex:stream') const pushable = require('it-pushable') const { InitiatorMessageTypes, ReceiverMessageTypes } = require('./message-types') -module.exports = ({ id, name, send, onEnd = (() => {}), type = 'initiator' }) => { +/** + * @param {object} options + * @param {number} options.id + * @param {string} options.name + * @param {function(*)} options.send Called to send data through the stream + * @param {function(Error)} [options.onEnd] Called whenever the stream ends + * @param {string} options.type One of ['initiator','receiver']. Defaults to 'initiator' + * @returns {*} A muxed stream + */ +module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator' }) => { const abortController = new AbortController() const resetController = new AbortController() const Types = type === 'initiator' ? InitiatorMessageTypes : ReceiverMessageTypes From 185165a21c7bd289654866a31d2ac83830d6cb0b Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Tue, 17 Sep 2019 17:49:22 +0200 Subject: [PATCH 18/18] docs: fix example --- examples/util.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/util.js b/examples/util.js index ad9e35d..9fc3182 100644 --- a/examples/util.js +++ b/examples/util.js @@ -4,7 +4,8 @@ exports.toIterable = socket => { sink: async source => { try { for await (const chunk of source) { - socket.write(chunk) + // Chunk is a BufferList, pass the underlying buffer to to the socket + socket.write(chunk.slice()) } } catch (err) { // If not an abort then destroy the socket with an error