Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add connection monitor #2644

Merged
merged 5 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions packages/interface/src/connection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,13 @@ export interface Connection {
*/
transient: boolean

/**
* The time in milliseconds it takes to make a round trip to the remote peer.
*
* This is updated periodically by the connection monitor.
*/
rtt?: number

/**
* Create a new stream on this connection and negotiate one of the passed protocols
*/
Expand Down
2 changes: 2 additions & 0 deletions packages/libp2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
"any-signal": "^4.1.1",
"datastore-core": "^9.2.9",
"interface-datastore": "^8.2.11",
"it-byte-stream": "^1.0.12",
"it-merge": "^3.0.5",
"it-parallel": "^3.0.7",
"merge-options": "^3.0.4",
Expand All @@ -126,6 +127,7 @@
"it-all": "^3.0.6",
"it-drain": "^3.0.7",
"it-map": "^3.1.0",
"it-pair": "^2.0.6",
"it-pipe": "^3.0.1",
"it-pushable": "^3.2.3",
"it-stream-types": "^2.0.1",
Expand Down
134 changes: 134 additions & 0 deletions packages/libp2p/src/connection-monitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import { serviceCapabilities } from '@libp2p/interface'
import { AdaptiveTimeout } from '@libp2p/utils/adaptive-timeout'
import { byteStream } from 'it-byte-stream'
import type { ComponentLogger, Logger, Metrics, Startable } from '@libp2p/interface'
import type { ConnectionManager } from '@libp2p/interface-internal'
import type { AdaptiveTimeoutInit } from '@libp2p/utils/adaptive-timeout'

const DEFAULT_PING_INTERVAL_MS = 10000

export interface ConnectionMonitorInit {
/**
* Whether the connection monitor is enabled
*
* @default true
*/
enabled?: boolean

/**
* How often to ping remote peers in ms
*
* @default 10000
*/
pingInterval?: number

/**
* Timeout settings for how long the ping is allowed to take before the
* connection will be judged inactive and aborted.
*
* The timeout is adaptive to cope with slower networks or nodes that
* have changing network characteristics, such as mobile.
*/
pingTimeout?: Omit<AdaptiveTimeoutInit, 'metricsName' | 'metrics'>

/**
* If true, any connection that fails the ping will be aborted
*
* @default true
*/
abortConnectionOnPingFailure?: boolean
}

export interface ConnectionMonitorComponents {
logger: ComponentLogger
connectionManager: ConnectionManager
metrics?: Metrics
}

export class ConnectionMonitor implements Startable {
private readonly components: ConnectionMonitorComponents
private readonly log: Logger
private heartbeatInterval?: ReturnType<typeof setInterval>
private readonly pingIntervalMs: number
private abortController?: AbortController
private readonly timeout: AdaptiveTimeout

constructor (components: ConnectionMonitorComponents, init: ConnectionMonitorInit = {}) {
this.components = components

this.log = components.logger.forComponent('libp2p:connection-monitor')
this.pingIntervalMs = init.pingInterval ?? DEFAULT_PING_INTERVAL_MS

this.timeout = new AdaptiveTimeout({
...(init.pingTimeout ?? {}),
metrics: components.metrics,
metricName: 'libp2p_connection_monitor_ping_time_milliseconds'
})
}

readonly [Symbol.toStringTag] = '@libp2p/connection-monitor'

readonly [serviceCapabilities]: string[] = [
'@libp2p/connection-monitor'
]

start (): void {
this.abortController = new AbortController()

this.heartbeatInterval = setInterval(() => {
this.components.connectionManager.getConnections().forEach(conn => {
Promise.resolve().then(async () => {
let start = Date.now()
try {
const signal = this.timeout.getTimeoutSignal({
signal: this.abortController?.signal
})
const stream = await conn.newStream('/ipfs/ping/1.0.0', {
signal,
runOnTransientConnection: true
})
const bs = byteStream(stream)
start = Date.now()

await Promise.all([
bs.write(new Uint8Array(1), {
signal
}),
bs.read(1, {
signal
})
])

conn.rtt = Date.now() - start
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this not divided by 2 too?

Copy link
Member Author

@achingbrain achingbrain Aug 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one times how long it takes to send and receive one byte, (e.g. one round trip) but after the stream has been opened (e.g. after multistream select has finished).

The ERR_UNSUPPORTED_PROTOCOL error is thrown by multistream select, which takes two round trips, so that needs dividing by two but the happy path here doesn't, because we reset the start variable before timing the single byte rtt.


await bs.unwrap().close({
signal
})
} catch (err: any) {
if (err.code !== 'ERR_UNSUPPORTED_PROTOCOL') {
maschad marked this conversation as resolved.
Show resolved Hide resolved
throw err
}

// protocol was unsupported, but that's ok as it means the remote
// peer was still alive. We ran multistream-select which means two
// round trips (e.g. 1x for the mss header, then another for the
// protocol) so divide the time it took by two
conn.rtt = (Date.now() - start) / 2
}
})
.catch(err => {
this.log.error('error during heartbeat, aborting connection', err)
conn.abort(err)
})
})
}, this.pingIntervalMs)
}

stop (): void {
this.abortController?.abort()

if (this.heartbeatInterval != null) {
clearInterval(this.heartbeatInterval)
}
}
}
6 changes: 6 additions & 0 deletions packages/libp2p/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { createLibp2pNode } from './libp2p.js'
import type { AddressManagerInit } from './address-manager/index.js'
import type { Components } from './components.js'
import type { ConnectionManagerInit } from './connection-manager/index.js'
import type { ConnectionMonitorInit } from './connection-monitor.js'
import type { TransportManagerInit } from './transport-manager.js'
import type { Libp2p, ServiceMap, ComponentLogger, NodeInfo, ConnectionProtector, ConnectionEncrypter, ConnectionGater, ContentRouting, Metrics, PeerDiscovery, PeerId, PeerRouting, StreamMuxerFactory, Transport, PrivateKey } from '@libp2p/interface'
import type { PersistentPeerStoreInit } from '@libp2p/peer-store'
Expand Down Expand Up @@ -57,6 +58,11 @@ export interface Libp2pInit<T extends ServiceMap = ServiceMap> {
*/
connectionManager?: ConnectionManagerInit

/**
* libp2p Connection Monitor configuration
*/
connectionMonitor?: ConnectionMonitorInit

/**
* A connection gater can deny new connections based on user criteria
*/
Expand Down
6 changes: 6 additions & 0 deletions packages/libp2p/src/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { checkServiceDependencies, defaultComponents } from './components.js'
import { connectionGater } from './config/connection-gater.js'
import { validateConfig } from './config.js'
import { DefaultConnectionManager } from './connection-manager/index.js'
import { ConnectionMonitor } from './connection-monitor.js'
import { CompoundContentRouting } from './content-routing.js'
import { codes } from './errors.js'
import { DefaultPeerRouting } from './peer-routing.js'
Expand Down Expand Up @@ -121,6 +122,11 @@ export class Libp2pNode<T extends ServiceMap = ServiceMap> extends TypedEventEmi
// Create the Connection Manager
this.configureComponent('connectionManager', new DefaultConnectionManager(this.components, init.connectionManager))

if (init.connectionMonitor?.enabled !== false) {
// Create the Connection Monitor if not disabled
this.configureComponent('connectionMonitor', new ConnectionMonitor(this.components, init.connectionMonitor))
}

// Create the Registrar
this.configureComponent('registrar', new DefaultRegistrar(this.components))

Expand Down
115 changes: 115 additions & 0 deletions packages/libp2p/test/connection-monitor/index.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/* eslint-env mocha */

import { CodeError, start, stop } from '@libp2p/interface'
import { defaultLogger } from '@libp2p/logger'
import { expect } from 'aegir/chai'
import delay from 'delay'
import { pair } from 'it-pair'
import { type StubbedInstance, stubInterface } from 'sinon-ts'
import { ConnectionMonitor } from '../../src/connection-monitor.js'
import type { ComponentLogger, Stream, Connection } from '@libp2p/interface'
import type { ConnectionManager } from '@libp2p/interface-internal'

interface StubbedConnectionMonitorComponents {
logger: ComponentLogger
connectionManager: StubbedInstance<ConnectionManager>
}

describe('connection monitor', () => {
let monitor: ConnectionMonitor
let components: StubbedConnectionMonitorComponents

beforeEach(() => {
components = {
logger: defaultLogger(),
connectionManager: stubInterface<ConnectionManager>()
}
})

afterEach(async () => {
await stop(monitor)
})

it('should monitor the liveness of a connection', async () => {
monitor = new ConnectionMonitor(components, {
pingInterval: 10
})

await start(monitor)

const connection = stubInterface<Connection>()
const stream = stubInterface<Stream>({
...pair<any>()
})
connection.newStream.withArgs('/ipfs/ping/1.0.0').resolves(stream)

components.connectionManager.getConnections.returns([connection])

await delay(100)

expect(connection.rtt).to.be.gte(0)
})

it('should monitor the liveness of a connection that does not support ping', async () => {
monitor = new ConnectionMonitor(components, {
pingInterval: 10
})

await start(monitor)

const connection = stubInterface<Connection>()
connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async () => {
await delay(10)
throw new CodeError('Unsupported protocol', 'ERR_UNSUPPORTED_PROTOCOL')
})

components.connectionManager.getConnections.returns([connection])

await delay(100)

expect(connection.rtt).to.be.gte(0)
})

it('should abort a connection that times out', async () => {
monitor = new ConnectionMonitor(components, {
pingInterval: 50,
pingTimeout: {
initialValue: 10
}
})

await start(monitor)

const connection = stubInterface<Connection>()
connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async (protocols, opts) => {
await delay(200)
opts?.signal?.throwIfAborted()
return stubInterface<Stream>()
})

components.connectionManager.getConnections.returns([connection])

await delay(500)

expect(connection.abort).to.have.property('called', true)
})

it('should abort a connection that fails', async () => {
monitor = new ConnectionMonitor(components, {
pingInterval: 10
})

await start(monitor)

const connection = stubInterface<Connection>()
connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async (protocols, opts) => {
throw new CodeError('Connection closed', 'ERR_CONNECTION_CLOSED')
})

components.connectionManager.getConnections.returns([connection])

await delay(100)

expect(connection.abort).to.have.property('called', true)
})
})
Loading