From a1d2c225d54d397623962d1880f968c8a92bbf6d Mon Sep 17 00:00:00 2001 From: achingbrain Date: Sat, 23 Sep 2023 12:09:06 -0400 Subject: [PATCH] feat: add mock stream pair Adds a `streamPair` convenience function to the interface mocks that returns two `Stream` objects where the duplex streams read/write to/from the other. --- .../src/mocks/connection.ts | 31 ++- .../src/mocks/index.ts | 2 +- packages/interface/src/stream-muxer/stream.ts | 10 +- packages/kad-dht/src/libp2p-routers.ts | 74 +++++++ .../src/circuit-relay/transport/index.ts | 2 +- packages/libp2p/src/connection/index.ts | 1 + packages/libp2p/src/transport-manager.ts | 5 + packages/libp2p/src/upgrader.ts | 3 +- packages/multistream-select/src/handle.ts | 8 +- packages/multistream-select/src/select.ts | 10 +- packages/transport-webrtc/.aegir.js | 16 +- packages/transport-webrtc/package.json | 2 +- packages/transport-webrtc/src/index.ts | 34 +++ packages/transport-webrtc/src/maconn.ts | 12 +- packages/transport-webrtc/src/muxer.ts | 101 +++++---- .../transport-webrtc/src/pb/message.proto | 6 + packages/transport-webrtc/src/pb/message.ts | 6 +- .../src/private-to-private/handler.ts | 177 ---------------- .../private-to-private/initiate-connection.ts | 200 ++++++++++++++++++ .../src/private-to-private/listener.ts | 16 +- .../signaling-stream-handler.ts | 133 ++++++++++++ .../src/private-to-private/transport.ts | 171 ++++++++++----- .../src/private-to-private/util.ts | 76 ++++--- .../src/private-to-public/transport.ts | 8 +- packages/transport-webrtc/src/stream.ts | 162 +++++++++----- packages/transport-webrtc/src/util.ts | 83 ++++++++ packages/transport-webrtc/test/basics.spec.ts | 100 +++++++-- .../transport-webrtc/test/listener.spec.ts | 2 + .../test/peer.browser.spec.ts | 158 ++++++++++---- .../test/stream.browser.spec.ts | 14 +- packages/transport-webrtc/test/stream.spec.ts | 7 +- .../src/socket-to-conn.ts | 1 + 32 files changed, 1185 insertions(+), 446 deletions(-) create mode 100644 packages/kad-dht/src/libp2p-routers.ts delete mode 100644 packages/transport-webrtc/src/private-to-private/handler.ts create mode 100644 packages/transport-webrtc/src/private-to-private/initiate-connection.ts create mode 100644 packages/transport-webrtc/src/private-to-private/signaling-stream-handler.ts diff --git a/packages/interface-compliance-tests/src/mocks/connection.ts b/packages/interface-compliance-tests/src/mocks/connection.ts index 566de6d05c..8b1607d2ad 100644 --- a/packages/interface-compliance-tests/src/mocks/connection.ts +++ b/packages/interface-compliance-tests/src/mocks/connection.ts @@ -170,7 +170,13 @@ export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectio return connection } -export function mockStream (stream: Duplex, Source, Promise>): Stream { +export interface StreamInit { + direction?: Direction + protocol?: string + id?: string +} + +export function mockStream (stream: Duplex, Source, Promise>, init: StreamInit = {}): Stream { return { ...stream, close: async () => {}, @@ -186,10 +192,31 @@ export function mockStream (stream: Duplex, Sourc id: `stream-${Date.now()}`, status: 'open', readStatus: 'ready', - writeStatus: 'ready' + writeStatus: 'ready', + ...init } } +export interface StreamPairInit { + duplex: Duplex, Source, Promise> + init?: StreamInit +} + +export function streamPair (a: StreamPairInit, b: StreamPairInit, init: StreamInit = {}): [Stream, Stream] { + return [ + mockStream(a.duplex, { + direction: 'outbound', + ...init, + ...(a.init ?? {}) + }), + mockStream(b.duplex, { + direction: 'inbound', + ...init, + ...(b.init ?? {}) + }) + ] +} + export interface Peer { peerId: PeerId registrar: Registrar diff --git a/packages/interface-compliance-tests/src/mocks/index.ts b/packages/interface-compliance-tests/src/mocks/index.ts index 8b78f1f7f3..ffdef08ac3 100644 --- a/packages/interface-compliance-tests/src/mocks/index.ts +++ b/packages/interface-compliance-tests/src/mocks/index.ts @@ -1,7 +1,7 @@ export { mockConnectionEncrypter } from './connection-encrypter.js' export { mockConnectionGater } from './connection-gater.js' export { mockConnectionManager, mockNetwork } from './connection-manager.js' -export { mockConnection, mockStream, connectionPair } from './connection.js' +export { mockConnection, mockStream, streamPair, connectionPair } from './connection.js' export { mockMultiaddrConnection, mockMultiaddrConnPair } from './multiaddr-connection.js' export { mockMuxer } from './muxer.js' export { mockRegistrar } from './registrar.js' diff --git a/packages/interface/src/stream-muxer/stream.ts b/packages/interface/src/stream-muxer/stream.ts index bac8c7f729..d3be37382a 100644 --- a/packages/interface/src/stream-muxer/stream.ts +++ b/packages/interface/src/stream-muxer/stream.ts @@ -174,10 +174,14 @@ export abstract class AbstractStream implements Stream { } this.log.trace('sink finished reading from source') - this.writeStatus = 'done' - this.log.trace('sink calling closeWrite') - await this.closeWrite(options) + if (this.writeStatus === 'writing') { + this.writeStatus = 'done' + + this.log.trace('sink calling closeWrite') + await this.closeWrite(options) + } + this.onSinkEnd() } catch (err: any) { this.log.trace('sink ended with error, calling abort with error', err) diff --git a/packages/kad-dht/src/libp2p-routers.ts b/packages/kad-dht/src/libp2p-routers.ts new file mode 100644 index 0000000000..c0bb33807a --- /dev/null +++ b/packages/kad-dht/src/libp2p-routers.ts @@ -0,0 +1,74 @@ +import { type ContentRouting } from '@libp2p/interface/content-routing' +import { CodeError } from '@libp2p/interface/errors' +import { type PeerRouting } from '@libp2p/interface/peer-routing' +import drain from 'it-drain' +import type { KadDHT, QueryOptions } from './index.js' +import type { PeerId } from '@libp2p/interface/peer-id' +import type { PeerInfo } from '@libp2p/interface/peer-info' +import type { CID } from 'multiformats/cid' + +/** + * Wrapper class to convert events into returned values + */ +export class DHTContentRouting implements ContentRouting { + private readonly dht: KadDHT + + constructor (dht: KadDHT) { + this.dht = dht + } + + async provide (cid: CID, options: QueryOptions = {}): Promise { + await drain(this.dht.provide(cid, options)) + } + + async * findProviders (cid: CID, options: QueryOptions = {}): AsyncGenerator { + for await (const event of this.dht.findProviders(cid, options)) { + if (event.name === 'PROVIDER') { + yield * event.providers + } + } + } + + async put (key: Uint8Array, value: Uint8Array, options?: QueryOptions): Promise { + await drain(this.dht.put(key, value, options)) + } + + async get (key: Uint8Array, options?: QueryOptions): Promise { + for await (const event of this.dht.get(key, options)) { + if (event.name === 'VALUE') { + return event.value + } + } + + throw new CodeError('Not found', 'ERR_NOT_FOUND') + } +} + +/** + * Wrapper class to convert events into returned values + */ +export class DHTPeerRouting implements PeerRouting { + private readonly dht: KadDHT + + constructor (dht: KadDHT) { + this.dht = dht + } + + async findPeer (peerId: PeerId, options: QueryOptions = {}): Promise { + for await (const event of this.dht.findPeer(peerId, options)) { + if (event.name === 'FINAL_PEER') { + return event.peer + } + } + + throw new CodeError('Not found', 'ERR_NOT_FOUND') + } + + async * getClosestPeers (key: Uint8Array, options: QueryOptions = {}): AsyncIterable { + for await (const event of this.dht.getClosestPeers(key, options)) { + if (event.name === 'FINAL_PEER') { + yield event.peer + } + } + } +} diff --git a/packages/libp2p/src/circuit-relay/transport/index.ts b/packages/libp2p/src/circuit-relay/transport/index.ts index 139fb69d7c..483f9a0366 100644 --- a/packages/libp2p/src/circuit-relay/transport/index.ts +++ b/packages/libp2p/src/circuit-relay/transport/index.ts @@ -241,7 +241,7 @@ class CircuitRelayTransport implements Transport { disconnectOnFailure }) } catch (err: any) { - log.error(`Circuit relay dial to destination ${destinationPeer.toString()} via relay ${relayPeer.toString()} failed`, err) + log.error('circuit relay dial to destination %p via relay %p failed', destinationPeer, relayPeer, err) if (stream != null) { stream.abort(err) diff --git a/packages/libp2p/src/connection/index.ts b/packages/libp2p/src/connection/index.ts index 8509b8d70a..70abb60fa8 100644 --- a/packages/libp2p/src/connection/index.ts +++ b/packages/libp2p/src/connection/index.ts @@ -147,6 +147,7 @@ export class ConnectionImpl implements Connection { } log('closing connection to %a', this.remoteAddr) + console.info('for why close connection to', this.remoteAddr.toString(), new Error('where').stack) this.status = 'closing' diff --git a/packages/libp2p/src/transport-manager.ts b/packages/libp2p/src/transport-manager.ts index 070503764a..9399b879fa 100644 --- a/packages/libp2p/src/transport-manager.ts +++ b/packages/libp2p/src/transport-manager.ts @@ -119,6 +119,11 @@ export class DefaultTransportManager implements TransportManager, Startable { throw new CodeError(`No transport available for address ${String(ma)}`, codes.ERR_TRANSPORT_UNAVAILABLE) } + console.info('---> dial', ma.toString(), 'with', [...this.transports.entries()].filter(([name, entry]) => { + console.info('wat', name, entry === transport) + return entry === transport + }).map(([ name ]) => name).pop()) + try { return await transport.dial(ma, { ...options, diff --git a/packages/libp2p/src/upgrader.ts b/packages/libp2p/src/upgrader.ts index 23c3cd0df1..e29421eaf1 100644 --- a/packages/libp2p/src/upgrader.ts +++ b/packages/libp2p/src/upgrader.ts @@ -478,7 +478,7 @@ export class DefaultUpgrader implements Upgrader { return muxedStream } catch (err: any) { - log.error('could not create new stream', err) + log.error('could not create new stream for protocols %s on connection with address %a', protocols, connection.remoteAddr, err) if (muxedStream.timeline.close == null) { muxedStream.abort(err) @@ -513,6 +513,7 @@ export class DefaultUpgrader implements Upgrader { } } catch (err: any) { log.error(err) + connection.abort(err) } finally { this.events.safeDispatchEvent('connection:close', { detail: connection diff --git a/packages/multistream-select/src/handle.ts b/packages/multistream-select/src/handle.ts index eaf8331f6b..73c52faf62 100644 --- a/packages/multistream-select/src/handle.ts +++ b/packages/multistream-select/src/handle.ts @@ -66,14 +66,14 @@ export async function handle (stream: any, protocols: string | string[], options log.trace('read "%s"', protocol) if (protocol === PROTOCOL_ID) { - log.trace('respond with "%s" for "%s"', PROTOCOL_ID, protocol) + log.trace('handle: %s:%s respond with "%s" for "%s"', stream.direction, stream.id, PROTOCOL_ID, protocol) multistream.write(writer, uint8ArrayFromString(PROTOCOL_ID), options) continue } if (protocols.includes(protocol)) { multistream.write(writer, uint8ArrayFromString(protocol), options) - log.trace('respond with "%s" for "%s"', protocol, protocol) + log.trace('handle: %s:%s respond with "%s" for "%s"', stream.direction, stream.id, protocol, protocol) rest() return { stream: shakeStream, protocol } } @@ -82,11 +82,11 @@ export async function handle (stream: any, protocols: string | string[], options // \n\n\n multistream.write(writer, new Uint8ArrayList(...protocols.map(p => multistream.encode(uint8ArrayFromString(p)))), options) // multistream.writeAll(writer, protocols.map(p => uint8ArrayFromString(p))) - log.trace('respond with "%s" for %s', protocols, protocol) + log.trace('handle: %s:%s respond with "%s" for %s', stream.direction, stream.id, protocols, protocol) continue } multistream.write(writer, uint8ArrayFromString('na'), options) - log('respond with "na" for "%s"', protocol) + log('handle: %s:%s respond with "na" for "%s"', stream.direction, stream.id, protocol) } } diff --git a/packages/multistream-select/src/select.ts b/packages/multistream-select/src/select.ts index e5145c46c5..e1eb1ce61c 100644 --- a/packages/multistream-select/src/select.ts +++ b/packages/multistream-select/src/select.ts @@ -68,18 +68,18 @@ export async function select (stream: any, protocols: string | string[], options throw new Error('At least one protocol must be specified') } - log.trace('select: write ["%s", "%s"]', PROTOCOL_ID, protocol) + log.trace('select: %s:%s write ["%s", "%s"]', stream.direction, stream.id, PROTOCOL_ID, protocol) const p1 = uint8ArrayFromString(PROTOCOL_ID) const p2 = uint8ArrayFromString(protocol) multistream.writeAll(writer, [p1, p2], options) let response = await multistream.readString(reader, options) - log.trace('select: read "%s"', response) + log.trace('select: %s:%s read "%s"', stream.direction, stream.id, response) // Read the protocol response if we got the protocolId in return if (response === PROTOCOL_ID) { response = await multistream.readString(reader, options) - log.trace('select: read "%s"', response) + log.trace('select: %s:%s read "%s"', stream.direction, stream.id, response) } // We're done @@ -90,10 +90,10 @@ export async function select (stream: any, protocols: string | string[], options // We haven't gotten a valid ack, try the other protocols for (const protocol of protocols) { - log.trace('select: write "%s"', protocol) + log.trace('select: %s:%s write "%s"', stream.direction, stream.id, protocol) multistream.write(writer, uint8ArrayFromString(protocol), options) const response = await multistream.readString(reader, options) - log.trace('select: read "%s" for "%s"', response, protocol) + log.trace('select: %s:%s read "%s" for "%s"', stream.direction, stream.id, response, protocol) if (response === protocol) { rest() // End our writer so others can start writing to stream diff --git a/packages/transport-webrtc/.aegir.js b/packages/transport-webrtc/.aegir.js index 491576df87..5a6f0a14c6 100644 --- a/packages/transport-webrtc/.aegir.js +++ b/packages/transport-webrtc/.aegir.js @@ -8,7 +8,6 @@ export default { before: async () => { const { createLibp2p } = await import('libp2p') const { circuitRelayServer } = await import('libp2p/circuit-relay') - const { identifyService } = await import('libp2p/identify') const { webSockets } = await import('@libp2p/websockets') const { noise } = await import('@chainsafe/libp2p-noise') const { yamux } = await import('@chainsafe/libp2p-yamux') @@ -34,11 +33,20 @@ export default { reservations: { maxReservations: Infinity } - }), - identify: identifyService() + }) }, connectionManager: { - minConnections: 0 + minConnections: 0, + inboundConnectionThreshold: Infinity + }, + connectionGater: { + denyDialMultiaddr: (ma) => { + if (ma.toOptions().family === 6) { + return true + } + + return false + } } }) diff --git a/packages/transport-webrtc/package.json b/packages/transport-webrtc/package.json index 46302127ae..f77df5d619 100644 --- a/packages/transport-webrtc/package.json +++ b/packages/transport-webrtc/package.json @@ -52,7 +52,6 @@ "@multiformats/mafmt": "^12.1.2", "@multiformats/multiaddr": "^12.1.5", "@multiformats/multiaddr-matcher": "^1.0.1", - "abortable-iterator": "^5.0.1", "detect-browser": "^5.3.0", "it-length-prefixed": "^9.0.1", "it-pipe": "^3.0.1", @@ -65,6 +64,7 @@ "node-datachannel": "^0.4.3", "p-defer": "^4.0.0", "p-event": "^6.0.0", + "p-timeout": "^6.1.2", "protons-runtime": "^5.0.0", "uint8arraylist": "^2.4.3", "uint8arrays": "^4.0.6" diff --git a/packages/transport-webrtc/src/index.ts b/packages/transport-webrtc/src/index.ts index 0245aefcc9..8ce992f839 100644 --- a/packages/transport-webrtc/src/index.ts +++ b/packages/transport-webrtc/src/index.ts @@ -3,6 +3,40 @@ import { WebRTCDirectTransport, type WebRTCTransportDirectInit, type WebRTCDirec import type { WebRTCTransportComponents, WebRTCTransportInit } from './private-to-private/transport.js' import type { Transport } from '@libp2p/interface/transport' +export interface DataChannelOptions { + /** + * The maximum message size sendable over the channel + */ + maxMessageSize?: number + + /** + * If the channel's `bufferedAmount` grows over this amount in bytes, wait + * for it to drain before sending more data (default: 16MB) + */ + maxBufferedAmount?: number + + /** + * When `bufferedAmount` is above `maxBufferedAmount`, we pause sending until + * the `bufferedAmountLow` event fires - this controls how long we wait for + * that event in ms (default: 30s) + */ + bufferedAmountLowEventTimeout?: number + + /** + * When closing a stream, we wait for `bufferedAmount` to become 0 before + * closing the underlying RTCDataChannel - this controls how long we wait + * (default: 30s) + */ + drainTimeout?: number + + /** + * When closing a stream we first send a FIN flag to the remote and wait + * for a FIN_ACK reply before closing the underlying RTCDataChannel - this + * controls how long we wait for the acknowledgement (default: 5s) + */ + closeTimeout?: number +} + /** * @param {WebRTCTransportDirectInit} init - WebRTC direct transport configuration * @param init.dataChannel - DataChannel configurations diff --git a/packages/transport-webrtc/src/maconn.ts b/packages/transport-webrtc/src/maconn.ts index c32d88760c..12af978b6a 100644 --- a/packages/transport-webrtc/src/maconn.ts +++ b/packages/transport-webrtc/src/maconn.ts @@ -5,7 +5,7 @@ import type { CounterGroup } from '@libp2p/interface/metrics' import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr' import type { Source, Sink } from 'it-stream-types' -const log = logger('libp2p:webrtc:connection') +const log = logger('libp2p:webrtc:maconn') interface WebRTCMultiaddrConnectionInit { /** @@ -65,8 +65,16 @@ export class WebRTCMultiaddrConnection implements MultiaddrConnection { this.timeline = init.timeline this.peerConnection = init.peerConnection + let initialState = this.peerConnection.connectionState + this.peerConnection.onconnectionstatechange = () => { - if (this.peerConnection.connectionState === 'closed' || this.peerConnection.connectionState === 'disconnected' || this.peerConnection.connectionState === 'failed') { + log.trace('peer connection state change', this.peerConnection.connectionState, 'initial state', initialState) + + if (this.peerConnection.connectionState === 'disconnected') { + // attempt to reconnect + this.peerConnection.restartIce() + } else if (this.peerConnection.connectionState === 'closed') { + // nothing else to do but close the connection this.timeline.close = Date.now() } } diff --git a/packages/transport-webrtc/src/muxer.ts b/packages/transport-webrtc/src/muxer.ts index 991d130230..172e170bc0 100644 --- a/packages/transport-webrtc/src/muxer.ts +++ b/packages/transport-webrtc/src/muxer.ts @@ -1,6 +1,7 @@ +import { logger } from '@libp2p/logger' import { createStream } from './stream.js' -import { nopSink, nopSource } from './util.js' -import type { DataChannelOpts } from './stream.js' +import { drainAndClose, nopSink, nopSource } from './util.js' +import type { DataChannelOptions } from './index.js' import type { Stream } from '@libp2p/interface/connection' import type { CounterGroup } from '@libp2p/interface/metrics' import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface/stream-muxer' @@ -8,6 +9,8 @@ import type { AbortOptions } from '@multiformats/multiaddr' import type { Source, Sink } from 'it-stream-types' import type { Uint8ArrayList } from 'uint8arraylist' +const log = logger('libp2p:webrtc:muxer') + const PROTOCOL = '/webrtc' export interface DataChannelMuxerFactoryInit { @@ -17,19 +20,16 @@ export interface DataChannelMuxerFactoryInit { peerConnection: RTCPeerConnection /** - * Optional metrics for this data channel muxer + * The protocol to use */ - metrics?: CounterGroup + protocol?: string /** - * Data channel options + * Optional metrics for this data channel muxer */ - dataChannelOptions?: Partial + metrics?: CounterGroup - /** - * The protocol to use - */ - protocol?: string + dataChannelOptions?: DataChannelOptions } export class DataChannelMuxerFactory implements StreamMuxerFactory { @@ -41,23 +41,23 @@ export class DataChannelMuxerFactory implements StreamMuxerFactory { private readonly peerConnection: RTCPeerConnection private streamBuffer: Stream[] = [] private readonly metrics?: CounterGroup - private readonly dataChannelOptions?: Partial + private readonly dataChannelOptions?: DataChannelOptions constructor (init: DataChannelMuxerFactoryInit) { this.peerConnection = init.peerConnection this.metrics = init.metrics this.protocol = init.protocol ?? PROTOCOL - this.dataChannelOptions = init.dataChannelOptions + this.dataChannelOptions = init.dataChannelOptions ?? {} // store any datachannels opened before upgrade has been completed this.peerConnection.ondatachannel = ({ channel }) => { const stream = createStream({ channel, direction: 'inbound', - dataChannelOptions: init.dataChannelOptions, onEnd: () => { this.streamBuffer = this.streamBuffer.filter(s => s.id !== stream.id) - } + }, + ...this.dataChannelOptions }) this.streamBuffer.push(stream) } @@ -90,34 +90,15 @@ export class DataChannelMuxer implements StreamMuxer { public protocol: string private readonly peerConnection: RTCPeerConnection - private readonly dataChannelOptions?: DataChannelOpts + private readonly dataChannelOptions: DataChannelOptions private readonly metrics?: CounterGroup - /** - * Gracefully close all tracked streams and stop the muxer - */ - close: (options?: AbortOptions) => Promise = async () => { } - - /** - * Abort all tracked streams and stop the muxer - */ - abort: (err: Error) => void = () => { } - - /** - * The stream source, a no-op as the transport natively supports multiplexing - */ - source: AsyncGenerator = nopSource() - - /** - * The stream destination, a no-op as the transport natively supports multiplexing - */ - sink: Sink, Promise> = nopSink - constructor (readonly init: DataChannelMuxerInit) { this.streams = init.streams this.peerConnection = init.peerConnection this.protocol = init.protocol ?? PROTOCOL this.metrics = init.metrics + this.dataChannelOptions = init.dataChannelOptions ?? {} /** * Fired when a data channel has been added to the connection has been @@ -129,19 +110,19 @@ export class DataChannelMuxer implements StreamMuxer { const stream = createStream({ channel, direction: 'inbound', - dataChannelOptions: this.dataChannelOptions, onEnd: () => { + log.trace('stream %s %s %s onEnd', stream.direction, stream.id, stream.protocol) + drainAndClose(channel, `inbound ${stream.id} ${stream.protocol}`, this.dataChannelOptions.drainTimeout) this.streams = this.streams.filter(s => s.id !== stream.id) this.metrics?.increment({ stream_end: true }) init?.onStreamEnd?.(stream) - } + }, + ...this.dataChannelOptions }) this.streams.push(stream) - if ((init?.onIncomingStream) != null) { - this.metrics?.increment({ incoming_stream: true }) - init.onIncomingStream(stream) - } + this.metrics?.increment({ incoming_stream: true }) + init?.onIncomingStream?.(stream) } const onIncomingStream = init?.onIncomingStream @@ -150,19 +131,53 @@ export class DataChannelMuxer implements StreamMuxer { } } + /** + * Gracefully close all tracked streams and stop the muxer + */ + async close (options?: AbortOptions): Promise { + try { + await Promise.all( + this.streams.map(async stream => stream.close(options)) + ) + } catch (err: any) { + this.abort(err) + } + } + + /** + * Abort all tracked streams and stop the muxer + */ + abort (err: Error): void { + for (const stream of this.streams) { + stream.abort(err) + } + } + + /** + * The stream source, a no-op as the transport natively supports multiplexing + */ + source: AsyncGenerator = nopSource() + + /** + * The stream destination, a no-op as the transport natively supports multiplexing + */ + sink: Sink, Promise> = nopSink + newStream (): Stream { // The spec says the label SHOULD be an empty string: https://github.com/libp2p/specs/blob/master/webrtc/README.md#rtcdatachannel-label const channel = this.peerConnection.createDataChannel('') const stream = createStream({ channel, direction: 'outbound', - dataChannelOptions: this.dataChannelOptions, onEnd: () => { + log.trace('stream %s %s %s onEnd', stream.direction, stream.id, stream.protocol) + drainAndClose(channel, `outbound ${stream.id} ${stream.protocol}`, this.dataChannelOptions.drainTimeout) channel.close() // Stream initiator is responsible for closing the channel this.streams = this.streams.filter(s => s.id !== stream.id) this.metrics?.increment({ stream_end: true }) this.init?.onStreamEnd?.(stream) - } + }, + ...this.dataChannelOptions }) this.streams.push(stream) this.metrics?.increment({ outgoing_stream: true }) diff --git a/packages/transport-webrtc/src/pb/message.proto b/packages/transport-webrtc/src/pb/message.proto index 9301bd802b..7b30a4ac77 100644 --- a/packages/transport-webrtc/src/pb/message.proto +++ b/packages/transport-webrtc/src/pb/message.proto @@ -12,6 +12,12 @@ message Message { // The sender abruptly terminates the sending part of the stream. The // receiver can discard any data that it already received on that stream. RESET = 2; + + // The sender previously received a FIN, when the reciever receives this + // it knows the sender has received it's FIN. It should send a FIN_ACK_ACK + // back to the sender. + // Workaround for https://bugs.chromium.org/p/chromium/issues/detail?id=1484907 + FIN_ACK = 3; } optional Flag flag = 1; diff --git a/packages/transport-webrtc/src/pb/message.ts b/packages/transport-webrtc/src/pb/message.ts index a74ca6dd06..f8abb7a4a9 100644 --- a/packages/transport-webrtc/src/pb/message.ts +++ b/packages/transport-webrtc/src/pb/message.ts @@ -17,13 +17,15 @@ export namespace Message { export enum Flag { FIN = 'FIN', STOP_SENDING = 'STOP_SENDING', - RESET = 'RESET' + RESET = 'RESET', + FIN_ACK = 'FIN_ACK' } enum __FlagValues { FIN = 0, STOP_SENDING = 1, - RESET = 2 + RESET = 2, + FIN_ACK = 3 } export namespace Flag { diff --git a/packages/transport-webrtc/src/private-to-private/handler.ts b/packages/transport-webrtc/src/private-to-private/handler.ts deleted file mode 100644 index 8564fc84d2..0000000000 --- a/packages/transport-webrtc/src/private-to-private/handler.ts +++ /dev/null @@ -1,177 +0,0 @@ -import { CodeError } from '@libp2p/interface/errors' -import { logger } from '@libp2p/logger' -import { abortableDuplex } from 'abortable-iterator' -import { pbStream } from 'it-protobuf-stream' -import pDefer, { type DeferredPromise } from 'p-defer' -import { DataChannelMuxerFactory } from '../muxer.js' -import { RTCPeerConnection, RTCSessionDescription } from '../webrtc/index.js' -import { Message } from './pb/message.js' -import { readCandidatesUntilConnected, resolveOnConnected } from './util.js' -import type { DataChannelOpts } from '../stream.js' -import type { Stream } from '@libp2p/interface/connection' -import type { StreamMuxerFactory } from '@libp2p/interface/stream-muxer' -import type { IncomingStreamData } from '@libp2p/interface-internal/registrar' - -const DEFAULT_TIMEOUT = 30 * 1000 - -const log = logger('libp2p:webrtc:peer') - -export type IncomingStreamOpts = { rtcConfiguration?: RTCConfiguration, dataChannelOptions?: Partial } & IncomingStreamData - -export async function handleIncomingStream ({ rtcConfiguration, dataChannelOptions, stream: rawStream }: IncomingStreamOpts): Promise<{ pc: RTCPeerConnection, muxerFactory: StreamMuxerFactory, remoteAddress: string }> { - const signal = AbortSignal.timeout(DEFAULT_TIMEOUT) - const stream = pbStream(abortableDuplex(rawStream, signal)).pb(Message) - const pc = new RTCPeerConnection(rtcConfiguration) - - try { - const muxerFactory = new DataChannelMuxerFactory({ peerConnection: pc, dataChannelOptions }) - const connectedPromise: DeferredPromise = pDefer() - const answerSentPromise: DeferredPromise = pDefer() - - signal.onabort = () => { - connectedPromise.reject(new CodeError('Timed out while trying to connect', 'ERR_TIMEOUT')) - } - // candidate callbacks - pc.onicecandidate = ({ candidate }) => { - answerSentPromise.promise.then( - async () => { - await stream.write({ - type: Message.Type.ICE_CANDIDATE, - data: (candidate != null) ? JSON.stringify(candidate.toJSON()) : '' - }) - }, - (err) => { - log.error('cannot set candidate since sending answer failed', err) - connectedPromise.reject(err) - } - ) - } - - resolveOnConnected(pc, connectedPromise) - - // read an SDP offer - const pbOffer = await stream.read() - if (pbOffer.type !== Message.Type.SDP_OFFER) { - throw new Error(`expected message type SDP_OFFER, received: ${pbOffer.type ?? 'undefined'} `) - } - const offer = new RTCSessionDescription({ - type: 'offer', - sdp: pbOffer.data - }) - - await pc.setRemoteDescription(offer).catch(err => { - log.error('could not execute setRemoteDescription', err) - throw new Error('Failed to set remoteDescription') - }) - - // create and write an SDP answer - const answer = await pc.createAnswer().catch(err => { - log.error('could not execute createAnswer', err) - answerSentPromise.reject(err) - throw new Error('Failed to create answer') - }) - // write the answer to the remote - await stream.write({ type: Message.Type.SDP_ANSWER, data: answer.sdp }) - - await pc.setLocalDescription(answer).catch(err => { - log.error('could not execute setLocalDescription', err) - answerSentPromise.reject(err) - throw new Error('Failed to set localDescription') - }) - - answerSentPromise.resolve() - - // wait until candidates are connected - await readCandidatesUntilConnected(connectedPromise, pc, stream) - - const remoteAddress = parseRemoteAddress(pc.currentRemoteDescription?.sdp ?? '') - - return { pc, muxerFactory, remoteAddress } - } catch (err) { - pc.close() - throw err - } -} - -export interface ConnectOptions { - stream: Stream - signal: AbortSignal - rtcConfiguration?: RTCConfiguration - dataChannelOptions?: Partial -} - -export async function initiateConnection ({ rtcConfiguration, dataChannelOptions, signal, stream: rawStream }: ConnectOptions): Promise<{ pc: RTCPeerConnection, muxerFactory: StreamMuxerFactory, remoteAddress: string }> { - const stream = pbStream(abortableDuplex(rawStream, signal)).pb(Message) - // setup peer connection - const pc = new RTCPeerConnection(rtcConfiguration) - - try { - const muxerFactory = new DataChannelMuxerFactory({ peerConnection: pc, dataChannelOptions }) - - const connectedPromise: DeferredPromise = pDefer() - resolveOnConnected(pc, connectedPromise) - - // reject the connectedPromise if the signal aborts - signal.onabort = connectedPromise.reject - // we create the channel so that the peerconnection has a component for which - // to collect candidates. The label is not relevant to connection initiation - // but can be useful for debugging - const channel = pc.createDataChannel('init') - // setup callback to write ICE candidates to the remote - // peer - pc.onicecandidate = ({ candidate }) => { - void stream.write({ - type: Message.Type.ICE_CANDIDATE, - data: (candidate != null) ? JSON.stringify(candidate.toJSON()) : '' - }) - .catch(err => { - log.error('error sending ICE candidate', err) - }) - } - - // create an offer - const offerSdp = await pc.createOffer() - // write the offer to the stream - await stream.write({ type: Message.Type.SDP_OFFER, data: offerSdp.sdp }) - // set offer as local description - await pc.setLocalDescription(offerSdp).catch(err => { - log.error('could not execute setLocalDescription', err) - throw new Error('Failed to set localDescription') - }) - - // read answer - const answerMessage = await stream.read() - if (answerMessage.type !== Message.Type.SDP_ANSWER) { - throw new Error('remote should send an SDP answer') - } - - const answerSdp = new RTCSessionDescription({ type: 'answer', sdp: answerMessage.data }) - await pc.setRemoteDescription(answerSdp).catch(err => { - log.error('could not execute setRemoteDescription', err) - throw new Error('Failed to set remoteDescription') - }) - - await readCandidatesUntilConnected(connectedPromise, pc, stream) - channel.close() - - const remoteAddress = parseRemoteAddress(pc.currentRemoteDescription?.sdp ?? '') - - return { pc, muxerFactory, remoteAddress } - } catch (err) { - pc.close() - throw err - } -} - -function parseRemoteAddress (sdp: string): string { - // 'a=candidate:1746876089 1 udp 2113937151 0614fbad-b...ocal 54882 typ host generation 0 network-cost 999' - const candidateLine = sdp.split('\r\n').filter(line => line.startsWith('a=candidate')).pop() - const candidateParts = candidateLine?.split(' ') - - if (candidateLine == null || candidateParts == null || candidateParts.length < 5) { - log('could not parse remote address from', candidateLine) - return '/webrtc' - } - - return `/dnsaddr/${candidateParts[4]}/${candidateParts[2].toLowerCase()}/${candidateParts[5]}/webrtc` -} diff --git a/packages/transport-webrtc/src/private-to-private/initiate-connection.ts b/packages/transport-webrtc/src/private-to-private/initiate-connection.ts new file mode 100644 index 0000000000..7e1e20e49d --- /dev/null +++ b/packages/transport-webrtc/src/private-to-private/initiate-connection.ts @@ -0,0 +1,200 @@ +import { logger } from '@libp2p/logger' +import { pbStream } from 'it-protobuf-stream' +import pDefer, { type DeferredPromise } from 'p-defer' +import { isNode, isElectronMain } from 'wherearewe' +import { RTCPeerConnection, RTCSessionDescription } from '../webrtc/index.js' +import { Message } from './pb/message.js' +import { parseRemoteAddress, readCandidatesUntilConnected, resolveOnConnected } from './util.js' +import type { DataChannelOptions } from '../index.js' +import type { Connection } from '@libp2p/interface/connection' +import type { IncomingStreamData } from '@libp2p/interface-internal/registrar' +import { multiaddr, type Multiaddr } from '@multiformats/multiaddr' +// import { peerIdFromString } from '@libp2p/peer-id' +import { CodeError } from '@libp2p/interface/errors' +import { SIGNALING_PROTO_ID, splitAddr, type WebRTCTransportMetrics } from './transport.js' +import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager' +import type { TransportManager } from '@libp2p/interface-internal/transport-manager' + +const log = logger('libp2p:webrtc:initiate-connection') + +export interface IncomingStreamOpts extends IncomingStreamData { + rtcConfiguration?: RTCConfiguration + dataChannelOptions?: Partial + signal: AbortSignal +} + +export interface ConnectOptions { + peerConnection: RTCPeerConnection + multiaddr: Multiaddr + connectionManager: ConnectionManager + transportManager: TransportManager + dataChannelOptions?: Partial + signal?: AbortSignal + metrics?: WebRTCTransportMetrics +} + +export async function initiateConnection ({ peerConnection, signal, metrics, multiaddr: ma, connectionManager, transportManager }: ConnectOptions): Promise<{ remoteAddress: Multiaddr }> { + const { baseAddr, peerId } = splitAddr(ma) + + metrics?.dialerEvents.increment({ open: true }) + + log.trace('dialing base address: %a', baseAddr) + + const relayPeer = baseAddr.getPeerId() + + if (relayPeer == null) { + throw new CodeError('Relay peer was missing', 'ERR_INVALID_ADDRESS') + } + + //const connections = connectionManager.getConnections(peerIdFromString(relayPeer)) + let connection: Connection + let shouldCloseConnection = false + + //if (connections.length === 0) { + // use the transport manager to open a connection. Initiating a WebRTC + // connection takes place in the context of a dial - if we use the + // connection manager instead we can end up joining our own dial context + connection = await transportManager.dial(baseAddr, { + signal + }) + // this connection is unmanaged by the connection manager so we should + // close it when we are done + shouldCloseConnection = true + //} else { + // connection = connections[0] + //} + + try { + const stream = await connection.newStream(SIGNALING_PROTO_ID, { + signal, + runOnTransientConnection: true + }) + + const messageStream = pbStream(stream).pb(Message) + const connectedPromise: DeferredPromise = pDefer() + const sdpAbortedListener = () => { + connectedPromise.reject(new CodeError('SDP handshake aborted', 'ERR_SDP_HANDSHAKE_ABORTED')) + } + + try { + resolveOnConnected(peerConnection, connectedPromise) + + // reject the connectedPromise if the signal aborts + signal?.addEventListener('abort', sdpAbortedListener) + + // we create the channel so that the RTCPeerConnection has a component for + // which to collect candidates. The label is not relevant to connection + // initiation but can be useful for debugging + const channel = peerConnection.createDataChannel('init') + + // setup callback to write ICE candidates to the remote peer + peerConnection.onicecandidate = ({ candidate }) => { + let data = '' + + if (candidate != null) { + data = JSON.stringify(candidate.toJSON()) + log.trace('initiator send ICE candidate %s', data) + } + + log.trace('initiator sending ICE candidate %s', data) + + void messageStream.write({ + type: Message.Type.ICE_CANDIDATE, + data + }, { + signal + }) + .catch(err => { + log.error('error sending ICE candidate', err) + }) + } + peerConnection.onicecandidateerror = (event) => { + log('initiator ICE candidate error', event) + } + + // create an offer + const offerSdp = await peerConnection.createOffer() + + log.trace('initiator send SDP offer %s', offerSdp.sdp) + + // write the offer to the stream + await messageStream.write({ type: Message.Type.SDP_OFFER, data: offerSdp.sdp }, { + signal + }) + + // set offer as local description + await peerConnection.setLocalDescription(offerSdp).catch(err => { + log.error('could not execute setLocalDescription', err) + throw new Error('Failed to set localDescription') + }) + + // read answer + const answerMessage = await messageStream.read({ + signal + }) + + if (answerMessage.type !== Message.Type.SDP_ANSWER) { + throw new Error('remote should send an SDP answer') + } + + log.trace('initiator receive SDP answer %s', answerMessage.data) + + const answerSdp = new RTCSessionDescription({ type: 'answer', sdp: answerMessage.data }) + await peerConnection.setRemoteDescription(answerSdp).catch(err => { + log.error('could not execute setRemoteDescription', err) + throw new Error('Failed to set remoteDescription') + }) + + log.trace('initiator read candidates until connected') + + await readCandidatesUntilConnected(connectedPromise, peerConnection, messageStream, { + direction: 'initiator', + signal + }) + + // TODO: workaround for https://github.com/murat-dogan/node-datachannel/issues/196 + if (!isNode && !isElectronMain) { + log.trace('initiator connected, closing init channel') + + // close init channel as we are connected now + channel.close() + } + + const remoteAddress = parseRemoteAddress(peerConnection.currentRemoteDescription?.sdp ?? '') + + log.trace('initiator connected to remote address %s', remoteAddress) + + // close the signalling stream + await messageStream.unwrap().unwrap().close({ + signal + }) + + log.trace('initiator closed signalling stream') + + return { + remoteAddress: multiaddr(remoteAddress).encapsulate(`/p2p/${peerId.toString()}`) + } + } catch (err: any) { + peerConnection.close() + stream.abort(err) + throw err + } finally { + // remove event listeners + signal?.removeEventListener('abort', sdpAbortedListener) + peerConnection.onicecandidate = null + peerConnection.onicecandidateerror = null + } + } finally { + // if we had to open a connection to perform the SDP handshake + // close it because it's not tracked by the connection manager + if (shouldCloseConnection) { + try { + await connection.close({ + signal + }) + } catch (err: any) { + connection.abort(err) + } + } + } +} diff --git a/packages/transport-webrtc/src/private-to-private/listener.ts b/packages/transport-webrtc/src/private-to-private/listener.ts index 1dccac6e25..53a3d299c6 100644 --- a/packages/transport-webrtc/src/private-to-private/listener.ts +++ b/packages/transport-webrtc/src/private-to-private/listener.ts @@ -5,20 +5,27 @@ import type { ListenerEvents, Listener } from '@libp2p/interface/transport' import type { TransportManager } from '@libp2p/interface-internal/transport-manager' import type { Multiaddr } from '@multiformats/multiaddr' -export interface ListenerOptions { +export interface WebRTCPeerListenerComponents { peerId: PeerId transportManager: TransportManager } +export interface WebRTCPeerListenerInit { + shutdownController: AbortController +} + export class WebRTCPeerListener extends EventEmitter implements Listener { private readonly peerId: PeerId private readonly transportManager: TransportManager + private readonly shutdownController: AbortController - constructor (opts: ListenerOptions) { + constructor (components: WebRTCPeerListenerComponents, init: WebRTCPeerListenerInit) { super() - this.peerId = opts.peerId - this.transportManager = opts.transportManager + this.peerId = components.peerId + this.transportManager = components.transportManager + + this.shutdownController = init.shutdownController } async listen (): Promise { @@ -39,6 +46,7 @@ export class WebRTCPeerListener extends EventEmitter implements } async close (): Promise { + this.shutdownController.abort() this.safeDispatchEvent('close', {}) } } diff --git a/packages/transport-webrtc/src/private-to-private/signaling-stream-handler.ts b/packages/transport-webrtc/src/private-to-private/signaling-stream-handler.ts new file mode 100644 index 0000000000..106fbd292f --- /dev/null +++ b/packages/transport-webrtc/src/private-to-private/signaling-stream-handler.ts @@ -0,0 +1,133 @@ +import { CodeError } from '@libp2p/interface/errors' +import { logger } from '@libp2p/logger' +import { pbStream } from 'it-protobuf-stream' +import pDefer, { type DeferredPromise } from 'p-defer' +import { RTCPeerConnection, RTCSessionDescription } from '../webrtc/index.js' +import { Message } from './pb/message.js' +import { parseRemoteAddress, readCandidatesUntilConnected, resolveOnConnected } from './util.js' +import type { IncomingStreamData } from '@libp2p/interface-internal/registrar' + +const log = logger('libp2p:webrtc:signaling-stream-handler') + +export interface IncomingStreamOpts extends IncomingStreamData { + peerConnection: RTCPeerConnection + signal: AbortSignal +} + +export async function handleIncomingStream ({ peerConnection, stream, signal, connection }: IncomingStreamOpts): Promise<{ remoteAddress: string }> { + log.trace('new inbound signaling stream') + + const messageStream = pbStream(stream).pb(Message) + + try { + const connectedPromise: DeferredPromise = pDefer() + const answerSentPromise: DeferredPromise = pDefer() + + signal.onabort = () => { + connectedPromise.reject(new CodeError('Timed out while trying to connect', 'ERR_TIMEOUT')) + } + + peerConnection.onnegotiationneeded = () => { + console.info('recipient renegotiation needed!') + } + + // candidate callbacks + peerConnection.onicecandidate = ({ candidate }) => { + answerSentPromise.promise.then( + async () => { + let data = '' + + if (candidate != null) { + data = JSON.stringify(candidate.toJSON()) + log.trace('recipient send ICE candidate %s', data) + } + + await messageStream.write({ + type: Message.Type.ICE_CANDIDATE, + data + }, { + signal + }) + }, + (err) => { + log.error('cannot set candidate since sending answer failed', err) + connectedPromise.reject(err) + } + ) + } + + resolveOnConnected(peerConnection, connectedPromise) + + // read an SDP offer + const pbOffer = await messageStream.read({ + signal + }) + + if (pbOffer.type !== Message.Type.SDP_OFFER) { + throw new Error(`expected message type SDP_OFFER, received: ${pbOffer.type ?? 'undefined'} `) + } + + log.trace('recipient receive SDP offer %s', pbOffer.data) + + const offer = new RTCSessionDescription({ + type: 'offer', + sdp: pbOffer.data + }) + + await peerConnection.setRemoteDescription(offer).catch(err => { + log.error('could not execute setRemoteDescription', err) + throw new Error('Failed to set remoteDescription') + }) + + // create and write an SDP answer + const answer = await peerConnection.createAnswer().catch(err => { + log.error('could not execute createAnswer', err) + answerSentPromise.reject(err) + throw new Error('Failed to create answer') + }) + + log.trace('recipient send SDP answer %s', answer.sdp) + + // write the answer to the remote + await messageStream.write({ type: Message.Type.SDP_ANSWER, data: answer.sdp }, { + signal + }) + + await peerConnection.setLocalDescription(answer).catch(err => { + log.error('could not execute setLocalDescription', err) + answerSentPromise.reject(err) + throw new Error('Failed to set localDescription') + }) + + answerSentPromise.resolve() + + log.trace('recipient read candidates until connected') + + // wait until candidates are connected + await readCandidatesUntilConnected(connectedPromise, peerConnection, messageStream, { + direction: 'recipient', + signal + }) + + log.trace('recipient connected, closing signaling stream') + + await messageStream.unwrap().unwrap().close({ + signal + }) + } catch (err: any) { + if (peerConnection.connectionState !== 'connected') { + log.error('error while handling signaling stream from peer %a', connection.remoteAddr, err) + + peerConnection.close() + throw err + } else { + log('error while handling signaling stream from peer %a, ignoring as the RTCPeerConnection is already connected', connection.remoteAddr, err) + } + } + + const remoteAddress = parseRemoteAddress(peerConnection.currentRemoteDescription?.sdp ?? '') + + log.trace('recipient connected to remote address %s', remoteAddress) + + return { remoteAddress } +} diff --git a/packages/transport-webrtc/src/private-to-private/transport.ts b/packages/transport-webrtc/src/private-to-private/transport.ts index 9ab8de1f1e..ca16d771eb 100644 --- a/packages/transport-webrtc/src/private-to-private/transport.ts +++ b/packages/transport-webrtc/src/private-to-private/transport.ts @@ -2,30 +2,41 @@ import { CodeError } from '@libp2p/interface/errors' import { type CreateListenerOptions, type DialOptions, symbol, type Transport, type Listener, type Upgrader } from '@libp2p/interface/transport' import { logger } from '@libp2p/logger' import { peerIdFromString } from '@libp2p/peer-id' -import { multiaddr, type Multiaddr } from '@multiformats/multiaddr' import { WebRTC } from '@multiformats/multiaddr-matcher' +import { multiaddr, type Multiaddr } from '@multiformats/multiaddr' import { codes } from '../error.js' import { WebRTCMultiaddrConnection } from '../maconn.js' import { cleanup } from '../webrtc/index.js' -import { initiateConnection, handleIncomingStream } from './handler.js' +import { initiateConnection } from './initiate-connection.js' +import { handleIncomingStream } from './signaling-stream-handler.js' import { WebRTCPeerListener } from './listener.js' -import type { DataChannelOpts } from '../stream.js' +import { RTCPeerConnection } from '../webrtc/index.js' +import type { DataChannelOptions } from '../index.js' import type { Connection } from '@libp2p/interface/connection' import type { PeerId } from '@libp2p/interface/peer-id' import type { CounterGroup, Metrics } from '@libp2p/interface/src/metrics/index.js' import type { Startable } from '@libp2p/interface/startable' import type { IncomingStreamData, Registrar } from '@libp2p/interface-internal/registrar' import type { TransportManager } from '@libp2p/interface-internal/transport-manager' +import type { ConnectionManager } from '@libp2p/interface-internal/src/connection-manager/index.js' +import { DataChannelMuxerFactory } from '../muxer.js' const log = logger('libp2p:webrtc:peer') const WEBRTC_TRANSPORT = '/webrtc' const CIRCUIT_RELAY_TRANSPORT = '/p2p-circuit' -const SIGNALING_PROTO_ID = '/webrtc-signaling/0.0.1' +export const SIGNALING_PROTO_ID = '/webrtc-signaling/0.0.1' +const INBOUND_CONNECTION_TIMEOUT = 30 * 1000 export interface WebRTCTransportInit { rtcConfiguration?: RTCConfiguration - dataChannel?: Partial + dataChannel?: DataChannelOptions + + /** + * Inbound connections must complete the upgrade within this many ms + * (default: 30s) + */ + inboundConnectionTimeout?: number } export interface WebRTCTransportComponents { @@ -33,6 +44,7 @@ export interface WebRTCTransportComponents { registrar: Registrar upgrader: Upgrader transportManager: TransportManager + connectionManager: ConnectionManager metrics?: Metrics } @@ -44,11 +56,14 @@ export interface WebRTCTransportMetrics { export class WebRTCTransport implements Transport, Startable { private _started = false private readonly metrics?: WebRTCTransportMetrics + private readonly shutdownController: AbortController constructor ( private readonly components: WebRTCTransportComponents, private readonly init: WebRTCTransportInit = {} ) { + this.shutdownController = new AbortController() + if (components.metrics != null) { this.metrics = { dialerEvents: components.metrics.registerCounterGroup('libp2p_webrtc_dialer_events_total', { @@ -83,7 +98,9 @@ export class WebRTCTransport implements Transport, Startable { } createListener (options: CreateListenerOptions): Listener { - return new WebRTCPeerListener(this.components) + return new WebRTCPeerListener(this.components, { + shutdownController: this.shutdownController + }) } readonly [Symbol.toStringTag] = '@libp2p/webrtc' @@ -102,84 +119,124 @@ export class WebRTCTransport implements Transport, Startable { * /p2p//p2p-circuit/webrtc/p2p/ */ async dial (ma: Multiaddr, options: DialOptions): Promise { - log.trace('dialing address: ', ma) - const { baseAddr, peerId } = splitAddr(ma) + log.trace('dialing address: %a', ma) - if (options.signal == null) { - const controller = new AbortController() - options.signal = controller.signal - } + const peerConnection = new RTCPeerConnection(this.init.rtcConfiguration) + const muxerFactory = new DataChannelMuxerFactory({ + peerConnection, + dataChannelOptions: this.init.dataChannel + }) - this.metrics?.dialerEvents.increment({ open: true }) - const connection = await this.components.transportManager.dial(baseAddr, options) - const signalingStream = await connection.newStream(SIGNALING_PROTO_ID, { - ...options, - runOnTransientConnection: true + const { remoteAddress } = await initiateConnection({ + peerConnection, + multiaddr: ma, + dataChannelOptions: this.init.dataChannel, + signal: options.signal, + connectionManager: this.components.connectionManager, + transportManager: this.components.transportManager }) - try { - const { pc, muxerFactory, remoteAddress } = await initiateConnection({ - stream: signalingStream, - rtcConfiguration: this.init.rtcConfiguration, + const webRTCConn = new WebRTCMultiaddrConnection({ + peerConnection, + timeline: { open: Date.now() }, + remoteAddr: remoteAddress, + metrics: this.metrics?.dialerEvents + }) + + peerConnection.onnegotiationneeded = () => { + console.info('------> initiator renegotiating!') + + this.metrics?.dialerEvents.increment({ renegotiate: true }) + + let signal = options.signal + + if (signal != null && signal.aborted) { + signal = undefined + } + + initiateConnection({ + peerConnection, + multiaddr: ma, dataChannelOptions: this.init.dataChannel, - signal: options.signal + signal: options.signal, + connectionManager: this.components.connectionManager, + transportManager: this.components.transportManager + }) + .then(({ remoteAddress }) => { + webRTCConn.remoteAddr = multiaddr(remoteAddress) }) - - const result = await options.upgrader.upgradeOutbound( - new WebRTCMultiaddrConnection({ - peerConnection: pc, - timeline: { open: Date.now() }, - remoteAddr: multiaddr(remoteAddress).encapsulate(`/p2p/${peerId.toString()}`), - metrics: this.metrics?.dialerEvents - }), - { - skipProtection: true, - skipEncryption: true, - muxerFactory - } - ) - - // close the stream if SDP has been exchanged successfully - await signalingStream.close() - return result - } catch (err: any) { - this.metrics?.dialerEvents.increment({ error: true }) - // reset the stream in case of any error - signalingStream.abort(err) - throw err - } finally { - // Close the signaling connection - await connection.close() } + + const result = await options.upgrader.upgradeOutbound(webRTCConn, { + skipProtection: true, + skipEncryption: true, + muxerFactory + }) + + // close the connection on shut down + this._closeOnShutdown(peerConnection, webRTCConn) + + return result } async _onProtocol ({ connection, stream }: IncomingStreamData): Promise { + const signal = AbortSignal.timeout(this.init.inboundConnectionTimeout ?? INBOUND_CONNECTION_TIMEOUT) + const peerConnection = new RTCPeerConnection(this.init.rtcConfiguration) + const muxerFactory = new DataChannelMuxerFactory({ peerConnection, dataChannelOptions: this.init.dataChannel }) + + peerConnection.onnegotiationneeded = () => { + console.info('------> recipient needs renegotiating!') + } + try { - const { pc, muxerFactory, remoteAddress } = await handleIncomingStream({ - rtcConfiguration: this.init.rtcConfiguration, + const { remoteAddress } = await handleIncomingStream({ + peerConnection, connection, stream, - dataChannelOptions: this.init.dataChannel + signal }) - await this.components.upgrader.upgradeInbound(new WebRTCMultiaddrConnection({ - peerConnection: pc, + const webRTCConn = new WebRTCMultiaddrConnection({ + peerConnection, timeline: { open: (new Date()).getTime() }, remoteAddr: multiaddr(remoteAddress).encapsulate(`/p2p/${connection.remotePeer.toString()}`), metrics: this.metrics?.listenerEvents - }), { + }) + + // close the connection on shut down + this._closeOnShutdown(peerConnection, webRTCConn) + + await this.components.upgrader.upgradeInbound(webRTCConn, { skipEncryption: true, skipProtection: true, muxerFactory }) + + // close the stream if SDP messages have been exchanged successfully + await stream.close({ + signal + }) } catch (err: any) { stream.abort(err) throw err - } finally { - // Close the signaling connection - await connection.close() } } + + private _closeOnShutdown (pc: RTCPeerConnection, webRTCConn: WebRTCMultiaddrConnection): void { + // close the connection on shut down + const shutDownListener = (): void => { + webRTCConn.close() + .catch(err => { + log.error('could not close WebRTCMultiaddrConnection', err) + }) + } + + this.shutdownController.signal.addEventListener('abort', shutDownListener) + + pc.addEventListener('close', () => { + this.shutdownController.signal.removeEventListener('abort', shutDownListener) + }) + } } export function splitAddr (ma: Multiaddr): { baseAddr: Multiaddr, peerId: PeerId } { diff --git a/packages/transport-webrtc/src/private-to-private/util.ts b/packages/transport-webrtc/src/private-to-private/util.ts index 6d2b97898d..c9174333c0 100644 --- a/packages/transport-webrtc/src/private-to-private/util.ts +++ b/packages/transport-webrtc/src/private-to-private/util.ts @@ -3,47 +3,54 @@ import { isFirefox } from '../util.js' import { RTCIceCandidate } from '../webrtc/index.js' import { Message } from './pb/message.js' import type { DeferredPromise } from 'p-defer' - -interface MessageStream { - read: () => Promise - write: (d: Message) => void | Promise -} +import type { AbortOptions, MessageStream } from 'it-protobuf-stream' const log = logger('libp2p:webrtc:peer:util') -export const readCandidatesUntilConnected = async (connectedPromise: DeferredPromise, pc: RTCPeerConnection, stream: MessageStream): Promise => { +export interface ReadCandidatesOptions extends AbortOptions { + direction: string +} + +export const readCandidatesUntilConnected = async (connectedPromise: DeferredPromise, pc: RTCPeerConnection, stream: MessageStream, options: ReadCandidatesOptions): Promise => { while (true) { - const readResult = await Promise.race([connectedPromise.promise, stream.read()]) - // check if readResult is a message - if (readResult instanceof Object) { - const message = readResult - if (message.type !== Message.Type.ICE_CANDIDATE) { - throw new Error('expected only ice candidates') - } - // end of candidates has been signalled - if (message.data == null || message.data === '') { - log.trace('end-of-candidates received') - break - } - - log.trace('received new ICE candidate: %s', message.data) - try { - await pc.addIceCandidate(new RTCIceCandidate(JSON.parse(message.data))) - } catch (err) { - log.error('bad candidate received: ', err) - throw new Error('bad candidate received') - } - } else { + const readResult = await Promise.race([ + connectedPromise.promise, + stream.read(options) + ]) + + if (readResult == null) { // connected promise resolved break } + + const message = readResult + + if (message.type !== Message.Type.ICE_CANDIDATE) { + throw new Error('expected only ice candidates') + } + + // end of candidates has been signalled + if (message.data == null || message.data === '') { + log.trace('%s received end-of-candidates', options.direction) + break + } + + log.trace('%s received new ICE candidate: %s', options.direction, message.data) + + try { + await pc.addIceCandidate(new RTCIceCandidate(JSON.parse(message.data))) + } catch (err) { + log.error('%s bad candidate received:', options.direction, err) + throw new Error('bad candidate received') + } } + await connectedPromise.promise } export function resolveOnConnected (pc: RTCPeerConnection, promise: DeferredPromise): void { pc[isFirefox ? 'oniceconnectionstatechange' : 'onconnectionstatechange'] = (_) => { - log.trace('receiver peerConnectionState state: ', pc.connectionState) + log.trace('receiver peerConnectionState state change: %s', pc.connectionState) switch (isFirefox ? pc.iceConnectionState : pc.connectionState) { case 'connected': promise.resolve() @@ -58,3 +65,16 @@ export function resolveOnConnected (pc: RTCPeerConnection, promise: DeferredProm } } } + +export function parseRemoteAddress (sdp: string): string { + // 'a=candidate:1746876089 1 udp 2113937151 0614fbad-b...ocal 54882 typ host generation 0 network-cost 999' + const candidateLine = sdp.split('\r\n').filter(line => line.startsWith('a=candidate')).pop() + const candidateParts = candidateLine?.split(' ') + + if (candidateLine == null || candidateParts == null || candidateParts.length < 5) { + log('could not parse remote address from', candidateLine) + return '/webrtc' + } + + return `/dnsaddr/${candidateParts[4]}/${candidateParts[2].toLowerCase()}/${candidateParts[5]}/webrtc` +} diff --git a/packages/transport-webrtc/src/private-to-public/transport.ts b/packages/transport-webrtc/src/private-to-public/transport.ts index 23bb5d1994..a82f84a2b5 100644 --- a/packages/transport-webrtc/src/private-to-public/transport.ts +++ b/packages/transport-webrtc/src/private-to-public/transport.ts @@ -16,7 +16,7 @@ import { RTCPeerConnection } from '../webrtc/index.js' import * as sdp from './sdp.js' import { genUfrag } from './util.js' import type { WebRTCDialOptions } from './options.js' -import type { DataChannelOpts } from '../stream.js' +import type { DataChannelOptions } from '../index.js' import type { Connection } from '@libp2p/interface/connection' import type { CounterGroup, Metrics } from '@libp2p/interface/metrics' import type { PeerId } from '@libp2p/interface/peer-id' @@ -56,7 +56,7 @@ export interface WebRTCMetrics { } export interface WebRTCTransportDirectInit { - dataChannel?: Partial + dataChannel?: DataChannelOptions } export class WebRTCDirectTransport implements Transport { @@ -81,7 +81,7 @@ export class WebRTCDirectTransport implements Transport { */ async dial (ma: Multiaddr, options: WebRTCDialOptions): Promise { const rawConn = await this._connect(ma, options) - log(`dialing address - ${ma.toString()}`) + log('dialing address: %a', ma) return rawConn } @@ -194,7 +194,7 @@ export class WebRTCDirectTransport implements Transport { // we pass in undefined for these parameters. const noise = Noise({ prologueBytes: fingerprintsPrologue })() - const wrappedChannel = createStream({ channel: handshakeDataChannel, direction: 'inbound', dataChannelOptions: this.init.dataChannel }) + const wrappedChannel = createStream({ channel: handshakeDataChannel, direction: 'inbound', ...(this.init.dataChannel ?? {}) }) const wrappedDuplex = { ...wrappedChannel, sink: wrappedChannel.sink.bind(wrappedChannel), diff --git a/packages/transport-webrtc/src/stream.ts b/packages/transport-webrtc/src/stream.ts index 1e9bb0c3c7..85b9d3aaf5 100644 --- a/packages/transport-webrtc/src/stream.ts +++ b/packages/transport-webrtc/src/stream.ts @@ -3,18 +3,18 @@ import { AbstractStream, type AbstractStreamInit } from '@libp2p/interface/strea import { logger } from '@libp2p/logger' import * as lengthPrefixed from 'it-length-prefixed' import { type Pushable, pushable } from 'it-pushable' +import pDefer from 'p-defer' import { pEvent, TimeoutError } from 'p-event' +import pTimeout from 'p-timeout' import { Uint8ArrayList } from 'uint8arraylist' import { Message } from './pb/message.js' +import { abortPromise } from './util.js' +import type { DataChannelOptions } from './index.js' +import type { AbortOptions } from '@libp2p/interface' import type { Direction } from '@libp2p/interface/connection' +import type { DeferredPromise } from 'p-defer' -export interface DataChannelOpts { - maxMessageSize: number - maxBufferedAmount: number - bufferedAmountLowEventTimeout: number -} - -export interface WebRTCStreamInit extends AbstractStreamInit { +export interface WebRTCStreamInit extends AbstractStreamInit, DataChannelOptions { /** * The network channel used for bidirectional peer-to-peer transfers of * arbitrary data @@ -22,38 +22,46 @@ export interface WebRTCStreamInit extends AbstractStreamInit { * {@link https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel} */ channel: RTCDataChannel - - dataChannelOptions?: Partial - - maxDataSize: number } -// Max message size that can be sent to the DataChannel -export const MAX_MESSAGE_SIZE = 16 * 1024 - -// How much can be buffered to the DataChannel at once +/** + * How much can be buffered to the DataChannel at once + */ export const MAX_BUFFERED_AMOUNT = 16 * 1024 * 1024 -// How long time we wait for the 'bufferedamountlow' event to be emitted +/** + * How long time we wait for the 'bufferedamountlow' event to be emitted + */ export const BUFFERED_AMOUNT_LOW_TIMEOUT = 30 * 1000 -// protobuf field definition overhead +/** + * protobuf field definition overhead + */ export const PROTOBUF_OVERHEAD = 5 -// Length of varint, in bytes. +/** + * Length of varint, in bytes + */ export const VARINT_LENGTH = 2 +/** + * Max message size that can be sent to the DataChannel + */ +export const MAX_MESSAGE_SIZE = 16 * 1024 + +/** + * When closing streams we send a FIN then wait for the remote to + * reply with a FIN_ACK. If that does not happen within this timeout + * we close the stream anyway. + */ +export const FIN_ACK_TIMEOUT = 5000 + export class WebRTCStream extends AbstractStream { /** * The data channel used to send and receive data */ private readonly channel: RTCDataChannel - /** - * Data channel options - */ - private readonly dataChannelOptions: DataChannelOpts - /** * push data from the underlying datachannel to the length prefix decoder * and then the protobuf decoder. @@ -62,10 +70,21 @@ export class WebRTCStream extends AbstractStream { private messageQueue?: Uint8ArrayList + private readonly maxBufferedAmount: number + + private readonly bufferedAmountLowEventTimeout: number + /** * The maximum size of a message in bytes */ - private readonly maxDataSize: number + private readonly maxMessageSize: number + + /** + * When this promise is resolved, the remote has sent us a FIN flag + */ + private readonly receivedFinAck: DeferredPromise + + private readonly finAckTimeout: number constructor (init: WebRTCStreamInit) { super(init) @@ -74,12 +93,11 @@ export class WebRTCStream extends AbstractStream { this.channel.binaryType = 'arraybuffer' this.incomingData = pushable() this.messageQueue = new Uint8ArrayList() - this.dataChannelOptions = { - bufferedAmountLowEventTimeout: init.dataChannelOptions?.bufferedAmountLowEventTimeout ?? BUFFERED_AMOUNT_LOW_TIMEOUT, - maxBufferedAmount: init.dataChannelOptions?.maxBufferedAmount ?? MAX_BUFFERED_AMOUNT, - maxMessageSize: init.dataChannelOptions?.maxMessageSize ?? init.maxDataSize - } - this.maxDataSize = init.maxDataSize + this.bufferedAmountLowEventTimeout = init.bufferedAmountLowEventTimeout ?? BUFFERED_AMOUNT_LOW_TIMEOUT + this.maxBufferedAmount = init.maxBufferedAmount ?? MAX_BUFFERED_AMOUNT + this.maxMessageSize = (init.maxMessageSize ?? MAX_MESSAGE_SIZE) - PROTOBUF_OVERHEAD - VARINT_LENGTH + this.receivedFinAck = pDefer() + this.finAckTimeout = init.closeTimeout ?? FIN_ACK_TIMEOUT // set up initial state switch (this.channel.readyState) { @@ -105,14 +123,18 @@ export class WebRTCStream extends AbstractStream { this.channel.onopen = (_evt) => { this.timeline.open = new Date().getTime() - if (this.messageQueue != null) { + if (this.messageQueue != null && this.messageQueue.byteLength > 0) { + this.log.trace('dataChannel opened, sending queued messages', this.messageQueue.byteLength, this.channel.readyState) + // send any queued messages this._sendMessage(this.messageQueue) .catch(err => { + this.log.error('error sending queued messages', err) this.abort(err) }) - this.messageQueue = undefined } + + this.messageQueue = undefined } this.channel.onclose = (_evt) => { @@ -126,8 +148,6 @@ export class WebRTCStream extends AbstractStream { this.abort(err) } - const self = this - this.channel.onmessage = async (event: MessageEvent) => { const { data } = event @@ -138,6 +158,8 @@ export class WebRTCStream extends AbstractStream { this.incomingData.push(new Uint8Array(data, 0, data.byteLength)) } + const self = this + // pipe framed protobuf messages through a length prefixed decoder, and // surface data from the `Message.message` field through a source. Promise.resolve().then(async () => { @@ -159,9 +181,9 @@ export class WebRTCStream extends AbstractStream { } async _sendMessage (data: Uint8ArrayList, checkBuffer: boolean = true): Promise { - if (checkBuffer && this.channel.bufferedAmount > this.dataChannelOptions.maxBufferedAmount) { + if (checkBuffer && this.channel.bufferedAmount > this.maxBufferedAmount) { try { - await pEvent(this.channel, 'bufferedamountlow', { timeout: this.dataChannelOptions.bufferedAmountLowEventTimeout }) + await pEvent(this.channel, 'bufferedamountlow', { timeout: this.bufferedAmountLowEventTimeout }) } catch (err: any) { if (err instanceof TimeoutError) { throw new Error('Timed out waiting for DataChannel buffer to clear') @@ -194,10 +216,12 @@ export class WebRTCStream extends AbstractStream { } async sendData (data: Uint8ArrayList): Promise { + // sending messages is an async operation so use a copy of the list as it + // may be changed beneath us data = data.sublist() while (data.byteLength > 0) { - const toSend = Math.min(data.byteLength, this.maxDataSize) + const toSend = Math.min(data.byteLength, this.maxMessageSize) const buf = data.subarray(0, toSend) const msgbuf = Message.encode({ message: buf }) const sendbuf = lengthPrefixed.encode.single(msgbuf) @@ -211,8 +235,36 @@ export class WebRTCStream extends AbstractStream { await this._sendFlag(Message.Flag.RESET) } - async sendCloseWrite (): Promise { + async sendCloseWrite (options: AbortOptions): Promise { + if (this.channel.readyState === 'closed') { + return + } + + this.log.trace('send FIN') await this._sendFlag(Message.Flag.FIN) + + this.log.trace('wait for FIN_ACK') + const abortP = abortPromise({ signal: options.signal, message: 'FIN_ACK not received' }) + + try { + await Promise.race([ + this.receivedFinAck.promise, + abortP.promise, + pTimeout(this.receivedFinAck.promise, { + milliseconds: this.finAckTimeout + }) + ]) + } catch (err) { + this.log.error('error while waiting for remote FIN_ACK', err) + + if (options.signal?.aborted === true) { + throw err + } + } finally { + abortP.cleanup() + } + + this.log.trace('waited for FIN_ACK') } async sendCloseRead (): Promise { @@ -230,6 +282,16 @@ export class WebRTCStream extends AbstractStream { // We should expect no more data from the remote, stop reading this.incomingData.end() this.remoteCloseWrite() + + if (this.channel.readyState === 'open') { + this.log.trace('received FIN, sending FIN_ACK') + this._sendFlag(Message.Flag.FIN_ACK) + .catch(err => { + this.log.error('error sending FIN_ACK', err) + }) + } else { + this.log.trace('received FIN, not sending FIN_ACK because we are "%s" and not "open"', this.channel.readyState) + } } if (message.flag === Message.Flag.RESET) { @@ -241,6 +303,11 @@ export class WebRTCStream extends AbstractStream { // The remote has stopped reading this.remoteCloseRead() } + + if (message.flag === Message.Flag.FIN_ACK) { + this.log.trace('received FIN_ACK') + this.receivedFinAck.resolve() + } } return message.message @@ -255,7 +322,7 @@ export class WebRTCStream extends AbstractStream { } } -export interface WebRTCStreamOptions { +export interface WebRTCStreamOptions extends DataChannelOptions { /** * The network channel used for bidirectional peer-to-peer transfers of * arbitrary data @@ -269,23 +336,18 @@ export interface WebRTCStreamOptions { */ direction: Direction - dataChannelOptions?: Partial - - maxMsgSize?: number - + /** + * A callback invoked when the channel ends + */ onEnd?: (err?: Error | undefined) => void } export function createStream (options: WebRTCStreamOptions): WebRTCStream { - const { channel, direction, onEnd, dataChannelOptions } = options + const { channel, direction } = options return new WebRTCStream({ id: direction === 'inbound' ? (`i${channel.id}`) : `r${channel.id}`, - direction, - maxDataSize: (dataChannelOptions?.maxMessageSize ?? MAX_MESSAGE_SIZE) - PROTOBUF_OVERHEAD - VARINT_LENGTH, - dataChannelOptions, - onEnd, - channel, - log: logger(`libp2p:webrtc:stream:${direction}:${channel.id}`) + log: logger(`libp2p:webrtc:stream:${direction}:${channel.id}`), + ...options }) } diff --git a/packages/transport-webrtc/src/util.ts b/packages/transport-webrtc/src/util.ts index e26e64dd5f..b108b4eb05 100644 --- a/packages/transport-webrtc/src/util.ts +++ b/packages/transport-webrtc/src/util.ts @@ -1,4 +1,9 @@ +import { logger } from '@libp2p/logger' import { detect } from 'detect-browser' +import pDefer, { type DeferredPromise } from 'p-defer' +import pTimeout from 'p-timeout' + +const log = logger('libp2p:webrtc:utils') const browser = detect() export const isFirefox = ((browser != null) && browser.name === 'firefox') @@ -6,3 +11,81 @@ export const isFirefox = ((browser != null) && browser.name === 'firefox') export const nopSource = async function * nop (): AsyncGenerator {} export const nopSink = async (_: any): Promise => {} + +export const DATA_CHANNEL_DRAIN_TIMEOUT = 30 * 1000 + +export function drainAndClose (channel: RTCDataChannel, direction: string, drainTimeout: number = DATA_CHANNEL_DRAIN_TIMEOUT): void { + if (channel.readyState !== 'open') { + return + } + + void Promise.resolve() + .then(async () => { + // wait for bufferedAmount to become zero + if (channel.bufferedAmount > 0) { + log('%s drain channel with %d buffered bytes', direction, channel.bufferedAmount) + const deferred = pDefer() + let drained = false + + channel.bufferedAmountLowThreshold = 0 + + const closeListener = (): void => { + if (!drained) { + log('%s drain channel closed before drain', direction) + deferred.resolve() + } + } + + channel.addEventListener('close', closeListener, { + once: true + }) + + channel.addEventListener('bufferedamountlow', () => { + drained = true + channel.removeEventListener('close', closeListener) + deferred.resolve() + }) + + await pTimeout(deferred.promise, { + milliseconds: drainTimeout + }) + } + }) + .then(async () => { + // only close if the channel is still open + if (channel.readyState === 'open') { + channel.close() + } + }) + .catch(err => { + log.error('error closing outbound stream', err) + }) +} + +export interface AbortPromiseOptions { + signal?: AbortSignal + message?: string +} + +export function abortPromise (opts: AbortPromiseOptions = {}): { promise: Promise, cleanup: () => void } { + const deferred: DeferredPromise = pDefer() + + const listener = (): void => { + deferred.reject(new Error(opts.message ?? 'aborted')) + } + + if (opts.signal?.aborted === true) { + listener() + } + + opts.signal?.addEventListener('abort', listener, { + once: true + }) + + return { + promise: deferred.promise, + cleanup: () => { + opts.signal?.removeEventListener('abort', listener) + } + } +} diff --git a/packages/transport-webrtc/test/basics.spec.ts b/packages/transport-webrtc/test/basics.spec.ts index 03d89a1c8b..0068391544 100644 --- a/packages/transport-webrtc/test/basics.spec.ts +++ b/packages/transport-webrtc/test/basics.spec.ts @@ -12,10 +12,11 @@ import { pipe } from 'it-pipe' import toBuffer from 'it-to-buffer' import { createLibp2p } from 'libp2p' import { circuitRelayTransport } from 'libp2p/circuit-relay' -import { identifyService } from 'libp2p/identify' +import pDefer from 'p-defer' import { webRTC } from '../src/index.js' import type { Libp2p } from '@libp2p/interface' import type { Connection } from '@libp2p/interface/connection' +import type { StreamHandler } from '@libp2p/interface/stream-handler' async function createNode (): Promise { return createLibp2p({ @@ -38,9 +39,6 @@ async function createNode (): Promise { streamMuxers: [ yamux() ], - services: { - identify: identifyService() - }, connectionGater: { denyDialMultiaddr: () => false }, @@ -55,6 +53,7 @@ describe('basics', () => { let localNode: Libp2p let remoteNode: Libp2p + let streamHandler: StreamHandler async function connectNodes (): Promise { const remoteAddr = remoteNode.getMultiaddrs() @@ -64,25 +63,39 @@ describe('basics', () => { throw new Error('Remote peer could not listen on relay') } - await remoteNode.handle(echo, ({ stream }) => { - void pipe( - stream, - stream - ) + await remoteNode.handle(echo, (info) => { + streamHandler(info) }, { runOnTransientConnection: true }) + console.info('---- dial remote', remoteAddr.toString()) + const connection = await localNode.dial(remoteAddr) + // ensure the connection has been established both ways + + //await delay(1000) + + console.info('---- close relay connections') + // disconnect both from relay await localNode.hangUp(multiaddr(process.env.RELAY_MULTIADDR)) await remoteNode.hangUp(multiaddr(process.env.RELAY_MULTIADDR)) + console.info('---- relay connections closed') + return connection } beforeEach(async () => { + streamHandler = ({ stream }) => { + void pipe( + stream, + stream + ) + } + localNode = await createNode() remoteNode = await createNode() }) @@ -101,9 +114,7 @@ describe('basics', () => { const connection = await connectNodes() // open a stream on the echo protocol - const stream = await connection.newStream(echo, { - runOnTransientConnection: true - }) + const stream = await connection.newStream(echo) // send and receive some data const input = new Array(5).fill(0).map(() => new Uint8Array(10)) @@ -138,4 +149,69 @@ describe('basics', () => { // asset that we got the right data expect(output).to.equalBytes(toBuffer(input)) }) + + it('can close local stream for reading but send a large file', async () => { + let output: Uint8Array = new Uint8Array(0) + const streamClosed = pDefer() + + streamHandler = ({ stream }) => { + void Promise.resolve().then(async () => { + output = await toBuffer(map(stream.source, (buf) => buf.subarray())) + await stream.close() + streamClosed.resolve() + }) + } + + const connection = await connectNodes() + + // open a stream on the echo protocol + const stream = await connection.newStream(echo, { + runOnTransientConnection: true + }) + + // close for reading + await stream.closeRead() + + // send some data + const input = new Array(5).fill(0).map(() => new Uint8Array(1024 * 1024)) + + await stream.sink(input) + await stream.close() + + // wait for remote to receive all data + await streamClosed.promise + + // asset that we got the right data + expect(output).to.equalBytes(toBuffer(input)) + }) + + it('can close local stream for writing but receive a large file', async () => { + const input = new Array(5).fill(0).map(() => new Uint8Array(1024 * 1024)) + + streamHandler = ({ stream }) => { + void Promise.resolve().then(async () => { + // send some data + await stream.sink(input) + await stream.close() + }) + } + + const connection = await connectNodes() + + // open a stream on the echo protocol + const stream = await connection.newStream(echo, { + runOnTransientConnection: true + }) + + // close for reading + await stream.closeWrite() + + // receive some data + const output = await toBuffer(map(stream.source, (buf) => buf.subarray())) + + await stream.close() + + // asset that we got the right data + expect(output).to.equalBytes(toBuffer(input)) + }) }) diff --git a/packages/transport-webrtc/test/listener.spec.ts b/packages/transport-webrtc/test/listener.spec.ts index 34feedb859..036e727d7c 100644 --- a/packages/transport-webrtc/test/listener.spec.ts +++ b/packages/transport-webrtc/test/listener.spec.ts @@ -16,6 +16,8 @@ describe('webrtc private-to-private listener', () => { const listener = new WebRTCPeerListener({ peerId, transportManager + }, { + shutdownController: new AbortController() }) const otherListener = stubInterface({ diff --git a/packages/transport-webrtc/test/peer.browser.spec.ts b/packages/transport-webrtc/test/peer.browser.spec.ts index 623a8a8542..923be6c539 100644 --- a/packages/transport-webrtc/test/peer.browser.spec.ts +++ b/packages/transport-webrtc/test/peer.browser.spec.ts @@ -1,56 +1,120 @@ -import { mockConnection, mockMultiaddrConnection, mockRegistrar, mockStream, mockUpgrader } from '@libp2p/interface-compliance-tests/mocks' +import { mockRegistrar, mockUpgrader } from '@libp2p/interface-compliance-tests/mocks' import { createEd25519PeerId } from '@libp2p/peer-id-factory' -import { multiaddr } from '@multiformats/multiaddr' +import { multiaddr, type Multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' import { detect } from 'detect-browser' -import { pair } from 'it-pair' import { duplexPair } from 'it-pair/duplex' import { pbStream } from 'it-protobuf-stream' import Sinon from 'sinon' -import { initiateConnection, handleIncomingStream } from '../src/private-to-private/handler.js' +import { initiateConnection } from '../src/private-to-private/initiate-connection.js' +import { handleIncomingStream } from '../src/private-to-private/signaling-stream-handler.js' import { Message } from '../src/private-to-private/pb/message.js' -import { WebRTCTransport, splitAddr } from '../src/private-to-private/transport.js' +import { SIGNALING_PROTO_ID, WebRTCTransport, splitAddr } from '../src/private-to-private/transport.js' import { RTCPeerConnection, RTCSessionDescription } from '../src/webrtc/index.js' +import { stubInterface, type StubbedInstance } from 'sinon-ts' +import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager' +import type { TransportManager } from '@libp2p/interface-internal/transport-manager' +import { streamPair } from '@libp2p/interface-compliance-tests/mocks' +import type { Connection, Stream } from '@libp2p/interface/connection' const browser = detect() +interface PrivateToPrivateComponents { + initiator: { + multiaddr: Multiaddr + peerConnection: RTCPeerConnection + connectionManager: StubbedInstance + transportManager: StubbedInstance + connection: StubbedInstance + stream: Stream + }, + recipient: { + peerConnection: RTCPeerConnection + connection: StubbedInstance + abortController: AbortController + signal: AbortSignal + stream: Stream + } +} + +async function getComponents (): Promise { + const relayPeerId = await createEd25519PeerId() + const receiverPeerId = await createEd25519PeerId() + const receiverMultiaddr = multiaddr(`/ip4/123.123.123.123/tcp/123/p2p/${relayPeerId}/p2p-circuit/webrtc/p2p/${receiverPeerId}`) + const [initiatorToReceiver, receiverToInitiator] = duplexPair() + const [initiatorStream, receiverStream] = streamPair({ + duplex: initiatorToReceiver, + init: { + protocol: SIGNALING_PROTO_ID + } + }, { + duplex: receiverToInitiator, + init: { + protocol: SIGNALING_PROTO_ID + } + }) + + const recipientAbortController = new AbortController() + + return { + initiator: { + multiaddr: receiverMultiaddr, + peerConnection: new RTCPeerConnection(), + connectionManager: stubInterface(), + transportManager: stubInterface(), + connection: stubInterface(), + stream: initiatorStream + }, + recipient: { + peerConnection: new RTCPeerConnection(), + connection: stubInterface(), + abortController: recipientAbortController, + signal: recipientAbortController.signal, + stream: receiverStream + } + } +} + describe('webrtc basic', () => { const isFirefox = ((browser != null) && browser.name === 'firefox') it('should connect', async () => { - const [receiver, initiator] = duplexPair() - const dstPeerId = await createEd25519PeerId() - const connection = mockConnection( - mockMultiaddrConnection(pair(), dstPeerId) - ) - const controller = new AbortController() - const initiatorPeerConnectionPromise = initiateConnection({ stream: mockStream(initiator), signal: controller.signal }) - const receiverPeerConnectionPromise = handleIncomingStream({ stream: mockStream(receiver), connection }) - await expect(initiatorPeerConnectionPromise).to.be.fulfilled() - await expect(receiverPeerConnectionPromise).to.be.fulfilled() - const [{ pc: pc0 }, { pc: pc1 }] = await Promise.all([initiatorPeerConnectionPromise, receiverPeerConnectionPromise]) + const { initiator, recipient } = await getComponents() + + // no existing connection + initiator.connectionManager.getConnections.returns([]) + + // transport manager dials recipient + initiator.transportManager.dial.resolves(initiator.connection) + + // signalling stream opens successfully + initiator.connection.newStream.withArgs(SIGNALING_PROTO_ID).resolves(initiator.stream) + + await expect( + Promise.all([ + initiateConnection(initiator), + handleIncomingStream(recipient) + ]) + ).to.eventually.be.fulfilled() + if (isFirefox) { - expect(pc0.iceConnectionState).eq('connected') - expect(pc1.iceConnectionState).eq('connected') + expect(initiator.peerConnection.iceConnectionState).eq('connected') + expect(recipient.peerConnection.iceConnectionState).eq('connected') return } - expect(pc0.connectionState).eq('connected') - expect(pc1.connectionState).eq('connected') + expect(initiator.peerConnection.connectionState).eq('connected') + expect(recipient.peerConnection.connectionState).eq('connected') - pc0.close() - pc1.close() + initiator.peerConnection.close() + recipient.peerConnection.close() }) }) describe('webrtc receiver', () => { it('should fail receiving on invalid sdp offer', async () => { - const [receiver, initiator] = duplexPair() - const dstPeerId = await createEd25519PeerId() - const connection = mockConnection( - mockMultiaddrConnection(pair(), dstPeerId) - ) - const receiverPeerConnectionPromise = handleIncomingStream({ stream: mockStream(receiver), connection }) - const stream = pbStream(initiator).pb(Message) + const { initiator, recipient } = await getComponents() + const receiverPeerConnectionPromise = handleIncomingStream(recipient) + const stream = pbStream(initiator.stream).pb(Message) await stream.write({ type: Message.Type.SDP_OFFER, data: 'bad' }) await expect(receiverPeerConnectionPromise).to.be.rejectedWith(/Failed to set remoteDescription/) @@ -59,10 +123,18 @@ describe('webrtc receiver', () => { describe('webrtc dialer', () => { it('should fail receiving on invalid sdp answer', async () => { - const [receiver, initiator] = duplexPair() - const controller = new AbortController() - const initiatorPeerConnectionPromise = initiateConnection({ signal: controller.signal, stream: mockStream(initiator) }) - const stream = pbStream(receiver).pb(Message) + const { initiator, recipient } = await getComponents() + + // existing connection already exists + initiator.connectionManager.getConnections.returns([ + initiator.connection + ]) + + // signalling stream opens successfully + initiator.connection.newStream.withArgs(SIGNALING_PROTO_ID).resolves(initiator.stream) + + const initiatorPeerConnectionPromise = initiateConnection(initiator) + const stream = pbStream(recipient.stream).pb(Message) const offerMessage = await stream.read() expect(offerMessage.type).to.eq(Message.Type.SDP_OFFER) @@ -72,10 +144,19 @@ describe('webrtc dialer', () => { }) it('should fail on receiving a candidate before an answer', async () => { - const [receiver, initiator] = duplexPair() - const controller = new AbortController() - const initiatorPeerConnectionPromise = initiateConnection({ signal: controller.signal, stream: mockStream(initiator) }) - const stream = pbStream(receiver).pb(Message) + const { initiator, recipient } = await getComponents() + + // existing connection already exists + initiator.connectionManager.getConnections.returns([ + initiator.connection + ]) + + // signalling stream opens successfully + initiator.connection.newStream.withArgs(SIGNALING_PROTO_ID).resolves(initiator.stream) + + const initiatorPeerConnectionPromise = initiateConnection(initiator) + + const stream = pbStream(recipient.stream).pb(Message) const pc = new RTCPeerConnection() pc.onicecandidate = ({ candidate }) => { @@ -99,7 +180,8 @@ describe('webrtc dialer', () => { describe('webrtc filter', () => { it('can filter multiaddrs to dial', async () => { const transport = new WebRTCTransport({ - transportManager: Sinon.stub() as any, + transportManager: stubInterface(), + connectionManager: stubInterface(), peerId: Sinon.stub() as any, registrar: mockRegistrar(), upgrader: mockUpgrader({}) diff --git a/packages/transport-webrtc/test/stream.browser.spec.ts b/packages/transport-webrtc/test/stream.browser.spec.ts index 457f95317d..dcbc3394e1 100644 --- a/packages/transport-webrtc/test/stream.browser.spec.ts +++ b/packages/transport-webrtc/test/stream.browser.spec.ts @@ -11,7 +11,7 @@ const TEST_MESSAGE = 'test_message' function setup (): { peerConnection: RTCPeerConnection, dataChannel: RTCDataChannel, stream: WebRTCStream } { const peerConnection = new RTCPeerConnection() const dataChannel = peerConnection.createDataChannel('whatever', { negotiated: true, id: 91 }) - const stream = createStream({ channel: dataChannel, direction: 'outbound' }) + const stream = createStream({ channel: dataChannel, direction: 'outbound', closeTimeout: 1 }) return { peerConnection, dataChannel, stream } } @@ -28,9 +28,10 @@ function generatePbByFlag (flag?: Message.Flag): Uint8Array { describe('Stream Stats', () => { let stream: WebRTCStream let peerConnection: RTCPeerConnection + let dataChannel: RTCDataChannel beforeEach(async () => { - ({ stream, peerConnection } = setup()) + ({ stream, peerConnection, dataChannel } = setup()) }) afterEach(() => { @@ -45,7 +46,14 @@ describe('Stream Stats', () => { it('close marks it closed', async () => { expect(stream.timeline.close).to.not.exist() - await stream.close() + + const msgbuf = Message.encode({ flag: Message.Flag.FIN_ACK }) + const prefixedBuf = lengthPrefixed.encode.single(msgbuf) + + const p = stream.close() + dataChannel.dispatchEvent(new MessageEvent('message', { data: prefixedBuf })) + await p + expect(stream.timeline.close).to.be.a('number') }) diff --git a/packages/transport-webrtc/test/stream.spec.ts b/packages/transport-webrtc/test/stream.spec.ts index 500cbb02de..9a2796e3ff 100644 --- a/packages/transport-webrtc/test/stream.spec.ts +++ b/packages/transport-webrtc/test/stream.spec.ts @@ -33,7 +33,8 @@ describe('Max message size', () => { sent.append(bytes) } }), - direction: 'outbound' + direction: 'outbound', + closeTimeout: 1 }) p.push(data) @@ -80,9 +81,7 @@ describe('Max message size', () => { const timeout = 100 let closed = false const webrtcStream = createStream({ - dataChannelOptions: { - bufferedAmountLowEventTimeout: timeout - }, + bufferedAmountLowEventTimeout: timeout, channel: mockDataChannel({ send: () => { throw new Error('Expected to not send') diff --git a/packages/transport-websockets/src/socket-to-conn.ts b/packages/transport-websockets/src/socket-to-conn.ts index 206ad7459b..91a44a5a44 100644 --- a/packages/transport-websockets/src/socket-to-conn.ts +++ b/packages/transport-websockets/src/socket-to-conn.ts @@ -56,6 +56,7 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, try { await stream.close() } catch (err: any) { + log.error('error closing WebSocket gracefully', err) this.abort(err) } finally { options.signal.removeEventListener('abort', listener)