Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CON-152] Allow state machine jobs to record metrics #3309

Merged
merged 5 commits into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions creator-node/src/serviceRegistry.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class ServiceRegistry {

/**
* Truncates large JSON data in Bull Board after any of the following is exceeded:
* - 5 levels of nesting
* - 7 levels of nesting
* - 10,000 characters (strings only)
* - 100 elements (arrays) or 100 keys (objects)
*
Expand All @@ -197,12 +197,21 @@ class ServiceRegistry {
truncateBull(dataToTruncate, curDepth = 0) {
if (
typeof dataToTruncate === 'object' &&
!Array.isArray(dataToTruncate) &&
dataToTruncate !== null &&
dataToTruncate !== undefined
) {
if (curDepth < 5) {
if (curDepth < 7) {
const newDepth = curDepth + 1
if (Array.isArray(dataToTruncate)) {
if (dataToTruncate.length > 100) {
return `[Truncated array with ${dataToTruncate.length} elements]`
}
const truncatedArr = []
dataToTruncate.forEach((element) => {
truncatedArr.push(this.truncateBull(element, newDepth))
})
return truncatedArr
}
const json = Object.assign({}, dataToTruncate)
Object.entries(dataToTruncate).forEach(([key, value]) => {
switch (typeof value) {
Expand Down Expand Up @@ -233,9 +242,9 @@ class ServiceRegistry {
})
return json
}
return `[Truncated object with ${
Object.keys(dataToTruncate).length
} keys]`
return Array.isArray(dataToTruncate)
? `[Truncated array with ${dataToTruncate.length} elements]`
: `[Truncated object with ${Object.keys(dataToTruncate).length} keys]`
}
return dataToTruncate
}
Expand All @@ -262,7 +271,7 @@ class ServiceRegistry {
await this._initSnapbackSM()
this.stateMachineManager = new StateMachineManager()
const { stateMonitoringQueue, stateReconciliationQueue } =
await this.stateMachineManager.init(this.libs)
await this.stateMachineManager.init(this.libs, this.prometheusRegistry)

// SyncQueue construction (requires L1 identity)
// Note - passes in reference to instance of self (serviceRegistry), a very sub-optimal workaround
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
const promClient = require('prom-client')
const _ = require('lodash')
const config = require('../../config')
const { exponentialBucketsRange } = require('./prometheusUtils')

/**
* For explanation of Metrics, and instructions on how to add a new metric, please see `prometheusMonitoring/README.md`
Expand All @@ -21,12 +23,33 @@ const MetricTypes = Object.freeze({
let MetricNames = {
SYNC_QUEUE_JOBS_TOTAL_GAUGE: 'sync_queue_jobs_total',
ROUTE_POST_TRACKS_DURATION_SECONDS_HISTOGRAM:
'route_post_tracks_duration_seconds'
'route_post_tracks_duration_seconds',
ISSUE_SYNC_REQUEST_MONITORING_DURATION_SECONDS_HISTOGRAM:
'issue_sync_request_monitoring_duration_seconds'
}
MetricNames = Object.freeze(
_.mapValues(MetricNames, (metricName) => NamespacePrefix + metricName)
)

const MetricLabels = Object.freeze({
[MetricNames.ISSUE_SYNC_REQUEST_MONITORING_DURATION_SECONDS_HISTOGRAM]: {
// The reason another sync is needed
reason_for_additional_sync: [
'secondary_progressed_too_slow', // The secondary sync went through, but its clock value didn't increase enough
'secondary_failed_to_progress', // The secondary's clock value did not increase at all
'none' // No additional sync is required -- the first sync was successful
]
}
})
const MetricLabelNames = Object.freeze(
Object.fromEntries(
Object.entries(MetricLabels).map(([metric, metricLabels]) => [
metric,
Object.keys(metricLabels)
])
)
)

const Metrics = Object.freeze({
[MetricNames.SYNC_QUEUE_JOBS_TOTAL_GAUGE]: {
metricType: MetricTypes.GAUGE,
Expand All @@ -45,10 +68,28 @@ const Metrics = Object.freeze({
labelNames: ['code'],
buckets: [0.1, 0.3, 0.5, 1, 3, 5, 10] // 0.1 to 10 seconds
}
},
[MetricNames.ISSUE_SYNC_REQUEST_MONITORING_DURATION_SECONDS_HISTOGRAM]: {
metricType: MetricTypes.HISTOGRAM,
metricConfig: {
name: MetricNames.ISSUE_SYNC_REQUEST_MONITORING_DURATION_SECONDS_HISTOGRAM,
help: 'Seconds spent monitoring an outgoing sync request issued by this node to be completed (successfully or not)',
labelNames:
MetricLabelNames[
MetricNames.ISSUE_SYNC_REQUEST_MONITORING_DURATION_SECONDS_HISTOGRAM
],
// 5 buckets in the range of 1 second to max seconds before timing out a sync request
buckets: exponentialBucketsRange(
theoilie marked this conversation as resolved.
Show resolved Hide resolved
1,
config.get('maxSyncMonitoringDurationInMs') / 1000,
SidSethi marked this conversation as resolved.
Show resolved Hide resolved
5
)
}
}
})

module.exports.NamespacePrefix = NamespacePrefix
module.exports.MetricTypes = MetricTypes
module.exports.MetricNames = MetricNames
module.exports.MetricLabels = MetricLabels
module.exports.Metrics = Metrics
44 changes: 44 additions & 0 deletions creator-node/src/services/prometheusMonitoring/prometheusUtils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
const _ = require('lodash')

/**
* Creates 'count' buckets, where the lowest bucket is 'min' and the highest bucket is 'max'.
* The final +Inf bucket is not counted and not included in the returned array.
* The returned array is meant to be used for the buckets field of metricConfig.
* Throws if 'count' is 0 or negative, if 'min' is 0 or negative.
*
* Uses an implementation similar to the Golang client's ExponentialBucketsRange function:
* https://github.com/prometheus/client_golang/blob/v1.12.2/prometheus/histogram.go#L125
*
* See prom-client (JS) proposed implementation that was never completed:
* https://github.com/siimon/prom-client/issues/213
*
* @param {number} min the lowest value for a bucket
* @param {number} max the highest value for a bucket
* @param {number} count the number of buckets to generate for values between min and max
* @param {number} [precision] the number of decimal points to round each bucket to
* @returns 'count' buckets (number[] of length 'count') for values between 'min' and 'max'
*/
const exponentialBucketsRange = (min, max, count, precision = 0) => {
if (count < 1) {
throw new Error('exponentialBucketsRange count needs a positive count')
}
if (min <= 0) {
throw new Error('ExponentialBucketsRange min needs to be greater than 0')
}

// Formula for exponential buckets: max = min*growthFactor^(bucketCount-1)

// We know max/min and highest bucket. Solve for growthFactor
const growthFactor = (max / min) ** (1 / (count - 1))

// Now that we know growthFactor, solve for each bucket
const buckets = []
for (let i = 1; i <= count; i++) {
const bucket = min * growthFactor ** (i - 1)
buckets.push(_.round(bucket, precision))
}
return buckets
}

module.exports = {}
module.exports.exponentialBucketsRange = exponentialBucketsRange
16 changes: 9 additions & 7 deletions creator-node/src/services/stateMachineManager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ const StateReconciliationManager = require('./stateReconciliation')
const NodeToSpIdManager = require('./CNodeToSpIdMapManager')
const { RECONFIG_MODES } = require('./stateMachineConstants')
const QueueInterfacer = require('./QueueInterfacer')
const makeCompletedJobEnqueueOtherJobs = require('./makeCompletedJobEnqueueOtherJobs')
const makeOnCompleteCallback = require('./makeOnCompleteCallback')

/**
* Manages the queue for monitoring the state of Content Nodes and
* the queue for reconciling anomalies in the state (syncs and replica set updates).
* Use QueueInterfacer for interfacing with the queues.
*/
class StateMachineManager {
async init(audiusLibs) {
async init(audiusLibs, prometheusRegistry) {
this.updateEnabledReconfigModesSet()

// TODO: Decide on interval to run this on
Expand All @@ -40,19 +40,21 @@ class StateMachineManager {
)
const stateReconciliationQueue = await stateReconciliationManager.init()

// Make jobs enqueue other jobs as necessary upon completion
// Upon completion, make jobs record metrics and enqueue other jobs as necessary
stateMonitoringQueue.on(
'global:completed',
makeCompletedJobEnqueueOtherJobs(
makeOnCompleteCallback(
stateMonitoringQueue,
stateReconciliationQueue
stateReconciliationQueue,
prometheusRegistry
).bind(this)
)
stateReconciliationQueue.on(
'global:completed',
makeCompletedJobEnqueueOtherJobs(
makeOnCompleteCallback(
stateMonitoringQueue,
stateReconciliationQueue
stateReconciliationQueue,
prometheusRegistry
).bind(this)
)

Expand Down

This file was deleted.

Loading