Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CON-152] Allow state machine jobs to record metrics #3309

Merged
merged 5 commits into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const promClient = require('prom-client')
const _ = require('lodash')
const config = require('../../config')
const { exponentialBucketsRange } = require('./prometheusUtils')

/**
* For explanation of Metrics, and instructions on how to add a new metric, please see `prometheusMonitoring/README.md`
Expand All @@ -9,25 +10,6 @@ const config = require('../../config')
// We add a namespace prefix to differentiate internal metrics from those exported by different exporters from the same host
const NamespacePrefix = 'audius_cn_'

const MetricLabels = Object.freeze({
ISSUE_SYNC_SECONDS_WAITING_FOR_COMPLETION_HISTOGRAM: {
// The reason another sync is needed
reason_for_additional_sync: [
'secondary_progressed_too_slow', // The secondary sync went through, but its clock value didn't increase enough
'secondary_failed_to_progress', // The secondary's clock value did not increase at all
'none' // No additional sync is required -- the first sync was successful
]
}
})
const MetricLabelNames = Object.freeze(
Object.fromEntries(
Object.entries(MetricLabels).map(([metric, metricLabels]) => [
metric,
Object.keys(metricLabels)
])
)
)

/**
* @notice Counter and Summary metric types are currently disabled, see README for details.
*/
Expand All @@ -42,41 +24,31 @@ let MetricNames = {
SYNC_QUEUE_JOBS_TOTAL_GAUGE: 'sync_queue_jobs_total',
ROUTE_POST_TRACKS_DURATION_SECONDS_HISTOGRAM:
'route_post_tracks_duration_seconds',
ISSUE_SYNC_SECONDS_WAITING_FOR_COMPLETION_HISTOGRAM:
'issue_sync_seconds_waiting_for_completion_histogram'
ISSUE_SYNC_REQUEST_MONITORING_DURATION_SECONDS_HISTOGRAM:
'issue_sync_request_monitoring_duration_seconds'
}
MetricNames = Object.freeze(
_.mapValues(MetricNames, (metricName) => NamespacePrefix + metricName)
)

// Uses an implementation similar to the Golang client's ExponentialBucketsRange function:
// https://github.com/prometheus/client_golang/blob/v1.12.2/prometheus/histogram.go#L125
// ExponentialBucketsRange creates 'count' buckets, where the lowest bucket is
// 'min' and the highest bucket is 'max'. The final +Inf bucket is not counted
// and not included in the returned array. The returned array is meant to be
// used for the buckets field of metricConfig.
//
// The function throws if 'count' is 0 or negative, if 'min' is 0 or negative.
const exponentialBucketsRange = (min, max, count) => {
if (count < 1) {
throw new Error('exponentialBucketsRange count needs a positive count')
}
if (min <= 0) {
throw new Error('ExponentialBucketsRange min needs to be greater than 0')
}

// Formula for exponential buckets: max = min*growthFactor^(bucketCount-1)

// We know max/min and highest bucket. Solve for growthFactor
const growthFactor = (max / min) ** (1 / (count - 1))

// Now that we know growthFactor, solve for each bucket
const buckets = []
for (let i = 1; i <= count; i++) {
buckets.push(min * growthFactor ** (i - 1))
const MetricLabels = Object.freeze({
[MetricNames.ISSUE_SYNC_REQUEST_MONITORING_DURATION_SECONDS_HISTOGRAM]: {
// The reason another sync is needed
reason_for_additional_sync: [
'secondary_progressed_too_slow', // The secondary sync went through, but its clock value didn't increase enough
'secondary_failed_to_progress', // The secondary's clock value did not increase at all
'none' // No additional sync is required -- the first sync was successful
]
}
return buckets
}
})
const MetricLabelNames = Object.freeze(
Object.fromEntries(
Object.entries(MetricLabels).map(([metric, metricLabels]) => [
metric,
Object.keys(metricLabels)
])
)
)

const Metrics = Object.freeze({
[MetricNames.SYNC_QUEUE_JOBS_TOTAL_GAUGE]: {
Expand All @@ -97,13 +69,15 @@ const Metrics = Object.freeze({
buckets: [0.1, 0.3, 0.5, 1, 3, 5, 10] // 0.1 to 10 seconds
}
},
[MetricNames.ISSUE_SYNC_SECONDS_WAITING_FOR_COMPLETION_HISTOGRAM]: {
[MetricNames.ISSUE_SYNC_REQUEST_MONITORING_DURATION_SECONDS_HISTOGRAM]: {
metricType: MetricTypes.HISTOGRAM,
metricConfig: {
name: MetricNames.ISSUE_SYNC_SECONDS_WAITING_FOR_COMPLETION_HISTOGRAM,
name: MetricNames.ISSUE_SYNC_REQUEST_MONITORING_DURATION_SECONDS_HISTOGRAM,
help: 'Seconds spent monitoring an outgoing sync request issued by this node to be completed (successfully or not)',
labelNames:
MetricLabelNames.ISSUE_SYNC_SECONDS_WAITING_FOR_COMPLETION_HISTOGRAM,
MetricLabelNames[
MetricNames.ISSUE_SYNC_REQUEST_MONITORING_DURATION_SECONDS_HISTOGRAM
],
// 5 buckets in the range of 1 second to max seconds before timing out a sync request
buckets: exponentialBucketsRange(
theoilie marked this conversation as resolved.
Show resolved Hide resolved
1,
Expand Down
44 changes: 44 additions & 0 deletions creator-node/src/services/prometheusMonitoring/prometheusUtils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
const _ = require('lodash')

/**
* Creates 'count' buckets, where the lowest bucket is 'min' and the highest bucket is 'max'.
* The final +Inf bucket is not counted and not included in the returned array.
* The returned array is meant to be used for the buckets field of metricConfig.
* Throws if 'count' is 0 or negative, if 'min' is 0 or negative.
*
* Uses an implementation similar to the Golang client's ExponentialBucketsRange function:
* https://github.com/prometheus/client_golang/blob/v1.12.2/prometheus/histogram.go#L125
*
* See prom-client (JS) proposed implementation that was never completed:
* https://github.com/siimon/prom-client/issues/213
*
* @param {number} min the lowest value for a bucket
* @param {number} max the highest value for a bucket
* @param {number} count the number of buckets to generate for values between min and max
* @param {number} [precision] the number of decimal points to round each bucket to
* @returns 'count' buckets (number[] of length 'count') for values between 'min' and 'max'
*/
const exponentialBucketsRange = (min, max, count, precision = 0) => {
if (count < 1) {
throw new Error('exponentialBucketsRange count needs a positive count')
}
if (min <= 0) {
throw new Error('ExponentialBucketsRange min needs to be greater than 0')
}

// Formula for exponential buckets: max = min*growthFactor^(bucketCount-1)

// We know max/min and highest bucket. Solve for growthFactor
const growthFactor = (max / min) ** (1 / (count - 1))

// Now that we know growthFactor, solve for each bucket
const buckets = []
for (let i = 1; i <= count; i++) {
const bucket = min * growthFactor ** (i - 1)
buckets.push(_.round(bucket, precision))
}
return buckets
}

module.exports = {}
module.exports.exponentialBucketsRange = exponentialBucketsRange
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
const _ = require('lodash')

const { logger: baseLogger, createChildLogger } = require('../../logging')
const { QUEUE_NAMES, JOB_NAMES } = require('./stateMachineConstants')

Expand Down Expand Up @@ -92,12 +90,10 @@ module.exports = function (
// Record metrics
theoilie marked this conversation as resolved.
Show resolved Hide resolved
;(metricsToRecord || []).forEach((metricInfo) => {
try {
const { metricName, metricType, metricValue, metricLabel } = metricInfo
const { metricName, metricType, metricValue, metricLabels } = metricInfo
const metric = prometheusRegistry.getMetric(metricName)
if (metricType === 'HISTOGRAM') {
_.isEmpty(metricLabel)
? metric.observe(metricValue)
: metric.observe(metricLabel, metricValue)
metric.observe(metricLabels, metricValue)
} else if (metricType === 'GAUGE') {
metric.set(metricValue)
} else {
Expand Down
49 changes: 18 additions & 31 deletions creator-node/src/services/stateMachineManager/stateMachineUtils.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,47 +134,34 @@ const retrieveClockValueForUserFromReplica = async (replica, wallet) => {
/**
* Returns an object that can be returned from any state machine job to record a histogram metric being observed.
* Example: to call histogram.observe('response_time', { code: '200' }, 1000), you would call this function with:
* makeHistogramToRecord('response_time', 1000, 'code', '200')
* @param {string} metricName the key of the name of the metric from prometheus.constants
* makeHistogramToRecord('response_time', 1000, { code: '200' })
* @param {string} metricName the name of the metric from prometheus.constants
* @param {number} metricValue the value to observe
* @param {string} [metricLabelName] the optional name of the label
* @param {string} [metricLabelValue] the optional value of the label
* @param {string} [metricLabels] the optional mapping of metric label name => metric label value
*/
const makeHistogramToRecord = (
metricName,
metricValue,
metricLabelName = '',
metricLabelValue = ''
) => {
if (!Object.keys(MetricNames).includes(metricName)) {
throw new Error(
`Invalid metricName: ${metricName}. Options: ${JSON.stringify(
Object.keys(MetricNames)
)}`
)
const makeHistogramToRecord = (metricName, metricValue, metricLabels = {}) => {
if (!Object.values(MetricNames).includes(metricName)) {
throw new Error(`Invalid metricName: ${metricName}`)
}
if (typeof metricValue !== 'number') {
throw new Error(`Invalid non-numerical metricValue: ${metricValue}`)
}
const useMetricLabel = metricLabelName && metricLabelValue
if (
useMetricLabel &&
!Object.keys(MetricLabels[metricName])?.includes(metricLabelName)
) {
throw new Error(`Invalid metricLabelName: ${metricLabelName}`)
}
if (
useMetricLabel &&
!MetricLabels[metricName][metricLabelName]?.includes(metricLabelValue)
) {
throw new Error(`Invalid metricLabelValue: ${metricLabelValue}`)
const labelNames = Object.keys(MetricLabels[metricName])
for (const [labelName, labelValue] of Object.entries(metricLabels)) {
if (!labelNames?.includes(labelName)) {
throw new Error(`Metric label has invliad name: ${labelName}`)
}
const labelValues = MetricLabels[metricName][labelName]
if (!labelValues?.includes(labelValue)) {
throw new Error(`Metric label has invalid value: ${labelValue}`)
}
}

const metric = {
metricName: MetricNames[metricName],
metricType: MetricTypes.HISTOGRAM,
metricName,
metricType: 'HISTOGRAM',
theoilie marked this conversation as resolved.
Show resolved Hide resolved
metricValue,
metricLabel: useMetricLabel ? { [metricLabelName]: [metricLabelValue] } : {}
metricLabels
}
return metric
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ const _ = require('lodash')
const config = require('../../../config')
const models = require('../../../models')
const Utils = require('../../../utils')
const {
MetricNames
} = require('../../prometheusMonitoring/prometheus.constants')
const {
retrieveClockValueForUserFromReplica,
makeHistogramToRecord
Expand Down Expand Up @@ -127,10 +130,9 @@ module.exports = async function ({ logger, syncType, syncRequestParameters }) {
)
metricsToRecord.push(
makeHistogramToRecord(
'ISSUE_SYNC_SECONDS_WAITING_FOR_COMPLETION_HISTOGRAM',
MetricNames.ISSUE_SYNC_REQUEST_MONITORING_DURATION_SECONDS_HISTOGRAM,
(Date.now() - startWaitingForCompletion) / 1000, // Metric is in seconds
'reason_for_additional_sync',
reasonForAdditionalSync
{ reason_for_additional_sync: reasonForAdditionalSync }
)
)

Expand Down