Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add WebSockets metrics #2649

Merged
merged 12 commits into from
Aug 15, 2024
31 changes: 28 additions & 3 deletions packages/transport-websockets/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
import * as filters from './filters.js'
import { createListener } from './listener.js'
import { socketToMaConn } from './socket-to-conn.js'
import type { Transport, MultiaddrFilter, CreateListenerOptions, DialTransportOptions, Listener, AbortOptions, ComponentLogger, Logger, Connection, OutboundConnectionUpgradeEvents } from '@libp2p/interface'
import type { Transport, MultiaddrFilter, CreateListenerOptions, DialTransportOptions, Listener, AbortOptions, ComponentLogger, Logger, Connection, OutboundConnectionUpgradeEvents, Metrics, CounterGroup } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Server } from 'http'
import type { DuplexWebSocket } from 'it-ws/duplex'
Expand All @@ -82,6 +82,11 @@

export interface WebSocketsComponents {
logger: ComponentLogger
metrics?: Metrics
}

export interface WebSocketsMetrics {
dialerEvents: CounterGroup
}

export type WebSocketsDialEvents =
Expand All @@ -92,11 +97,23 @@
private readonly log: Logger
private readonly init?: WebSocketsInit
private readonly logger: ComponentLogger
private readonly metrics?: WebSocketsMetrics
private readonly components: WebSocketsComponents

constructor (components: WebSocketsComponents, init?: WebSocketsInit) {
this.log = components.logger.forComponent('libp2p:websockets')
this.logger = components.logger
this.components = components
this.init = init

if (components.metrics != null) {
this.metrics = {
dialerEvents: components.metrics.registerCounterGroup('libp2p_websockets_dialer_events_total', {
label: 'event',
help: 'Total count of WebSockets dialer events by type'
})
}
}

Check warning on line 116 in packages/transport-websockets/src/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/index.ts#L110-L116

Added lines #L110 - L116 were not covered by tests
}

readonly [transportSymbol] = true
Expand All @@ -113,7 +130,8 @@

const socket = await this._connect(ma, options)
const maConn = socketToMaConn(socket, ma, {
logger: this.logger
logger: this.logger,
metrics: this.metrics?.dialerEvents
})
this.log('new outbound connection %s', maConn.remoteAddr)

Expand All @@ -136,13 +154,18 @@
// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/error_event
const err = new CodeError(`Could not connect to ${ma.toString()}`, 'ERR_CONNECTION_FAILED')
this.log.error('connection error:', err)
this.metrics?.dialerEvents.increment({ error: true })
errorPromise.reject(err)
})

try {
options.onProgress?.(new CustomProgressEvent('websockets:open-connection'))
await raceSignal(Promise.race([rawSocket.connected(), errorPromise.promise]), options.signal)
} catch (err: any) {
if (options.signal?.aborted === true) {
this.metrics?.dialerEvents.increment({ abort: true })
}

rawSocket.close()
.catch(err => {
this.log.error('error closing raw socket', err)
Expand All @@ -152,6 +175,7 @@
}

this.log('connected %s', ma)
this.metrics?.dialerEvents.increment({ connect: true })
return rawSocket
}

Expand All @@ -162,7 +186,8 @@
*/
createListener (options: CreateListenerOptions): Listener {
return createListener({
logger: this.logger
logger: this.logger,
metrics: this.components.metrics
}, {
...this.init,
...options
Expand Down
52 changes: 49 additions & 3 deletions packages/transport-websockets/src/listener.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,57 @@
import os from 'os'
import { TypedEventEmitter, CustomEvent } from '@libp2p/interface'
import { TypedEventEmitter } from '@libp2p/interface'
import { ipPortToMultiaddr as toMultiaddr } from '@libp2p/utils/ip-port-to-multiaddr'
import { multiaddr, protocols } from '@multiformats/multiaddr'
import { createServer } from 'it-ws/server'
import { socketToMaConn } from './socket-to-conn.js'
import type { ComponentLogger, Logger, Connection, Listener, ListenerEvents, CreateListenerOptions } from '@libp2p/interface'
import type { ComponentLogger, Logger, Connection, Listener, ListenerEvents, CreateListenerOptions, CounterGroup, MetricGroup, Metrics } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Server } from 'http'
import type { DuplexWebSocket } from 'it-ws/duplex'
import type { WebSocketServer } from 'it-ws/server'

export interface WebSocketListenerComponents {
logger: ComponentLogger
metrics?: Metrics
}

export interface WebSocketListenerInit extends CreateListenerOptions {
server?: Server
}

export interface WebSocketListenerMetrics {
status: MetricGroup
errors: CounterGroup
events: CounterGroup
}

class WebSocketListener extends TypedEventEmitter<ListenerEvents> implements Listener {
private readonly connections: Set<DuplexWebSocket>
private listeningMultiaddr?: Multiaddr
private readonly server: WebSocketServer
private readonly log: Logger
private metrics?: WebSocketListenerMetrics
private addr: string

constructor (components: WebSocketListenerComponents, init: WebSocketListenerInit) {
super()

this.log = components.logger.forComponent('libp2p:websockets:listener')
const metrics = components.metrics
// Keep track of open connections to destroy when the listener is closed
this.connections = new Set<DuplexWebSocket>()

const self = this // eslint-disable-line @typescript-eslint/no-this-alias

this.addr = 'unknown'

this.server = createServer({
...init,
onConnection: (stream: DuplexWebSocket) => {
const maConn = socketToMaConn(stream, toMultiaddr(stream.remoteAddress ?? '', stream.remotePort ?? 0), {
logger: components.logger
logger: components.logger,
metrics: this.metrics?.events,
metricPrefix: `${this.addr} `
})
this.log('new inbound connection %s', maConn.remoteAddr)

Expand All @@ -62,6 +76,7 @@
})
.catch(async err => {
this.log.error('inbound connection failed to upgrade', err)
this.metrics?.errors.increment({ [`${this.addr} inbound_upgrade`]: true })

Check warning on line 79 in packages/transport-websockets/src/listener.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/listener.ts#L79

Added line #L79 was not covered by tests

await maConn.close().catch(err => {
this.log.error('inbound connection failed to close after upgrade failed', err)
Expand All @@ -71,15 +86,46 @@
this.log.error('inbound connection failed to upgrade', err)
maConn.close().catch(err => {
this.log.error('inbound connection failed to close after upgrade failed', err)
this.metrics?.errors.increment({ [`${this.addr} inbound_closing_failed`]: true })

Check warning on line 89 in packages/transport-websockets/src/listener.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/listener.ts#L89

Added line #L89 was not covered by tests
})
}
}
})

this.server.on('listening', () => {
if (metrics != null) {
const { host, port } = this.listeningMultiaddr?.toOptions() ?? {}
this.addr = `${host}:${port}`

metrics.registerMetricGroup('libp2p_websockets_inbound_connections_total', {
label: 'address',
help: 'Current active connections in WebSocket listener',
calculate: () => {
return {
[this.addr]: this.connections.size
}
}
})

this.metrics = {
status: metrics?.registerMetricGroup('libp2p_websockets_listener_status_info', {
label: 'address',
help: 'Current status of the WebSocket listener socket'
}),
errors: metrics?.registerMetricGroup('libp2p_websockets_listener_errors_total', {
label: 'address',
help: 'Total count of WebSocket listener errors by type'
}),
events: metrics?.registerMetricGroup('libp2p_websockets_listener_events_total', {
label: 'address',
help: 'Total count of WebSocket listener events by type'
})
}
}

Check warning on line 124 in packages/transport-websockets/src/listener.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/listener.ts#L97-L124

Added lines #L97 - L124 were not covered by tests
this.dispatchEvent(new CustomEvent('listening'))
})
this.server.on('error', (err: Error) => {
this.metrics?.errors.increment({ [`${this.addr} listen_error`]: true })

Check warning on line 128 in packages/transport-websockets/src/listener.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/listener.ts#L128

Added line #L128 was not covered by tests
this.dispatchEvent(new CustomEvent('error', {
detail: err
}))
Expand Down
14 changes: 13 additions & 1 deletion packages/transport-websockets/src/socket-to-conn.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import { CodeError } from '@libp2p/interface'
import { CLOSE_TIMEOUT } from './constants.js'
import type { AbortOptions, ComponentLogger, MultiaddrConnection } from '@libp2p/interface'
import type { AbortOptions, ComponentLogger, CounterGroup, MultiaddrConnection } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { DuplexWebSocket } from 'it-ws/duplex'

export interface SocketToConnOptions {
localAddr?: Multiaddr
logger: ComponentLogger
metrics?: CounterGroup
metricPrefix?: string
}

// Convert a stream into a MultiaddrConnection
// https://github.com/libp2p/interface-transport#multiaddrconnection
export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, options: SocketToConnOptions): MultiaddrConnection {
const log = options.logger.forComponent('libp2p:websockets:maconn')
const metrics = options.metrics
const metricPrefix = options.metricPrefix ?? ''

const maConn: MultiaddrConnection = {
log,
Expand Down Expand Up @@ -81,10 +85,18 @@

stream.destroy()
maConn.timeline.close = Date.now()

// ws WebSocket.terminate does not accept an Error arg to emit an 'error'
// event on destroy like other node streams so we can't update a metric
// with an event listener
// https://github.com/websockets/ws/issues/1752#issuecomment-622380981
metrics?.increment({ [`${metricPrefix}error`]: true })

Check warning on line 93 in packages/transport-websockets/src/socket-to-conn.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/socket-to-conn.ts#L88-L93

Added lines #L88 - L93 were not covered by tests
}
}

stream.socket.addEventListener('close', () => {
metrics?.increment({ [`${metricPrefix}close`]: true })

// In instances where `close` was not explicitly called,
// such as an iterable stream ending, ensure we have set the close
// timeline
Expand Down
Loading