Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
fix: ensure stream closes on abort or reset (#116)
Browse files Browse the repository at this point in the history
* fix: ensure stream closes on abort or reset

* fix: more aggressive closures

* fix: abortcontroller doesnt take an error

test: verify abort/reset errors are correct
  • Loading branch information
jacobheun authored Oct 22, 2020
1 parent a9559cd commit 77835b3
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 20 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
},
"homepage": "https://github.com/libp2p/js-libp2p-mplex#readme",
"devDependencies": {
"aegir": "^25.0.0",
"aegir": "^28.0.0",
"interface-stream-muxer": "^0.8.0",
"p-defer": "^3.0.0",
"random-bytes": "^1.0.0",
Expand All @@ -55,6 +55,7 @@
"abortable-iterator": "^3.0.0",
"bl": "^4.0.0",
"debug": "^4.1.1",
"err-code": "^2.0.3",
"it-pipe": "^1.0.1",
"it-pushable": "^1.3.1",
"varint": "^5.0.0"
Expand Down
1 change: 1 addition & 0 deletions src/coder/decode.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class Decoder {

/**
* Attempts to decode the message header from the buffer
*
* @private
* @param {Uint8Array} data
* @returns {*} message header (id, type, offset, length)
Expand Down
3 changes: 2 additions & 1 deletion src/coder/encode.browser.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ class Encoder {

/**
* Encodes the given message and returns it and its header
* @param {*} msg The message object to encode
*
* @param {*} msg - The message object to encode
* @returns {Uint8Array|Uint8Array[]}
*/
write (msg) {
Expand Down
3 changes: 2 additions & 1 deletion src/coder/encode.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ class Encoder {

/**
* Encodes the given message and returns it and its header
* @param {*} msg The message object to encode
*
* @param {*} msg - The message object to encode
* @returns {Buffer|Buffer[]}
*/
write (msg) {
Expand Down
22 changes: 14 additions & 8 deletions src/mplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ const createStream = require('./stream')

class Mplex {
/**
* @constructor
* @class
* @param {object} options
* @param {function(*)} options.onStream Called whenever an inbound stream is created
* @param {function(*)} options.onStreamEnd Called whenever a stream ends
* @param {AbortSignal} options.signal An AbortController signal
* @param {function(*)} options.onStream - Called whenever an inbound stream is created
* @param {function(*)} options.onStreamEnd - Called whenever a stream ends
* @param {AbortSignal} options.signal - An AbortController signal
*/
constructor (options) {
options = options || {}
Expand Down Expand Up @@ -45,18 +45,19 @@ class Mplex {
this.source = this._createSource()

/**
* @property {function} onStream
* @property {Function} onStream
*/
this.onStream = options.onStream

/**
* @property {function} onStreamEnd
* @property {Function} onStreamEnd
*/
this.onStreamEnd = options.onStreamEnd
}

/**
* Returns a Map of streams and their ids
*
* @returns {Map<number,*>}
*/
get streams () {
Expand All @@ -74,7 +75,8 @@ class Mplex {
/**
* Initiate a new stream with the given name. If no name is
* provided, the id of th stream will be used.
* @param {string} [name] If name is not a string it will be cast to one
*
* @param {string} [name] - If name is not a string it will be cast to one
* @returns {Stream}
*/
newStream (name) {
Expand All @@ -86,6 +88,7 @@ class Mplex {

/**
* Called whenever an inbound stream is created
*
* @private
* @param {*} options
* @param {number} options.id
Expand All @@ -99,12 +102,13 @@ class Mplex {

/**
* Creates a new stream
*
* @private
* @param {object} options
* @param {number} options.id
* @param {string} options.name
* @param {string} options.type
* @param {Map<number, *>} options.registry A map of streams to their ids
* @param {Map<number, *>} options.registry - A map of streams to their ids
* @returns {*} A muxed stream
*/
_newStream ({ id, name, type, registry }) {
Expand All @@ -131,6 +135,7 @@ class Mplex {
/**
* Creates a sink with an abortable source. Incoming messages will
* also have their size restricted. All messages will be varint decoded.
*
* @private
* @returns {*} Returns an iterable sink
*/
Expand Down Expand Up @@ -165,6 +170,7 @@ class Mplex {
/**
* Creates a source that restricts outgoing message sizes
* and varint encodes them.
*
* @private
* @returns {*} An iterable source
*/
Expand Down
3 changes: 2 additions & 1 deletion src/restrict-size.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ const MAX_MSG_SIZE = 1 << 20 // 1MB
/**
* Creates an iterable transform that restricts message sizes to
* the given maximum size.
* @param {number} [max] The maximum message size. Defaults to 1MB
*
* @param {number} [max] - The maximum message size. Defaults to 1MB
* @returns {*} An iterable transform.
*/
module.exports = max => {
Expand Down
28 changes: 20 additions & 8 deletions src/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@ const AbortController = require('abort-controller')
const log = require('debug')('libp2p:mplex:stream')
const pushable = require('it-pushable')
const BufferList = require('bl/BufferList')
const errCode = require('err-code')
const { MAX_MSG_SIZE } = require('./restrict-size')
const { InitiatorMessageTypes, ReceiverMessageTypes } = require('./message-types')

const ERR_MPLEX_STREAM_RESET = 'ERR_MPLEX_STREAM_RESET'
const ERR_MPLEX_STREAM_ABORT = 'ERR_MPLEX_STREAM_ABORT'

/**
* @param {object} options
* @param {number} options.id
* @param {string} options.name
* @param {function(*)} options.send Called to send data through the stream
* @param {function(Error)} [options.onEnd] Called whenever the stream ends
* @param {string} [options.type] One of ['initiator','receiver']. Defaults to 'initiator'
* @param {number} [options.maxMsgSize] Max size of an mplex message in bytes. Writes > size are automatically split. Defaults to 1MB
* @param {function(*)} options.send - Called to send data through the stream
* @param {function(Error)} [options.onEnd] - Called whenever the stream ends
* @param {string} [options.type] - One of ['initiator','receiver']. Defaults to 'initiator'
* @param {number} [options.maxMsgSize] - Max size of an mplex message in bytes. Writes > size are automatically split. Defaults to 1MB
* @returns {*} A muxed stream
*/
module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator', maxMsgSize = MAX_MSG_SIZE }) => {
Expand All @@ -31,6 +35,7 @@ module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator', maxMsg
let endErr

const onSourceEnd = err => {
if (sourceEnded) return
sourceEnded = true
log('%s stream %s source end', type, name, err)
if (err && !endErr) endErr = err
Expand All @@ -41,6 +46,7 @@ module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator', maxMsg
}

const onSinkEnd = err => {
if (sinkEnded) return
sinkEnded = true
log('%s stream %s sink end', type, name, err)
if (err && !endErr) endErr = err
Expand All @@ -59,13 +65,19 @@ module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator', maxMsg
// End the source with the passed error
stream.source.end(err)
abortController.abort()
onSinkEnd(err)
},
// Close immediately for reading and writing (remote error)
reset: () => resetController.abort(),
reset: () => {
const err = errCode(new Error('stream reset'), ERR_MPLEX_STREAM_RESET)
resetController.abort()
stream.source.end(err)
onSinkEnd(err)
},
sink: async source => {
source = abortable(source, [
{ signal: abortController.signal, options: { abortMessage: 'stream aborted', abortCode: 'ERR_MPLEX_STREAM_ABORT' } },
{ signal: resetController.signal, options: { abortMessage: 'stream reset', abortCode: 'ERR_MPLEX_STREAM_RESET' } }
{ signal: abortController.signal, options: { abortMessage: 'stream aborted', abortCode: ERR_MPLEX_STREAM_ABORT } },
{ signal: resetController.signal, options: { abortMessage: 'stream reset', abortCode: ERR_MPLEX_STREAM_RESET } }
])

if (type === 'initiator') { // If initiator, open a new stream
Expand All @@ -86,7 +98,7 @@ module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator', maxMsg
}
} catch (err) {
// Send no more data if this stream was remotely reset
if (err.code === 'ERR_MPLEX_STREAM_RESET') {
if (err.code === ERR_MPLEX_STREAM_RESET) {
log('%s stream %s reset', type, name)
} else {
log('%s stream %s error', type, name, err)
Expand Down
30 changes: 30 additions & 0 deletions test/stream.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,36 @@ describe('stream', () => {
expect(msgs[0].data).to.deep.equal(name)
})

it('should end a stream when it is aborted', async () => {
const msgs = []
const mockSend = msg => msgs.push(msg)
const id = randomInt(1000)
const name = `STREAM${Date.now()}`
const deferred = defer()
const stream = createStream({ id, name, onEnd: deferred.resolve, send: mockSend })

const error = new Error('boom')
stream.abort(error)

const err = await deferred.promise
expect(err).to.equal(error)
})

it('should end a stream when it is reset', async () => {
const msgs = []
const mockSend = msg => msgs.push(msg)
const id = randomInt(1000)
const name = `STREAM${Date.now()}`
const deferred = defer()
const stream = createStream({ id, name, onEnd: deferred.resolve, send: mockSend })

stream.reset()

const err = await deferred.promise
expect(err).to.exist()
expect(err).to.have.property('code', 'ERR_MPLEX_STREAM_RESET')
})

it('should send data with MESSAGE_INITIATOR messages if stream initiator', async () => {
const msgs = []
const mockSend = msg => msgs.push(msg)
Expand Down

0 comments on commit 77835b3

Please sign in to comment.