diff --git a/packages/interface/src/connection/index.ts b/packages/interface/src/connection/index.ts index 9379eab041..1328c26952 100644 --- a/packages/interface/src/connection/index.ts +++ b/packages/interface/src/connection/index.ts @@ -287,6 +287,13 @@ export interface Connection { */ transient: boolean + /** + * The time in milliseconds it takes to make a round trip to the remote peer. + * + * This is updated periodically by the connection monitor. + */ + rtt?: number + /** * Create a new stream on this connection and negotiate one of the passed protocols */ diff --git a/packages/libp2p/package.json b/packages/libp2p/package.json index 67487629af..0f43d04cb9 100644 --- a/packages/libp2p/package.json +++ b/packages/libp2p/package.json @@ -101,6 +101,7 @@ "any-signal": "^4.1.1", "datastore-core": "^9.2.9", "interface-datastore": "^8.2.11", + "it-byte-stream": "^1.0.12", "it-merge": "^3.0.5", "it-parallel": "^3.0.7", "merge-options": "^3.0.4", @@ -126,6 +127,7 @@ "it-all": "^3.0.6", "it-drain": "^3.0.7", "it-map": "^3.1.0", + "it-pair": "^2.0.6", "it-pipe": "^3.0.1", "it-pushable": "^3.2.3", "it-stream-types": "^2.0.1", diff --git a/packages/libp2p/src/connection-monitor.ts b/packages/libp2p/src/connection-monitor.ts new file mode 100644 index 0000000000..7dcf9176f5 --- /dev/null +++ b/packages/libp2p/src/connection-monitor.ts @@ -0,0 +1,134 @@ +import { serviceCapabilities } from '@libp2p/interface' +import { AdaptiveTimeout } from '@libp2p/utils/adaptive-timeout' +import { byteStream } from 'it-byte-stream' +import type { ComponentLogger, Logger, Metrics, Startable } from '@libp2p/interface' +import type { ConnectionManager } from '@libp2p/interface-internal' +import type { AdaptiveTimeoutInit } from '@libp2p/utils/adaptive-timeout' + +const DEFAULT_PING_INTERVAL_MS = 10000 + +export interface ConnectionMonitorInit { + /** + * Whether the connection monitor is enabled + * + * @default true + */ + enabled?: boolean + + /** + * How often to ping remote peers in ms + * + * @default 10000 + */ + pingInterval?: number + + /** + * Timeout settings for how long the ping is allowed to take before the + * connection will be judged inactive and aborted. + * + * The timeout is adaptive to cope with slower networks or nodes that + * have changing network characteristics, such as mobile. + */ + pingTimeout?: Omit + + /** + * If true, any connection that fails the ping will be aborted + * + * @default true + */ + abortConnectionOnPingFailure?: boolean +} + +export interface ConnectionMonitorComponents { + logger: ComponentLogger + connectionManager: ConnectionManager + metrics?: Metrics +} + +export class ConnectionMonitor implements Startable { + private readonly components: ConnectionMonitorComponents + private readonly log: Logger + private heartbeatInterval?: ReturnType + private readonly pingIntervalMs: number + private abortController?: AbortController + private readonly timeout: AdaptiveTimeout + + constructor (components: ConnectionMonitorComponents, init: ConnectionMonitorInit = {}) { + this.components = components + + this.log = components.logger.forComponent('libp2p:connection-monitor') + this.pingIntervalMs = init.pingInterval ?? DEFAULT_PING_INTERVAL_MS + + this.timeout = new AdaptiveTimeout({ + ...(init.pingTimeout ?? {}), + metrics: components.metrics, + metricName: 'libp2p_connection_monitor_ping_time_milliseconds' + }) + } + + readonly [Symbol.toStringTag] = '@libp2p/connection-monitor' + + readonly [serviceCapabilities]: string[] = [ + '@libp2p/connection-monitor' + ] + + start (): void { + this.abortController = new AbortController() + + this.heartbeatInterval = setInterval(() => { + this.components.connectionManager.getConnections().forEach(conn => { + Promise.resolve().then(async () => { + let start = Date.now() + try { + const signal = this.timeout.getTimeoutSignal({ + signal: this.abortController?.signal + }) + const stream = await conn.newStream('/ipfs/ping/1.0.0', { + signal, + runOnTransientConnection: true + }) + const bs = byteStream(stream) + start = Date.now() + + await Promise.all([ + bs.write(new Uint8Array(1), { + signal + }), + bs.read(1, { + signal + }) + ]) + + conn.rtt = Date.now() - start + + await bs.unwrap().close({ + signal + }) + } catch (err: any) { + if (err.code !== 'ERR_UNSUPPORTED_PROTOCOL') { + throw err + } + + // protocol was unsupported, but that's ok as it means the remote + // peer was still alive. We ran multistream-select which means two + // round trips (e.g. 1x for the mss header, then another for the + // protocol) so divide the time it took by two + conn.rtt = (Date.now() - start) / 2 + } + }) + .catch(err => { + this.log.error('error during heartbeat, aborting connection', err) + conn.abort(err) + }) + }) + }, this.pingIntervalMs) + } + + stop (): void { + this.abortController?.abort() + + if (this.heartbeatInterval != null) { + clearInterval(this.heartbeatInterval) + } + } +} diff --git a/packages/libp2p/src/index.ts b/packages/libp2p/src/index.ts index 014ec77535..3defada121 100644 --- a/packages/libp2p/src/index.ts +++ b/packages/libp2p/src/index.ts @@ -18,6 +18,7 @@ import { createLibp2pNode } from './libp2p.js' import type { AddressManagerInit } from './address-manager/index.js' import type { Components } from './components.js' import type { ConnectionManagerInit } from './connection-manager/index.js' +import type { ConnectionMonitorInit } from './connection-monitor.js' import type { TransportManagerInit } from './transport-manager.js' import type { Libp2p, ServiceMap, ComponentLogger, NodeInfo, ConnectionProtector, ConnectionEncrypter, ConnectionGater, ContentRouting, Metrics, PeerDiscovery, PeerId, PeerRouting, StreamMuxerFactory, Transport, PrivateKey } from '@libp2p/interface' import type { PersistentPeerStoreInit } from '@libp2p/peer-store' @@ -57,6 +58,11 @@ export interface Libp2pInit { */ connectionManager?: ConnectionManagerInit + /** + * libp2p Connection Monitor configuration + */ + connectionMonitor?: ConnectionMonitorInit + /** * A connection gater can deny new connections based on user criteria */ diff --git a/packages/libp2p/src/libp2p.ts b/packages/libp2p/src/libp2p.ts index 5ca8fa88a5..af6981f091 100644 --- a/packages/libp2p/src/libp2p.ts +++ b/packages/libp2p/src/libp2p.ts @@ -14,6 +14,7 @@ import { checkServiceDependencies, defaultComponents } from './components.js' import { connectionGater } from './config/connection-gater.js' import { validateConfig } from './config.js' import { DefaultConnectionManager } from './connection-manager/index.js' +import { ConnectionMonitor } from './connection-monitor.js' import { CompoundContentRouting } from './content-routing.js' import { codes } from './errors.js' import { DefaultPeerRouting } from './peer-routing.js' @@ -121,6 +122,11 @@ export class Libp2pNode extends TypedEventEmi // Create the Connection Manager this.configureComponent('connectionManager', new DefaultConnectionManager(this.components, init.connectionManager)) + if (init.connectionMonitor?.enabled !== false) { + // Create the Connection Monitor if not disabled + this.configureComponent('connectionMonitor', new ConnectionMonitor(this.components, init.connectionMonitor)) + } + // Create the Registrar this.configureComponent('registrar', new DefaultRegistrar(this.components)) diff --git a/packages/libp2p/test/connection-monitor/index.spec.ts b/packages/libp2p/test/connection-monitor/index.spec.ts new file mode 100644 index 0000000000..d3d920c353 --- /dev/null +++ b/packages/libp2p/test/connection-monitor/index.spec.ts @@ -0,0 +1,115 @@ +/* eslint-env mocha */ + +import { CodeError, start, stop } from '@libp2p/interface' +import { defaultLogger } from '@libp2p/logger' +import { expect } from 'aegir/chai' +import delay from 'delay' +import { pair } from 'it-pair' +import { type StubbedInstance, stubInterface } from 'sinon-ts' +import { ConnectionMonitor } from '../../src/connection-monitor.js' +import type { ComponentLogger, Stream, Connection } from '@libp2p/interface' +import type { ConnectionManager } from '@libp2p/interface-internal' + +interface StubbedConnectionMonitorComponents { + logger: ComponentLogger + connectionManager: StubbedInstance +} + +describe('connection monitor', () => { + let monitor: ConnectionMonitor + let components: StubbedConnectionMonitorComponents + + beforeEach(() => { + components = { + logger: defaultLogger(), + connectionManager: stubInterface() + } + }) + + afterEach(async () => { + await stop(monitor) + }) + + it('should monitor the liveness of a connection', async () => { + monitor = new ConnectionMonitor(components, { + pingInterval: 10 + }) + + await start(monitor) + + const connection = stubInterface() + const stream = stubInterface({ + ...pair() + }) + connection.newStream.withArgs('/ipfs/ping/1.0.0').resolves(stream) + + components.connectionManager.getConnections.returns([connection]) + + await delay(100) + + expect(connection.rtt).to.be.gte(0) + }) + + it('should monitor the liveness of a connection that does not support ping', async () => { + monitor = new ConnectionMonitor(components, { + pingInterval: 10 + }) + + await start(monitor) + + const connection = stubInterface() + connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async () => { + await delay(10) + throw new CodeError('Unsupported protocol', 'ERR_UNSUPPORTED_PROTOCOL') + }) + + components.connectionManager.getConnections.returns([connection]) + + await delay(100) + + expect(connection.rtt).to.be.gte(0) + }) + + it('should abort a connection that times out', async () => { + monitor = new ConnectionMonitor(components, { + pingInterval: 50, + pingTimeout: { + initialValue: 10 + } + }) + + await start(monitor) + + const connection = stubInterface() + connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async (protocols, opts) => { + await delay(200) + opts?.signal?.throwIfAborted() + return stubInterface() + }) + + components.connectionManager.getConnections.returns([connection]) + + await delay(500) + + expect(connection.abort).to.have.property('called', true) + }) + + it('should abort a connection that fails', async () => { + monitor = new ConnectionMonitor(components, { + pingInterval: 10 + }) + + await start(monitor) + + const connection = stubInterface() + connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async (protocols, opts) => { + throw new CodeError('Connection closed', 'ERR_CONNECTION_CLOSED') + }) + + components.connectionManager.getConnections.returns([connection]) + + await delay(100) + + expect(connection.abort).to.have.property('called', true) + }) +})