From fa84233e9ef499e6db372ade84af5a87c8ed2af9 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 31 Jul 2024 07:22:00 +0100 Subject: [PATCH 1/5] feat: add connection monitor Adds a connection monitor that periodically ensures remote peers are still online and contactable by trying to send a single byte via the ping protocol, and sets the `.rtt` property of the connection to how long it took. If the ping protocol is not supported by the remote, it tries to infer the round trip time by how long it took to fail. If the remote is unresponsive or opening the stream fails for any other reason, the connection is aborted with the throw error. It's possible to configure the ping interval, how long we wait before considering a peer to be inactive and whether or not to close the connection on failure. Closes #2643 --- packages/interface/src/connection/index.ts | 7 + packages/libp2p/src/connection-monitor.ts | 121 ++++++++++++++++++ packages/libp2p/src/index.ts | 6 + packages/libp2p/src/libp2p.ts | 4 + .../test/connection-monitor/index.spec.ts | 113 ++++++++++++++++ 5 files changed, 251 insertions(+) create mode 100644 packages/libp2p/src/connection-monitor.ts create mode 100644 packages/libp2p/test/connection-monitor/index.spec.ts diff --git a/packages/interface/src/connection/index.ts b/packages/interface/src/connection/index.ts index 9379eab041..1328c26952 100644 --- a/packages/interface/src/connection/index.ts +++ b/packages/interface/src/connection/index.ts @@ -287,6 +287,13 @@ export interface Connection { */ transient: boolean + /** + * The time in milliseconds it takes to make a round trip to the remote peer. + * + * This is updated periodically by the connection monitor. + */ + rtt?: number + /** * Create a new stream on this connection and negotiate one of the passed protocols */ diff --git a/packages/libp2p/src/connection-monitor.ts b/packages/libp2p/src/connection-monitor.ts new file mode 100644 index 0000000000..fe326dc4ac --- /dev/null +++ b/packages/libp2p/src/connection-monitor.ts @@ -0,0 +1,121 @@ +import { serviceCapabilities } from '@libp2p/interface' +import { anySignal } from 'any-signal' +import { byteStream } from 'it-byte-stream' +import type { ComponentLogger, Logger, Startable } from '@libp2p/interface' +import type { ConnectionManager } from '@libp2p/interface-internal' + +const DEFAULT_PING_INTERVAL_MS = 10000 +const DEFAULT_PING_TIMEOUT_MS = 2000 + +export interface ConnectionMonitorInit { + /** + * How often to ping remote peers in ms + * + * @default 10000 + */ + pingInterval?: number + + /** + * How long the ping is allowed to take before the connection will be judged + * inactive and aborted + * + * @default 2000 + */ + pingTimeout?: number + + /** + * If true, any connection that fails the ping will be aborted + * + * @default true + */ + abortConnectionOnPingFailure?: boolean +} + +export interface ConnectionMonitorComponents { + logger: ComponentLogger + connectionManager: ConnectionManager +} + +export class ConnectionMonitor implements Startable { + private readonly components: ConnectionMonitorComponents + private readonly log: Logger + private heartbeatInterval?: ReturnType + private readonly pingIntervalMs: number + private readonly pingTimeoutMs: number + private abortController?: AbortController + + constructor (components: ConnectionMonitorComponents, init: ConnectionMonitorInit = {}) { + this.components = components + + this.log = components.logger.forComponent('libp2p:connection-monitor') + this.pingIntervalMs = init.pingInterval ?? DEFAULT_PING_INTERVAL_MS + this.pingTimeoutMs = init.pingTimeout ?? DEFAULT_PING_TIMEOUT_MS + } + + readonly [Symbol.toStringTag] = '@libp2p/connection-monitor' + + readonly [serviceCapabilities]: string[] = [ + '@libp2p/connection-monitor' + ] + + start (): void { + this.abortController = new AbortController() + + this.heartbeatInterval = setInterval(() => { + this.components.connectionManager.getConnections().forEach(conn => { + Promise.resolve().then(async () => { + let start = Date.now() + try { + const signal = anySignal([ + this.abortController?.signal, + AbortSignal.timeout(this.pingTimeoutMs) + ]) + const stream = await conn.newStream('/ipfs/ping/1.0.0', { + signal, + runOnTransientConnection: true + }) + const bs = byteStream(stream) + start = Date.now() + + await Promise.all([ + bs.write(new Uint8Array(1), { + signal + }), + bs.read(1, { + signal + }) + ]) + + conn.rtt = Date.now() - start + + await bs.unwrap().close({ + signal + }) + } catch (err: any) { + if (err.code !== 'ERR_UNSUPPORTED_PROTOCOL') { + throw err + } + + // protocol was unsupported, but that's ok as it means the remote + // peer was still alive. We ran multistream-select which means two + // round trips (e.g. 1x for the mss header, then another for the + // protocol) so divide the time it took by two + conn.rtt = (Date.now() - start) / 2 + } + }) + .catch(err => { + this.log.error('error during heartbeat, aborting connection', err) + conn.abort(err) + }) + }) + }, this.pingIntervalMs) + } + + stop (): void { + this.abortController?.abort() + + if (this.heartbeatInterval != null) { + clearInterval(this.heartbeatInterval) + } + } +} diff --git a/packages/libp2p/src/index.ts b/packages/libp2p/src/index.ts index 014ec77535..3defada121 100644 --- a/packages/libp2p/src/index.ts +++ b/packages/libp2p/src/index.ts @@ -18,6 +18,7 @@ import { createLibp2pNode } from './libp2p.js' import type { AddressManagerInit } from './address-manager/index.js' import type { Components } from './components.js' import type { ConnectionManagerInit } from './connection-manager/index.js' +import type { ConnectionMonitorInit } from './connection-monitor.js' import type { TransportManagerInit } from './transport-manager.js' import type { Libp2p, ServiceMap, ComponentLogger, NodeInfo, ConnectionProtector, ConnectionEncrypter, ConnectionGater, ContentRouting, Metrics, PeerDiscovery, PeerId, PeerRouting, StreamMuxerFactory, Transport, PrivateKey } from '@libp2p/interface' import type { PersistentPeerStoreInit } from '@libp2p/peer-store' @@ -57,6 +58,11 @@ export interface Libp2pInit { */ connectionManager?: ConnectionManagerInit + /** + * libp2p Connection Monitor configuration + */ + connectionMonitor?: ConnectionMonitorInit + /** * A connection gater can deny new connections based on user criteria */ diff --git a/packages/libp2p/src/libp2p.ts b/packages/libp2p/src/libp2p.ts index 5ca8fa88a5..ba1cd74143 100644 --- a/packages/libp2p/src/libp2p.ts +++ b/packages/libp2p/src/libp2p.ts @@ -14,6 +14,7 @@ import { checkServiceDependencies, defaultComponents } from './components.js' import { connectionGater } from './config/connection-gater.js' import { validateConfig } from './config.js' import { DefaultConnectionManager } from './connection-manager/index.js' +import { ConnectionMonitor } from './connection-monitor.js' import { CompoundContentRouting } from './content-routing.js' import { codes } from './errors.js' import { DefaultPeerRouting } from './peer-routing.js' @@ -121,6 +122,9 @@ export class Libp2pNode extends TypedEventEmi // Create the Connection Manager this.configureComponent('connectionManager', new DefaultConnectionManager(this.components, init.connectionManager)) + // Create the Connection Monitor + this.configureComponent('connectionMonitor', new ConnectionMonitor(this.components, init.connectionMonitor)) + // Create the Registrar this.configureComponent('registrar', new DefaultRegistrar(this.components)) diff --git a/packages/libp2p/test/connection-monitor/index.spec.ts b/packages/libp2p/test/connection-monitor/index.spec.ts new file mode 100644 index 0000000000..886da42daf --- /dev/null +++ b/packages/libp2p/test/connection-monitor/index.spec.ts @@ -0,0 +1,113 @@ +/* eslint-env mocha */ + +import { CodeError, start, stop } from '@libp2p/interface' +import { defaultLogger } from '@libp2p/logger' +import { expect } from 'aegir/chai' +import delay from 'delay' +import { pair } from 'it-pair' +import { type StubbedInstance, stubInterface } from 'sinon-ts' +import { ConnectionMonitor } from '../../src/connection-monitor.js' +import type { ComponentLogger, Stream, Connection } from '@libp2p/interface' +import type { ConnectionManager } from '@libp2p/interface-internal' + +interface StubbedConnectionMonitorComponents { + logger: ComponentLogger + connectionManager: StubbedInstance +} + +describe('connection monitor', () => { + let monitor: ConnectionMonitor + let components: StubbedConnectionMonitorComponents + + beforeEach(() => { + components = { + logger: defaultLogger(), + connectionManager: stubInterface() + } + }) + + afterEach(async () => { + await stop(monitor) + }) + + it('should monitor the liveness of a connection', async () => { + monitor = new ConnectionMonitor(components, { + pingInterval: 10 + }) + + await start(monitor) + + const connection = stubInterface() + const stream = stubInterface({ + ...pair() + }) + connection.newStream.withArgs('/ipfs/ping/1.0.0').resolves(stream) + + components.connectionManager.getConnections.returns([connection]) + + await delay(100) + + expect(connection.rtt).to.be.gte(0) + }) + + it('should monitor the liveness of a connection that does not support ping', async () => { + monitor = new ConnectionMonitor(components, { + pingInterval: 10 + }) + + await start(monitor) + + const connection = stubInterface() + connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async () => { + await delay(10) + throw new CodeError('Unsupported protocol', 'ERR_UNSUPPORTED_PROTOCOL') + }) + + components.connectionManager.getConnections.returns([connection]) + + await delay(100) + + expect(connection.rtt).to.be.gte(0) + }) + + it('should abort a connection that times out', async () => { + monitor = new ConnectionMonitor(components, { + pingInterval: 50, + pingTimeout: 10 + }) + + await start(monitor) + + const connection = stubInterface() + connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async (protocols, opts) => { + await delay(200) + opts?.signal?.throwIfAborted() + return stubInterface() + }) + + components.connectionManager.getConnections.returns([connection]) + + await delay(500) + + expect(connection.abort).to.have.property('called', true) + }) + + it('should abort a connection that fails', async () => { + monitor = new ConnectionMonitor(components, { + pingInterval: 10 + }) + + await start(monitor) + + const connection = stubInterface() + connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async (protocols, opts) => { + throw new CodeError('Connection closed', 'ERR_CONNECTION_CLOSED') + }) + + components.connectionManager.getConnections.returns([connection]) + + await delay(100) + + expect(connection.abort).to.have.property('called', true) + }) +}) From d0e50a52b78121d6a1f75922fdbf050fc9378f8c Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 31 Jul 2024 12:06:32 +0100 Subject: [PATCH 2/5] chore: use adaptive timeout --- packages/libp2p/src/connection-monitor.ts | 32 +++++++++++-------- .../test/connection-monitor/index.spec.ts | 4 ++- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/packages/libp2p/src/connection-monitor.ts b/packages/libp2p/src/connection-monitor.ts index fe326dc4ac..c30a1b3cfa 100644 --- a/packages/libp2p/src/connection-monitor.ts +++ b/packages/libp2p/src/connection-monitor.ts @@ -1,11 +1,11 @@ import { serviceCapabilities } from '@libp2p/interface' -import { anySignal } from 'any-signal' +import { AdaptiveTimeout } from '@libp2p/utils/adaptive-timeout' import { byteStream } from 'it-byte-stream' -import type { ComponentLogger, Logger, Startable } from '@libp2p/interface' +import type { ComponentLogger, Logger, Metrics, Startable } from '@libp2p/interface' import type { ConnectionManager } from '@libp2p/interface-internal' +import type { AdaptiveTimeoutInit } from '@libp2p/utils/adaptive-timeout' const DEFAULT_PING_INTERVAL_MS = 10000 -const DEFAULT_PING_TIMEOUT_MS = 2000 export interface ConnectionMonitorInit { /** @@ -16,12 +16,13 @@ export interface ConnectionMonitorInit { pingInterval?: number /** - * How long the ping is allowed to take before the connection will be judged - * inactive and aborted + * Timeout settings for How long the ping is allowed to take before the + * connection will be judged inactive and aborted. * - * @default 2000 + * The timeout is adaptive to cope with slower networks or nodes that + * have changing network characteristics, such as mobile. */ - pingTimeout?: number + pingTimeout?: Omit /** * If true, any connection that fails the ping will be aborted @@ -34,6 +35,7 @@ export interface ConnectionMonitorInit { export interface ConnectionMonitorComponents { logger: ComponentLogger connectionManager: ConnectionManager + metrics?: Metrics } export class ConnectionMonitor implements Startable { @@ -41,15 +43,20 @@ export class ConnectionMonitor implements Startable { private readonly log: Logger private heartbeatInterval?: ReturnType private readonly pingIntervalMs: number - private readonly pingTimeoutMs: number private abortController?: AbortController + private readonly timeout: AdaptiveTimeout constructor (components: ConnectionMonitorComponents, init: ConnectionMonitorInit = {}) { this.components = components this.log = components.logger.forComponent('libp2p:connection-monitor') this.pingIntervalMs = init.pingInterval ?? DEFAULT_PING_INTERVAL_MS - this.pingTimeoutMs = init.pingTimeout ?? DEFAULT_PING_TIMEOUT_MS + + this.timeout = new AdaptiveTimeout({ + ...(init.pingTimeout ?? {}), + metrics: components.metrics, + metricName: 'libp2p_connection_monitor_ping_time_milliseconds' + }) } readonly [Symbol.toStringTag] = '@libp2p/connection-monitor' @@ -66,10 +73,9 @@ export class ConnectionMonitor implements Startable { Promise.resolve().then(async () => { let start = Date.now() try { - const signal = anySignal([ - this.abortController?.signal, - AbortSignal.timeout(this.pingTimeoutMs) - ]) + const signal = this.timeout.getTimeoutSignal({ + signal: this.abortController?.signal + }) const stream = await conn.newStream('/ipfs/ping/1.0.0', { signal, runOnTransientConnection: true diff --git a/packages/libp2p/test/connection-monitor/index.spec.ts b/packages/libp2p/test/connection-monitor/index.spec.ts index 886da42daf..d3d920c353 100644 --- a/packages/libp2p/test/connection-monitor/index.spec.ts +++ b/packages/libp2p/test/connection-monitor/index.spec.ts @@ -73,7 +73,9 @@ describe('connection monitor', () => { it('should abort a connection that times out', async () => { monitor = new ConnectionMonitor(components, { pingInterval: 50, - pingTimeout: 10 + pingTimeout: { + initialValue: 10 + } }) await start(monitor) From 6141c868a3639916088b918b5e38cb6fbe69b996 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 31 Jul 2024 12:18:07 +0100 Subject: [PATCH 3/5] chore: add missing deps --- packages/libp2p/package.json | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/libp2p/package.json b/packages/libp2p/package.json index 67487629af..0f43d04cb9 100644 --- a/packages/libp2p/package.json +++ b/packages/libp2p/package.json @@ -101,6 +101,7 @@ "any-signal": "^4.1.1", "datastore-core": "^9.2.9", "interface-datastore": "^8.2.11", + "it-byte-stream": "^1.0.12", "it-merge": "^3.0.5", "it-parallel": "^3.0.7", "merge-options": "^3.0.4", @@ -126,6 +127,7 @@ "it-all": "^3.0.6", "it-drain": "^3.0.7", "it-map": "^3.1.0", + "it-pair": "^2.0.6", "it-pipe": "^3.0.1", "it-pushable": "^3.2.3", "it-stream-types": "^2.0.1", From 978e2214692550eed2e776757c2a7b46f0b9d546 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Fri, 9 Aug 2024 10:43:32 +0100 Subject: [PATCH 4/5] chore: apply suggestions from code review Co-authored-by: Chad Nehemiah --- packages/libp2p/src/connection-monitor.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/libp2p/src/connection-monitor.ts b/packages/libp2p/src/connection-monitor.ts index c30a1b3cfa..cb7dfb9dd6 100644 --- a/packages/libp2p/src/connection-monitor.ts +++ b/packages/libp2p/src/connection-monitor.ts @@ -16,7 +16,7 @@ export interface ConnectionMonitorInit { pingInterval?: number /** - * Timeout settings for How long the ping is allowed to take before the + * Timeout settings for how long the ping is allowed to take before the * connection will be judged inactive and aborted. * * The timeout is adaptive to cope with slower networks or nodes that From f3221ce4db98c5276b9845b2fedd994461710ff8 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 9 Aug 2024 12:02:14 +0100 Subject: [PATCH 5/5] chore: add flag to disable --- packages/libp2p/src/connection-monitor.ts | 7 +++++++ packages/libp2p/src/libp2p.ts | 6 ++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/packages/libp2p/src/connection-monitor.ts b/packages/libp2p/src/connection-monitor.ts index cb7dfb9dd6..7dcf9176f5 100644 --- a/packages/libp2p/src/connection-monitor.ts +++ b/packages/libp2p/src/connection-monitor.ts @@ -8,6 +8,13 @@ import type { AdaptiveTimeoutInit } from '@libp2p/utils/adaptive-timeout' const DEFAULT_PING_INTERVAL_MS = 10000 export interface ConnectionMonitorInit { + /** + * Whether the connection monitor is enabled + * + * @default true + */ + enabled?: boolean + /** * How often to ping remote peers in ms * diff --git a/packages/libp2p/src/libp2p.ts b/packages/libp2p/src/libp2p.ts index ba1cd74143..af6981f091 100644 --- a/packages/libp2p/src/libp2p.ts +++ b/packages/libp2p/src/libp2p.ts @@ -122,8 +122,10 @@ export class Libp2pNode extends TypedEventEmi // Create the Connection Manager this.configureComponent('connectionManager', new DefaultConnectionManager(this.components, init.connectionManager)) - // Create the Connection Monitor - this.configureComponent('connectionMonitor', new ConnectionMonitor(this.components, init.connectionMonitor)) + if (init.connectionMonitor?.enabled !== false) { + // Create the Connection Monitor if not disabled + this.configureComponent('connectionMonitor', new ConnectionMonitor(this.components, init.connectionMonitor)) + } // Create the Registrar this.configureComponent('registrar', new DefaultRegistrar(this.components))