Skip to content

Commit

Permalink
feat: allow per-component metrics to be collected (#1061)
Browse files Browse the repository at this point in the history
Implements the idea from #1060 - allows us to get some insight into what's happening in a libp2p node out side of just bandwidth stats.

Configures a few default metrics if metrics are enabled - current connections, the state of the dial queue, etc.

Also makes the `Metrics` class not depend on the `ConnectionManager` class, otherwise we can't collect simple metrics from the connection manager class due to the circular dependency.
  • Loading branch information
achingbrain authored Dec 15, 2021
1 parent d172d0d commit 2f0b311
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- run: npx aegir lint
- run: npx aegir build
- run: npx aegir dep-check
- uses: ipfs/aegir/actions/bundle-size@v32.1.0
- uses: ipfs/aegir/actions/bundle-size
name: size
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
Expand Down
14 changes: 14 additions & 0 deletions src/connection-manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ const defaultOptions = {
defaultPeerValue: 1
}

const METRICS_COMPONENT = 'connection-manager'
const METRICS_PEER_CONNECTIONS = 'peer-connections'
const METRICS_ALL_CONNECTIONS = 'all-connections'

/**
* @typedef {import('../')} Libp2p
* @typedef {import('libp2p-interfaces/src/connection').Connection} Connection
Expand Down Expand Up @@ -160,6 +164,8 @@ class ConnectionManager extends EventEmitter {

await Promise.all(tasks)
this.connections.clear()
this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, 0)
this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, 0)
}

/**
Expand Down Expand Up @@ -211,10 +217,13 @@ class ConnectionManager extends EventEmitter {
const storedConn = this.connections.get(peerIdStr)

this.emit('peer:connect', connection)

if (storedConn) {
storedConn.push(connection)
} else {
this.connections.set(peerIdStr, [connection])
this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, this.connections.size)
this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, this.size)
}

this._libp2p.peerStore.keyBook.set(peerId, peerId.pubKey)
Expand Down Expand Up @@ -243,7 +252,12 @@ class ConnectionManager extends EventEmitter {
this.connections.delete(peerId)
this._peerValues.delete(connection.remotePeer.toB58String())
this.emit('peer:disconnect', connection)

this._libp2p.metrics && this._libp2p.metrics.onPeerDisconnected(connection.remotePeer)
}

this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PEER_CONNECTIONS, this.connections.size)
this._libp2p.metrics && this._libp2p.metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_ALL_CONNECTIONS, this.size)
}

/**
Expand Down
18 changes: 17 additions & 1 deletion src/dialer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ const {
MAX_ADDRS_TO_DIAL
} = require('../constants')

const METRICS_COMPONENT = 'dialler'
const METRICS_PENDING_DIALS = 'pending-dials'
const METRICS_PENDING_DIAL_TARGETS = 'pending-dials-targers'

/**
* @typedef {import('libp2p-interfaces/src/connection').Connection} Connection
* @typedef {import('peer-id')} PeerId
Expand All @@ -44,6 +48,7 @@ const {
* @property {number} [maxDialsPerPeer = MAX_PER_PEER_DIALS] - Number of max concurrent dials per peer.
* @property {number} [dialTimeout = DIAL_TIMEOUT] - How long a dial attempt is allowed to take.
* @property {Record<string, Resolver>} [resolvers = {}] - multiaddr resolvers to use when dialing
* @property {import('../metrics')} [metrics]
*
* @typedef DialTarget
* @property {string} id
Expand All @@ -69,7 +74,8 @@ class Dialer {
maxAddrsToDial = MAX_ADDRS_TO_DIAL,
dialTimeout = DIAL_TIMEOUT,
maxDialsPerPeer = MAX_PER_PEER_DIALS,
resolvers = {}
resolvers = {},
metrics
}) {
this.transportManager = transportManager
this.peerStore = peerStore
Expand All @@ -81,6 +87,7 @@ class Dialer {
this.tokens = [...new Array(maxParallelDials)].map((_, index) => index)
this._pendingDials = new Map()
this._pendingDialTargets = new Map()
this._metrics = metrics

for (const [key, value] of Object.entries(resolvers)) {
Multiaddr.resolvers.set(key, value)
Expand All @@ -104,6 +111,9 @@ class Dialer {
pendingTarget.reject(new AbortError('Dialer was destroyed'))
}
this._pendingDialTargets.clear()

this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIALS, 0)
this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIAL_TARGETS, 0)
}

/**
Expand Down Expand Up @@ -153,6 +163,7 @@ class Dialer {
const id = `${(parseInt(String(Math.random() * 1e9), 10)).toString() + Date.now()}`
const cancellablePromise = new Promise((resolve, reject) => {
this._pendingDialTargets.set(id, { resolve, reject })
this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIAL_TARGETS, this._pendingDialTargets.size)
})

try {
Expand All @@ -164,6 +175,7 @@ class Dialer {
return dialTarget
} finally {
this._pendingDialTargets.delete(id)
this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIAL_TARGETS, this._pendingDialTargets.size)
}
}

Expand Down Expand Up @@ -252,9 +264,13 @@ class Dialer {
destroy: () => {
timeoutController.clear()
this._pendingDials.delete(dialTarget.id)
this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIALS, this._pendingDials.size)
}
}
this._pendingDials.set(dialTarget.id, pendingDial)

this._metrics && this._metrics.updateComponentMetric(METRICS_COMPONENT, METRICS_PENDING_DIALS, this._pendingDials.size)

return pendingDial
}

Expand Down
8 changes: 5 additions & 3 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,11 @@ class Libp2p extends EventEmitter {

// Create Metrics
if (this._options.metrics.enabled) {
this.metrics = new Metrics({
...this._options.metrics,
connectionManager: this.connectionManager
const metrics = new Metrics({
...this._options.metrics
})

this.metrics = metrics
}

// Create keychain
Expand Down Expand Up @@ -262,6 +263,7 @@ class Libp2p extends EventEmitter {
this.dialer = new Dialer({
transportManager: this.transportManager,
peerStore: this.peerStore,
metrics: this.metrics,
...this._options.dialer
})

Expand Down
26 changes: 18 additions & 8 deletions src/metrics/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ const directionToEvent = {
*/

/**
* @typedef MetricsProperties
* @property {import('../connection-manager')} connectionManager
*
* @typedef MetricsOptions
* @property {number} [computeThrottleMaxQueueSize = defaultOptions.computeThrottleMaxQueueSize]
* @property {number} [computeThrottleTimeout = defaultOptions.computeThrottleTimeout]
Expand All @@ -37,7 +34,7 @@ const directionToEvent = {
class Metrics {
/**
* @class
* @param {MetricsProperties & MetricsOptions} options
* @param {MetricsOptions} options
*/
constructor (options) {
this._options = mergeOptions(defaultOptions, options)
Expand All @@ -47,10 +44,7 @@ class Metrics {
this._oldPeers = oldPeerLRU(this._options.maxOldPeersRetention)
this._running = false
this._onMessage = this._onMessage.bind(this)
this._connectionManager = options.connectionManager
this._connectionManager.on('peer:disconnect', (connection) => {
this.onPeerDisconnected(connection.remotePeer)
})
this._componentMetrics = new Map()
}

/**
Expand Down Expand Up @@ -94,6 +88,22 @@ class Metrics {
return Array.from(this._peerStats.keys())
}

/**
* @returns {Map}
*/
getComponentMetrics () {
return this._componentMetrics
}

updateComponentMetric (component, metric, value) {
if (!this._componentMetrics.has(component)) {
this._componentMetrics.set(component, new Map())
}

const map = this._componentMetrics.get(component)
map.set(metric, value)
}

/**
* Returns the `Stats` object for the given `PeerId` whether it
* is a live peer, or in the disconnected peer LRU cache.
Expand Down
36 changes: 23 additions & 13 deletions test/metrics/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@

const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')

const { EventEmitter } = require('events')

const { randomBytes } = require('libp2p-crypto')
const duplexPair = require('it-pair/duplex')
const pipe = require('it-pipe')
Expand Down Expand Up @@ -34,8 +31,7 @@ describe('Metrics', () => {
const [local, remote] = duplexPair()
const metrics = new Metrics({
computeThrottleMaxQueueSize: 1, // compute after every message
movingAverageIntervals: [10, 100, 1000],
connectionManager: new EventEmitter()
movingAverageIntervals: [10, 100, 1000]
})

metrics.trackStream({
Expand Down Expand Up @@ -70,8 +66,7 @@ describe('Metrics', () => {
const [local, remote] = duplexPair()
const metrics = new Metrics({
computeThrottleMaxQueueSize: 1, // compute after every message
movingAverageIntervals: [10, 100, 1000],
connectionManager: new EventEmitter()
movingAverageIntervals: [10, 100, 1000]
})

metrics.trackStream({
Expand Down Expand Up @@ -119,8 +114,7 @@ describe('Metrics', () => {
const [local2, remote2] = duplexPair()
const metrics = new Metrics({
computeThrottleMaxQueueSize: 1, // compute after every message
movingAverageIntervals: [10, 100, 1000],
connectionManager: new EventEmitter()
movingAverageIntervals: [10, 100, 1000]
})
const protocol = '/echo/1.0.0'
metrics.start()
Expand Down Expand Up @@ -175,8 +169,7 @@ describe('Metrics', () => {
const [local, remote] = duplexPair()
const metrics = new Metrics({
computeThrottleMaxQueueSize: 1, // compute after every message
movingAverageIntervals: [10, 100, 1000],
connectionManager: new EventEmitter()
movingAverageIntervals: [10, 100, 1000]
})
metrics.start()

Expand Down Expand Up @@ -231,8 +224,7 @@ describe('Metrics', () => {
}))

const metrics = new Metrics({
maxOldPeersRetention: 5, // Only keep track of 5
connectionManager: new EventEmitter()
maxOldPeersRetention: 5 // Only keep track of 5
})

// Clone so trackedPeers isn't modified
Expand Down Expand Up @@ -262,4 +254,22 @@ describe('Metrics', () => {
expect(spy).to.have.property('callCount', 1)
}
})

it('should allow components to track metrics', () => {
const metrics = new Metrics({
maxOldPeersRetention: 5 // Only keep track of 5
})

expect(metrics.getComponentMetrics()).to.be.empty()

const component = 'my-component'
const metric = 'some-metric'
const value = 1

metrics.updateComponentMetric(component, metric, value)

expect(metrics.getComponentMetrics()).to.have.lengthOf(1)
expect(metrics.getComponentMetrics().get(component)).to.have.lengthOf(1)
expect(metrics.getComponentMetrics().get(component).get(metric)).to.equal(value)
})
})

0 comments on commit 2f0b311

Please sign in to comment.