Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

fix: add per-connection stream limit #173

Merged
merged 3 commits into from
Jun 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ npm install @libp2p/mplex
import { Mplex } from '@libp2p/mplex'
import { pipe } from 'it-pipe'

const muxer = new Mplex({
const factory = new Mplex()

const muxer = factory.createStreamMuxer(components, {
onStream: stream => { // Receive a duplex stream from the remote
// ...receive data from the remote and optionally send data back
},
Expand All @@ -46,7 +48,16 @@ pipe([1, 2, 3], stream)

## API

### `const muxer = new Mplex([options])`
### `const factory = new Mplex([options])`

Creates a factory that can be used to create new muxers.

`options` is an optional `Object` that may have the following properties:

* `maxMsgSize` - a number that defines how large mplex data messages can be in bytes, if messages are larger than this they will be sent as multiple messages (default: 1048576 - e.g. 1MB)
* `maxStreamsPerConnection` - a number that defines how many streams are allowed per connection (default: 1024)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default values listed in these docs can get out of date. Can/should we link to the authoritative source?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, generating docs from the interface might be a better approach here.


### `const muxer = factory.createStreamMuxer(components, [options])`

Create a new _duplex_ stream that can be piped together with a connection in order to allow multiplexed communications.

Expand Down
25 changes: 22 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,33 @@ import type { Components } from '@libp2p/interfaces/components'
import type { StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interfaces/stream-muxer'
import { MplexStreamMuxer } from './mplex.js'

export interface MplexInit extends StreamMuxerInit {
export interface MplexInit {
/**
* The maximum size of message that can be sent in one go in bytes.
* Messages larger than this will be split into multiple smaller
* messages.
*/
maxMsgSize?: number

/**
* The maximum number of multiplexed streams that can be open at any
* one time. An attempt to open more than this will throw.
*/
maxStreamsPerConnection?: number
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newbie question here: I assume it's not possible to just put the default here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the interface is removed at compile time so it can't have default values.

}

export class Mplex implements StreamMuxerFactory {
public protocol = '/mplex/6.7.0'
private readonly init: MplexInit

constructor (init: MplexInit = {}) {
this.init = init
}

createStreamMuxer (components: Components, init?: MplexInit) {
return new MplexStreamMuxer(components, init)
createStreamMuxer (components: Components, init: StreamMuxerInit = {}) {
return new MplexStreamMuxer(components, {
...init,
...this.init
})
}
}
17 changes: 12 additions & 5 deletions src/mplex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ import type { Components } from '@libp2p/interfaces/components'
import type { Sink } from 'it-stream-types'
import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interfaces/stream-muxer'
import type { Stream } from '@libp2p/interfaces/connection'
import type { MplexInit } from './index.js'

const log = logger('libp2p:mplex')

const MAX_STREAMS_PER_CONNECTION = 1024
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newbie question: is the TypeScript way of doing things to define defaults here rather than where the override can be set? For example, I was surprised not to see this closer to MplexInit.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, how did we pick this default? How do we know it's a good value?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MplexInit is defined in index.ts, exporting it for import into mplex.ts would also expose it to people importing @libp2p/mplex. It could be the other way round though, defined in mplex.ts and imported into index.ts.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, how did we pick this default?

Completely arbitrary. It's a lot bigger than go-libp2p but we need to start somewhere. We might end up splitting it into separate incoming/outgoing stream limits too.


function printMessage (msg: Message) {
const output: any = {
...msg,
Expand All @@ -38,9 +41,7 @@ export interface MplexStream extends Stream {
source: Pushable<Uint8Array>
}

export interface MplexInit extends StreamMuxerInit {
maxMsgSize?: number
}
interface MplexStreamMuxerInit extends MplexInit, StreamMuxerInit {}

export class MplexStreamMuxer implements StreamMuxer {
public protocol = '/mplex/6.7.0'
Expand All @@ -50,10 +51,10 @@ export class MplexStreamMuxer implements StreamMuxer {

private _streamId: number
private readonly _streams: { initiators: Map<number, MplexStream>, receivers: Map<number, MplexStream> }
private readonly _init: MplexInit
private readonly _init: MplexStreamMuxerInit
private readonly _source: { push: (val: Message) => void, end: (err?: Error) => void }

constructor (components: Components, init?: MplexInit) {
constructor (components: Components, init?: MplexStreamMuxerInit) {
init = init ?? {}

this._streamId = 0
Expand Down Expand Up @@ -122,6 +123,12 @@ export class MplexStreamMuxer implements StreamMuxer {
}

_newStream (options: { id: number, name: string, type: 'initiator' | 'receiver', registry: Map<number, MplexStream> }) {
const maxStreams = this._init.maxStreamsPerConnection ?? MAX_STREAMS_PER_CONNECTION

if ((this._streams.initiators.size + this._streams.receivers.size) === maxStreams) {
throw errCode(new Error('To many streams open'), 'ERR_TOO_MANY_STREAMS')
}

const { id, name, type, registry } = options

log('new %s stream %s %s', type, id, name)
Expand Down
57 changes: 57 additions & 0 deletions test/mplex.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/* eslint-env mocha */
/* eslint max-nested-callbacks: ["error", 5] */

import { expect } from 'aegir/chai'
import { Mplex } from '../src/index.js'
import { Components } from '@libp2p/interfaces/components'
import type { NewStreamMessage } from '../src/message-types.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { concat as uint8ArrayConcat } from 'uint8arrays/concat'
import { encode } from '../src/encode.js'
import all from 'it-all'

describe('mplex', () => {
it('should restrict number of initiator streams per connection', async () => {
const maxStreamsPerConnection = 10
const factory = new Mplex({
maxStreamsPerConnection
})
const components = new Components()
const muxer = factory.createStreamMuxer(components)

// max out the streams for this connection
for (let i = 0; i < maxStreamsPerConnection; i++) {
muxer.newStream()
}

// open one more
expect(() => muxer.newStream()).to.throw().with.property('code', 'ERR_TOO_MANY_STREAMS')
})

it('should restrict number of recipient streams per connection', async () => {
const maxStreamsPerConnection = 10
const factory = new Mplex({
maxStreamsPerConnection
})
const components = new Components()
const muxer = factory.createStreamMuxer(components)

// max out the streams for this connection
for (let i = 0; i < maxStreamsPerConnection; i++) {
muxer.newStream()
}

// simulate a new incoming stream
const source: NewStreamMessage[] = [{
id: 17,
type: 0,
data: uint8ArrayFromString('17')
}]

const data = uint8ArrayConcat(await all(encode(source)))

await muxer.sink([data])

await expect(all(muxer.source)).to.eventually.be.rejected.with.property('code', 'ERR_TOO_MANY_STREAMS')
})
})