diff --git a/.aegir.js b/.aegir.js index daf5fab..19f3e50 100644 --- a/.aegir.js +++ b/.aegir.js @@ -1,8 +1,59 @@ +import { createLibp2p } from 'libp2p' +import { circuitRelayServer } from 'libp2p/circuit-relay' +import { identifyService } from 'libp2p/identify' +import { webSockets } from '@libp2p/websockets' +import { noise } from '@chainsafe/libp2p-noise' +import { yamux } from '@chainsafe/libp2p-yamux' + export default { build: { config: { platform: 'node' }, bundlesizeMax: '117KB' + }, + test: { + before: async () => { + // start a relay node for use in the tests + const relay = await createLibp2p({ + addresses: { + listen: [ + '/ip4/127.0.0.1/tcp/0/ws' + ] + }, + transports: [ + webSockets() + ], + connectionEncryption: [ + noise() + ], + streamMuxers: [ + yamux() + ], + services: { + relay: circuitRelayServer({ + reservations: { + maxReservations: Infinity + } + }), + identify: identifyService() + }, + connectionManager: { + minConnections: 0 + } + }) + + const multiaddrs = relay.getMultiaddrs().map(ma => ma.toString()) + + return { + relay, + env: { + RELAY_MULTIADDR: multiaddrs[0] + } + } + }, + after: async (_, before) => { + await before.relay.stop() + } } } diff --git a/package.json b/package.json index 8227f0e..fa737b5 100644 --- a/package.json +++ b/package.json @@ -140,7 +140,7 @@ "@libp2p/interface-metrics": "^4.0.8", "@libp2p/interface-peer-id": "^2.0.2", "@libp2p/interface-registrar": "^2.0.12", - "@libp2p/interface-stream-muxer": "^4.0.1", + "@libp2p/interface-stream-muxer": "^4.1.2", "@libp2p/interface-transport": "^4.0.3", "@libp2p/interfaces": "^3.3.2", "@libp2p/logger": "^2.0.7", @@ -150,7 +150,6 @@ "abortable-iterator": "^5.0.1", "detect-browser": "^5.3.0", "it-length-prefixed": "^9.0.1", - "it-merge": "^3.0.0", "it-pb-stream": "^4.0.1", "it-pipe": "^3.0.1", "it-pushable": "^3.1.3", @@ -164,11 +163,19 @@ "uint8arrays": "^4.0.3" }, "devDependencies": { + "@chainsafe/libp2p-yamux": "^4.0.1", + "@libp2p/interface-libp2p": "^3.1.0", "@libp2p/interface-mocks": "^12.0.1", "@libp2p/peer-id-factory": "^2.0.3", + "@libp2p/websockets": "^6.0.1", "@types/sinon": "^10.0.14", - "aegir": "^39.0.6", + "aegir": "^39.0.7", + "delay": "^5.0.0", + "it-all": "^3.0.2", + "it-length": "^3.0.2", + "it-map": "^3.0.3", "it-pair": "^2.0.6", + "libp2p": "^0.45.0", "protons": "^7.0.2", "sinon": "^15.0.4", "sinon-ts": "^1.0.0" diff --git a/src/muxer.ts b/src/muxer.ts index 2487be0..95f925c 100644 --- a/src/muxer.ts +++ b/src/muxer.ts @@ -1,11 +1,14 @@ -import { type DataChannelOpts, WebRTCStream } from './stream.js' +import { createStream } from './stream.js' import { nopSink, nopSource } from './util.js' +import type { DataChannelOpts } from './stream.js' import type { Stream } from '@libp2p/interface-connection' import type { CounterGroup } from '@libp2p/interface-metrics' import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface-stream-muxer' import type { Source, Sink } from 'it-stream-types' import type { Uint8ArrayList } from 'uint8arraylist' +const PROTOCOL = '/webrtc' + export interface DataChannelMuxerFactoryInit { /** * WebRTC Peer Connection @@ -21,37 +24,60 @@ export interface DataChannelMuxerFactoryInit { * Data channel options */ dataChannelOptions?: Partial + + /** + * The protocol to use + */ + protocol?: string } export class DataChannelMuxerFactory implements StreamMuxerFactory { + public readonly protocol: string + /** * WebRTC Peer Connection */ - private streamBuffer: WebRTCStream[] = [] + private readonly peerConnection: RTCPeerConnection + private streamBuffer: Stream[] = [] + private readonly metrics?: CounterGroup + private readonly dataChannelOptions?: Partial + + constructor (init: DataChannelMuxerFactoryInit) { + this.peerConnection = init.peerConnection + this.metrics = init.metrics + this.protocol = init.protocol ?? PROTOCOL + this.dataChannelOptions = init.dataChannelOptions - constructor (readonly init: DataChannelMuxerFactoryInit, readonly protocol = '/webrtc') { // store any datachannels opened before upgrade has been completed - this.init.peerConnection.ondatachannel = ({ channel }) => { - const stream = new WebRTCStream({ + this.peerConnection.ondatachannel = ({ channel }) => { + const stream = createStream({ channel, - stat: { - direction: 'inbound', - timeline: { open: 0 } - }, + direction: 'inbound', dataChannelOptions: init.dataChannelOptions, - closeCb: (_stream) => { - this.streamBuffer = this.streamBuffer.filter(s => !_stream.eq(s)) + onEnd: () => { + this.streamBuffer = this.streamBuffer.filter(s => s.id !== stream.id) } }) this.streamBuffer.push(stream) } } - createStreamMuxer (init?: StreamMuxerInit | undefined): StreamMuxer { - return new DataChannelMuxer(this.init, this.streamBuffer, this.protocol, init) + createStreamMuxer (init?: StreamMuxerInit): StreamMuxer { + return new DataChannelMuxer({ + ...init, + peerConnection: this.peerConnection, + dataChannelOptions: this.dataChannelOptions, + metrics: this.metrics, + streams: this.streamBuffer, + protocol: this.protocol + }) } } +export interface DataChannelMuxerInit extends DataChannelMuxerFactoryInit, StreamMuxerInit { + streams: Stream[] +} + /** * A libp2p data channel stream muxer */ @@ -59,12 +85,12 @@ export class DataChannelMuxer implements StreamMuxer { /** * Array of streams in the data channel */ - streams: Stream[] = [] + public streams: Stream[] + public protocol: string - /** - * Initialized stream muxer - */ - init?: StreamMuxerInit + private readonly peerConnection: RTCPeerConnection + private readonly dataChannelOptions?: DataChannelOpts + private readonly metrics?: CounterGroup /** * Close or abort all tracked streams and stop the muxer @@ -81,11 +107,11 @@ export class DataChannelMuxer implements StreamMuxer { */ sink: Sink, Promise> = nopSink - constructor (readonly dataChannelMuxer: DataChannelMuxerFactoryInit, streams: Stream[], readonly protocol: string = '/webrtc', init?: StreamMuxerInit) { - /** - * Initialized stream muxer - */ - this.init = init + constructor (readonly init: DataChannelMuxerInit) { + this.streams = init.streams + this.peerConnection = init.peerConnection + this.protocol = init.protocol ?? PROTOCOL + this.metrics = init.metrics /** * Fired when a data channel has been added to the connection has been @@ -93,33 +119,25 @@ export class DataChannelMuxer implements StreamMuxer { * * {@link https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/datachannel_event} */ - this.dataChannelMuxer.peerConnection.ondatachannel = ({ channel }) => { - const stream = new WebRTCStream({ + this.peerConnection.ondatachannel = ({ channel }) => { + const stream = createStream({ channel, - stat: { - direction: 'inbound', - timeline: { - open: 0 - } - }, - dataChannelOptions: dataChannelMuxer.dataChannelOptions, - closeCb: this.wrapStreamEnd(init?.onIncomingStream) + direction: 'inbound', + dataChannelOptions: this.dataChannelOptions, + onEnd: () => { + this.streams = this.streams.filter(s => s.id !== stream.id) + this.metrics?.increment({ stream_end: true }) + init?.onStreamEnd?.(stream) + } }) this.streams.push(stream) if ((init?.onIncomingStream) != null) { - this.dataChannelMuxer.metrics?.increment({ incoming_stream: true }) + this.metrics?.increment({ incoming_stream: true }) init.onIncomingStream(stream) } } - // wrap open streams with the onStreamEnd callback - this.streams = streams - .filter(stream => stream.stat.timeline.close == null) - .map(stream => { - (stream as WebRTCStream).closeCb = this.wrapStreamEnd(init?.onStreamEnd) - return stream - }) const onIncomingStream = init?.onIncomingStream if (onIncomingStream != null) { this.streams.forEach(s => { onIncomingStream(s) }) @@ -128,35 +146,20 @@ export class DataChannelMuxer implements StreamMuxer { newStream (): Stream { // The spec says the label SHOULD be an empty string: https://github.com/libp2p/specs/blob/master/webrtc/README.md#rtcdatachannel-label - const channel = this.dataChannelMuxer.peerConnection.createDataChannel('') - const closeCb = (stream: Stream): void => { - this.dataChannelMuxer.metrics?.increment({ stream_end: true }) - this.init?.onStreamEnd?.(stream) - } - const stream = new WebRTCStream({ + const channel = this.peerConnection.createDataChannel('') + const stream = createStream({ channel, - stat: { - direction: 'outbound', - timeline: { - open: 0 - } - }, - dataChannelOptions: this.dataChannelMuxer.dataChannelOptions, - closeCb: this.wrapStreamEnd(closeCb) + direction: 'outbound', + dataChannelOptions: this.dataChannelOptions, + onEnd: () => { + this.streams = this.streams.filter(s => s.id !== stream.id) + this.metrics?.increment({ stream_end: true }) + this.init?.onStreamEnd?.(stream) + } }) this.streams.push(stream) - this.dataChannelMuxer.metrics?.increment({ outgoing_stream: true }) + this.metrics?.increment({ outgoing_stream: true }) return stream } - - private wrapStreamEnd (onStreamEnd?: (s: Stream) => void): (stream: Stream) => void { - const self = this - return (_stream) => { - self.streams = self.streams.filter(s => !(_stream instanceof WebRTCStream && (_stream).eq(s))) - if (onStreamEnd != null) { - onStreamEnd(_stream) - } - } - } } diff --git a/src/private-to-public/transport.ts b/src/private-to-public/transport.ts index 8499024..c23b715 100644 --- a/src/private-to-public/transport.ts +++ b/src/private-to-public/transport.ts @@ -9,11 +9,12 @@ import { fromString as uint8arrayFromString } from 'uint8arrays/from-string' import { dataChannelError, inappropriateMultiaddr, unimplemented, invalidArgument } from '../error.js' import { WebRTCMultiaddrConnection } from '../maconn.js' import { DataChannelMuxerFactory } from '../muxer.js' -import { type DataChannelOpts, WebRTCStream } from '../stream.js' +import { createStream } from '../stream.js' import { isFirefox } from '../util.js' import * as sdp from './sdp.js' import { genUfrag } from './util.js' import type { WebRTCDialOptions } from './options.js' +import type { DataChannelOpts } from '../stream.js' import type { Connection } from '@libp2p/interface-connection' import type { CounterGroup, Metrics } from '@libp2p/interface-metrics' import type { PeerId } from '@libp2p/interface-peer-id' @@ -190,7 +191,7 @@ export class WebRTCDirectTransport implements Transport { // we pass in undefined for these parameters. const noise = Noise({ prologueBytes: fingerprintsPrologue })() - const wrappedChannel = new WebRTCStream({ channel: handshakeDataChannel, stat: { direction: 'inbound', timeline: { open: 1 } }, dataChannelOptions: this.init.dataChannel }) + const wrappedChannel = createStream({ channel: handshakeDataChannel, direction: 'inbound', dataChannelOptions: this.init.dataChannel }) const wrappedDuplex = { ...wrappedChannel, sink: wrappedChannel.sink.bind(wrappedChannel), diff --git a/src/stream.ts b/src/stream.ts index 340c832..fe5482b 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,37 +1,22 @@ +import { AbstractStream, type AbstractStreamInit } from '@libp2p/interface-stream-muxer/stream' +import { CodeError } from '@libp2p/interfaces/errors' import { logger } from '@libp2p/logger' import * as lengthPrefixed from 'it-length-prefixed' -import merge from 'it-merge' -import { pipe } from 'it-pipe' -import { pushable } from 'it-pushable' -import defer, { type DeferredPromise } from 'p-defer' -import { pEvent } from 'p-event' +import { type Pushable, pushable } from 'it-pushable' +import { pEvent, TimeoutError } from 'p-event' import { Uint8ArrayList } from 'uint8arraylist' import { Message } from './pb/message.js' -import type { Stream, StreamStat, Direction } from '@libp2p/interface-connection' -import type { Source } from 'it-stream-types' +import type { Direction, Stream } from '@libp2p/interface-connection' const log = logger('libp2p:webrtc:stream') -/** - * Constructs a default StreamStat - */ -export function defaultStat (dir: Direction): StreamStat { - return { - direction: dir, - timeline: { - open: 0, - close: undefined - } - } -} - export interface DataChannelOpts { maxMessageSize: number maxBufferedAmount: number bufferedAmountLowEventTimeout: number } -interface StreamInitOpts { +export interface WebRTCStreamInit extends AbstractStreamInit { /** * The network channel used for bidirectional peer-to-peer transfers of * arbitrary data @@ -40,129 +25,9 @@ interface StreamInitOpts { */ channel: RTCDataChannel - /** - * User defined stream metadata - */ - metadata?: Record - - /** - * Stats about this stream - */ - stat: StreamStat - - /** - * Callback to invoke when the stream is closed. - */ - closeCb?: (stream: WebRTCStream) => void - - /** - * Data channel options - */ dataChannelOptions?: Partial } -/* - * State transitions for a stream - */ -interface StreamStateInput { - /** - * Outbound conections are opened by the local node, inbound streams are - * opened by the remote - */ - direction: 'inbound' | 'outbound' - - /** - * Message flag from the protobufs - * - * 0 = FIN - * 1 = STOP_SENDING - * 2 = RESET - */ - flag: Message.Flag -} - -export enum StreamStates { - OPEN, - READ_CLOSED, - WRITE_CLOSED, - CLOSED, -} - -// Checked by the Typescript compiler. If this fails it's because the switch -// statement is not exhaustive. -function unreachableBranch (x: never): never { - throw new Error('Case not handled in switch') -} - -class StreamState { - state: StreamStates = StreamStates.OPEN - - isWriteClosed (): boolean { - return (this.state === StreamStates.CLOSED || this.state === StreamStates.WRITE_CLOSED) - } - - transition ({ direction, flag }: StreamStateInput): [StreamStates, StreamStates] { - const prev = this.state - - // return early if the stream is closed - if (this.state === StreamStates.CLOSED) { - return [prev, StreamStates.CLOSED] - } - - if (direction === 'inbound') { - switch (flag) { - case Message.Flag.FIN: - if (this.state === StreamStates.OPEN) { - this.state = StreamStates.READ_CLOSED - } else if (this.state === StreamStates.WRITE_CLOSED) { - this.state = StreamStates.CLOSED - } - break - - case Message.Flag.STOP_SENDING: - if (this.state === StreamStates.OPEN) { - this.state = StreamStates.WRITE_CLOSED - } else if (this.state === StreamStates.READ_CLOSED) { - this.state = StreamStates.CLOSED - } - break - - case Message.Flag.RESET: - this.state = StreamStates.CLOSED - break - default: - unreachableBranch(flag) - } - } else { - switch (flag) { - case Message.Flag.FIN: - if (this.state === StreamStates.OPEN) { - this.state = StreamStates.WRITE_CLOSED - } else if (this.state === StreamStates.READ_CLOSED) { - this.state = StreamStates.CLOSED - } - break - - case Message.Flag.STOP_SENDING: - if (this.state === StreamStates.OPEN) { - this.state = StreamStates.READ_CLOSED - } else if (this.state === StreamStates.WRITE_CLOSED) { - this.state = StreamStates.CLOSED - } - break - - case Message.Flag.RESET: - this.state = StreamStates.CLOSED - break - - default: - unreachableBranch(flag) - } - } - return [prev, this.state] - } -} - // Max message size that can be sent to the DataChannel const MAX_MESSAGE_SIZE = 16 * 1024 @@ -172,109 +37,73 @@ const MAX_BUFFERED_AMOUNT = 16 * 1024 * 1024 // How long time we wait for the 'bufferedamountlow' event to be emitted const BUFFERED_AMOUNT_LOW_TIMEOUT = 30 * 1000 -export class WebRTCStream implements Stream { - /** - * Unique identifier for a stream - */ - id: string - - /** - * Stats about this stream - */ - stat: StreamStat - - /** - * User defined stream metadata - */ - metadata: Record +// protobuf field definition overhead +const PROTOBUF_OVERHEAD = 3 +class WebRTCStream extends AbstractStream { /** * The data channel used to send and receive data */ private readonly channel: RTCDataChannel - /** - * The current state of the stream - */ - streamState = new StreamState() - /** * Data channel options */ - dataChannelOptions: DataChannelOpts - - /** - * Read unwrapped protobuf data from the underlying datachannel. - * _src is exposed to the user via the `source` getter to . - */ - private readonly _src: AsyncGenerator + private readonly dataChannelOptions: DataChannelOpts /** * push data from the underlying datachannel to the length prefix decoder * and then the protobuf decoder. */ - private readonly _innersrc = pushable() + private readonly incomingData: Pushable - /** - * Deferred promise that resolves when the underlying datachannel is in the - * open state. - */ - opened: DeferredPromise = defer() - - /** - * sinkCreated is set to true once the sinkFunction is invoked - */ - _sinkCalled: boolean = false + private messageQueue?: Uint8ArrayList - /** - * Triggers a generator which can be used to close the sink. - */ - closeWritePromise: DeferredPromise = defer() - - /** - * Callback to invoke when the stream is closed. - */ - closeCb?: (stream: WebRTCStream) => void + constructor (init: WebRTCStreamInit) { + super(init) - constructor (opts: StreamInitOpts) { - this.channel = opts.channel + this.channel = init.channel this.channel.binaryType = 'arraybuffer' - this.id = this.channel.label - this.stat = opts.stat + this.incomingData = pushable() + this.messageQueue = new Uint8ArrayList() this.dataChannelOptions = { - bufferedAmountLowEventTimeout: opts.dataChannelOptions?.bufferedAmountLowEventTimeout ?? BUFFERED_AMOUNT_LOW_TIMEOUT, - maxBufferedAmount: opts.dataChannelOptions?.maxBufferedAmount ?? MAX_BUFFERED_AMOUNT, - maxMessageSize: opts.dataChannelOptions?.maxMessageSize ?? MAX_MESSAGE_SIZE + bufferedAmountLowEventTimeout: init.dataChannelOptions?.bufferedAmountLowEventTimeout ?? BUFFERED_AMOUNT_LOW_TIMEOUT, + maxBufferedAmount: init.dataChannelOptions?.maxBufferedAmount ?? MAX_BUFFERED_AMOUNT, + maxMessageSize: init.dataChannelOptions?.maxMessageSize ?? MAX_MESSAGE_SIZE } - this.closeCb = opts.closeCb + // set up initial state switch (this.channel.readyState) { case 'open': - this.opened.resolve() break case 'closed': case 'closing': - this.streamState.state = StreamStates.CLOSED if (this.stat.timeline.close === undefined || this.stat.timeline.close === 0) { - this.stat.timeline.close = new Date().getTime() + this.stat.timeline.close = Date.now() } - this.opened.resolve() break case 'connecting': // noop break default: - unreachableBranch(this.channel.readyState) + log.error('unknown datachannel state %s', this.channel.readyState) + throw new CodeError('Unknown datachannel state', 'ERR_INVALID_STATE') } - this.metadata = opts.metadata ?? {} - // handle RTCDataChannel events this.channel.onopen = (_evt) => { this.stat.timeline.open = new Date().getTime() - this.opened.resolve() + + if (this.messageQueue != null) { + // send any queued messages + this._sendMessage(this.messageQueue) + .catch(err => { + this.abort(err) + }) + this.messageQueue = undefined + } } this.channel.onclose = (_evt) => { @@ -288,222 +117,157 @@ export class WebRTCStream implements Stream { const self = this - // reader pipe - this.channel.onmessage = async ({ data }) => { - if (data === null || data.length === 0) { + this.channel.onmessage = async (event: MessageEvent) => { + const { data } = event + + if (data === null || data.byteLength === 0) { return } - this._innersrc.push(new Uint8Array(data as ArrayBufferLike)) + + this.incomingData.push(new Uint8Array(data, 0, data.byteLength)) } // pipe framed protobuf messages through a length prefixed decoder, and // surface data from the `Message.message` field through a source. - this._src = pipe( - this._innersrc, - (source) => lengthPrefixed.decode(source), - (source) => (async function * () { - for await (const buf of source) { - const message = self.processIncomingProtobuf(buf.subarray()) - if (message != null) { - yield new Uint8ArrayList(message) - } + Promise.resolve().then(async () => { + for await (const buf of lengthPrefixed.decode(this.incomingData)) { + const message = self.processIncomingProtobuf(buf.subarray()) + + if (message != null) { + self.sourcePush(new Uint8ArrayList(message)) } - })() - ) + } + }) + .catch(err => { + log.error('error processing incoming data channel messages', err) + }) } - // If user attempts to set a new source this should be a noop - set source (_src: AsyncGenerator) { } - - get source (): AsyncGenerator { - return this._src + sendNewStream (): void { + // opening new streams is handled by WebRTC so this is a noop } - /** - * Write data to the remote peer. - * It takes care of wrapping data in a protobuf and adding the length prefix. - */ - async sink (src: Source): Promise { - if (this._sinkCalled) { - throw new Error('sink already called on this stream') + async _sendMessage (data: Uint8ArrayList, checkBuffer: boolean = true): Promise { + if (checkBuffer && this.channel.bufferedAmount > this.dataChannelOptions.maxBufferedAmount) { + try { + await pEvent(this.channel, 'bufferedamountlow', { timeout: this.dataChannelOptions.bufferedAmountLowEventTimeout }) + } catch (err: any) { + if (err instanceof TimeoutError) { + this.abort(err) + throw new Error('Timed out waiting for DataChannel buffer to clear') + } + + throw err + } } - // await stream opening before sending data - await this.opened.promise - try { - await this._sink(src) - } finally { - this.closeWrite() + + if (this.channel.readyState === 'closed' || this.channel.readyState === 'closing') { + throw new CodeError('Invalid datachannel state - closed or closing', 'ERR_INVALID_STATE') } - } - /** - * Closable sink implementation - */ - private async _sink (src: Source): Promise { - const closeWrite = this._closeWriteIterable() - for await (const buf of merge(closeWrite, src)) { - if (this.streamState.isWriteClosed()) { - return + if (this.channel.readyState === 'open') { + // send message without copying data + for (const buf of data) { + this.channel.send(buf) } - - if (this.channel.bufferedAmount > this.dataChannelOptions.maxBufferedAmount) { - await pEvent(this.channel, 'bufferedamountlow', { timeout: this.dataChannelOptions.bufferedAmountLowEventTimeout }).catch((e) => { - this.close() - throw new Error('Timed out waiting for DataChannel buffer to clear') - }) + } else if (this.channel.readyState === 'connecting') { + // queue message for when we are open + if (this.messageQueue == null) { + this.messageQueue = new Uint8ArrayList() } - const msgbuf = Message.encode({ message: buf.subarray() }) - const sendbuf = lengthPrefixed.encode.single(msgbuf) - - while (sendbuf.length > 0) { - if (sendbuf.length <= this.dataChannelOptions.maxMessageSize) { - this.channel.send(sendbuf.subarray()) - break - } - this.channel.send(sendbuf.subarray(0, this.dataChannelOptions.maxMessageSize)) - sendbuf.consume(this.dataChannelOptions.maxMessageSize) - } + this.messageQueue.append(data) + } else { + log.error('unknown datachannel state %s', this.channel.readyState) + throw new CodeError('Unknown datachannel state', 'ERR_INVALID_STATE') } } - /** - * Handle incoming - */ - processIncomingProtobuf (buffer: Uint8Array): Uint8Array | undefined { - const message = Message.decode(buffer) + async sendData (data: Uint8ArrayList): Promise { + const msgbuf = Message.encode({ message: data.subarray() }) + const sendbuf = lengthPrefixed.encode.single(msgbuf) - if (message.flag !== undefined) { - const [currentState, nextState] = this.streamState.transition({ direction: 'inbound', flag: message.flag }) - - if (currentState !== nextState) { - switch (nextState) { - case StreamStates.READ_CLOSED: - this._innersrc.end() - break - case StreamStates.WRITE_CLOSED: - this.closeWritePromise.resolve() - break - case StreamStates.CLOSED: - this.close() - break - // StreamStates.OPEN will never be a nextState - case StreamStates.OPEN: - break - default: - unreachableBranch(nextState) - } - } - } + await this._sendMessage(sendbuf) + } - return message.message + async sendReset (): Promise { + await this._sendFlag(Message.Flag.RESET) } - /** - * Close a stream for reading and writing - */ - close (): void { - this.stat.timeline.close = new Date().getTime() - this.streamState.state = StreamStates.CLOSED - this._innersrc.end() - this.closeWritePromise.resolve() - this.channel.close() - - if (this.closeCb !== undefined) { - this.closeCb(this) - } + async sendCloseWrite (): Promise { + await this._sendFlag(Message.Flag.FIN) + } + + async sendCloseRead (): Promise { + await this._sendFlag(Message.Flag.STOP_SENDING) } /** - * Close a stream for reading only + * Handle incoming */ - closeRead (): void { - const [currentState, nextState] = this.streamState.transition({ direction: 'outbound', flag: Message.Flag.STOP_SENDING }) - if (currentState === nextState) { - // No change, no op - return - } + private processIncomingProtobuf (buffer: Uint8Array): Uint8Array | undefined { + const message = Message.decode(buffer) - if (currentState === StreamStates.OPEN || currentState === StreamStates.WRITE_CLOSED) { - this._sendFlag(Message.Flag.STOP_SENDING) - this._innersrc.end() - } + if (message.flag !== undefined) { + if (message.flag === Message.Flag.FIN) { + // We should expect no more data from the remote, stop reading + this.incomingData.end() + this.closeRead() + } - if (nextState === StreamStates.CLOSED) { - this.close() - } - } + if (message.flag === Message.Flag.RESET) { + // Stop reading and writing to the stream immediately + this.reset() + } - /** - * Close a stream for writing only - */ - closeWrite (): void { - const [currentState, nextState] = this.streamState.transition({ direction: 'outbound', flag: Message.Flag.FIN }) - if (currentState === nextState) { - // No change, no op - return + if (message.flag === Message.Flag.STOP_SENDING) { + // The remote has stopped reading + this.closeWrite() + } } - if (currentState === StreamStates.OPEN || currentState === StreamStates.READ_CLOSED) { - this._sendFlag(Message.Flag.FIN) - this.closeWritePromise.resolve() - } + return message.message + } - if (nextState === StreamStates.CLOSED) { - this.close() - } + private async _sendFlag (flag: Message.Flag): Promise { + log.trace('Sending flag: %s', flag.toString()) + const msgbuf = Message.encode({ flag }) + const prefixedBuf = lengthPrefixed.encode.single(msgbuf) + + await this._sendMessage(prefixedBuf, false) } +} +export interface WebRTCStreamOptions { /** - * Call when a local error occurs, should close the stream for reading and writing + * The network channel used for bidirectional peer-to-peer transfers of + * arbitrary data + * + * {@link https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel} */ - abort (err: Error): void { - log.error(`An error occurred, closing the stream for reading and writing: ${err.message}`) - this.close() - } + channel: RTCDataChannel /** - * Close the stream for writing, and indicate to the remote side this is being done 'abruptly' - * - * @see this.closeWrite + * The stream direction */ - reset (): void { - const [currentState, nextState] = this.streamState.transition({ direction: 'outbound', flag: Message.Flag.RESET }) - if (currentState === nextState) { - // No change, no op - return - } + direction: Direction - this._sendFlag(Message.Flag.RESET) - this.close() - } + dataChannelOptions?: Partial - private _sendFlag (flag: Message.Flag): void { - try { - log.trace('Sending flag: %s', flag.toString()) - const msgbuf = Message.encode({ flag }) - this.channel.send(lengthPrefixed.encode.single(msgbuf).subarray()) - } catch (err) { - if (err instanceof Error) { - log.error(`Exception while sending flag ${flag}: ${err.message}`) - } - } - } + maxMsgSize?: number - private _closeWriteIterable (): Source { - const self = this - return { - async * [Symbol.asyncIterator] () { - await self.closeWritePromise.promise - yield new Uint8Array(0) - } - } - } + onEnd?: (err?: Error | undefined) => void +} - eq (stream: Stream): boolean { - if (stream instanceof WebRTCStream) { - return stream.channel.id === this.channel.id - } - return false - } +export function createStream (options: WebRTCStreamOptions): Stream { + const { channel, direction, onEnd, dataChannelOptions } = options + + return new WebRTCStream({ + id: direction === 'inbound' ? (`i${channel.id}`) : `r${channel.id}`, + direction, + maxDataSize: (dataChannelOptions?.maxMessageSize ?? MAX_MESSAGE_SIZE) - PROTOBUF_OVERHEAD, + dataChannelOptions, + onEnd, + channel + }) } diff --git a/test/basics.spec.ts b/test/basics.spec.ts new file mode 100644 index 0000000..2f9efed --- /dev/null +++ b/test/basics.spec.ts @@ -0,0 +1,109 @@ +/* eslint-disable @typescript-eslint/no-unused-expressions */ + +import { noise } from '@chainsafe/libp2p-noise' +import { yamux } from '@chainsafe/libp2p-yamux' +import { webSockets } from '@libp2p/websockets' +import * as filter from '@libp2p/websockets/filters' +import { WebRTC } from '@multiformats/mafmt' +import { multiaddr } from '@multiformats/multiaddr' +import { expect } from 'aegir/chai' +import all from 'it-all' +import map from 'it-map' +import { pipe } from 'it-pipe' +import { createLibp2p } from 'libp2p' +import { circuitRelayTransport } from 'libp2p/circuit-relay' +import { identifyService } from 'libp2p/identify' +import { webRTC } from '../src/index.js' +import type { Libp2p } from '@libp2p/interface-libp2p' + +async function createNode (): Promise { + return createLibp2p({ + addresses: { + listen: [ + '/webrtc', + `${process.env.RELAY_MULTIADDR}/p2p-circuit` + ] + }, + transports: [ + webSockets({ + filter: filter.all + }), + circuitRelayTransport(), + webRTC() + ], + connectionEncryption: [ + noise() + ], + streamMuxers: [ + yamux() + ], + services: { + identify: identifyService() + }, + connectionGater: { + denyDialMultiaddr: () => false + }, + connectionManager: { + minConnections: 0 + } + }) +} + +describe('basics', () => { + let localNode: Libp2p + let remoteNode: Libp2p + + beforeEach(async () => { + localNode = await createNode() + remoteNode = await createNode() + }) + + afterEach(async () => { + if (localNode != null) { + await localNode.stop() + } + + if (remoteNode != null) { + await remoteNode.stop() + } + }) + + it('can dial through a relay', async () => { + const remoteAddr = remoteNode.getMultiaddrs() + .filter(ma => WebRTC.matches(ma)).pop() + + if (remoteAddr == null) { + throw new Error('Remote peer could not listen on relay') + } + + const echo = '/echo/1.0.0' + + await remoteNode.handle(echo, ({ stream }) => { + void pipe( + stream, + stream + ) + }) + + const connection = await localNode.dial(remoteAddr) + + // disconnect both from relay + await localNode.hangUp(multiaddr(process.env.RELAY_MULTIADDR)) + await remoteNode.hangUp(multiaddr(process.env.RELAY_MULTIADDR)) + + // open a stream on the echo protocol + const stream = await connection.newStream(echo) + + // send and receive some data + const input = new Array(5).fill(0).map(() => new Uint8Array(10)) + const output = await pipe( + input, + stream, + (source) => map(source, list => list.subarray()), + async (source) => all(source) + ) + + // asset that we got the right data + expect(output).to.deep.equal(input) + }) +}) diff --git a/test/stream.browser.spec.ts b/test/stream.browser.spec.ts index 5318ece..ee57a61 100644 --- a/test/stream.browser.spec.ts +++ b/test/stream.browser.spec.ts @@ -1,122 +1,150 @@ -import { expect, assert } from 'aegir/chai' +import { expect } from 'aegir/chai' +import delay from 'delay' +import * as lengthPrefixed from 'it-length-prefixed' import { bytes } from 'multiformats' import { Message } from '../src/pb/message.js' -import * as underTest from '../src/stream' -const TEST_MESSAGE = 'test_messgae' +import { createStream } from '../src/stream' +import type { Stream } from '@libp2p/interface-connection' +const TEST_MESSAGE = 'test_message' -function setup (): { peerConnection: RTCPeerConnection, datachannel: RTCDataChannel, webrtcStream: underTest.WebRTCStream } { +function setup (): { peerConnection: RTCPeerConnection, dataChannel: RTCDataChannel, stream: Stream } { const peerConnection = new RTCPeerConnection() - const datachannel = peerConnection.createDataChannel('whatever', { negotiated: true, id: 91 }) - const webrtcStream = new underTest.WebRTCStream({ channel: datachannel, stat: underTest.defaultStat('outbound') }) + const dataChannel = peerConnection.createDataChannel('whatever', { negotiated: true, id: 91 }) + const stream = createStream({ channel: dataChannel, direction: 'outbound' }) - return { peerConnection, datachannel, webrtcStream } + return { peerConnection, dataChannel, stream } } function generatePbByFlag (flag?: Message.Flag): Uint8Array { - const testPb: Message = { + const buf = Message.encode({ flag, message: bytes.fromString(TEST_MESSAGE) - } - return Message.encode(testPb) + }) + + return lengthPrefixed.encode.single(buf).subarray() } describe('Stream Stats', () => { + let stream: Stream + + beforeEach(async () => { + ({ stream } = setup()) + }) + it('can construct', () => { - const { webrtcStream } = setup() - assert.notExists(webrtcStream.stat.timeline.close) + expect(stream.stat.timeline.close).to.not.exist() }) it('close marks it closed', () => { - const { webrtcStream } = setup() - - expect(webrtcStream.streamState.state).to.equal(underTest.StreamStates.OPEN) - webrtcStream.close() - expect(webrtcStream.streamState.state).to.equal(underTest.StreamStates.CLOSED) + expect(stream.stat.timeline.close).to.not.exist() + stream.close() + expect(stream.stat.timeline.close).to.be.a('number') }) it('closeRead marks it read-closed only', () => { - const { webrtcStream } = setup() - - expect(webrtcStream.streamState.state).to.equal(underTest.StreamStates.OPEN) - webrtcStream.closeRead() - expect(webrtcStream.streamState.state).to.equal(underTest.StreamStates.READ_CLOSED) + expect(stream.stat.timeline.close).to.not.exist() + stream.closeRead() + expect(stream.stat.timeline.close).to.not.exist() + expect(stream.stat.timeline.closeRead).to.be.greaterThanOrEqual(stream.stat.timeline.open) }) it('closeWrite marks it write-closed only', () => { - const { webrtcStream } = setup() - - expect(webrtcStream.streamState.state).to.equal(underTest.StreamStates.OPEN) - webrtcStream.closeWrite() - expect(webrtcStream.streamState.state).to.equal(underTest.StreamStates.WRITE_CLOSED) + expect(stream.stat.timeline.close).to.not.exist() + stream.closeWrite() + expect(stream.stat.timeline.close).to.not.exist() + expect(stream.stat.timeline.closeWrite).to.be.greaterThanOrEqual(stream.stat.timeline.open) }) - it('closeWrite AND closeRead = close', () => { - const { webrtcStream } = setup() - - webrtcStream.closeWrite() - expect(webrtcStream.streamState.state).to.equal(underTest.StreamStates.WRITE_CLOSED) - webrtcStream.closeRead() - expect(webrtcStream.streamState.state).to.equal(underTest.StreamStates.CLOSED) + it('closeWrite AND closeRead = close', async () => { + expect(stream.stat.timeline.close).to.not.exist() + stream.closeWrite() + stream.closeRead() + expect(stream.stat.timeline.close).to.be.a('number') + expect(stream.stat.timeline.closeWrite).to.be.greaterThanOrEqual(stream.stat.timeline.open) + expect(stream.stat.timeline.closeRead).to.be.greaterThanOrEqual(stream.stat.timeline.open) }) it('abort = close', () => { - const { webrtcStream } = setup() - - expect(webrtcStream.streamState.state).to.equal(underTest.StreamStates.OPEN) - webrtcStream.abort({ name: 'irrelevant', message: 'this parameter is actually ignored' }) - expect(webrtcStream.streamState.state).to.equal(underTest.StreamStates.CLOSED) - expect(webrtcStream.stat.timeline.close).to.be.greaterThan(webrtcStream.stat.timeline.open) + expect(stream.stat.timeline.close).to.not.exist() + stream.abort(new Error('Oh no!')) + expect(stream.stat.timeline.close).to.be.a('number') + expect(stream.stat.timeline.close).to.be.greaterThanOrEqual(stream.stat.timeline.open) + expect(stream.stat.timeline.closeWrite).to.be.greaterThanOrEqual(stream.stat.timeline.open) + expect(stream.stat.timeline.closeRead).to.be.greaterThanOrEqual(stream.stat.timeline.open) }) it('reset = close', () => { - const { datachannel, webrtcStream } = setup() - - expect(webrtcStream.streamState.state).to.equal(underTest.StreamStates.OPEN) - webrtcStream.reset() // only resets the write side - expect(webrtcStream.streamState.state).to.equal(underTest.StreamStates.CLOSED) - expect(datachannel.readyState).to.be.oneOf(['closing', 'closed']) + expect(stream.stat.timeline.close).to.not.exist() + stream.reset() // only resets the write side + expect(stream.stat.timeline.close).to.be.a('number') + expect(stream.stat.timeline.close).to.be.greaterThanOrEqual(stream.stat.timeline.open) + expect(stream.stat.timeline.closeWrite).to.be.greaterThanOrEqual(stream.stat.timeline.open) + expect(stream.stat.timeline.closeRead).to.be.greaterThanOrEqual(stream.stat.timeline.open) }) }) describe('Stream Read Stats Transition By Incoming Flag', () => { - const webrtcStream = setup().webrtcStream + let dataChannel: RTCDataChannel + let stream: Stream + + beforeEach(async () => { + ({ dataChannel, stream } = setup()) + }) + it('no flag, no transition', () => { - expect(webrtcStream.streamState.state).to.equal(underTest.StreamStates.OPEN) - const IncomingBuffer = generatePbByFlag() - const message = webrtcStream.processIncomingProtobuf(IncomingBuffer) - expect(message).not.equal(undefined) - if (message != null) { - expect(bytes.toString(message)).to.equal(TEST_MESSAGE) - } - expect(webrtcStream.streamState.state).to.equal(underTest.StreamStates.OPEN) + expect(stream.stat.timeline.close).to.not.exist() + const data = generatePbByFlag() + dataChannel.onmessage?.(new MessageEvent('message', { data })) + + expect(stream.stat.timeline.close).to.not.exist() }) - it('open to read-close by flag:FIN', () => { - expect(webrtcStream.streamState.state).to.equal(underTest.StreamStates.OPEN) - const IncomingBuffer = generatePbByFlag(Message.Flag.FIN) - webrtcStream.processIncomingProtobuf(IncomingBuffer) - expect(webrtcStream.streamState.state).to.equal(underTest.StreamStates.READ_CLOSED) + it('open to read-close by flag:FIN', async () => { + const data = generatePbByFlag(Message.Flag.FIN) + dataChannel.dispatchEvent(new MessageEvent('message', { data })) + + await delay(100) + + expect(stream.stat.timeline.closeWrite).to.not.exist() + expect(stream.stat.timeline.closeRead).to.be.greaterThanOrEqual(stream.stat.timeline.open) }) - it('read-close to close by flag:STOP_SENDING', () => { - const IncomingBuffer = generatePbByFlag(Message.Flag.STOP_SENDING) - webrtcStream.processIncomingProtobuf(IncomingBuffer) - expect(webrtcStream.streamState.state).to.equal(underTest.StreamStates.CLOSED) + it('read-close to close by flag:STOP_SENDING', async () => { + const data = generatePbByFlag(Message.Flag.STOP_SENDING) + dataChannel.dispatchEvent(new MessageEvent('message', { data })) + + await delay(100) + + expect(stream.stat.timeline.closeWrite).to.be.greaterThanOrEqual(stream.stat.timeline.open) + expect(stream.stat.timeline.closeRead).to.not.exist() }) }) describe('Stream Write Stats Transition By Incoming Flag', () => { - const webrtcStream = setup().webrtcStream - it('open to write-close by flag:STOP_SENDING', () => { - expect(webrtcStream.streamState.state).to.equal(underTest.StreamStates.OPEN) - const IncomingBuffer = generatePbByFlag(Message.Flag.STOP_SENDING) - webrtcStream.processIncomingProtobuf(IncomingBuffer) - expect(webrtcStream.streamState.state).to.equal(underTest.StreamStates.WRITE_CLOSED) - }) - - it('write-close to close by flag:FIN', () => { - const IncomingBuffer = generatePbByFlag(Message.Flag.FIN) - webrtcStream.processIncomingProtobuf(IncomingBuffer) - expect(webrtcStream.streamState.state).to.equal(underTest.StreamStates.CLOSED) + let dataChannel: RTCDataChannel + let stream: Stream + + beforeEach(async () => { + ({ dataChannel, stream } = setup()) + }) + + it('open to write-close by flag:STOP_SENDING', async () => { + const data = generatePbByFlag(Message.Flag.STOP_SENDING) + dataChannel.dispatchEvent(new MessageEvent('message', { data })) + + await delay(100) + + expect(stream.stat.timeline.closeWrite).to.be.greaterThanOrEqual(stream.stat.timeline.open) + expect(stream.stat.timeline.closeRead).to.not.exist() + }) + + it('write-close to close by flag:FIN', async () => { + const data = generatePbByFlag(Message.Flag.FIN) + dataChannel.dispatchEvent(new MessageEvent('message', { data })) + + await delay(100) + + expect(stream.stat.timeline.closeWrite).to.not.exist() + expect(stream.stat.timeline.closeRead).to.be.greaterThanOrEqual(stream.stat.timeline.open) }) }) diff --git a/test/stream.spec.ts b/test/stream.spec.ts index 2bfaa0c..dd44b1e 100644 --- a/test/stream.spec.ts +++ b/test/stream.spec.ts @@ -1,10 +1,12 @@ /* eslint-disable @typescript-eslint/consistent-type-assertions */ import { expect } from 'aegir/chai' +import length from 'it-length' import * as lengthPrefixed from 'it-length-prefixed' import { pushable } from 'it-pushable' -import { Message } from '../src/pb/message' -import * as underTest from '../src/stream' +import { Uint8ArrayList } from 'uint8arraylist' +import { Message } from '../src/pb/message.js' +import { createStream } from '../src/stream.js' const mockDataChannel = (opts: { send: (bytes: Uint8Array) => void, bufferedAmount?: number }): RTCDataChannel => { return { @@ -20,74 +22,79 @@ const MAX_MESSAGE_SIZE = 16 * 1024 describe('Max message size', () => { it(`sends messages smaller or equal to ${MAX_MESSAGE_SIZE} bytes in one`, async () => { - const sent: Uint8Array[] = [] + const sent: Uint8ArrayList = new Uint8ArrayList() const data = new Uint8Array(MAX_MESSAGE_SIZE - 5) const p = pushable() // Make sure that the data that ought to be sent will result in a message with exactly MAX_MESSAGE_SIZE - const messageLengthEncoded = lengthPrefixed.encode.single(Message.encode({ message: data })).subarray() + const messageLengthEncoded = lengthPrefixed.encode.single(Message.encode({ message: data })) expect(messageLengthEncoded.length).eq(MAX_MESSAGE_SIZE) - const webrtcStream = new underTest.WebRTCStream({ + const webrtcStream = createStream({ channel: mockDataChannel({ send: (bytes) => { - sent.push(bytes) - if (p.readableLength === 0) { - webrtcStream.close() - } + sent.append(bytes) } }), - stat: underTest.defaultStat('outbound') + direction: 'outbound' }) p.push(data) p.end() await webrtcStream.sink(p) - expect(sent).to.deep.equals([messageLengthEncoded]) + + // length(message) + message + length(FIN) + FIN + expect(length(sent)).to.equal(4) + + for (const buf of sent) { + expect(buf.byteLength).to.be.lessThanOrEqual(MAX_MESSAGE_SIZE) + } }) it(`sends messages greater than ${MAX_MESSAGE_SIZE} bytes in parts`, async () => { - const sent: Uint8Array[] = [] - const data = new Uint8Array(MAX_MESSAGE_SIZE - 4) + const sent: Uint8ArrayList = new Uint8ArrayList() + const data = new Uint8Array(MAX_MESSAGE_SIZE) const p = pushable() // Make sure that the data that ought to be sent will result in a message with exactly MAX_MESSAGE_SIZE + 1 - const messageLengthEncoded = lengthPrefixed.encode.single(Message.encode({ message: data })).subarray() - expect(messageLengthEncoded.length).eq(MAX_MESSAGE_SIZE + 1) + // const messageLengthEncoded = lengthPrefixed.encode.single(Message.encode({ message: data })).subarray() + // expect(messageLengthEncoded.length).eq(MAX_MESSAGE_SIZE + 1) - const webrtcStream = new underTest.WebRTCStream({ + const webrtcStream = createStream({ channel: mockDataChannel({ send: (bytes) => { - sent.push(bytes) - if (p.readableLength === 0) { - webrtcStream.close() - } + sent.append(bytes) } }), - stat: underTest.defaultStat('outbound') + direction: 'outbound' }) p.push(data) p.end() await webrtcStream.sink(p) - // Message is sent in two parts - expect(sent).to.deep.equals([messageLengthEncoded.subarray(0, messageLengthEncoded.length - 1), messageLengthEncoded.subarray(messageLengthEncoded.length - 1)]) + expect(length(sent)).to.equal(6) + + for (const buf of sent) { + expect(buf.byteLength).to.be.lessThanOrEqual(MAX_MESSAGE_SIZE) + } }) it('closes the stream if bufferamountlow timeout', async () => { const MAX_BUFFERED_AMOUNT = 16 * 1024 * 1024 + 1 - const timeout = 2000 + const timeout = 100 let closed = false - const webrtcStream = new underTest.WebRTCStream({ - dataChannelOptions: { bufferedAmountLowEventTimeout: timeout }, + const webrtcStream = createStream({ + dataChannelOptions: { + bufferedAmountLowEventTimeout: timeout + }, channel: mockDataChannel({ send: () => { throw new Error('Expected to not send') }, bufferedAmount: MAX_BUFFERED_AMOUNT }), - stat: underTest.defaultStat('outbound'), - closeCb: () => { + direction: 'outbound', + onEnd: () => { closed = true } })