Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: do not allow autodial to run in parallel #1804

Merged
merged 4 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@
"@libp2p/interface-peer-id": "^2.0.1",
"@libp2p/interface-peer-info": "^1.0.3",
"@libp2p/interface-peer-routing": "^1.1.0",
"@libp2p/interface-peer-store": "^2.0.3",
"@libp2p/interface-peer-store": "^2.0.4",
"@libp2p/interface-pubsub": "^4.0.0",
"@libp2p/interface-record": "^2.0.6",
"@libp2p/interface-registrar": "^2.0.3",
Expand All @@ -149,7 +149,7 @@
"@libp2p/peer-id": "^2.0.0",
"@libp2p/peer-id-factory": "^2.0.0",
"@libp2p/peer-record": "^5.0.0",
"@libp2p/peer-store": "^8.1.0",
"@libp2p/peer-store": "^8.2.0",
"@libp2p/topology": "^4.0.1",
"@libp2p/tracked-map": "^3.0.0",
"@libp2p/utils": "^3.0.10",
Expand Down
15 changes: 11 additions & 4 deletions src/circuit-relay/transport/discovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,17 @@ export class RelayDiscovery extends EventEmitter<RelayDiscoveryEvents> implement
*/
async discover (): Promise<void> {
log('searching peer store for relays')
const peers = (await this.peerStore.all())
// filter by a list of peers supporting RELAY_V2_HOP and ones we are not listening on
.filter(({ protocols }) => protocols.includes(RELAY_V2_HOP_CODEC))
.sort(() => Math.random() - 0.5)
const peers = (await this.peerStore.all({
filters: [
// filter by a list of peers supporting RELAY_V2_HOP and ones we are not listening on
(peer) => {
return peer.protocols.includes(RELAY_V2_HOP_CODEC)
}
],
orders: [
() => Math.random() < 0.5 ? 1 : -1
]
}))

for (const peer of peers) {
log('found relay peer %p in content peer store', peer.id)
Expand Down
25 changes: 22 additions & 3 deletions src/circuit-relay/transport/reservation-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { logger } from '@libp2p/logger'
import { PeerMap } from '@libp2p/peer-collections'
import { multiaddr } from '@multiformats/multiaddr'
import { pbStream } from 'it-pb-stream'
import PQueue from 'p-queue'
import { PeerJobQueue } from '../../utils/peer-job-queue.js'
import { DEFAULT_RESERVATION_CONCURRENCY, RELAY_TAG, RELAY_V2_HOP_CODEC } from '../constants.js'
import { HopMessage, Status } from '../pb/index.js'
import { getExpirationMilliseconds } from '../utils.js'
Expand Down Expand Up @@ -50,6 +50,11 @@ export interface RelayStoreInit {
* How many discovered relays to allow in the reservation store
*/
discoverRelays?: number

/**
* Limit the number of potential relays we will dial (default: 100)
*/
maxReservationQueueLength?: number
}

export type RelayType = 'discovered' | 'configured'
Expand All @@ -71,9 +76,10 @@ export class ReservationStore extends EventEmitter<ReservationStoreEvents> imple
private readonly transportManager: TransportManager
private readonly peerStore: PeerStore
private readonly events: EventEmitter<Libp2pEvents>
private readonly reserveQueue: PQueue
private readonly reserveQueue: PeerJobQueue
private readonly reservations: PeerMap<RelayEntry>
private readonly maxDiscoveredRelays: number
private readonly maxReservationQueueLength: number
private started: boolean

constructor (components: RelayStoreComponents, init?: RelayStoreInit) {
Expand All @@ -86,10 +92,11 @@ export class ReservationStore extends EventEmitter<ReservationStoreEvents> imple
this.events = components.events
this.reservations = new PeerMap()
this.maxDiscoveredRelays = init?.discoverRelays ?? 0
this.maxReservationQueueLength = init?.maxReservationQueueLength ?? 100
this.started = false

// ensure we don't listen on multiple relays simultaneously
this.reserveQueue = new PQueue({
this.reserveQueue = new PeerJobQueue({
concurrency: init?.reservationConcurrency ?? DEFAULT_RESERVATION_CONCURRENCY
})

Expand Down Expand Up @@ -130,6 +137,16 @@ export class ReservationStore extends EventEmitter<ReservationStoreEvents> imple
return
}

if (this.reserveQueue.size > this.maxReservationQueueLength) {
log('not adding relay as the queue is full')
return
}

if (this.reserveQueue.hasJob(peerId)) {
log('relay peer is already in the reservation queue')
return
}

log('add relay %p', peerId)

await this.reserveQueue.add(async () => {
Expand Down Expand Up @@ -206,6 +223,8 @@ export class ReservationStore extends EventEmitter<ReservationStoreEvents> imple
// if listening failed, remove the reservation
this.reservations.delete(peerId)
}
}, {
peerId
})
}

Expand Down
106 changes: 75 additions & 31 deletions src/connection-manager/auto-dial.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { logger } from '@libp2p/logger'
import { PeerMap, PeerSet } from '@libp2p/peer-collections'
import PQueue from 'p-queue'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_INTERVAL, AUTO_DIAL_PRIORITY, MIN_CONNECTIONS } from './constants.js'
import { PeerJobQueue } from '../utils/peer-job-queue.js'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_INTERVAL, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, MIN_CONNECTIONS } from './constants.js'
import type { ConnectionManager } from '@libp2p/interface-connection-manager'
import type { Libp2pEvents } from '@libp2p/interface-libp2p'
import type { PeerStore } from '@libp2p/interface-peer-store'
Expand All @@ -12,6 +12,7 @@ const log = logger('libp2p:connection-manager:auto-dial')

interface AutoDialInit {
minConnections?: number
maxQueueLength?: number
autoDialConcurrency?: number
autoDialPriority?: number
autoDialInterval?: number
Expand All @@ -25,6 +26,7 @@ interface AutoDialComponents {

const defaultOptions = {
minConnections: MIN_CONNECTIONS,
maxQueueLength: AUTO_DIAL_MAX_QUEUE_LENGTH,
autoDialConcurrency: AUTO_DIAL_CONCURRENCY,
autoDialPriority: AUTO_DIAL_PRIORITY,
autoDialInterval: AUTO_DIAL_INTERVAL
Expand All @@ -33,12 +35,14 @@ const defaultOptions = {
export class AutoDial implements Startable {
private readonly connectionManager: ConnectionManager
private readonly peerStore: PeerStore
private readonly queue: PQueue
private readonly queue: PeerJobQueue
private readonly minConnections: number
private readonly autoDialPriority: number
private readonly autoDialIntervalMs: number
private readonly autoDialMaxQueueLength: number
private autoDialInterval?: ReturnType<typeof setInterval>
private started: boolean
private running: boolean

/**
* Proactively tries to connect to known peers stored in the PeerStore.
Expand All @@ -51,8 +55,10 @@ export class AutoDial implements Startable {
this.minConnections = init.minConnections ?? defaultOptions.minConnections
this.autoDialPriority = init.autoDialPriority ?? defaultOptions.autoDialPriority
this.autoDialIntervalMs = init.autoDialInterval ?? defaultOptions.autoDialInterval
this.autoDialMaxQueueLength = init.maxQueueLength ?? defaultOptions.maxQueueLength
this.started = false
this.queue = new PQueue({
this.running = false
this.queue = new PeerJobQueue({
concurrency: init.autoDialConcurrency ?? defaultOptions.autoDialConcurrency
})
this.queue.addListener('error', (err) => {
Expand All @@ -73,7 +79,7 @@ export class AutoDial implements Startable {
}

start (): void {
this.autoDialInterval = setInterval(() => {
this.autoDialInterval = setTimeout(() => {
this.autoDial()
.catch(err => {
log.error('error while autodialing', err)
Expand All @@ -92,8 +98,9 @@ export class AutoDial implements Startable {
stop (): void {
// clear the queue
this.queue.clear()
clearInterval(this.autoDialInterval)
clearTimeout(this.autoDialInterval)
this.started = false
this.running = false
}

async autoDial (): Promise<void> {
Expand All @@ -103,47 +110,71 @@ export class AutoDial implements Startable {

const connections = this.connectionManager.getConnectionsMap()
const numConnections = connections.size
const dialQueue = new PeerSet(
// @ts-expect-error boolean filter removes falsy peer IDs
this.connectionManager.getDialQueue()
.map(queue => queue.peerId)
.filter(Boolean)
)

// Already has enough connections
if (numConnections >= this.minConnections) {
log.trace('have enough connections %d/%d', numConnections, this.minConnections)
return
}

log('not enough connections %d/%d - will dial peers to increase the number of connections', numConnections, this.minConnections)
if (this.queue.size > this.autoDialMaxQueueLength) {
log('not enough connections %d/%d but auto dial queue is full', numConnections, this.minConnections)
return
}

// Sort peers on whether we know protocols or public keys for them
const peers = await this.peerStore.all()
if (this.running) {
log('not enough connections %d/%d - but skipping autodial as it is already running', numConnections, this.minConnections)
return
}

// Remove some peers
const filteredPeers = peers.filter((peer) => {
// Remove peers without addresses
if (peer.addresses.length === 0) {
return false
}
this.running = true

// remove peers we are already connected to
if (connections.has(peer.id)) {
return false
}
log('not enough connections %d/%d - will dial peers to increase the number of connections', numConnections, this.minConnections)

// remove peers we are already dialling
if (dialQueue.has(peer.id)) {
return false
}
const dialQueue = new PeerSet(
// @ts-expect-error boolean filter removes falsy peer IDs
this.connectionManager.getDialQueue()
.map(queue => queue.peerId)
.filter(Boolean)
)

return true
// Sort peers on whether we know protocols or public keys for them
const peers = await this.peerStore.all({
filters: [
// Remove some peers
(peer) => {
// Remove peers without addresses
if (peer.addresses.length === 0) {
log.trace('not autodialing %p because they have no addresses')
return false
}

// remove peers we are already connected to
if (connections.has(peer.id)) {
log.trace('not autodialing %p because they are already connected')
return false
}

// remove peers we are already dialling
if (dialQueue.has(peer.id)) {
log.trace('not autodialing %p because they are already being dialed')
return false
}

// remove peers already in the autodial queue
if (this.queue.hasJob(peer.id)) {
log.trace('not autodialing %p because they are already being autodialed')
return false
}

return true
}
]
})

// shuffle the peers - this is so peers with the same tag values will be
// dialled in a different order each time
const shuffledPeers = filteredPeers.sort(() => Math.random() > 0.5 ? 1 : -1)
const shuffledPeers = peers.sort(() => Math.random() > 0.5 ? 1 : -1)

// Sort shuffled peers by tag value
const peerValues = new PeerMap<number>()
Expand Down Expand Up @@ -192,9 +223,22 @@ export class AutoDial implements Startable {
// @ts-expect-error needs adding to the ConnectionManager interface
priority: this.autoDialPriority
})
}, {
peerId: peer.id
}).catch(err => {
log.error('could not connect to peerStore stored peer', err)
})
}

this.running = false

if (this.started) {
this.autoDialInterval = setTimeout(() => {
this.autoDial()
.catch(err => {
log.error('error while autodialing', err)
})
}, this.autoDialIntervalMs)
}
}
}
5 changes: 5 additions & 0 deletions src/connection-manager/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ export const AUTO_DIAL_CONCURRENCY = 25
*/
export const AUTO_DIAL_PRIORITY = 0

/**
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#autoDialMaxQueueLength
*/
export const AUTO_DIAL_MAX_QUEUE_LENGTH = 100

/**
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#inboundConnectionThreshold
*/
Expand Down
30 changes: 18 additions & 12 deletions src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import { codes } from '../errors.js'
import { getPeerAddress } from '../get-peer.js'
import { AutoDial } from './auto-dial.js'
import { ConnectionPruner } from './connection-pruner.js'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js'
import { DialQueue } from './dial-queue.js'
import type { Connection, MultiaddrConnection } from '@libp2p/interface-connection'
import type { ConnectionGater } from '@libp2p/interface-connection-gater'
import type { ConnectionManager } from '@libp2p/interface-connection-manager'
import type { PendingDial, AddressSorter, Libp2pEvents } from '@libp2p/interface-libp2p'
import type { Metrics } from '@libp2p/interface-metrics'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { PeerStore } from '@libp2p/interface-peer-store'
import type { Peer, PeerStore } from '@libp2p/interface-peer-store'
import type { TransportManager } from '@libp2p/interface-transport'
import type { AbortOptions } from '@libp2p/interfaces'
import type { EventEmitter } from '@libp2p/interfaces/events'
Expand Down Expand Up @@ -61,6 +61,12 @@ export interface ConnectionManagerInit {
*/
autoDialPriority?: number

/**
* Limit the maximum number of peers to dial when trying to keep the number of
* open connections above `minConnections`. (default: 100)
*/
autoDialMaxQueueLength?: number

/**
* Sort the known addresses of a peer before trying to dial, By default public
* addresses will be dialled before private (e.g. loopback or LAN) addresses.
Expand Down Expand Up @@ -136,7 +142,8 @@ const defaultOptions = {
inboundConnectionThreshold: INBOUND_CONNECTION_THRESHOLD,
maxIncomingPendingConnections: MAX_INCOMING_PENDING_CONNECTIONS,
autoDialConcurrency: AUTO_DIAL_CONCURRENCY,
autoDialPriority: AUTO_DIAL_PRIORITY
autoDialPriority: AUTO_DIAL_PRIORITY,
autoDialMaxQueueLength: AUTO_DIAL_MAX_QUEUE_LENGTH
}

export interface DefaultConnectionManagerComponents {
Expand Down Expand Up @@ -217,7 +224,8 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
}, {
minConnections,
autoDialConcurrency: init.autoDialConcurrency ?? defaultOptions.autoDialConcurrency,
autoDialPriority: init.autoDialPriority ?? defaultOptions.autoDialPriority
autoDialPriority: init.autoDialPriority ?? defaultOptions.autoDialPriority,
maxQueueLength: init.autoDialMaxQueueLength ?? defaultOptions.autoDialMaxQueueLength
})

// controls what happens when we have too many connections
Expand Down Expand Up @@ -344,17 +352,15 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
// re-connect to any peers with the KEEP_ALIVE tag
void Promise.resolve()
.then(async () => {
const keepAlivePeers: PeerId[] = []

for (const peer of await this.peerStore.all()) {
if (peer.tags.has(KEEP_ALIVE)) {
keepAlivePeers.push(peer.id)
}
}
const keepAlivePeers: Peer[] = await this.peerStore.all({
filters: [(peer) => {
return peer.tags.has(KEEP_ALIVE)
}]
})

await Promise.all(
keepAlivePeers.map(async peer => {
await this.openConnection(peer)
await this.openConnection(peer.id)
.catch(err => {
log.error(err)
})
Expand Down
Loading