diff --git a/README.md b/README.md index 49897fc..8d1ecac 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,9 @@ npm install @libp2p/mplex import { Mplex } from '@libp2p/mplex' import { pipe } from 'it-pipe' -const muxer = new Mplex({ +const factory = new Mplex() + +const muxer = factory.createStreamMuxer(components, { onStream: stream => { // Receive a duplex stream from the remote // ...receive data from the remote and optionally send data back }, @@ -46,7 +48,16 @@ pipe([1, 2, 3], stream) ## API -### `const muxer = new Mplex([options])` +### `const factory = new Mplex([options])` + +Creates a factory that can be used to create new muxers. + +`options` is an optional `Object` that may have the following properties: + +* `maxMsgSize` - a number that defines how large mplex data messages can be in bytes, if messages are larger than this they will be sent as multiple messages (default: 1048576 - e.g. 1MB) +* `maxStreamsPerConnection` - a number that defines how many streams are allowed per connection (default: 1024) + +### `const muxer = factory.createStreamMuxer(components, [options])` Create a new _duplex_ stream that can be piped together with a connection in order to allow multiplexed communications. diff --git a/src/index.ts b/src/index.ts index 8e2659b..d9fc2b7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,14 +2,33 @@ import type { Components } from '@libp2p/interfaces/components' import type { StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interfaces/stream-muxer' import { MplexStreamMuxer } from './mplex.js' -export interface MplexInit extends StreamMuxerInit { +export interface MplexInit { + /** + * The maximum size of message that can be sent in one go in bytes. + * Messages larger than this will be split into multiple smaller + * messages. + */ maxMsgSize?: number + + /** + * The maximum number of multiplexed streams that can be open at any + * one time. An attempt to open more than this will throw. + */ + maxStreamsPerConnection?: number } export class Mplex implements StreamMuxerFactory { public protocol = '/mplex/6.7.0' + private readonly init: MplexInit + + constructor (init: MplexInit = {}) { + this.init = init + } - createStreamMuxer (components: Components, init?: MplexInit) { - return new MplexStreamMuxer(components, init) + createStreamMuxer (components: Components, init: StreamMuxerInit = {}) { + return new MplexStreamMuxer(components, { + ...init, + ...this.init + }) } } diff --git a/src/mplex.ts b/src/mplex.ts index 3eefb5d..43ad7bd 100644 --- a/src/mplex.ts +++ b/src/mplex.ts @@ -14,9 +14,12 @@ import type { Components } from '@libp2p/interfaces/components' import type { Sink } from 'it-stream-types' import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interfaces/stream-muxer' import type { Stream } from '@libp2p/interfaces/connection' +import type { MplexInit } from './index.js' const log = logger('libp2p:mplex') +const MAX_STREAMS_PER_CONNECTION = 1024 + function printMessage (msg: Message) { const output: any = { ...msg, @@ -38,9 +41,7 @@ export interface MplexStream extends Stream { source: Pushable } -export interface MplexInit extends StreamMuxerInit { - maxMsgSize?: number -} +interface MplexStreamMuxerInit extends MplexInit, StreamMuxerInit {} export class MplexStreamMuxer implements StreamMuxer { public protocol = '/mplex/6.7.0' @@ -50,10 +51,10 @@ export class MplexStreamMuxer implements StreamMuxer { private _streamId: number private readonly _streams: { initiators: Map, receivers: Map } - private readonly _init: MplexInit + private readonly _init: MplexStreamMuxerInit private readonly _source: { push: (val: Message) => void, end: (err?: Error) => void } - constructor (components: Components, init?: MplexInit) { + constructor (components: Components, init?: MplexStreamMuxerInit) { init = init ?? {} this._streamId = 0 @@ -122,6 +123,12 @@ export class MplexStreamMuxer implements StreamMuxer { } _newStream (options: { id: number, name: string, type: 'initiator' | 'receiver', registry: Map }) { + const maxStreams = this._init.maxStreamsPerConnection ?? MAX_STREAMS_PER_CONNECTION + + if ((this._streams.initiators.size + this._streams.receivers.size) === maxStreams) { + throw errCode(new Error('To many streams open'), 'ERR_TOO_MANY_STREAMS') + } + const { id, name, type, registry } = options log('new %s stream %s %s', type, id, name) diff --git a/test/mplex.spec.ts b/test/mplex.spec.ts new file mode 100644 index 0000000..7ce3251 --- /dev/null +++ b/test/mplex.spec.ts @@ -0,0 +1,57 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 5] */ + +import { expect } from 'aegir/chai' +import { Mplex } from '../src/index.js' +import { Components } from '@libp2p/interfaces/components' +import type { NewStreamMessage } from '../src/message-types.js' +import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import { concat as uint8ArrayConcat } from 'uint8arrays/concat' +import { encode } from '../src/encode.js' +import all from 'it-all' + +describe('mplex', () => { + it('should restrict number of initiator streams per connection', async () => { + const maxStreamsPerConnection = 10 + const factory = new Mplex({ + maxStreamsPerConnection + }) + const components = new Components() + const muxer = factory.createStreamMuxer(components) + + // max out the streams for this connection + for (let i = 0; i < maxStreamsPerConnection; i++) { + muxer.newStream() + } + + // open one more + expect(() => muxer.newStream()).to.throw().with.property('code', 'ERR_TOO_MANY_STREAMS') + }) + + it('should restrict number of recipient streams per connection', async () => { + const maxStreamsPerConnection = 10 + const factory = new Mplex({ + maxStreamsPerConnection + }) + const components = new Components() + const muxer = factory.createStreamMuxer(components) + + // max out the streams for this connection + for (let i = 0; i < maxStreamsPerConnection; i++) { + muxer.newStream() + } + + // simulate a new incoming stream + const source: NewStreamMessage[] = [{ + id: 17, + type: 0, + data: uint8ArrayFromString('17') + }] + + const data = uint8ArrayConcat(await all(encode(source))) + + await muxer.sink([data]) + + await expect(all(muxer.source)).to.eventually.be.rejected.with.property('code', 'ERR_TOO_MANY_STREAMS') + }) +})