diff --git a/package.json b/package.json index a70b96c0c7..06e2f10769 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/libp2p", - "version": "0.42.2-laconic-0.1.1", + "version": "0.42.2-laconic-0.1.2", "description": "JavaScript implementation of libp2p, a modular peer to peer network stack", "license": "Apache-2.0 OR MIT", "homepage": "https://github.com/libp2p/js-libp2p#readme", @@ -123,6 +123,7 @@ "@libp2p/peer-store": "^6.0.0", "@libp2p/tracked-map": "^3.0.0", "@libp2p/utils": "^3.0.2", + "@libp2p/webrtc-peer": "^2.0.2", "@multiformats/mafmt": "^11.0.2", "@multiformats/multiaddr": "^11.0.0", "abortable-iterator": "^4.0.2", @@ -143,6 +144,7 @@ "it-merge": "^2.0.0", "it-pair": "^2.0.2", "it-pipe": "^2.0.3", + "it-pushable": "^3.1.2", "it-sort": "^2.0.0", "it-stream-types": "^1.0.4", "merge-options": "^3.0.4", @@ -173,7 +175,7 @@ "@libp2p/interface-compliance-tests": "^3.0.2", "@libp2p/interface-connection-compliance-tests": "^2.0.3", "@libp2p/interface-connection-encrypter-compliance-tests": "^4.0.0", - "@libp2p/interface-mocks": "^9.0.0", + "@libp2p/interface-mocks": "9.1.3", "@libp2p/interop": "^4.0.0", "@libp2p/kad-dht": "^7.0.0", "@libp2p/mdns": "^6.0.0", @@ -192,7 +194,6 @@ "delay": "^5.0.0", "execa": "^6.1.0", "go-libp2p": "^0.0.6", - "it-pushable": "^3.0.0", "it-to-buffer": "^3.0.0", "npm-run-all": "^4.1.5", "p-defer": "^4.0.0", diff --git a/src/config.ts b/src/config.ts index e856061c99..58c4f6e1c5 100644 --- a/src/config.ts +++ b/src/config.ts @@ -66,6 +66,14 @@ const DefaultConfig: Partial = { maxListeners: 2 } }, + webRTCSignal: { + enabled: false, + isSignallingNode: false, + autoSignal: { + enabled: false, + relayPeerId: '' + } + }, identify: { protocolPrefix: 'ipfs', host: { diff --git a/src/index.ts b/src/index.ts index cd0ee8992a..87cc396fb5 100644 --- a/src/index.ts +++ b/src/index.ts @@ -42,6 +42,7 @@ import type { NatManagerInit } from './nat-manager.js' import type { AddressManagerInit } from './address-manager/index.js' import type { PeerRoutingInit } from './peer-routing.js' import type { ConnectionManagerInit } from './connection-manager/index.js' +import type { WebRTCSignalConfig } from './webrtc-signal/index.js' /** * For Libp2p configurations and modules details read the [Configuration Document](./CONFIGURATION.md). @@ -105,6 +106,8 @@ export interface Libp2pInit { */ relay: RelayConfig + webRTCSignal: WebRTCSignalConfig + /** * libp2p identify protocol options */ diff --git a/src/libp2p.ts b/src/libp2p.ts index 00cdb23647..0c2d84dba1 100644 --- a/src/libp2p.ts +++ b/src/libp2p.ts @@ -50,6 +50,8 @@ import { PeerSet } from '@libp2p/peer-collections' import { DefaultDialer } from './connection-manager/dialer/index.js' import { peerIdFromString } from '@libp2p/peer-id' import type { Datastore } from 'interface-datastore' +import { WebRTCSignal } from './webrtc-signal/transport.js' +import { AutoSignal } from './webrtc-signal/index.js' const log = logger('libp2p') @@ -233,6 +235,14 @@ export class Libp2pNode extends EventEmitter implements Libp2p { })) } + if (init.webRTCSignal.enabled) { + this.components.transportManager.add(this.configureComponent(new WebRTCSignal(this.components, init.webRTCSignal))) + + if (!init.webRTCSignal.isSignallingNode && init.webRTCSignal.autoSignal?.enabled) { + this.configureComponent(new AutoSignal(this.components, init.webRTCSignal.autoSignal)) + } + } + this.fetchService = this.configureComponent(new FetchService(this.components, { ...init.fetch })) @@ -282,7 +292,7 @@ export class Libp2pNode extends EventEmitter implements Libp2p { // start any startables await Promise.all( - this.services.map(service => service.start()) + this.services.map(async service => await service.start()) ) await Promise.all( @@ -322,7 +332,7 @@ export class Libp2pNode extends EventEmitter implements Libp2p { ) await Promise.all( - this.services.map(service => service.stop()) + this.services.map(async service => await service.stop()) ) await Promise.all( diff --git a/src/webrtc-signal/constants.ts b/src/webrtc-signal/constants.ts new file mode 100644 index 0000000000..7fb2251cae --- /dev/null +++ b/src/webrtc-signal/constants.ts @@ -0,0 +1,9 @@ +// Time to wait for a connection to close gracefully before destroying it manually +export const CLOSE_TIMEOUT = 2000 + +// Use a supported protocol id in multiaddr to listen through signalling stream +// Need to use one of the supported protocol names (list: https://github.com/multiformats/multiaddr/blob/master/protocols.csv) for the multiaddr to be valid +export const P2P_WEBRTC_STAR_ID = 'p2p-webrtc-star' + +// Pubsub topic over which signalling nodes forward the signalling messages if not connected to the destination +export const WEBRTC_SIGNAL_TOPIC = 'webrtc-signal' diff --git a/src/webrtc-signal/index.ts b/src/webrtc-signal/index.ts new file mode 100644 index 0000000000..e2b69258cf --- /dev/null +++ b/src/webrtc-signal/index.ts @@ -0,0 +1,133 @@ +import { logger } from '@libp2p/logger' +import type { PeerId } from '@libp2p/interface-peer-id' +import type { PeerStore, PeerProtocolsChangeData } from '@libp2p/interface-peer-store' +import type { Connection } from '@libp2p/interface-connection' +import type { ConnectionManager } from '@libp2p/interface-connection-manager' +import type { TransportManager } from '@libp2p/interface-transport' + +import { WEBRTC_SIGNAL_CODEC } from './multicodec.js' +import { P2P_WEBRTC_STAR_ID } from './constants.js' + +const log = logger('libp2p:webrtc-signal:auto-signal') + +export interface WebRTCSignalConfig { + enabled: boolean + isSignallingNode: boolean + autoSignal: AutoSignalConfig +} + +export interface AutoSignalConfig { + enabled: boolean + relayPeerId: string +} + +export interface SignalComponents { + peerStore: PeerStore + connectionManager: ConnectionManager + transportManager: TransportManager +} + +export class AutoSignal { + private readonly components: SignalComponents + private readonly relayPeerId: string + private isListening: boolean = false + // TODO Done in circuit-relay implementation, required here? + // private readonly onError: (error: Error, msg?: string) => void + + constructor (components: SignalComponents, init: AutoSignalConfig) { + this.components = components + this.relayPeerId = init.relayPeerId + + this._onProtocolChange = this._onProtocolChange.bind(this) + this._onPeerConnected = this._onPeerConnected.bind(this) + this._onPeerDisconnected = this._onPeerDisconnected.bind(this) + + this.components.peerStore.addEventListener('change:protocols', (evt) => { + void this._onProtocolChange(evt).catch(err => { + log.error(err) + }) + }) + + this.components.connectionManager.addEventListener('peer:connect', (evt) => { + void this._onPeerConnected(evt).catch(err => { + log.error(err) + }) + }) + + this.components.connectionManager.addEventListener('peer:disconnect', (evt) => this._onPeerDisconnected(evt)) + } + + async _onProtocolChange (evt: CustomEvent) { + const { + peerId, + protocols + } = evt.detail + + await this._handleProtocols(peerId, protocols) + } + + async _onPeerConnected (evt: CustomEvent) { + const connection = evt.detail + const peerId = connection.remotePeer + const protocols = await this.components.peerStore.protoBook.get(peerId) + + // Handle protocols on peer connection as change:protocols event is not triggered after reconnection between peers. + await this._handleProtocols(peerId, protocols) + } + + _onPeerDisconnected (evt: CustomEvent) { + const connection = evt.detail + + if (connection.remotePeer.toString() === this.relayPeerId.toString()) { + this.isListening = false + } + } + + async _handleProtocols (peerId: PeerId, protocols: string[]) { + // Ignore if we are already listening or it's not the primary relay node + if (this.isListening || peerId.toString() !== this.relayPeerId) { + return + } + + // Check if it has the protocol + const hasProtocol = protocols.find(protocol => protocol === WEBRTC_SIGNAL_CODEC) + + // Ignore if protocol is not supported + if (hasProtocol == null) { + return + } + + // If required protocol is supported, start the listener + const connections = this.components.connectionManager.getConnections(peerId) + if (connections.length === 0) { + return + } + + const connection = connections[0] + + // TODO Done in circuit-relay implementation, required here? + // await this.components.peerStore.metadataBook.setValue(peerId, HOP_METADATA_KEY, uint8ArrayFromString(HOP_METADATA_VALUE)) + + await this._addListener(connection) + } + + /** + * Attempt to listen on the given connection with relay node + */ + async _addListener (connection: Connection): Promise { + try { + const remoteAddr = connection.remoteAddr + + // Attempt to listen on relay + const multiaddr = remoteAddr.encapsulate(`/${P2P_WEBRTC_STAR_ID}`) + + // Announce multiaddr will update on listen success by TransportManager event being triggered + await this.components.transportManager.listen([multiaddr]) + this.isListening = true + } catch (err: any) { + log.error('error listening on signalling address', err) + this.isListening = false + throw err + } + } +} diff --git a/src/webrtc-signal/listener.ts b/src/webrtc-signal/listener.ts new file mode 100644 index 0000000000..9b8efb6549 --- /dev/null +++ b/src/webrtc-signal/listener.ts @@ -0,0 +1,215 @@ +import map from 'it-map' +import { pipe } from 'it-pipe' +import type { Pushable } from 'it-pushable' +import * as lp from 'it-length-prefixed' +import { toString as uint8ArrayToString } from 'uint8arrays/to-string' +import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' + +import { logger } from '@libp2p/logger' +import { multiaddr, Multiaddr } from '@multiformats/multiaddr' +import { peerIdFromString } from '@libp2p/peer-id' +import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events' +import { Signal, WebRTCReceiver } from '@libp2p/webrtc-peer' +import type { Connection, Stream } from '@libp2p/interface-connection' +import type { ConnectionManager } from '@libp2p/interface-connection-manager' +import type { ConnectionHandler, Listener, Upgrader } from '@libp2p/interface-transport' + +import { WEBRTC_SIGNAL_CODEC } from './multicodec.js' +import { SignallingMessage, Type } from './signal-message.js' +import { toMultiaddrConnection } from './socket-to-conn.js' + +const log = logger('libp2p:webrtc-signal') + +export interface ListenerOptions { + handler?: ConnectionHandler + upgrader: Upgrader + connectionManager: ConnectionManager +} + +export function createListener (options: ListenerOptions, peerInputStream: Pushable, dialResponseStream: Pushable): Listener { + let listeningAddr: Multiaddr | undefined + + async function pipePeerInputStream (signallingStream: Stream): Promise { + // TODO Test if effective / required + // Empty out peerInputStream first + while (peerInputStream.readableLength !== 0) { + await peerInputStream.next() + } + + // Send dial requests / responses to the relay node over the signalling stream + void pipe( + // Read from stream (the source) + peerInputStream, + // Turn objects into buffers + (source) => map(source, (value) => { + return uint8ArrayFromString(JSON.stringify(value)) + }), + // Encode with length prefix (so receiving side knows how much data is coming) + lp.encode(), + // Write to the stream (the sink) + signallingStream.sink + ) + } + + async function handleSignallingMessages (signallingStream: Stream): Promise { + // Handle incoming messages from the signalling stream + void pipe( + // Read from the stream (the source) + signallingStream.source, + // Decode length-prefixed data + lp.decode(), + // Turn buffers into objects + (source) => map(source, (buf) => { + return JSON.parse(uint8ArrayToString(buf.subarray())) + }), + // Sink function + async (source) => { + // For each chunk of data + for await (const msg of source) { + switch ((msg as SignallingMessage).type) { + case Type.REQUEST: + processRequest(msg) + break + case Type.RESPONSE: + dialResponseStream.push(msg) + break + default: + log('unknown message', msg) + break + } + } + } + ) + } + + function processRequest (request: SignallingMessage) { + const incSignal: Signal = JSON.parse(request.signal) + + if (incSignal.type !== 'offer') { + // offers contain candidates so only respond to the offer + return + } + + const channel = new WebRTCReceiver() + + channel.addEventListener('signal', (evt) => { + const signal = evt.detail + const signalStr = JSON.stringify(signal) + + // Send response signal + const response: SignallingMessage = { + type: Type.RESPONSE, + src: request.dst, + dst: request.src, + signal: signalStr + } + + peerInputStream.push(response) + }) + + channel.addEventListener('error', (evt) => { + const err = evt.detail + + log.error('incoming connection errored with', err) + void channel.close().catch(err => { + log.error(err) + }) + }) + + channel.addEventListener('ready', () => { + void (async () => { + if (listeningAddr === undefined) { + const msg = 'listening address not set' + throw new Error(msg) + } + + const maConn = toMultiaddrConnection(channel, { + // Form the multiaddr for this peer by appending it's peer id to the listening multiaddr + remoteAddr: multiaddr(`${listeningAddr.toString()}/p2p/${request.dst}`) + }) + log('new inbound connection %s', maConn.remoteAddr) + + const connection = await options.upgrader.upgradeInbound(maConn) + log('inbound connection %s upgraded', maConn.remoteAddr) + + // TODO: Done in webrtc-direct, required here? + // channel.addEventListener('close', untrackConn, { + // once: true + // }) + + if (options.handler != null) { + options.handler(connection) + } + + listener.dispatchEvent(new CustomEvent('connection', { detail: connection })) + })() + }) + + channel.handleSignal(incSignal) + } + + async function listen (addr: Multiaddr): Promise { + const relayMultiaddrString = addr.toString().split('/p2p-circuit').find(a => a !== '') + const relayMultiaddr = multiaddr(relayMultiaddrString) + const relayPeerIdString = relayMultiaddr.getPeerId() + + if (relayPeerIdString == null) { + throw new Error('Could not determine primary relay peer from multiaddr') + } + + const relayPeerId = peerIdFromString(relayPeerIdString) + + const connections = options.connectionManager.getConnections(relayPeerId) + if (connections.length === 0) { + throw new Error('Connection with primary relay node not found') + } + + const connection = connections[0] + + // Open a signalling stream to the relay node + const signallingStream = await connection.newStream(WEBRTC_SIGNAL_CODEC) + + // Pipe messages from peerInputStream to signallingStream + await pipePeerInputStream(signallingStream) + + // Handle messages from the signalling stream + await handleSignallingMessages(signallingStream) + + // Stop the listener when the primary relay node disconnects + options.connectionManager.addEventListener('peer:disconnect', (evt) => { + const { detail: connection } = evt + + // Check if it's the primary relay node + if (connection.remotePeer.toString() === relayPeerIdString) { + // Announce listen addresses change + void (async () => { + await listener.close() + })() + } + }, { once: true }) + + listeningAddr = addr + listener.dispatchEvent(new CustomEvent('listening')) + } + + function getAddrs (): Multiaddr[] { + if (listeningAddr != null) { + return [listeningAddr] + } + + return [] + } + + async function close () { + listeningAddr = undefined + listener.dispatchEvent(new CustomEvent('close')) + } + + const listener: Listener = Object.assign(new EventEmitter(), { + close, + listen, + getAddrs + }) + + return listener +} diff --git a/src/webrtc-signal/multicodec.ts b/src/webrtc-signal/multicodec.ts new file mode 100644 index 0000000000..e933e59a9f --- /dev/null +++ b/src/webrtc-signal/multicodec.ts @@ -0,0 +1 @@ +export const WEBRTC_SIGNAL_CODEC = '/libp2p/webrtc-signal/0.1.0' diff --git a/src/webrtc-signal/signal-message.ts b/src/webrtc-signal/signal-message.ts new file mode 100644 index 0000000000..e48faf0cf2 --- /dev/null +++ b/src/webrtc-signal/signal-message.ts @@ -0,0 +1,13 @@ +// REQUEST is made on dial by a peer to another peer listening through a signalling stream +// RESPONSE is made by a peer to another peer on a REQUEST to establish a webrtc connection +export enum Type { + REQUEST = 'REQUEST', + RESPONSE = 'RESPONSE' +} + +export interface SignallingMessage { + type: Type + src: string + dst: string + signal: string +} diff --git a/src/webrtc-signal/socket-to-conn.ts b/src/webrtc-signal/socket-to-conn.ts new file mode 100644 index 0000000000..6c0a747af9 --- /dev/null +++ b/src/webrtc-signal/socket-to-conn.ts @@ -0,0 +1,89 @@ +import { abortableSource } from 'abortable-iterator' + +import { logger } from '@libp2p/logger' +import type { AbortOptions } from '@libp2p/interfaces' +import type { Multiaddr } from '@multiformats/multiaddr' +import type { MultiaddrConnection } from '@libp2p/interface-connection' +import type { WebRTCPeer } from '@libp2p/webrtc-peer' + +import { CLOSE_TIMEOUT } from './constants.js' + +const log = logger('libp2p:webrtc-signal:socket') + +export interface ToMultiaddrConnectionOptions extends AbortOptions { + remoteAddr: Multiaddr +} + +// Ref: https://github.com/libp2p/js-libp2p-webrtc-direct/blob/master/src/socket-to-conn.ts +export function toMultiaddrConnection (socket: WebRTCPeer, options: ToMultiaddrConnectionOptions): MultiaddrConnection { + const { sink, source } = socket + + const maConn: MultiaddrConnection = { + remoteAddr: options.remoteAddr, + + async sink (source) { + if (options.signal != null) { + source = abortableSource(source, options.signal) + } + + try { + await sink(source) + } catch (err: any) { + // If aborted we can safely ignore + if (err.type !== 'aborted') { + // If the source errored the socket will already have been destroyed by + // toIterable.duplex(). If the socket errored it will already be + // destroyed. There's nothing to do here except log the error & return. + log.error(err) + } + } + }, + + source: (options.signal != null) ? abortableSource(source, options.signal) : source, + + timeline: { open: Date.now() }, + + async close () { + if (socket.closed) { + return + } + + const start = Date.now() + + // Attempt to end the socket. If it takes longer to close than the + // timeout, destroy it manually. + const timeout = setTimeout(() => { + if (maConn.remoteAddr != null) { + const { host, port } = maConn.remoteAddr.toOptions() + log('timeout closing socket to %s:%s after %dms, destroying it manually', + host, port, Date.now() - start) + } + + if (!socket.closed) { + socket.close().catch(err => { + log.error('could not close socket', err) + }) + } + }, CLOSE_TIMEOUT) + + try { + await socket.close() + } finally { + clearTimeout(timeout) + } + } + } + + socket.addEventListener('close', () => { + // In instances where `close` was not explicitly called, + // such as an iterable stream ending, ensure we have set the close + // timeline + if (maConn.timeline.close == null) { + maConn.timeline.close = Date.now() + } + }, { + once: true + }) + + return maConn +} diff --git a/src/webrtc-signal/transport.ts b/src/webrtc-signal/transport.ts new file mode 100644 index 0000000000..53fed5f3b8 --- /dev/null +++ b/src/webrtc-signal/transport.ts @@ -0,0 +1,367 @@ +import map from 'it-map' +import { pipe } from 'it-pipe' +import { Pushable, pushable } from 'it-pushable' +import * as lp from 'it-length-prefixed' +import { toString as uint8ArrayToString } from 'uint8arrays/to-string' +import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import { AbortError } from 'abortable-iterator' + +import { logger } from '@libp2p/logger' +import { multiaddr } from '@multiformats/multiaddr' +import { symbol } from '@libp2p/interface-transport' +import { Signal, WebRTCInitiator } from '@libp2p/webrtc-peer' +import type { PeerId } from '@libp2p/interface-peer-id' +import type { Multiaddr } from '@multiformats/multiaddr' +import type { IncomingStreamData, Registrar } from '@libp2p/interface-registrar' +import type { Startable } from '@libp2p/interfaces/startable' +import type { AbortOptions } from '@libp2p/interfaces' +import type { Connection, Stream } from '@libp2p/interface-connection' +import type { ConnectionManager } from '@libp2p/interface-connection-manager' +import type { PubSub, Message } from '@libp2p/interface-pubsub' +import type { CreateListenerOptions, Listener, Transport, Upgrader } from '@libp2p/interface-transport' + +import { P2P_WEBRTC_STAR_ID, WEBRTC_SIGNAL_TOPIC } from './constants.js' +import { WEBRTC_SIGNAL_CODEC } from './multicodec.js' +import type { WebRTCSignalConfig } from './index.js' +import { createListener } from './listener.js' +import { SignallingMessage, Type } from './signal-message.js' +import { toMultiaddrConnection } from './socket-to-conn.js' +import { DialResponseListener } from './utils.js' +import { CIRCUIT_PROTO_CODE } from '../circuit/constants.js' + +const log = logger('libp2p:webrtc-signal') + +export interface WebRTCSignalComponents { + peerId: PeerId + registrar: Registrar + connectionManager: ConnectionManager + upgrader: Upgrader + pubsub?: PubSub +} + +export class WebRTCSignal implements Transport, Startable { + // Startable service implmentation is concerned with relay nodes + // Transport implmentation is concerned with peer nodes + + private readonly components: WebRTCSignalComponents + private readonly init: WebRTCSignalConfig + + private started: boolean + private readonly peerSignallingInputStreams: Map> = new Map() + + private readonly peerInputStream: Pushable + private readonly dialResponseStream: Pushable + private readonly dialResponseListener: DialResponseListener + + constructor (components: WebRTCSignalComponents, init: WebRTCSignalConfig) { + this.components = components + this.init = init + this.started = false + + this.peerInputStream = pushable({ objectMode: true }) + this.dialResponseStream = pushable({ objectMode: true }) + this.dialResponseListener = new DialResponseListener(this.dialResponseStream) + void this.dialResponseListener.listen() + } + + isStarted () { + return this.started + } + + async start (): Promise { + if (this.started) { + return + } + + this.started = true + + // Handle incoming protocol stream + await this.components.registrar.handle(WEBRTC_SIGNAL_CODEC, (data) => { + void this._onProtocol(data).catch(err => { + log.error(err) + }) + }).catch(err => { + log.error(err) + }) + + // Handle signalling topic if it's a signalling node + if (this.init.isSignallingNode) { + await this._handleWebRTCSignalTopic() + } + } + + async stop () { + await this.components.registrar.unhandle(WEBRTC_SIGNAL_CODEC) + } + + get [symbol] (): true { + return true + } + + get [Symbol.toStringTag] () { + return 'libp2p/webrtc-signal-v1' + } + + async _onProtocol (data: IncomingStreamData) { + const { connection, stream } = data + + await this._handlePeerSignallingStream(connection.remotePeer.toString(), stream) + } + + async _handleWebRTCSignalTopic (): Promise { + const pubsub = this.components.pubsub + if (pubsub === undefined) { + return + } + + log('Subscribing peer to the signalling topic') + + pubsub.subscribe(WEBRTC_SIGNAL_TOPIC) + pubsub.addEventListener('message', (evt) => { + this._handlePubSubMessage(evt.detail) + }) + } + + _handlePubSubMessage (msg: Message): void { + if (msg.topic !== WEBRTC_SIGNAL_TOPIC) { + return + } + + // Forward the signalling message to the destination if connected to it + // Ignore otherwise + const signallingMsg: SignallingMessage = JSON.parse(uint8ArrayToString(msg.data)) + const destStream = this.peerSignallingInputStreams.get(signallingMsg.dst) + if (destStream !== undefined) { + destStream.push(signallingMsg) + } + } + + async dial (ma: Multiaddr, options: AbortOptions = {}): Promise { + // Extract the relay and destination peer ids from ma + const addrs = ma.toString().split(`/${P2P_WEBRTC_STAR_ID}`) + const relayAddr = multiaddr(addrs[0]) + const destinationAddr = multiaddr(addrs[addrs.length - 1]) + const relayId = relayAddr.getPeerId() + const destinationId = destinationAddr.getPeerId() + + if (relayId == null || destinationId == null) { + const errMsg = 'WebRTC signal dial failed as address did not have peer id' + log.error(errMsg) + throw new Error(errMsg) + } + + try { + const socket = await this._connect(destinationId, options) + + const maConn = toMultiaddrConnection(socket, { remoteAddr: ma, signal: options.signal }) + log('new outbound connection %s', maConn.remoteAddr) + + const conn = await this.components.upgrader.upgradeOutbound(maConn) + log('outbound connection %s upgraded', maConn.remoteAddr) + + return conn + } catch (err) { + log.error('WebRTC signal dial failed', err) + throw err + } + } + + createListener (options: CreateListenerOptions): Listener { + return createListener( + { connectionManager: this.components.connectionManager, upgrader: this.components.upgrader, handler: options.handler }, + this.peerInputStream, + this.dialResponseStream + ) + } + + filter (multiaddrs: Multiaddr[]): Multiaddr[] { + // A custom filter for signalling addresses + return multiaddrs.filter((ma) => { + if (ma.protoCodes().includes(CIRCUIT_PROTO_CODE)) { + return false + } + + return ma.protoNames().includes(P2P_WEBRTC_STAR_ID) + }) + } + + async _handlePeerSignallingStream (peerId: string, signallingStream: Stream): Promise { + const inputStream = pushable({ objectMode: true }) + + // Send messages from inputStream to signallling stream + void pipe( + // Read from stream (the source) + inputStream, + // Turn objects into buffers + (source) => map(source, (value) => { + return uint8ArrayFromString(JSON.stringify(value)) + }), + // Encode with length prefix (so receiving side knows how much data is coming) + lp.encode(), + // Write to the stream (the sink) + signallingStream.sink + ) + + // Track input stream for this peer + // TODO Untrack on disconnect, use components.connectionManager + this.peerSignallingInputStreams.set(peerId, inputStream) + + void pipe( + // Read from the stream (the source) + signallingStream.source, + // Decode length-prefixed data + lp.decode(), + // Turn buffers into objects + (source) => map(source, (buf) => { + return JSON.parse(uint8ArrayToString(buf.subarray())) + }), + // Sink function + async (source) => { + // For each chunk of data + for await (const msg of source) { + // Forward the signalling message to the destination + const destStream = this.peerSignallingInputStreams.get(msg.dst) + if (destStream !== undefined) { + destStream.push(msg) + } else { + log('outgoing stream not found for dest', msg.dst) + + const pubsub = this.components.pubsub + if (pubsub !== undefined) { + log('forwarding msg over signalling topic') + await pubsub.publish(WEBRTC_SIGNAL_TOPIC, uint8ArrayFromString(JSON.stringify(msg))) + } + } + } + } + ) + } + + async _connect (dstPeerId: string, options: AbortOptions) { + const peerId = this.components.peerId.toString() + + if (options.signal?.aborted === true) { + throw new AbortError() + } + + // TODO Done in webrtc-direct, required here? + // const channelOptions = { + // initiator: true, + // trickle: false, + // ...this.initiatorOptions + // } + + return await new Promise((resolve, reject) => { + let connected: boolean + log('Dialing peer %s', dstPeerId) + + const channel = new WebRTCInitiator() + + const onError = (evt: CustomEvent) => { + const err = evt.detail + + if (!connected) { + const msg = `connection error ${dstPeerId}: ${err.message}` + + log.error(msg) + err.message = msg + done(err) + } + } + + const onReady = () => { + connected = true + + log('connection opened %s', dstPeerId) + done() + } + + const onAbort = () => { + log.error('connection aborted %s', dstPeerId) + void channel.close().finally(() => { + done(new AbortError()) + }) + } + + const done = (err?: Error) => { + channel.removeEventListener('error', onError) + channel.removeEventListener('ready', onReady) + options.signal?.removeEventListener('abort', onAbort) + + if (err != null) { + reject(err) + } else { + resolve(channel) + } + } + + channel.addEventListener('error', onError, { + once: true + }) + channel.addEventListener('ready', onReady, { + once: true + }) + channel.addEventListener('close', () => { + channel.removeEventListener('error', onError) + }) + options.signal?.addEventListener('abort', onAbort) + + const onSignal = async (signal: Signal) => { + if (signal.type !== 'offer') { + // skip candidates, just send the offer as it includes the candidates + return + } + + const signalStr = JSON.stringify(signal) + + try { + // Create a connection request with signal string and send over signalling stream + const request: SignallingMessage = { + type: Type.REQUEST, + src: peerId, + dst: dstPeerId, + signal: signalStr + } + + // Wait for response message over the signalling stream + const responseSignalJson = await new Promise((resolve, reject) => { + const onResponse = (evt: CustomEvent) => { + try { + const msg = evt.detail + + if ( + msg.type === Type.RESPONSE && + msg.src === dstPeerId && + msg.dst === peerId + ) { + // Remove this handler after receiving the response + this.dialResponseListener.removeEventListener('response', onResponse) + resolve(msg.signal) + } + } catch (err) { + reject(err) + } + } + this.dialResponseListener.addEventListener('response', onResponse) + + this.peerInputStream.push(request) + }) + + const responseSignal = JSON.parse(responseSignalJson) + channel.handleSignal(responseSignal) + } catch (err: any) { + await channel.close(err) + reject(err) + } + } + + channel.addEventListener('signal', (evt) => { + const signal = evt.detail + + void onSignal(signal).catch(async err => { + await channel.close(err) + }) + }) + }) + } +} diff --git a/src/webrtc-signal/utils.ts b/src/webrtc-signal/utils.ts new file mode 100644 index 0000000000..4e501a0560 --- /dev/null +++ b/src/webrtc-signal/utils.ts @@ -0,0 +1,37 @@ +import type { Pushable } from 'it-pushable' + +import { logger } from '@libp2p/logger' +import { EventEmitter, CustomEvent } from '@libp2p/interfaces/events' + +import type { SignallingMessage } from './signal-message.js' + +const log = logger('libp2p:webrtc-signal') + +export interface DialResponseListenerEvents { + 'response': CustomEvent +} + +export class DialResponseListener extends EventEmitter { + stream: Pushable + + constructor (stream: Pushable) { + super() + this.stream = stream + } + + async listen (): Promise { + while (true) { + try { + const { done, value } = await this.stream.next() + if (done !== undefined && done) { + break + } + + this.dispatchEvent(new CustomEvent('response', { detail: value })) + } catch (err) { + log.error(err) + break + } + } + } +} diff --git a/test/connection-manager/index.node.ts b/test/connection-manager/index.node.ts index 8de955eeb1..f36413cdd8 100644 --- a/test/connection-manager/index.node.ts +++ b/test/connection-manager/index.node.ts @@ -189,7 +189,7 @@ describe('libp2p.connections', () => { }) afterEach(async () => { - await Promise.all(nodes.map((node) => node.stop())) + await Promise.all(nodes.map(async (node) => await node.stop())) if (libp2p != null) { await libp2p.stop() diff --git a/test/content-routing/content-routing.node.ts b/test/content-routing/content-routing.node.ts index 61cda7c182..229c05c970 100644 --- a/test/content-routing/content-routing.node.ts +++ b/test/content-routing/content-routing.node.ts @@ -27,7 +27,7 @@ describe('content-routing', () => { }) }) - after(() => node.stop()) + after(async () => await node.stop()) it('.findProviders should return an error', async () => { try { diff --git a/tsconfig.json b/tsconfig.json index fe4fd056b1..ca14e6edfc 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,7 +1,8 @@ { "extends": "aegir/src/config/tsconfig.aegir.json", "compilerOptions": { - "outDir": "dist" + "outDir": "dist", + "esModuleInterop": true }, "include": [ "src",