Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

fix: use labels to differentiate interfaces for metrics #230

Merged
merged 1 commit into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ export interface TCPComponents {

export interface TCPMetrics {
dialerEvents: CounterGroup
listenerEvents: CounterGroup
}

class TCP implements Transport {
Expand All @@ -76,13 +75,9 @@ class TCP implements Transport {

if (components.metrics != null) {
this.metrics = {
dialerEvents: components.metrics.registerCounterGroup('libp2p_tcp_dialer_errors_total', {
dialerEvents: components.metrics.registerCounterGroup('libp2p_tcp_dialer_events_total', {
label: 'event',
help: 'Total count of TCP dialer errors by error type'
}),
listenerEvents: components.metrics.registerCounterGroup('libp2p_tcp_listener_errors_total', {
label: 'event',
help: 'Total count of TCP listener errors by error type'
help: 'Total count of TCP dialer events by type'
})
}
}
Expand Down Expand Up @@ -115,7 +110,7 @@ class TCP implements Transport {
})
log('new outbound connection %s', maConn.remoteAddr)
const conn = await options.upgrader.upgradeOutbound(maConn)
log('outbound connection %s upgraded', maConn.remoteAddr)
log('outbound connection upgraded %s', maConn.remoteAddr)
return conn
}

Expand Down
60 changes: 35 additions & 25 deletions src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type { MultiaddrConnection, Connection } from '@libp2p/interface-connecti
import type { Upgrader, Listener, ListenerEvents } from '@libp2p/interface-transport'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { TCPCreateListenerOptions } from './index.js'
import type { CounterGroup, Metric, Metrics } from '@libp2p/interface-metrics'
import type { CounterGroup, MetricGroup, Metrics } from '@libp2p/interface-metrics'

const log = logger('libp2p:tcp:listener')

Expand Down Expand Up @@ -39,7 +39,7 @@ const SERVER_STATUS_UP = 1
const SERVER_STATUS_DOWN = 0

export interface TCPListenerMetrics {
status: Metric
status: MetricGroup
errors: CounterGroup
events: CounterGroup
}
Expand All @@ -52,12 +52,14 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
private readonly connections = new Set<MultiaddrConnection>()
private status: Status = { started: false }
private metrics?: TCPListenerMetrics
private addr: string

constructor (private readonly context: Context) {
super()

context.keepAlive = context.keepAlive ?? true

this.addr = 'unknown'
this.server = net.createServer(context, this.onSocket.bind(this))

// https://nodejs.org/api/net.html#servermaxconnections
Expand All @@ -72,49 +74,56 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
if (context.metrics != null) {
// we are listening, register metrics for our port
const address = this.server.address()
let addr: string

if (address == null) {
addr = 'unknown'
this.addr = 'unknown'
} else if (typeof address === 'string') {
// unix socket
addr = address
this.addr = address
} else {
addr = `${address.address}:${address.port}`
this.addr = `${address.address}:${address.port}`
}

context.metrics?.registerMetric(`libp2p_tcp_connections_${addr}_total`, {
context.metrics?.registerMetricGroup('libp2p_tcp_inbound_connections_total', {
label: 'address',
help: 'Current active connections in TCP listener',
calculate: () => {
return this.connections.size
return {
[this.addr]: this.connections.size
}
}
})

this.metrics = {
status: context.metrics.registerMetric(`libp2p_tcp_${addr}_server_status_info`, {
help: 'Current status of the TCP server'
status: context.metrics.registerMetricGroup('libp2p_tcp_listener_status_info', {
label: 'address',
help: 'Current status of the TCP listener socket'
}),
errors: context.metrics.registerCounterGroup(`libp2p_tcp_${addr}_server_errors_total`, {
label: 'error',
help: 'Total count of TCP listener errors by error type'
errors: context.metrics.registerMetricGroup('libp2p_tcp_listener_errors_total', {
label: 'address',
help: 'Total count of TCP listener errors by type'
}),
events: context.metrics.registerCounterGroup(`libp2p_tcp_${addr}_socket_events_total`, {
label: 'event',
help: 'Total count of TCP socket events by event'
events: context.metrics.registerMetricGroup('libp2p_tcp_listener_events_total', {
label: 'address',
help: 'Total count of TCP listener events by type'
})
}

this.metrics?.status.update(SERVER_STATUS_UP)
this.metrics?.status.update({
[this.addr]: SERVER_STATUS_UP
})
}

this.dispatchEvent(new CustomEvent('listening'))
})
.on('error', err => {
this.metrics?.errors.increment({ listen_error: true })
this.metrics?.errors.increment({ [`${this.addr} listen_error`]: true })
this.dispatchEvent(new CustomEvent<Error>('error', { detail: err }))
})
.on('close', () => {
this.metrics?.status.update(SERVER_STATUS_DOWN)
this.metrics?.status.update({
[this.addr]: SERVER_STATUS_DOWN
})
this.dispatchEvent(new CustomEvent('close'))
})
}
Expand All @@ -123,7 +132,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
// Avoid uncaught errors caused by unstable connections
socket.on('error', err => {
log('socket error', err)
this.metrics?.events.increment({ error: true })
this.metrics?.events.increment({ [`${this.addr} error`]: true })
})

let maConn: MultiaddrConnection
Expand All @@ -132,19 +141,20 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
listeningAddr: this.status.started ? this.status.listeningAddr : undefined,
socketInactivityTimeout: this.context.socketInactivityTimeout,
socketCloseTimeout: this.context.socketCloseTimeout,
metrics: this.metrics?.events
metrics: this.metrics?.events,
metricPrefix: `${this.addr} `
})
} catch (err) {
log.error('inbound connection failed', err)
this.metrics?.errors.increment({ inbound_to_connection: true })
this.metrics?.errors.increment({ [`${this.addr} inbound_to_connection`]: true })
return
}

log('new inbound connection %s', maConn.remoteAddr)
try {
this.context.upgrader.upgradeInbound(maConn)
.then((conn) => {
log('inbound connection %s upgraded', maConn.remoteAddr)
log('inbound connection upgraded %s', maConn.remoteAddr)
this.connections.add(maConn)

socket.once('close', () => {
Expand All @@ -159,7 +169,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
})
.catch(async err => {
log.error('inbound connection failed', err)
this.metrics?.errors.increment({ inbound_upgrade: true })
this.metrics?.errors.increment({ [`${this.addr} inbound_upgrade`]: true })

await attemptClose(maConn)
})
Expand All @@ -172,7 +182,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
attemptClose(maConn)
.catch(err => {
log.error('closing inbound connection failed', err)
this.metrics?.errors.increment({ inbound_closing_failed: true })
this.metrics?.errors.increment({ [`${this.addr} inbound_closing_failed`]: true })
})
}
}
Expand Down
8 changes: 5 additions & 3 deletions src/socket-to-conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ interface ToConnectionOptions {
socketInactivityTimeout?: number
socketCloseTimeout?: number
metrics?: CounterGroup
metricPrefix?: string
}

/**
Expand All @@ -29,6 +30,7 @@ interface ToConnectionOptions {
*/
export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptions) => {
const metrics = options.metrics
const metricPrefix = options.metricPrefix ?? ''
const inactivityTimeout = options.socketInactivityTimeout ?? SOCKET_TIMEOUT
const closeTimeout = options.socketCloseTimeout ?? CLOSE_TIMEOUT

Expand Down Expand Up @@ -63,7 +65,7 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#socketsettimeouttimeout-callback
socket.setTimeout(inactivityTimeout, () => {
log('%s socket read timeout', lOptsStr)
metrics?.increment({ timeout: true })
metrics?.increment({ [`${metricPrefix}timeout`]: true })

// only destroy with an error if the remote has not sent the FIN message
let err: Error | undefined
Expand All @@ -78,7 +80,7 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio

socket.once('close', () => {
log('%s socket read timeout', lOptsStr)
metrics?.increment({ close: true })
metrics?.increment({ [`${metricPrefix}close`]: true })

// In instances where `close` was not explicitly called,
// such as an iterable stream ending, ensure we have set the close
Expand All @@ -92,7 +94,7 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio
// the remote sent a FIN packet which means no more data will be sent
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-end
log('socket ended', maConn.remoteAddr.toString())
metrics?.increment({ end: true })
metrics?.increment({ [`${metricPrefix}end`]: true })
})

const maConn: MultiaddrConnection = {
Expand Down