Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

feat: Update Connection Manager and Dialler Interfaces and Configs. #342

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/interface-connection-manager/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@
},
"dependencies": {
"@libp2p/interface-connection": "^3.0.0",
"@libp2p/interface-metrics": "^4.0.5",
"@libp2p/interface-peer-id": "^2.0.0",
"@libp2p/interface-peer-store": "^1.2.8",
"@libp2p/interface-transport": "^2.1.1",
"@libp2p/interfaces": "^3.0.0",
"@multiformats/multiaddr": "^11.0.0"
},
Expand Down
174 changes: 172 additions & 2 deletions packages/interface-connection-manager/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,80 @@
import type { AbortOptions } from '@libp2p/interfaces'
import type { EventEmitter } from '@libp2p/interfaces/events'
import type { Connection, MultiaddrConnection } from '@libp2p/interface-connection'
import type { Connection, ConnectionGater, MultiaddrConnection } from '@libp2p/interface-connection'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Multiaddr, Resolver } from '@multiformats/multiaddr'
import type { Metrics } from '@libp2p/interface-metrics'
import type { AddressSorter, PeerStore } from '@libp2p/interface-peer-store'
import type { TransportManager, Upgrader } from '@libp2p/interface-transport'
export interface ConnectionManagerComponents {
peerId: PeerId
metrics?: Metrics
upgrader: Upgrader
peerStore: PeerStore
dialer: Dialer
}

export interface ConnectionManagerConfig {
maschad marked this conversation as resolved.
Show resolved Hide resolved
/**
* The maximum number of connections libp2p is willing to have before it starts disconnecting. Defaults to `Infinity`
*/
maxConnections: number

/**
* The minimum number of connections below which libp2p not activate preemptive disconnections. Defaults to `0`.
*/
minConnections: number

/**
* Sets the maximum event loop delay (measured in milliseconds) this node is willing to endure before it starts disconnecting peers. Defaults to `Infinity`.
*/
maxEventLoopDelay?: number

/**
* Sets the poll interval (in milliseconds) for assessing the current state and determining if this peer needs to force a disconnect. Defaults to `2000` (2 seconds).
*/
pollInterval?: number

/**
* Multiaddr resolvers to use when dialing
*/
resolvers?: Record<string, Resolver>

/**
* On startup we try to dial any peer that has previously been
* tagged with KEEP_ALIVE up to this timeout in ms. (default: 60000)
*/
startupReconnectTimeout?: number

/**
* A list of multiaddrs that will always be allowed (except if they are in the
* deny list) to open connections to this node even if we've reached maxConnections
*/
allow?: string[]

/**
* A list of multiaddrs that will never be allowed to open connections to
* this node under any circumstances
*/
deny?: string[]

/**
* If more than this many connections are opened per second by a single
* host, reject subsequent connections
*/
inboundConnectionThreshold?: number

/**
* The maximum number of parallel incoming connections allowed that have yet to
* complete the connection upgrade - e.g. choosing connection encryption, muxer, etc
*/
maxIncomingPendingConnections?: number

/**
* The abort signal to use for timeouts when opening connections to peers
*/
outgoingDialTimeout?: number
}

export interface ConnectionManagerEvents {
/**
Expand Down Expand Up @@ -49,6 +121,17 @@ export interface ConnectionManager extends EventEmitter<ConnectionManagerEvents>
*/
getConnections: (peerId?: PeerId) => Connection[]

/**
* Return a map of all connections with their associated PeerIds
*
* @example
*
* ```js
* const connectionsMap = libp2p.connectionManager.getConnectionsMap()
* ```
*/
getConnectionsMap: () => Map<string, Connection[]>

/**
* Open a connection to a remote peer
*
Expand Down Expand Up @@ -77,6 +160,73 @@ export interface ConnectionManager extends EventEmitter<ConnectionManagerEvents>
* Invoked after upgrading a multiaddr connection has finished
*/
afterUpgradeInbound: () => void

/**
* Return the components of the connection manager
*/
getComponents: () => ConnectionManagerComponents

/**
* Return the configuration of the connection manager
*/
getConfig: () => ConnectionManagerConfig
}

export interface DialerConfig {
/**
* Sort the known addresses of a peer before trying to dial
*/
addressSorter?: AddressSorter

/**
* If true, try to connect to all discovered peers up to the connection manager limit
*/
autoDial?: boolean

/**
* How long to wait between attempting to keep our number of concurrent connections
* above minConnections
*/
autoDialInterval: number

/**
* How long a dial attempt is allowed to take
*/
dialTimeout?: number

/**
* When a new inbound connection is opened, the upgrade process (e.g. protect,
* encrypt, multiplex etc) must complete within this number of ms.
*/
inboundUpgradeTimeout: number

/**
* Number of max concurrent dials
*/
maxParallelDials?: number

/**
* Number of max addresses to dial for a given peer
*/
maxAddrsToDial?: number

/**
* Number of max concurrent dials per peer
*/
maxDialsPerPeer?: number

/**
* Multiaddr resolvers to use when dialing
*/
resolvers?: Record<string, Resolver>
}

export interface DialerComponents {
peerId: PeerId
metrics?: Metrics
peerStore: PeerStore
transportManager: TransportManager
connectionGater: ConnectionGater
}

export interface Dialer {
Expand All @@ -94,4 +244,24 @@ export interface Dialer {
* After a dial attempt succeeds or fails, return the passed token to the pool
*/
releaseToken: (token: number) => void

/**
* Get the current dial targets which are pending
*/
getPendingDialTargets: () => Map<string, AbortController>

/**
* Returns true if the peer id is in the pending dials
*/
hasPendingDial: (peer: PeerId | Multiaddr) => boolean

/**
* Return the components of the dialer
*/
getComponents: () => DialerComponents

/**
* Return the configuration of the dialer
*/
getConfig: () => DialerConfig
}
1 change: 1 addition & 0 deletions packages/interface-mocks/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
"@libp2p/interface-peer-discovery": "^1.0.0",
"@libp2p/interface-peer-id": "^2.0.0",
"@libp2p/interface-peer-info": "^1.0.0",
"@libp2p/interface-peer-store": "^1.2.8",
"@libp2p/interface-pubsub": "^3.0.0",
"@libp2p/interface-registrar": "^2.0.0",
"@libp2p/interface-stream-muxer": "^3.0.0",
Expand Down
75 changes: 49 additions & 26 deletions packages/interface-mocks/src/connection-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,30 @@ import { EventEmitter } from '@libp2p/interfaces/events'
import type { Startable } from '@libp2p/interfaces/startable'
import type { Connection } from '@libp2p/interface-connection'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { ConnectionManager, ConnectionManagerEvents } from '@libp2p/interface-connection-manager'
import type { ConnectionManager, ConnectionManagerComponents, ConnectionManagerConfig, ConnectionManagerEvents, Dialer } from '@libp2p/interface-connection-manager'
import { connectionPair } from './connection.js'
import { CodeError } from '@libp2p/interfaces/errors'
import type { Registrar } from '@libp2p/interface-registrar'
import type { PubSub } from '@libp2p/interface-pubsub'
import { isMultiaddr, Multiaddr } from '@multiformats/multiaddr'
import type { Metrics } from '@libp2p/interface-metrics'
import type { PeerStore } from '@libp2p/interface-peer-store'
import type { Upgrader } from '@libp2p/interface-transport'

export interface MockNetworkComponents {
peerId: PeerId
registrar: Registrar
connectionManager: ConnectionManager
pubsub?: PubSub
metrics?: Metrics
upgrader: Upgrader
peerStore: PeerStore
dialer: Dialer
}

export interface MockConnectionManagerConfig {
maxConnections: number
minConnections: number
}

class MockNetwork {
Expand Down Expand Up @@ -41,14 +53,28 @@ class MockNetwork {
export const mockNetwork = new MockNetwork()

class MockConnectionManager extends EventEmitter<ConnectionManagerEvents> implements ConnectionManager, Startable {
private connections: Connection[] = []
private readonly config: ConnectionManagerConfig
private readonly connections: Map<string, Connection[]> = new Map()
private readonly components: MockNetworkComponents
private started = false

constructor (components: MockNetworkComponents) {
constructor (components: MockNetworkComponents, config: ConnectionManagerConfig) {
super()

this.components = components
this.config = config
}

getComponents (): ConnectionManagerComponents {
return this.components
}

getConfig (): ConnectionManagerConfig {
return this.config
}

getConnectionsMap (): Map<string, Connection[]> {
return this.connections
}

isStarted (): boolean {
Expand All @@ -60,20 +86,27 @@ class MockConnectionManager extends EventEmitter<ConnectionManagerEvents> implem
}

async stop (): Promise<void> {
for (const connection of this.connections) {
await this.closeConnections(connection.remotePeer)
for (const connectionList of this.connections.values()) {
for (const connection of connectionList) {
await connection.close()
}
}

this.started = false
}

getConnections (peerId?: PeerId): Connection[] {
if (peerId != null) {
return this.connections
.filter(c => c.remotePeer.toString() === peerId.toString())
return this.connections.get(peerId.toString()) ?? []
}

return this.connections
let conns: Connection[] = []

for (const c of this.connections.values()) {
conns = conns.concat(c)
}

return conns
}

async openConnection (peerId: PeerId | Multiaddr): Promise<Connection> {
Expand All @@ -96,8 +129,8 @@ class MockConnectionManager extends EventEmitter<ConnectionManagerEvents> implem
const [aToB, bToA] = connectionPair(this.components, componentsB)

// track connections
this.connections.push(aToB)
;(componentsB.connectionManager as MockConnectionManager).connections.push(bToA)
this.connections.set(peerId.toString(), [aToB])
this.connections.set(componentsB.peerId.toString(), [bToA])

this.components.connectionManager.safeDispatchEvent<Connection>('peer:connect', {
detail: aToB
Expand Down Expand Up @@ -133,21 +166,11 @@ class MockConnectionManager extends EventEmitter<ConnectionManagerEvents> implem
return
}

const componentsB = mockNetwork.getNode(peerId)

for (const protocol of this.components.registrar.getProtocols()) {
this.components.registrar.getTopologies(protocol).forEach(topology => {
topology.onDisconnect(componentsB.peerId)
await Promise.all(
connections.map(async connection => {
await connection.close()
})
}

for (const conn of connections) {
await conn.close()
}

this.connections = this.connections.filter(c => c.remotePeer.equals(peerId))

await componentsB.connectionManager?.closeConnections(this.components.peerId)
)
}

async acceptIncomingConnection (): Promise<boolean> {
Expand All @@ -159,6 +182,6 @@ class MockConnectionManager extends EventEmitter<ConnectionManagerEvents> implem
}
}

export function mockConnectionManager (components: MockNetworkComponents): ConnectionManager {
return new MockConnectionManager(components)
export function mockConnectionManager (components: MockNetworkComponents, config: MockConnectionManagerConfig): ConnectionManager {
return new MockConnectionManager(components, config)
}
8 changes: 7 additions & 1 deletion packages/interface-pubsub-compliance-tests/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ export async function createComponents (): Promise<MockNetworkComponents> {
peerId: await createEd25519PeerId(),
registrar: mockRegistrar()
}
components.connectionManager = mockConnectionManager(components)

const config: any = {
maxConnections: 10,
minConnections: 10
}

components.connectionManager = mockConnectionManager(components, config)

mockNetwork.addNode(components)

Expand Down