Skip to content

Commit

Permalink
feat: add mock stream pair
Browse files Browse the repository at this point in the history
Adds a `streamPair` convenience function to the interface mocks that
returns two `Stream` objects where the duplex streams read/write
to/from the other.
  • Loading branch information
achingbrain committed Sep 23, 2023
1 parent 972b10a commit a1d2c22
Show file tree
Hide file tree
Showing 32 changed files with 1,185 additions and 446 deletions.
31 changes: 29 additions & 2 deletions packages/interface-compliance-tests/src/mocks/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,13 @@ export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectio
return connection
}

export function mockStream (stream: Duplex<AsyncGenerator<Uint8ArrayList>, Source<Uint8ArrayList | Uint8Array>, Promise<void>>): Stream {
export interface StreamInit {
direction?: Direction
protocol?: string
id?: string
}

export function mockStream (stream: Duplex<AsyncGenerator<Uint8ArrayList>, Source<Uint8ArrayList | Uint8Array>, Promise<void>>, init: StreamInit = {}): Stream {
return {
...stream,
close: async () => {},
Expand All @@ -186,10 +192,31 @@ export function mockStream (stream: Duplex<AsyncGenerator<Uint8ArrayList>, Sourc
id: `stream-${Date.now()}`,
status: 'open',
readStatus: 'ready',
writeStatus: 'ready'
writeStatus: 'ready',
...init
}
}

export interface StreamPairInit {
duplex: Duplex<AsyncGenerator<Uint8ArrayList>, Source<Uint8ArrayList | Uint8Array>, Promise<void>>
init?: StreamInit
}

export function streamPair (a: StreamPairInit, b: StreamPairInit, init: StreamInit = {}): [Stream, Stream] {
return [
mockStream(a.duplex, {
direction: 'outbound',
...init,
...(a.init ?? {})
}),
mockStream(b.duplex, {
direction: 'inbound',
...init,
...(b.init ?? {})
})
]
}

export interface Peer {
peerId: PeerId
registrar: Registrar
Expand Down
2 changes: 1 addition & 1 deletion packages/interface-compliance-tests/src/mocks/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
export { mockConnectionEncrypter } from './connection-encrypter.js'
export { mockConnectionGater } from './connection-gater.js'
export { mockConnectionManager, mockNetwork } from './connection-manager.js'
export { mockConnection, mockStream, connectionPair } from './connection.js'
export { mockConnection, mockStream, streamPair, connectionPair } from './connection.js'
export { mockMultiaddrConnection, mockMultiaddrConnPair } from './multiaddr-connection.js'
export { mockMuxer } from './muxer.js'
export { mockRegistrar } from './registrar.js'
Expand Down
10 changes: 7 additions & 3 deletions packages/interface/src/stream-muxer/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,14 @@ export abstract class AbstractStream implements Stream {
}

this.log.trace('sink finished reading from source')
this.writeStatus = 'done'

this.log.trace('sink calling closeWrite')
await this.closeWrite(options)
if (this.writeStatus === 'writing') {
this.writeStatus = 'done'

this.log.trace('sink calling closeWrite')
await this.closeWrite(options)
}

this.onSinkEnd()
} catch (err: any) {
this.log.trace('sink ended with error, calling abort with error', err)
Expand Down
74 changes: 74 additions & 0 deletions packages/kad-dht/src/libp2p-routers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { type ContentRouting } from '@libp2p/interface/content-routing'
import { CodeError } from '@libp2p/interface/errors'
import { type PeerRouting } from '@libp2p/interface/peer-routing'
import drain from 'it-drain'
import type { KadDHT, QueryOptions } from './index.js'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { PeerInfo } from '@libp2p/interface/peer-info'
import type { CID } from 'multiformats/cid'

/**
* Wrapper class to convert events into returned values
*/
export class DHTContentRouting implements ContentRouting {
private readonly dht: KadDHT

constructor (dht: KadDHT) {
this.dht = dht
}

async provide (cid: CID, options: QueryOptions = {}): Promise<void> {
await drain(this.dht.provide(cid, options))
}

async * findProviders (cid: CID, options: QueryOptions = {}): AsyncGenerator<PeerInfo, void, undefined> {
for await (const event of this.dht.findProviders(cid, options)) {
if (event.name === 'PROVIDER') {
yield * event.providers
}
}
}

async put (key: Uint8Array, value: Uint8Array, options?: QueryOptions): Promise<void> {
await drain(this.dht.put(key, value, options))
}

async get (key: Uint8Array, options?: QueryOptions): Promise<Uint8Array> {
for await (const event of this.dht.get(key, options)) {
if (event.name === 'VALUE') {
return event.value
}
}

throw new CodeError('Not found', 'ERR_NOT_FOUND')
}
}

/**
* Wrapper class to convert events into returned values
*/
export class DHTPeerRouting implements PeerRouting {
private readonly dht: KadDHT

constructor (dht: KadDHT) {
this.dht = dht
}

async findPeer (peerId: PeerId, options: QueryOptions = {}): Promise<PeerInfo> {
for await (const event of this.dht.findPeer(peerId, options)) {
if (event.name === 'FINAL_PEER') {
return event.peer
}
}

throw new CodeError('Not found', 'ERR_NOT_FOUND')
}

async * getClosestPeers (key: Uint8Array, options: QueryOptions = {}): AsyncIterable<PeerInfo> {
for await (const event of this.dht.getClosestPeers(key, options)) {
if (event.name === 'FINAL_PEER') {
yield event.peer
}
}
}
}
2 changes: 1 addition & 1 deletion packages/libp2p/src/circuit-relay/transport/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ class CircuitRelayTransport implements Transport {
disconnectOnFailure
})
} catch (err: any) {
log.error(`Circuit relay dial to destination ${destinationPeer.toString()} via relay ${relayPeer.toString()} failed`, err)
log.error('circuit relay dial to destination %p via relay %p failed', destinationPeer, relayPeer, err)

if (stream != null) {
stream.abort(err)
Expand Down
1 change: 1 addition & 0 deletions packages/libp2p/src/connection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ export class ConnectionImpl implements Connection {
}

log('closing connection to %a', this.remoteAddr)
console.info('for why close connection to', this.remoteAddr.toString(), new Error('where').stack)

this.status = 'closing'

Expand Down
5 changes: 5 additions & 0 deletions packages/libp2p/src/transport-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ export class DefaultTransportManager implements TransportManager, Startable {
throw new CodeError(`No transport available for address ${String(ma)}`, codes.ERR_TRANSPORT_UNAVAILABLE)
}

console.info('---> dial', ma.toString(), 'with', [...this.transports.entries()].filter(([name, entry]) => {
console.info('wat', name, entry === transport)
return entry === transport
}).map(([ name ]) => name).pop())

try {
return await transport.dial(ma, {
...options,
Expand Down
3 changes: 2 additions & 1 deletion packages/libp2p/src/upgrader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ export class DefaultUpgrader implements Upgrader {

return muxedStream
} catch (err: any) {
log.error('could not create new stream', err)
log.error('could not create new stream for protocols %s on connection with address %a', protocols, connection.remoteAddr, err)

if (muxedStream.timeline.close == null) {
muxedStream.abort(err)
Expand Down Expand Up @@ -513,6 +513,7 @@ export class DefaultUpgrader implements Upgrader {
}
} catch (err: any) {
log.error(err)
connection.abort(err)
} finally {
this.events.safeDispatchEvent('connection:close', {
detail: connection
Expand Down
8 changes: 4 additions & 4 deletions packages/multistream-select/src/handle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ export async function handle (stream: any, protocols: string | string[], options
log.trace('read "%s"', protocol)

if (protocol === PROTOCOL_ID) {
log.trace('respond with "%s" for "%s"', PROTOCOL_ID, protocol)
log.trace('handle: %s:%s respond with "%s" for "%s"', stream.direction, stream.id, PROTOCOL_ID, protocol)
multistream.write(writer, uint8ArrayFromString(PROTOCOL_ID), options)
continue
}

if (protocols.includes(protocol)) {
multistream.write(writer, uint8ArrayFromString(protocol), options)
log.trace('respond with "%s" for "%s"', protocol, protocol)
log.trace('handle: %s:%s respond with "%s" for "%s"', stream.direction, stream.id, protocol, protocol)
rest()
return { stream: shakeStream, protocol }
}
Expand All @@ -82,11 +82,11 @@ export async function handle (stream: any, protocols: string | string[], options
// <varint-msg-len><varint-proto-name-len><proto-name>\n<varint-proto-name-len><proto-name>\n\n
multistream.write(writer, new Uint8ArrayList(...protocols.map(p => multistream.encode(uint8ArrayFromString(p)))), options)
// multistream.writeAll(writer, protocols.map(p => uint8ArrayFromString(p)))
log.trace('respond with "%s" for %s', protocols, protocol)
log.trace('handle: %s:%s respond with "%s" for %s', stream.direction, stream.id, protocols, protocol)
continue
}

multistream.write(writer, uint8ArrayFromString('na'), options)
log('respond with "na" for "%s"', protocol)
log('handle: %s:%s respond with "na" for "%s"', stream.direction, stream.id, protocol)
}
}
10 changes: 5 additions & 5 deletions packages/multistream-select/src/select.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,18 @@ export async function select (stream: any, protocols: string | string[], options
throw new Error('At least one protocol must be specified')
}

log.trace('select: write ["%s", "%s"]', PROTOCOL_ID, protocol)
log.trace('select: %s:%s write ["%s", "%s"]', stream.direction, stream.id, PROTOCOL_ID, protocol)
const p1 = uint8ArrayFromString(PROTOCOL_ID)
const p2 = uint8ArrayFromString(protocol)
multistream.writeAll(writer, [p1, p2], options)

let response = await multistream.readString(reader, options)
log.trace('select: read "%s"', response)
log.trace('select: %s:%s read "%s"', stream.direction, stream.id, response)

// Read the protocol response if we got the protocolId in return
if (response === PROTOCOL_ID) {
response = await multistream.readString(reader, options)
log.trace('select: read "%s"', response)
log.trace('select: %s:%s read "%s"', stream.direction, stream.id, response)
}

// We're done
Expand All @@ -90,10 +90,10 @@ export async function select (stream: any, protocols: string | string[], options

// We haven't gotten a valid ack, try the other protocols
for (const protocol of protocols) {
log.trace('select: write "%s"', protocol)
log.trace('select: %s:%s write "%s"', stream.direction, stream.id, protocol)
multistream.write(writer, uint8ArrayFromString(protocol), options)
const response = await multistream.readString(reader, options)
log.trace('select: read "%s" for "%s"', response, protocol)
log.trace('select: %s:%s read "%s" for "%s"', stream.direction, stream.id, response, protocol)

if (response === protocol) {
rest() // End our writer so others can start writing to stream
Expand Down
16 changes: 12 additions & 4 deletions packages/transport-webrtc/.aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ export default {
before: async () => {
const { createLibp2p } = await import('libp2p')
const { circuitRelayServer } = await import('libp2p/circuit-relay')
const { identifyService } = await import('libp2p/identify')
const { webSockets } = await import('@libp2p/websockets')
const { noise } = await import('@chainsafe/libp2p-noise')
const { yamux } = await import('@chainsafe/libp2p-yamux')
Expand All @@ -34,11 +33,20 @@ export default {
reservations: {
maxReservations: Infinity
}
}),
identify: identifyService()
})
},
connectionManager: {
minConnections: 0
minConnections: 0,
inboundConnectionThreshold: Infinity
},
connectionGater: {
denyDialMultiaddr: (ma) => {
if (ma.toOptions().family === 6) {
return true
}

return false
}
}
})

Expand Down
2 changes: 1 addition & 1 deletion packages/transport-webrtc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
"@multiformats/mafmt": "^12.1.2",
"@multiformats/multiaddr": "^12.1.5",
"@multiformats/multiaddr-matcher": "^1.0.1",
"abortable-iterator": "^5.0.1",
"detect-browser": "^5.3.0",
"it-length-prefixed": "^9.0.1",
"it-pipe": "^3.0.1",
Expand All @@ -65,6 +64,7 @@
"node-datachannel": "^0.4.3",
"p-defer": "^4.0.0",
"p-event": "^6.0.0",
"p-timeout": "^6.1.2",
"protons-runtime": "^5.0.0",
"uint8arraylist": "^2.4.3",
"uint8arrays": "^4.0.6"
Expand Down
34 changes: 34 additions & 0 deletions packages/transport-webrtc/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,40 @@ import { WebRTCDirectTransport, type WebRTCTransportDirectInit, type WebRTCDirec
import type { WebRTCTransportComponents, WebRTCTransportInit } from './private-to-private/transport.js'
import type { Transport } from '@libp2p/interface/transport'

export interface DataChannelOptions {
/**
* The maximum message size sendable over the channel
*/
maxMessageSize?: number

/**
* If the channel's `bufferedAmount` grows over this amount in bytes, wait
* for it to drain before sending more data (default: 16MB)
*/
maxBufferedAmount?: number

/**
* When `bufferedAmount` is above `maxBufferedAmount`, we pause sending until
* the `bufferedAmountLow` event fires - this controls how long we wait for
* that event in ms (default: 30s)
*/
bufferedAmountLowEventTimeout?: number

/**
* When closing a stream, we wait for `bufferedAmount` to become 0 before
* closing the underlying RTCDataChannel - this controls how long we wait
* (default: 30s)
*/
drainTimeout?: number

/**
* When closing a stream we first send a FIN flag to the remote and wait
* for a FIN_ACK reply before closing the underlying RTCDataChannel - this
* controls how long we wait for the acknowledgement (default: 5s)
*/
closeTimeout?: number
}

/**
* @param {WebRTCTransportDirectInit} init - WebRTC direct transport configuration
* @param init.dataChannel - DataChannel configurations
Expand Down
12 changes: 10 additions & 2 deletions packages/transport-webrtc/src/maconn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type { CounterGroup } from '@libp2p/interface/metrics'
import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr'
import type { Source, Sink } from 'it-stream-types'

const log = logger('libp2p:webrtc:connection')
const log = logger('libp2p:webrtc:maconn')

interface WebRTCMultiaddrConnectionInit {
/**
Expand Down Expand Up @@ -65,8 +65,16 @@ export class WebRTCMultiaddrConnection implements MultiaddrConnection {
this.timeline = init.timeline
this.peerConnection = init.peerConnection

let initialState = this.peerConnection.connectionState

this.peerConnection.onconnectionstatechange = () => {
if (this.peerConnection.connectionState === 'closed' || this.peerConnection.connectionState === 'disconnected' || this.peerConnection.connectionState === 'failed') {
log.trace('peer connection state change', this.peerConnection.connectionState, 'initial state', initialState)

if (this.peerConnection.connectionState === 'disconnected') {
// attempt to reconnect
this.peerConnection.restartIce()
} else if (this.peerConnection.connectionState === 'closed') {
// nothing else to do but close the connection
this.timeline.close = Date.now()
}
}
Expand Down
Loading

0 comments on commit a1d2c22

Please sign in to comment.