Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

Commit

Permalink
fix: use abstract stream class from muxer interface module (#165)
Browse files Browse the repository at this point in the history
Refactors the stream class in this module to use the abstract superclass from `@libp2p/interface-stream-muxer` as it handles all the various scenarios for closing streams which this module does not.

Fixes #164
  • Loading branch information
achingbrain authored May 17, 2023
1 parent 0e0cc8c commit 32f68de
Show file tree
Hide file tree
Showing 8 changed files with 522 additions and 552 deletions.
51 changes: 51 additions & 0 deletions .aegir.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,59 @@
import { createLibp2p } from 'libp2p'
import { circuitRelayServer } from 'libp2p/circuit-relay'
import { identifyService } from 'libp2p/identify'
import { webSockets } from '@libp2p/websockets'
import { noise } from '@chainsafe/libp2p-noise'
import { yamux } from '@chainsafe/libp2p-yamux'

export default {
build: {
config: {
platform: 'node'
},
bundlesizeMax: '117KB'
},
test: {
before: async () => {
// start a relay node for use in the tests
const relay = await createLibp2p({
addresses: {
listen: [
'/ip4/127.0.0.1/tcp/0/ws'
]
},
transports: [
webSockets()
],
connectionEncryption: [
noise()
],
streamMuxers: [
yamux()
],
services: {
relay: circuitRelayServer({
reservations: {
maxReservations: Infinity
}
}),
identify: identifyService()
},
connectionManager: {
minConnections: 0
}
})

const multiaddrs = relay.getMultiaddrs().map(ma => ma.toString())

return {
relay,
env: {
RELAY_MULTIADDR: multiaddrs[0]
}
}
},
after: async (_, before) => {
await before.relay.stop()
}
}
}
13 changes: 10 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@
"@libp2p/interface-metrics": "^4.0.8",
"@libp2p/interface-peer-id": "^2.0.2",
"@libp2p/interface-registrar": "^2.0.12",
"@libp2p/interface-stream-muxer": "^4.0.1",
"@libp2p/interface-stream-muxer": "^4.1.2",
"@libp2p/interface-transport": "^4.0.3",
"@libp2p/interfaces": "^3.3.2",
"@libp2p/logger": "^2.0.7",
Expand All @@ -150,7 +150,6 @@
"abortable-iterator": "^5.0.1",
"detect-browser": "^5.3.0",
"it-length-prefixed": "^9.0.1",
"it-merge": "^3.0.0",
"it-pb-stream": "^4.0.1",
"it-pipe": "^3.0.1",
"it-pushable": "^3.1.3",
Expand All @@ -164,11 +163,19 @@
"uint8arrays": "^4.0.3"
},
"devDependencies": {
"@chainsafe/libp2p-yamux": "^4.0.1",
"@libp2p/interface-libp2p": "^3.1.0",
"@libp2p/interface-mocks": "^12.0.1",
"@libp2p/peer-id-factory": "^2.0.3",
"@libp2p/websockets": "^6.0.1",
"@types/sinon": "^10.0.14",
"aegir": "^39.0.6",
"aegir": "^39.0.7",
"delay": "^5.0.0",
"it-all": "^3.0.2",
"it-length": "^3.0.2",
"it-map": "^3.0.3",
"it-pair": "^2.0.6",
"libp2p": "^0.45.0",
"protons": "^7.0.2",
"sinon": "^15.0.4",
"sinon-ts": "^1.0.0"
Expand Down
135 changes: 69 additions & 66 deletions src/muxer.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import { type DataChannelOpts, WebRTCStream } from './stream.js'
import { createStream } from './stream.js'
import { nopSink, nopSource } from './util.js'
import type { DataChannelOpts } from './stream.js'
import type { Stream } from '@libp2p/interface-connection'
import type { CounterGroup } from '@libp2p/interface-metrics'
import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface-stream-muxer'
import type { Source, Sink } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'

const PROTOCOL = '/webrtc'

export interface DataChannelMuxerFactoryInit {
/**
* WebRTC Peer Connection
Expand All @@ -21,50 +24,73 @@ export interface DataChannelMuxerFactoryInit {
* Data channel options
*/
dataChannelOptions?: Partial<DataChannelOpts>

/**
* The protocol to use
*/
protocol?: string
}

export class DataChannelMuxerFactory implements StreamMuxerFactory {
public readonly protocol: string

/**
* WebRTC Peer Connection
*/
private streamBuffer: WebRTCStream[] = []
private readonly peerConnection: RTCPeerConnection
private streamBuffer: Stream[] = []
private readonly metrics?: CounterGroup
private readonly dataChannelOptions?: Partial<DataChannelOpts>

constructor (init: DataChannelMuxerFactoryInit) {
this.peerConnection = init.peerConnection
this.metrics = init.metrics
this.protocol = init.protocol ?? PROTOCOL
this.dataChannelOptions = init.dataChannelOptions

constructor (readonly init: DataChannelMuxerFactoryInit, readonly protocol = '/webrtc') {
// store any datachannels opened before upgrade has been completed
this.init.peerConnection.ondatachannel = ({ channel }) => {
const stream = new WebRTCStream({
this.peerConnection.ondatachannel = ({ channel }) => {
const stream = createStream({
channel,
stat: {
direction: 'inbound',
timeline: { open: 0 }
},
direction: 'inbound',
dataChannelOptions: init.dataChannelOptions,
closeCb: (_stream) => {
this.streamBuffer = this.streamBuffer.filter(s => !_stream.eq(s))
onEnd: () => {
this.streamBuffer = this.streamBuffer.filter(s => s.id !== stream.id)
}
})
this.streamBuffer.push(stream)
}
}

createStreamMuxer (init?: StreamMuxerInit | undefined): StreamMuxer {
return new DataChannelMuxer(this.init, this.streamBuffer, this.protocol, init)
createStreamMuxer (init?: StreamMuxerInit): StreamMuxer {
return new DataChannelMuxer({
...init,
peerConnection: this.peerConnection,
dataChannelOptions: this.dataChannelOptions,
metrics: this.metrics,
streams: this.streamBuffer,
protocol: this.protocol
})
}
}

export interface DataChannelMuxerInit extends DataChannelMuxerFactoryInit, StreamMuxerInit {
streams: Stream[]
}

/**
* A libp2p data channel stream muxer
*/
export class DataChannelMuxer implements StreamMuxer {
/**
* Array of streams in the data channel
*/
streams: Stream[] = []
public streams: Stream[]
public protocol: string

/**
* Initialized stream muxer
*/
init?: StreamMuxerInit
private readonly peerConnection: RTCPeerConnection
private readonly dataChannelOptions?: DataChannelOpts
private readonly metrics?: CounterGroup

/**
* Close or abort all tracked streams and stop the muxer
Expand All @@ -81,45 +107,37 @@ export class DataChannelMuxer implements StreamMuxer {
*/
sink: Sink<Source<Uint8Array | Uint8ArrayList>, Promise<void>> = nopSink

constructor (readonly dataChannelMuxer: DataChannelMuxerFactoryInit, streams: Stream[], readonly protocol: string = '/webrtc', init?: StreamMuxerInit) {
/**
* Initialized stream muxer
*/
this.init = init
constructor (readonly init: DataChannelMuxerInit) {
this.streams = init.streams
this.peerConnection = init.peerConnection
this.protocol = init.protocol ?? PROTOCOL
this.metrics = init.metrics

/**
* Fired when a data channel has been added to the connection has been
* added by the remote peer.
*
* {@link https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/datachannel_event}
*/
this.dataChannelMuxer.peerConnection.ondatachannel = ({ channel }) => {
const stream = new WebRTCStream({
this.peerConnection.ondatachannel = ({ channel }) => {
const stream = createStream({
channel,
stat: {
direction: 'inbound',
timeline: {
open: 0
}
},
dataChannelOptions: dataChannelMuxer.dataChannelOptions,
closeCb: this.wrapStreamEnd(init?.onIncomingStream)
direction: 'inbound',
dataChannelOptions: this.dataChannelOptions,
onEnd: () => {
this.streams = this.streams.filter(s => s.id !== stream.id)
this.metrics?.increment({ stream_end: true })
init?.onStreamEnd?.(stream)
}
})

this.streams.push(stream)
if ((init?.onIncomingStream) != null) {
this.dataChannelMuxer.metrics?.increment({ incoming_stream: true })
this.metrics?.increment({ incoming_stream: true })
init.onIncomingStream(stream)
}
}

// wrap open streams with the onStreamEnd callback
this.streams = streams
.filter(stream => stream.stat.timeline.close == null)
.map(stream => {
(stream as WebRTCStream).closeCb = this.wrapStreamEnd(init?.onStreamEnd)
return stream
})
const onIncomingStream = init?.onIncomingStream
if (onIncomingStream != null) {
this.streams.forEach(s => { onIncomingStream(s) })
Expand All @@ -128,35 +146,20 @@ export class DataChannelMuxer implements StreamMuxer {

newStream (): Stream {
// The spec says the label SHOULD be an empty string: https://github.com/libp2p/specs/blob/master/webrtc/README.md#rtcdatachannel-label
const channel = this.dataChannelMuxer.peerConnection.createDataChannel('')
const closeCb = (stream: Stream): void => {
this.dataChannelMuxer.metrics?.increment({ stream_end: true })
this.init?.onStreamEnd?.(stream)
}
const stream = new WebRTCStream({
const channel = this.peerConnection.createDataChannel('')
const stream = createStream({
channel,
stat: {
direction: 'outbound',
timeline: {
open: 0
}
},
dataChannelOptions: this.dataChannelMuxer.dataChannelOptions,
closeCb: this.wrapStreamEnd(closeCb)
direction: 'outbound',
dataChannelOptions: this.dataChannelOptions,
onEnd: () => {
this.streams = this.streams.filter(s => s.id !== stream.id)
this.metrics?.increment({ stream_end: true })
this.init?.onStreamEnd?.(stream)
}
})
this.streams.push(stream)
this.dataChannelMuxer.metrics?.increment({ outgoing_stream: true })
this.metrics?.increment({ outgoing_stream: true })

return stream
}

private wrapStreamEnd (onStreamEnd?: (s: Stream) => void): (stream: Stream) => void {
const self = this
return (_stream) => {
self.streams = self.streams.filter(s => !(_stream instanceof WebRTCStream && (_stream).eq(s)))
if (onStreamEnd != null) {
onStreamEnd(_stream)
}
}
}
}
5 changes: 3 additions & 2 deletions src/private-to-public/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import { fromString as uint8arrayFromString } from 'uint8arrays/from-string'
import { dataChannelError, inappropriateMultiaddr, unimplemented, invalidArgument } from '../error.js'
import { WebRTCMultiaddrConnection } from '../maconn.js'
import { DataChannelMuxerFactory } from '../muxer.js'
import { type DataChannelOpts, WebRTCStream } from '../stream.js'
import { createStream } from '../stream.js'
import { isFirefox } from '../util.js'
import * as sdp from './sdp.js'
import { genUfrag } from './util.js'
import type { WebRTCDialOptions } from './options.js'
import type { DataChannelOpts } from '../stream.js'
import type { Connection } from '@libp2p/interface-connection'
import type { CounterGroup, Metrics } from '@libp2p/interface-metrics'
import type { PeerId } from '@libp2p/interface-peer-id'
Expand Down Expand Up @@ -190,7 +191,7 @@ export class WebRTCDirectTransport implements Transport {
// we pass in undefined for these parameters.
const noise = Noise({ prologueBytes: fingerprintsPrologue })()

const wrappedChannel = new WebRTCStream({ channel: handshakeDataChannel, stat: { direction: 'inbound', timeline: { open: 1 } }, dataChannelOptions: this.init.dataChannel })
const wrappedChannel = createStream({ channel: handshakeDataChannel, direction: 'inbound', dataChannelOptions: this.init.dataChannel })
const wrappedDuplex = {
...wrappedChannel,
sink: wrappedChannel.sink.bind(wrappedChannel),
Expand Down
Loading

0 comments on commit 32f68de

Please sign in to comment.