Skip to content

Commit

Permalink
chore: use libjuice for udp muxing
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain committed Jun 14, 2024
1 parent b572498 commit 03044e0
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 90 deletions.
4 changes: 3 additions & 1 deletion packages/transport-webrtc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
"@peculiar/x509": "^1.11.0",
"any-signal": "^4.1.1",
"detect-browser": "^5.3.0",
"get-port": "^7.1.0",
"it-length-prefixed": "^9.0.4",
"it-protobuf-stream": "^1.1.3",
"it-pushable": "^3.2.3",
Expand Down Expand Up @@ -108,7 +109,8 @@
"browser": {
"./dist/src/webrtc/index.js": "./dist/src/webrtc/index.browser.js",
"./dist/src/private-to-public/listener.js": "./dist/src/private-to-public/listener.browser.js",
"./dist/src/private-to-public/utils/get-rtcpeerconnection.js": "./dist/src/private-to-public/utils/get-rtcpeerconnection.browser.js"
"./dist/src/private-to-public/utils/get-rtcpeerconnection.js": "./dist/src/private-to-public/utils/get-rtcpeerconnection.browser.js",
"node:net": false
},
"react-native": {
"./dist/src/webrtc/index.js": "./dist/src/webrtc/index.react-native.js"
Expand Down
117 changes: 40 additions & 77 deletions packages/transport-webrtc/src/private-to-public/listener.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
import { createSocket } from 'node:dgram'

import { networkInterfaces } from 'node:os'
import { isIPv4, isIPv6 } from '@chainsafe/is-ip'
import { TypedEventEmitter } from '@libp2p/interface'
import { multiaddr, protocols } from '@multiformats/multiaddr'
import { IP4 } from '@multiformats/multiaddr-matcher'
import getPort from 'get-port'
import { sha256 } from 'multiformats/hashes/sha2'
import { pEvent } from 'p-event'
import pWaitFor from 'p-wait-for'
// @ts-expect-error no types
import stun from 'stun'
import { UFRAG_PREFIX } from './constants.js'
import { connect } from './utils/connect.js'
import { generateTransportCertificate } from './utils/generate-certificates.js'
import { type DirectRTCPeerConnection, createDialerRTCPeerConnection } from './utils/get-rtcpeerconnection.js'
import { createDialerRTCPeerConnection } from './utils/get-rtcpeerconnection.js'
import { stunListener } from './utils/stun-listener.js'
import type { DirectRTCPeerConnection } from './utils/get-rtcpeerconnection.js'
import type { StunServer } from './utils/stun-listener.js'
import type { DataChannelOptions, TransportCertificate } from '../index.js'
import type { PeerId, ListenerEvents, Listener, Upgrader, ComponentLogger, Logger, CounterGroup, Metrics, ConnectionHandler } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Socket, RemoteInfo } from 'node:dgram'
import type { AddressInfo } from 'node:net'

/**
* The time to wait, in milliseconds, for the data channel handshake to complete
Expand All @@ -37,6 +35,7 @@ export interface WebRTCDirectListenerInit {
maxInboundStreams?: number
dataChannel?: DataChannelOptions
rtcConfiguration?: RTCConfiguration
useLibjuice?: boolean
}

export interface WebRTCListenerMetrics {
Expand All @@ -48,7 +47,7 @@ const IP4_PROTOCOL = protocols('ip4')
const IP6_PROTOCOL = protocols('ip6')

export class WebRTCDirectListener extends TypedEventEmitter<ListenerEvents> implements Listener {
private socket?: Socket
private server?: StunServer
private readonly multiaddrs: Multiaddr[]
private certificate?: TransportCertificate
private readonly connections: Map<string, DirectRTCPeerConnection>
Expand Down Expand Up @@ -79,9 +78,7 @@ export class WebRTCDirectListener extends TypedEventEmitter<ListenerEvents> impl

async listen (ma: Multiaddr): Promise<void> {
const parts = ma.stringTuples()

const ipVersion = IP4.matches(ma) ? 4 : 6

const host = parts
.filter(([code]) => code === IP4_PROTOCOL.code)
.pop()?.[1] ?? parts
Expand All @@ -91,28 +88,29 @@ export class WebRTCDirectListener extends TypedEventEmitter<ListenerEvents> impl
if (host == null) {
throw new Error('IP4/6 host must be specified in webrtc-direct mulitaddr')
}

const port = parseInt(parts
let port = parseInt(parts
.filter(([code, value]) => code === UDP_PROTOCOL.code)
.pop()?.[1] ?? '')

if (isNaN(port)) {
throw new Error('UDP port must be specified in webrtc-direct mulitaddr')
}

this.socket = createSocket({
type: `udp${ipVersion}`,
reuseAddr: true
})

try {
this.socket.bind(port, host)
await pEvent(this.socket, 'listening')
} catch (err) {
this.socket.close()
throw err
if (port === 0 && this.init.useLibjuice !== false) {
// libjuice doesn't map 0 to a random free port so we have to do it
// ourselves
port = await getPort()
}

this.server = await stunListener(host, port, ipVersion, this.log, (ufrag, pwd, remoteHost, remotePort) => {
this.incomingConnection(ufrag, pwd, remoteHost, remotePort)
.catch(err => {
this.log.error('error processing incoming STUN request', err)
})
}, {
useLibjuice: this.init.useLibjuice
})

let certificate = this.certificate

if (certificate == null) {
Expand All @@ -126,44 +124,17 @@ export class WebRTCDirectListener extends TypedEventEmitter<ListenerEvents> impl
})
}

const address = this.socket.address()
const address = this.server.address()

getNetworkAddresses(address, ipVersion).forEach((ma) => {
getNetworkAddresses(address.address, address.port, ipVersion).forEach((ma) => {
this.multiaddrs.push(multiaddr(`${ma}/webrtc-direct/certhash/${certificate.certhash}`))
})

this.socket.on('message', (msg, rinfo) => {
try {
this.log('incoming STUN packet from %o', rinfo)
const response = stun.decode(msg)
// TODO: this needs to be rate limited keyed by the remote host to
// prevent a DOS attack
this.incomingConnection(response, rinfo, certificate).catch(err => {
this.log.error('could not process incoming STUN data from %o', rinfo, err)
})
} catch (err) {
this.log.error('could not process incoming STUN data from %o', rinfo, err)
}
})

this.socket.on('close', () => {
this.safeDispatchEvent('close')
})

this.safeDispatchEvent('listening')
}

private async incomingConnection (stunMessage: any, rinfo: RemoteInfo, certificate: TransportCertificate): Promise<void> {
const usernameAttribute = stunMessage.getAttribute(stun.constants.STUN_ATTR_USERNAME)
const username: string | undefined = usernameAttribute?.value?.toString()

if (username == null || !username.startsWith(UFRAG_PREFIX)) {
this.log.trace('ufrag missing from incoming STUN message from %s:%s', rinfo.address, rinfo.port)
return
}

const ufrag = username.split(':')[0]
const key = `${rinfo.address}:${rinfo.port}:${ufrag}`
private async incomingConnection (ufrag: string, pwd: string, remoteHost: string, remotePort: number): Promise<void> {
const key = `${remoteHost}:${remotePort}:${ufrag}`
let peerConnection = this.connections.get(key)

if (peerConnection != null) {
Expand Down Expand Up @@ -191,14 +162,14 @@ export class WebRTCDirectListener extends TypedEventEmitter<ListenerEvents> impl
})

try {
const conn = await connect(peerConnection, ufrag, {
const conn = await connect(peerConnection, ufrag, pwd, {
role: 'initiator',
log: this.log,
logger: this.components.logger,
metrics: this.components.metrics,
events: this.metrics?.listenerEvents,
signal: AbortSignal.timeout(HANDSHAKE_TIMEOUT_MS),
remoteAddr: multiaddr(`/${rinfo.family === 'IPv4' ? 'ip4' : 'ip6'}/${rinfo.address}/udp/${rinfo.port}`),
remoteAddr: multiaddr(`/ip${isIPv4(remoteHost) ? 4 : 6}/${remoteHost}/udp/${remotePort}`),
hashCode: sha256.code,
dataChannel: this.init.dataChannel,
upgrader: this.init.upgrader,
Expand All @@ -222,28 +193,20 @@ export class WebRTCDirectListener extends TypedEventEmitter<ListenerEvents> impl
connection.close()
}

await Promise.all([
new Promise<void>((resolve) => {
if (this.socket == null) {
resolve()
return
}
await this.server?.close()

this.socket.close(() => {
resolve()
})
}),
// RTCPeerConnections will be removed from the connections map when their
// connection state changes to 'closed'/'disconnected'/'failed
pWaitFor(() => {
return this.connections.size === 0
})
])
// RTCPeerConnections will be removed from the connections map when their
// connection state changes to 'closed'/'disconnected'/'failed
await pWaitFor(() => {
return this.connections.size === 0
})

this.safeDispatchEvent('close')
}
}

function getNetworkAddresses (host: AddressInfo, version: 4 | 6): string[] {
if (host.address === '0.0.0.0' || host.address === '::1') {
function getNetworkAddresses (host: string, port: number, version: 4 | 6): string[] {
if (host === '0.0.0.0' || host === '::1') {
// return all ip4 interfaces
return Object.entries(networkInterfaces())
.flatMap(([_, addresses]) => addresses)
Expand All @@ -263,10 +226,10 @@ function getNetworkAddresses (host: AddressInfo, version: 4 | 6): string[] {

return false
})
.map(address => `/ip${version}/${address}/udp/${host.port}`)
.map(address => `/ip${version}/${address}/udp/${port}`)
}

return [
`/ip${version}/${host.address}/udp/${host.port}`
`/ip${version}/${host}/udp/${port}`
]
}
3 changes: 2 additions & 1 deletion packages/transport-webrtc/src/private-to-public/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export interface WebRTCTransportDirectInit {
rtcConfiguration?: RTCConfiguration
dataChannel?: DataChannelOptions
certificates?: TransportCertificate[]
useLibjuice?: boolean
}

export class WebRTCDirectTransport implements Transport {
Expand Down Expand Up @@ -133,7 +134,7 @@ export class WebRTCDirectTransport implements Transport {
const peerConnection = await createDialerRTCPeerConnection('NodeA', ufrag, this.init.rtcConfiguration)

try {
return await raceSignal(connect(peerConnection, ufrag, {
return await raceSignal(connect(peerConnection, ufrag, ufrag, {
role: 'responder',
log: this.log,
logger: this.components.logger,
Expand Down
16 changes: 11 additions & 5 deletions packages/transport-webrtc/src/private-to-public/utils/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export interface ConnectOptions {

const CONNECTION_STATE_CHANGE_EVENT = isFirefox ? 'iceconnectionstatechange' : 'connectionstatechange'

export async function connect (peerConnection: DirectRTCPeerConnection, ufrag: string, options: ConnectOptions): Promise<Connection> {
export async function connect (peerConnection: DirectRTCPeerConnection, ufrag: string, pwd: string, options: ConnectOptions): Promise<Connection> {
// create data channel for running the noise handshake. Once the data
// channel is opened, the remote will initiate the noise handshake. This
// is used to confirm the identity of the peer.
Expand All @@ -47,15 +47,21 @@ export async function connect (peerConnection: DirectRTCPeerConnection, ufrag: s
options.log.trace('setting local description')
await peerConnection.setLocalDescription(mungedOfferSdp)

if (options.role === 'initiator') {
options.log.trace('server offer', mungedOfferSdp.sdp)
} else {
options.log.trace('client offer', mungedOfferSdp.sdp)
}

// construct answer sdp from multiaddr and ufrag
let answerSdp: RTCSessionDescriptionInit

if (options.role === 'initiator') {
options.log.trace('deriving client offer')
answerSdp = sdp.clientOfferFromMultiaddr(options.remoteAddr, ufrag)
answerSdp = sdp.clientOfferFromMultiaddr(options.remoteAddr, ufrag, pwd)
options.log.trace('server derived client offer', answerSdp.sdp)
} else {
options.log.trace('deriving server offer')
answerSdp = sdp.serverOfferFromMultiAddr(options.remoteAddr, ufrag)
answerSdp = sdp.serverOfferFromMultiAddr(options.remoteAddr, ufrag, pwd)
options.log.trace('client derived server offer', answerSdp.sdp)
}

options.log.trace('setting remote description')
Expand Down
8 changes: 4 additions & 4 deletions packages/transport-webrtc/src/private-to-public/utils/sdp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ export function toSupportedHashFunction (name: multihashes.HashName): string {
/**
* Create an offer SDP message from a multiaddr
*/
export function clientOfferFromMultiaddr (ma: Multiaddr, ufrag: string): RTCSessionDescriptionInit {
export function clientOfferFromMultiaddr (ma: Multiaddr, ufrag: string, pwd: string): RTCSessionDescriptionInit {
const { host, port } = ma.toOptions()
const ipVersion = ipv(ma)

Expand All @@ -130,7 +130,7 @@ a=sctp-port:5000
a=max-message-size:16384
a=setup:active
a=ice-ufrag:${ufrag}
a=ice-pwd:${ufrag}
a=ice-pwd:${pwd}
a=candidate:1467250027 1 UDP 1467250027 ${host} ${port} typ host
a=end-of-candidates
`
Expand All @@ -144,7 +144,7 @@ a=end-of-candidates
/**
* Create an answer SDP message from a multiaddr
*/
export function serverOfferFromMultiAddr (ma: Multiaddr, ufrag: string): RTCSessionDescriptionInit {
export function serverOfferFromMultiAddr (ma: Multiaddr, ufrag: string, pwd: string): RTCSessionDescriptionInit {
const { host, port } = ma.toOptions()
const ipVersion = ipv(ma)
const [CERTFP] = ma2Fingerprint(ma)
Expand All @@ -158,7 +158,7 @@ m=application ${port} UDP/DTLS/SCTP webrtc-datachannel
a=mid:0
a=setup:passive
a=ice-ufrag:${ufrag}
a=ice-pwd:${ufrag}
a=ice-pwd:${pwd}
a=fingerprint:${CERTFP}
a=sctp-port:5000
a=max-message-size:16384
Expand Down
Loading

0 comments on commit 03044e0

Please sign in to comment.