Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
fix!: upgrade to interface-stream-muxer 2.0.0 (#186)
Browse files Browse the repository at this point in the history
Adds the close() function to the StreamMuxer interface.

Fixes #185

Co-authored-by: achingbrain <alex@achingbrain.net>
  • Loading branch information
paralin and achingbrain authored Jun 28, 2022
1 parent 2578519 commit f11f2ce
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 25 deletions.
11 changes: 6 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
],
"exports": {
".": {
"types": "./src/index.d.ts",
"types": "./dist/src/index.d.ts",
"import": "./dist/src/index.js"
}
},
Expand Down Expand Up @@ -143,9 +143,9 @@
"dependencies": {
"@libp2p/components": "^2.0.0",
"@libp2p/interface-connection": "^2.0.0",
"@libp2p/interface-stream-muxer": "^1.0.1",
"@libp2p/interface-stream-muxer": "^2.0.0",
"@libp2p/logger": "^2.0.0",
"@libp2p/tracked-map": "^1.0.5",
"@libp2p/tracked-map": "^2.0.0",
"abortable-iterator": "^4.0.2",
"any-signal": "^3.0.0",
"err-code": "^3.0.1",
Expand All @@ -157,7 +157,7 @@
"varint": "^6.0.0"
},
"devDependencies": {
"@libp2p/interface-stream-muxer-compliance-tests": "^2.0.0",
"@libp2p/interface-stream-muxer-compliance-tests": "^3.0.1",
"@types/varint": "^6.0.0",
"aegir": "^37.2.0",
"cborg": "^1.8.1",
Expand All @@ -168,6 +168,7 @@
"it-foreach": "^0.1.1",
"it-map": "^1.0.6",
"p-defer": "^4.0.0",
"random-int": "^3.0.0"
"random-int": "^3.0.0",
"typescript": "^4.7.4"
}
}
50 changes: 30 additions & 20 deletions src/mplex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type { Sink } from 'it-stream-types'
import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interface-stream-muxer'
import type { Stream } from '@libp2p/interface-connection'
import type { MplexInit } from './index.js'
import anySignal from 'any-signal'

const log = logger('libp2p:mplex')

Expand Down Expand Up @@ -55,6 +56,7 @@ export class MplexStreamMuxer implements StreamMuxer {
private readonly _streams: { initiators: Map<number, MplexStream>, receivers: Map<number, MplexStream> }
private readonly _init: MplexStreamMuxerInit
private readonly _source: { push: (val: Message) => void, end: (err?: Error) => void }
private readonly closeController: AbortController

constructor (components: Components, init?: MplexStreamMuxerInit) {
init = init ?? {}
Expand Down Expand Up @@ -83,12 +85,15 @@ export class MplexStreamMuxer implements StreamMuxer {
const source = this._createSource()
this._source = source
this.source = source
}

init (components: Components) {

/**
* Close controller
*/
this.closeController = new AbortController()
}

init (components: Components) {}

/**
* Returns a Map of streams and their ids
*/
Expand All @@ -109,12 +114,29 @@ export class MplexStreamMuxer implements StreamMuxer {
* provided, the id of the stream will be used.
*/
newStream (name?: string): Stream {
if (this.closeController.signal.aborted) {
throw new Error('Muxer already closed')
}
const id = this._streamId++
name = name == null ? id.toString() : name.toString()
const registry = this._streams.initiators
return this._newStream({ id, name, type: 'initiator', registry })
}

/**
* Close or abort all tracked streams and stop the muxer
*/
close (err?: Error | undefined): void {
if (this.closeController.signal.aborted) return

if (err != null) {
this.streams.forEach(s => s.abort(err))
} else {
this.streams.forEach(s => s.close())
}
this.closeController.abort()
}

/**
* Called whenever an inbound stream is created
*/
Expand Down Expand Up @@ -177,9 +199,12 @@ export class MplexStreamMuxer implements StreamMuxer {
*/
_createSink () {
const sink: Sink<Uint8Array> = async source => {
// see: https://github.com/jacobheun/any-signal/pull/18
const abortSignals = [this.closeController.signal]
if (this._init.signal != null) {
source = abortableSource(source, this._init.signal)
abortSignals.push(this._init.signal)
}
source = abortableSource(source, anySignal(abortSignals))

try {
await pipe(
Expand Down Expand Up @@ -209,22 +234,7 @@ export class MplexStreamMuxer implements StreamMuxer {
*/
_createSource () {
const onEnd = (err?: Error) => {
const { initiators, receivers } = this._streams
// Abort all the things!
for (const s of initiators.values()) {
if (err != null) {
s.abort(err)
} else {
s.close()
}
}
for (const s of receivers.values()) {
if (err != null) {
s.abort(err)
} else {
s.close()
}
}
this.close(err)
}
const source = pushableV<Message>({
objectMode: true,
Expand Down

0 comments on commit f11f2ce

Please sign in to comment.