diff --git a/package.json b/package.json index 9d78eec..5b07616 100644 --- a/package.json +++ b/package.json @@ -42,7 +42,7 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-mplex#readme", "devDependencies": { - "aegir": "^25.0.0", + "aegir": "^28.0.0", "interface-stream-muxer": "^0.8.0", "p-defer": "^3.0.0", "random-bytes": "^1.0.0", @@ -55,6 +55,7 @@ "abortable-iterator": "^3.0.0", "bl": "^4.0.0", "debug": "^4.1.1", + "err-code": "^2.0.3", "it-pipe": "^1.0.1", "it-pushable": "^1.3.1", "varint": "^5.0.0" diff --git a/src/coder/decode.js b/src/coder/decode.js index 0c51a0b..60e0425 100644 --- a/src/coder/decode.js +++ b/src/coder/decode.js @@ -57,6 +57,7 @@ class Decoder { /** * Attempts to decode the message header from the buffer + * * @private * @param {Uint8Array} data * @returns {*} message header (id, type, offset, length) diff --git a/src/coder/encode.browser.js b/src/coder/encode.browser.js index 8439b40..162f698 100644 --- a/src/coder/encode.browser.js +++ b/src/coder/encode.browser.js @@ -13,7 +13,8 @@ class Encoder { /** * Encodes the given message and returns it and its header - * @param {*} msg The message object to encode + * + * @param {*} msg - The message object to encode * @returns {Uint8Array|Uint8Array[]} */ write (msg) { diff --git a/src/coder/encode.js b/src/coder/encode.js index 58fa055..f313b7b 100644 --- a/src/coder/encode.js +++ b/src/coder/encode.js @@ -13,7 +13,8 @@ class Encoder { /** * Encodes the given message and returns it and its header - * @param {*} msg The message object to encode + * + * @param {*} msg - The message object to encode * @returns {Buffer|Buffer[]} */ write (msg) { diff --git a/src/mplex.js b/src/mplex.js index 7c258b7..670f1a2 100644 --- a/src/mplex.js +++ b/src/mplex.js @@ -11,11 +11,11 @@ const createStream = require('./stream') class Mplex { /** - * @constructor + * @class * @param {object} options - * @param {function(*)} options.onStream Called whenever an inbound stream is created - * @param {function(*)} options.onStreamEnd Called whenever a stream ends - * @param {AbortSignal} options.signal An AbortController signal + * @param {function(*)} options.onStream - Called whenever an inbound stream is created + * @param {function(*)} options.onStreamEnd - Called whenever a stream ends + * @param {AbortSignal} options.signal - An AbortController signal */ constructor (options) { options = options || {} @@ -45,18 +45,19 @@ class Mplex { this.source = this._createSource() /** - * @property {function} onStream + * @property {Function} onStream */ this.onStream = options.onStream /** - * @property {function} onStreamEnd + * @property {Function} onStreamEnd */ this.onStreamEnd = options.onStreamEnd } /** * Returns a Map of streams and their ids + * * @returns {Map} */ get streams () { @@ -74,7 +75,8 @@ class Mplex { /** * Initiate a new stream with the given name. If no name is * provided, the id of th stream will be used. - * @param {string} [name] If name is not a string it will be cast to one + * + * @param {string} [name] - If name is not a string it will be cast to one * @returns {Stream} */ newStream (name) { @@ -86,6 +88,7 @@ class Mplex { /** * Called whenever an inbound stream is created + * * @private * @param {*} options * @param {number} options.id @@ -99,12 +102,13 @@ class Mplex { /** * Creates a new stream + * * @private * @param {object} options * @param {number} options.id * @param {string} options.name * @param {string} options.type - * @param {Map} options.registry A map of streams to their ids + * @param {Map} options.registry - A map of streams to their ids * @returns {*} A muxed stream */ _newStream ({ id, name, type, registry }) { @@ -131,6 +135,7 @@ class Mplex { /** * Creates a sink with an abortable source. Incoming messages will * also have their size restricted. All messages will be varint decoded. + * * @private * @returns {*} Returns an iterable sink */ @@ -165,6 +170,7 @@ class Mplex { /** * Creates a source that restricts outgoing message sizes * and varint encodes them. + * * @private * @returns {*} An iterable source */ diff --git a/src/restrict-size.js b/src/restrict-size.js index b097ea0..7344777 100644 --- a/src/restrict-size.js +++ b/src/restrict-size.js @@ -5,7 +5,8 @@ const MAX_MSG_SIZE = 1 << 20 // 1MB /** * Creates an iterable transform that restricts message sizes to * the given maximum size. - * @param {number} [max] The maximum message size. Defaults to 1MB + * + * @param {number} [max] - The maximum message size. Defaults to 1MB * @returns {*} An iterable transform. */ module.exports = max => { diff --git a/src/stream.js b/src/stream.js index ddd23ea..1ea5715 100644 --- a/src/stream.js +++ b/src/stream.js @@ -5,17 +5,21 @@ const AbortController = require('abort-controller') const log = require('debug')('libp2p:mplex:stream') const pushable = require('it-pushable') const BufferList = require('bl/BufferList') +const errCode = require('err-code') const { MAX_MSG_SIZE } = require('./restrict-size') const { InitiatorMessageTypes, ReceiverMessageTypes } = require('./message-types') +const ERR_MPLEX_STREAM_RESET = 'ERR_MPLEX_STREAM_RESET' +const ERR_MPLEX_STREAM_ABORT = 'ERR_MPLEX_STREAM_ABORT' + /** * @param {object} options * @param {number} options.id * @param {string} options.name - * @param {function(*)} options.send Called to send data through the stream - * @param {function(Error)} [options.onEnd] Called whenever the stream ends - * @param {string} [options.type] One of ['initiator','receiver']. Defaults to 'initiator' - * @param {number} [options.maxMsgSize] Max size of an mplex message in bytes. Writes > size are automatically split. Defaults to 1MB + * @param {function(*)} options.send - Called to send data through the stream + * @param {function(Error)} [options.onEnd] - Called whenever the stream ends + * @param {string} [options.type] - One of ['initiator','receiver']. Defaults to 'initiator' + * @param {number} [options.maxMsgSize] - Max size of an mplex message in bytes. Writes > size are automatically split. Defaults to 1MB * @returns {*} A muxed stream */ module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator', maxMsgSize = MAX_MSG_SIZE }) => { @@ -31,6 +35,7 @@ module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator', maxMsg let endErr const onSourceEnd = err => { + if (sourceEnded) return sourceEnded = true log('%s stream %s source end', type, name, err) if (err && !endErr) endErr = err @@ -41,6 +46,7 @@ module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator', maxMsg } const onSinkEnd = err => { + if (sinkEnded) return sinkEnded = true log('%s stream %s sink end', type, name, err) if (err && !endErr) endErr = err @@ -59,13 +65,19 @@ module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator', maxMsg // End the source with the passed error stream.source.end(err) abortController.abort() + onSinkEnd(err) }, // Close immediately for reading and writing (remote error) - reset: () => resetController.abort(), + reset: () => { + const err = errCode(new Error('stream reset'), ERR_MPLEX_STREAM_RESET) + resetController.abort() + stream.source.end(err) + onSinkEnd(err) + }, sink: async source => { source = abortable(source, [ - { signal: abortController.signal, options: { abortMessage: 'stream aborted', abortCode: 'ERR_MPLEX_STREAM_ABORT' } }, - { signal: resetController.signal, options: { abortMessage: 'stream reset', abortCode: 'ERR_MPLEX_STREAM_RESET' } } + { signal: abortController.signal, options: { abortMessage: 'stream aborted', abortCode: ERR_MPLEX_STREAM_ABORT } }, + { signal: resetController.signal, options: { abortMessage: 'stream reset', abortCode: ERR_MPLEX_STREAM_RESET } } ]) if (type === 'initiator') { // If initiator, open a new stream @@ -86,7 +98,7 @@ module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator', maxMsg } } catch (err) { // Send no more data if this stream was remotely reset - if (err.code === 'ERR_MPLEX_STREAM_RESET') { + if (err.code === ERR_MPLEX_STREAM_RESET) { log('%s stream %s reset', type, name) } else { log('%s stream %s error', type, name, err) diff --git a/test/stream.spec.js b/test/stream.spec.js index 28e8e60..00f989c 100644 --- a/test/stream.spec.js +++ b/test/stream.spec.js @@ -71,6 +71,36 @@ describe('stream', () => { expect(msgs[0].data).to.deep.equal(name) }) + it('should end a stream when it is aborted', async () => { + const msgs = [] + const mockSend = msg => msgs.push(msg) + const id = randomInt(1000) + const name = `STREAM${Date.now()}` + const deferred = defer() + const stream = createStream({ id, name, onEnd: deferred.resolve, send: mockSend }) + + const error = new Error('boom') + stream.abort(error) + + const err = await deferred.promise + expect(err).to.equal(error) + }) + + it('should end a stream when it is reset', async () => { + const msgs = [] + const mockSend = msg => msgs.push(msg) + const id = randomInt(1000) + const name = `STREAM${Date.now()}` + const deferred = defer() + const stream = createStream({ id, name, onEnd: deferred.resolve, send: mockSend }) + + stream.reset() + + const err = await deferred.promise + expect(err).to.exist() + expect(err).to.have.property('code', 'ERR_MPLEX_STREAM_RESET') + }) + it('should send data with MESSAGE_INITIATOR messages if stream initiator', async () => { const msgs = [] const mockSend = msg => msgs.push(msg)