diff --git a/packages/transport-websockets/src/index.ts b/packages/transport-websockets/src/index.ts index 49a6426d72..18cf573c9a 100644 --- a/packages/transport-websockets/src/index.ts +++ b/packages/transport-websockets/src/index.ts @@ -67,7 +67,7 @@ import { isBrowser, isWebWorker } from 'wherearewe' import * as filters from './filters.js' import { createListener } from './listener.js' import { socketToMaConn } from './socket-to-conn.js' -import type { Transport, MultiaddrFilter, CreateListenerOptions, DialTransportOptions, Listener, AbortOptions, ComponentLogger, Logger, Connection, OutboundConnectionUpgradeEvents } from '@libp2p/interface' +import type { Transport, MultiaddrFilter, CreateListenerOptions, DialTransportOptions, Listener, AbortOptions, ComponentLogger, Logger, Connection, OutboundConnectionUpgradeEvents, Metrics, CounterGroup } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' import type { Server } from 'http' import type { DuplexWebSocket } from 'it-ws/duplex' @@ -82,6 +82,11 @@ export interface WebSocketsInit extends AbortOptions, WebSocketOptions { export interface WebSocketsComponents { logger: ComponentLogger + metrics?: Metrics +} + +export interface WebSocketsMetrics { + dialerEvents: CounterGroup } export type WebSocketsDialEvents = @@ -92,11 +97,23 @@ class WebSockets implements Transport { private readonly log: Logger private readonly init?: WebSocketsInit private readonly logger: ComponentLogger + private readonly metrics?: WebSocketsMetrics + private readonly components: WebSocketsComponents constructor (components: WebSocketsComponents, init?: WebSocketsInit) { this.log = components.logger.forComponent('libp2p:websockets') this.logger = components.logger + this.components = components this.init = init + + if (components.metrics != null) { + this.metrics = { + dialerEvents: components.metrics.registerCounterGroup('libp2p_websockets_dialer_events_total', { + label: 'event', + help: 'Total count of WebSockets dialer events by type' + }) + } + } } readonly [transportSymbol] = true @@ -113,7 +130,8 @@ class WebSockets implements Transport { const socket = await this._connect(ma, options) const maConn = socketToMaConn(socket, ma, { - logger: this.logger + logger: this.logger, + metrics: this.metrics?.dialerEvents }) this.log('new outbound connection %s', maConn.remoteAddr) @@ -136,6 +154,7 @@ class WebSockets implements Transport { // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/error_event const err = new CodeError(`Could not connect to ${ma.toString()}`, 'ERR_CONNECTION_FAILED') this.log.error('connection error:', err) + this.metrics?.dialerEvents.increment({ error: true }) errorPromise.reject(err) }) @@ -143,6 +162,10 @@ class WebSockets implements Transport { options.onProgress?.(new CustomProgressEvent('websockets:open-connection')) await raceSignal(Promise.race([rawSocket.connected(), errorPromise.promise]), options.signal) } catch (err: any) { + if (options.signal?.aborted === true) { + this.metrics?.dialerEvents.increment({ abort: true }) + } + rawSocket.close() .catch(err => { this.log.error('error closing raw socket', err) @@ -152,6 +175,7 @@ class WebSockets implements Transport { } this.log('connected %s', ma) + this.metrics?.dialerEvents.increment({ connect: true }) return rawSocket } @@ -162,7 +186,8 @@ class WebSockets implements Transport { */ createListener (options: CreateListenerOptions): Listener { return createListener({ - logger: this.logger + logger: this.logger, + metrics: this.components.metrics }, { ...this.init, ...options diff --git a/packages/transport-websockets/src/listener.ts b/packages/transport-websockets/src/listener.ts index 20c2d7b50f..84905da253 100644 --- a/packages/transport-websockets/src/listener.ts +++ b/packages/transport-websockets/src/listener.ts @@ -1,10 +1,10 @@ import os from 'os' -import { TypedEventEmitter, CustomEvent } from '@libp2p/interface' +import { TypedEventEmitter } from '@libp2p/interface' import { ipPortToMultiaddr as toMultiaddr } from '@libp2p/utils/ip-port-to-multiaddr' import { multiaddr, protocols } from '@multiformats/multiaddr' import { createServer } from 'it-ws/server' import { socketToMaConn } from './socket-to-conn.js' -import type { ComponentLogger, Logger, Connection, Listener, ListenerEvents, CreateListenerOptions } from '@libp2p/interface' +import type { ComponentLogger, Logger, Connection, Listener, ListenerEvents, CreateListenerOptions, CounterGroup, MetricGroup, Metrics } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' import type { Server } from 'http' import type { DuplexWebSocket } from 'it-ws/duplex' @@ -12,32 +12,46 @@ import type { WebSocketServer } from 'it-ws/server' export interface WebSocketListenerComponents { logger: ComponentLogger + metrics?: Metrics } export interface WebSocketListenerInit extends CreateListenerOptions { server?: Server } +export interface WebSocketListenerMetrics { + status: MetricGroup + errors: CounterGroup + events: CounterGroup +} + class WebSocketListener extends TypedEventEmitter implements Listener { private readonly connections: Set private listeningMultiaddr?: Multiaddr private readonly server: WebSocketServer private readonly log: Logger + private metrics?: WebSocketListenerMetrics + private addr: string constructor (components: WebSocketListenerComponents, init: WebSocketListenerInit) { super() this.log = components.logger.forComponent('libp2p:websockets:listener') + const metrics = components.metrics // Keep track of open connections to destroy when the listener is closed this.connections = new Set() const self = this // eslint-disable-line @typescript-eslint/no-this-alias + this.addr = 'unknown' + this.server = createServer({ ...init, onConnection: (stream: DuplexWebSocket) => { const maConn = socketToMaConn(stream, toMultiaddr(stream.remoteAddress ?? '', stream.remotePort ?? 0), { - logger: components.logger + logger: components.logger, + metrics: this.metrics?.events, + metricPrefix: `${this.addr} ` }) this.log('new inbound connection %s', maConn.remoteAddr) @@ -62,6 +76,7 @@ class WebSocketListener extends TypedEventEmitter implements Lis }) .catch(async err => { this.log.error('inbound connection failed to upgrade', err) + this.metrics?.errors.increment({ [`${this.addr} inbound_upgrade`]: true }) await maConn.close().catch(err => { this.log.error('inbound connection failed to close after upgrade failed', err) @@ -71,15 +86,46 @@ class WebSocketListener extends TypedEventEmitter implements Lis this.log.error('inbound connection failed to upgrade', err) maConn.close().catch(err => { this.log.error('inbound connection failed to close after upgrade failed', err) + this.metrics?.errors.increment({ [`${this.addr} inbound_closing_failed`]: true }) }) } } }) this.server.on('listening', () => { + if (metrics != null) { + const { host, port } = this.listeningMultiaddr?.toOptions() ?? {} + this.addr = `${host}:${port}` + + metrics.registerMetricGroup('libp2p_websockets_inbound_connections_total', { + label: 'address', + help: 'Current active connections in WebSocket listener', + calculate: () => { + return { + [this.addr]: this.connections.size + } + } + }) + + this.metrics = { + status: metrics?.registerMetricGroup('libp2p_websockets_listener_status_info', { + label: 'address', + help: 'Current status of the WebSocket listener socket' + }), + errors: metrics?.registerMetricGroup('libp2p_websockets_listener_errors_total', { + label: 'address', + help: 'Total count of WebSocket listener errors by type' + }), + events: metrics?.registerMetricGroup('libp2p_websockets_listener_events_total', { + label: 'address', + help: 'Total count of WebSocket listener events by type' + }) + } + } this.dispatchEvent(new CustomEvent('listening')) }) this.server.on('error', (err: Error) => { + this.metrics?.errors.increment({ [`${this.addr} listen_error`]: true }) this.dispatchEvent(new CustomEvent('error', { detail: err })) diff --git a/packages/transport-websockets/src/socket-to-conn.ts b/packages/transport-websockets/src/socket-to-conn.ts index 1cb3278ca7..eb98160c65 100644 --- a/packages/transport-websockets/src/socket-to-conn.ts +++ b/packages/transport-websockets/src/socket-to-conn.ts @@ -1,18 +1,22 @@ import { CodeError } from '@libp2p/interface' import { CLOSE_TIMEOUT } from './constants.js' -import type { AbortOptions, ComponentLogger, MultiaddrConnection } from '@libp2p/interface' +import type { AbortOptions, ComponentLogger, CounterGroup, MultiaddrConnection } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' import type { DuplexWebSocket } from 'it-ws/duplex' export interface SocketToConnOptions { localAddr?: Multiaddr logger: ComponentLogger + metrics?: CounterGroup + metricPrefix?: string } // Convert a stream into a MultiaddrConnection // https://github.com/libp2p/interface-transport#multiaddrconnection export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, options: SocketToConnOptions): MultiaddrConnection { const log = options.logger.forComponent('libp2p:websockets:maconn') + const metrics = options.metrics + const metricPrefix = options.metricPrefix ?? '' const maConn: MultiaddrConnection = { log, @@ -81,10 +85,18 @@ export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, stream.destroy() maConn.timeline.close = Date.now() + + // ws WebSocket.terminate does not accept an Error arg to emit an 'error' + // event on destroy like other node streams so we can't update a metric + // with an event listener + // https://github.com/websockets/ws/issues/1752#issuecomment-622380981 + metrics?.increment({ [`${metricPrefix}error`]: true }) } } stream.socket.addEventListener('close', () => { + metrics?.increment({ [`${metricPrefix}close`]: true }) + // In instances where `close` was not explicitly called, // such as an iterable stream ending, ensure we have set the close // timeline