Skip to content

Commit

Permalink
[CON-310] Redo issue_update_replica_set metric for granular statuses (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jonaylor89 authored and dmanjunath committed Aug 25, 2022
1 parent a8b0e1d commit ee36515
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 80 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { Gauge, Histogram, Summary } from 'prom-client'
import { Gauge, Histogram } from 'prom-client'
import { snakeCase, mapValues } from 'lodash'
// eslint-disable-next-line import/no-unresolved
import { exponentialBucketsRange } from './prometheusUtils'
import {
QUEUE_NAMES as STATE_MACHINE_JOB_NAMES,
SyncType,
SYNC_MODES
SYNC_MODES,
UpdateReplicaSetJobResult
// eslint-disable-next-line import/no-unresolved
} from '../stateMachineManager/stateMachineConstants'
import * as config from '../../config'
Expand Down Expand Up @@ -118,7 +119,11 @@ export const METRIC_LABELS = Object.freeze({
'multiple_secondaries', // Both secondaries were replaced in the user's replica set
'primary_and_or_secondaries', // A secondary gets promoted to new primary and one or both secondaries get replaced with new random nodes
'null' // No change was made to the user's replica set because the job short-circuited before selecting or was unable to select new node(s)
]
],
// https://stackoverflow.com/questions/18111657/how-to-get-names-of-enum-entries
result: Object.values(UpdateReplicaSetJobResult).filter(
(value) => typeof value === 'string'
) as string[]
},

[METRIC_NAMES.FIND_SYNC_REQUEST_COUNTS_GAUGE]: {
Expand Down
19 changes: 1 addition & 18 deletions creator-node/src/services/stateMachineManager/processJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ const _ = require('lodash')

const { createChildLogger } = require('../../logging')
const redis = require('../../redis')
const { QUEUE_NAMES } = require('./stateMachineConstants')

/**
* Higher order function to wrap a job processor with a logger and a try-catch.
Expand Down Expand Up @@ -50,7 +49,7 @@ module.exports = async function (
try {
await redis.set(`latestJobStart_${queueName}`, Date.now())
result = await jobProcessor({ logger: jobLogger, ...jobData })
metricEndTimerFn({ uncaughtError: false, ...getLabels(queueName, result) })
metricEndTimerFn({ uncaughtError: false })
await redis.set(`latestJobSuccess_${queueName}`, Date.now())
} catch (error: any) {
jobLogger.error(`Error processing job: ${error}`)
Expand All @@ -61,19 +60,3 @@ module.exports = async function (

return result
}

/**
* Creates prometheus label names and values that are specific to the given job type and its results.
* @param {string} jobName the name of the job to generate metrics for
* @param {Object} jobResult the result of the job to generate metrics for
*/
const getLabels = (jobName: string, jobResult: any) => {
if (jobName === QUEUE_NAMES.UPDATE_REPLICA_SET) {
const { issuedReconfig, newReplicaSet } = jobResult
return {
issuedReconfig: issuedReconfig || 'false',
reconfigType: _.snakeCase(newReplicaSet?.reconfigType || 'null')
}
}
return {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,17 @@ export const FETCH_FILES_HASH_NUM_RETRIES = 3

// Seconds to hold the cache of healthy content nodes for update-replica-set jobs
export const HEALTHY_SERVICES_TTL_SEC = 60 /* 1 min */

export enum UpdateReplicaSetJobResult {
Success = 'success',
SuccessIssueReconfigDisabled = 'success_issue_reconfig_disabled',
FailureFindHealthyNodes = 'failure_find_healthy_nodes',
SkipUpdateReplicaSet = 'skip_update_replica_set',
FailureNoHealthyNodes = 'failure_no_healthy_nodes',
FailureNoValidSP = 'failure_no_valid_sp',
FailureToUpdateReplicaSet = 'failure_to_update_replica_set',
FailureIssueUpdateReplicaSet = 'failure_issue_update_replica_set',
FailureDetermineNewReplicaSet = 'failure_determine_new_replica_set',
FailureGetCurrentReplicaSet = 'failure_get_current_replica_set',
FailureInitAudiusLibs = 'failure_init_audius_libs'
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ const retrieveClockValueForUserFromReplica = async (replica, wallet) => {
* makeHistogramToRecord('response_time', 1000, { code: '200' })
* @param {string} metricName the name of the metric from prometheus.constants
* @param {number} metricValue the value to observe
* @param {string} [metricLabels] the optional mapping of metric label name => metric label value
* @param {Record<string, string>} [metricLabels] the optional mapping of metric label name => metric label value
*/
const makeHistogramToRecord = (metricName, metricValue, metricLabels = {}) => {
return makeMetricToRecord(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type Logger from 'bunyan'
import type { DecoratedJobParams, DecoratedJobReturnValue } from '../types'
import { METRIC_NAMES } from '../../../services/prometheusMonitoring/prometheus.constants'
import {
getCachedHealthyNodes,
cacheHealthyNodes
Expand All @@ -11,6 +12,8 @@ import type {
UpdateReplicaSetJobParams,
UpdateReplicaSetJobReturnValue
} from './types'
import { makeHistogramToRecord } from '../stateMachineUtils'
import { UpdateReplicaSetJobResult } from '../stateMachineConstants'

const _ = require('lodash')

Expand Down Expand Up @@ -68,6 +71,9 @@ const updateReplicaSetJobProcessor = async function ({
issueReconfig: false,
reconfigType: null
}
let result: UpdateReplicaSetJobResult

const startTimeMs = Date.now()

/**
* Fetch all the healthy nodes while disabling sync checks to select nodes for new replica set
Expand All @@ -87,26 +93,68 @@ const updateReplicaSetJobProcessor = async function ({
enableIdentity: true,
logger
})
} catch (e: any) {
result = UpdateReplicaSetJobResult.FailureInitAudiusLibs
errorMsg = `Error initting libs and auto-selecting creator nodes: ${e.message}: ${e.stack}`
logger.error(`ERROR ${errorMsg} - ${(e as Error).message}`)

return {
errorMsg,
issuedReconfig,
newReplicaSet,
healthyNodes,
metricsToRecord: [
makeHistogramToRecord(
METRIC_NAMES[
`STATE_MACHINE_${QUEUE_NAMES.UPDATE_REPLICA_SET}_JOB_DURATION_SECONDS_HISTOGRAM`
],
(Date.now() - startTimeMs) / 1000, // Metric is in seconds
{
issuedReconfig: issuedReconfig?.toString() || 'false',
reconfigType: _.snakeCase(newReplicaSet?.reconfigType || 'null'),
result
}
)
]
}
}

try {
const { services: healthyServicesMap } =
await audiusLibs.ServiceProvider.autoSelectCreatorNodes({
performSyncCheck: false,
whitelist: reconfigNodeWhitelist,
log: true
})
healthyNodes = Object.keys(healthyServicesMap || {})
if (healthyNodes.length === 0)
if (healthyNodes.length === 0) {
throw new Error(
'Auto-selecting Content Nodes returned an empty list of healthy nodes.'
)
}
await cacheHealthyNodes(healthyNodes)
} catch (e: any) {
const errorMsg = `Error initting libs and auto-selecting creator nodes: ${e.message}: ${e.stack}`
logger.error(errorMsg)
result = UpdateReplicaSetJobResult.FailureFindHealthyNodes
errorMsg = `Error finding healthy nodes to select - ${e.message}: ${e.stack}`

return {
errorMsg,
issuedReconfig,
newReplicaSet,
healthyNodes
healthyNodes,
metricsToRecord: [
makeHistogramToRecord(
METRIC_NAMES[
`STATE_MACHINE_${QUEUE_NAMES.UPDATE_REPLICA_SET}_JOB_DURATION_SECONDS_HISTOGRAM`
],
(Date.now() - startTimeMs) / 1000, // Metric is in seconds
{
issuedReconfig: issuedReconfig?.toString() || 'false',
reconfigType: _.snakeCase(newReplicaSet?.reconfigType || 'null'),
result
}
)
]
}
}
}
Expand All @@ -123,20 +171,32 @@ const updateReplicaSetJobProcessor = async function ({
replicaToUserInfoMap,
enabledReconfigModes
})
;({ errorMsg, issuedReconfig, syncJobsToEnqueue } =
await _issueUpdateReplicaSetOp(
userId,
wallet,
primary,
secondary1,
secondary2,
newReplicaSet,
audiusLibs,
logger
))

try {
;({ errorMsg, issuedReconfig, syncJobsToEnqueue, result } =
await _issueUpdateReplicaSetOp(
userId,
wallet,
primary,
secondary1,
secondary2,
newReplicaSet,
audiusLibs,
logger
))
} catch (e: any) {
result = UpdateReplicaSetJobResult.FailureIssueUpdateReplicaSet
logger.error(
`ERROR issuing update replica set op: userId=${userId} wallet=${wallet} old replica set=[${primary},${secondary1},${secondary2}] | Error: ${e.toString()}: ${
e.stack
}`
)
errorMsg = e.toString()
}
} catch (e: any) {
result = UpdateReplicaSetJobResult.FailureDetermineNewReplicaSet
logger.error(
`ERROR issuing update replica set op: userId=${userId} wallet=${wallet} old replica set=[${primary},${secondary1},${secondary2}] | Error: ${e.toString()}: ${
`ERROR determining new replica set: userId=${userId} wallet=${wallet} old replica set=[${primary},${secondary1},${secondary2}] | Error: ${e.toString()}: ${
e.stack
}`
)
Expand All @@ -148,6 +208,19 @@ const updateReplicaSetJobProcessor = async function ({
issuedReconfig,
newReplicaSet,
healthyNodes,
metricsToRecord: [
makeHistogramToRecord(
METRIC_NAMES[
`STATE_MACHINE_${QUEUE_NAMES.UPDATE_REPLICA_SET}_JOB_DURATION_SECONDS_HISTOGRAM`
],
(Date.now() - startTimeMs) / 1000, // Metric is in seconds
{
result: result || UpdateReplicaSetJobResult.Success,
issuedReconfig: issuedReconfig?.toString() || 'false',
reconfigType: _.snakeCase(newReplicaSet?.reconfigType || 'null')
}
)
],
jobsToEnqueue: syncJobsToEnqueue?.length
? {
[QUEUE_NAMES.RECURRING_SYNC]: syncJobsToEnqueue
Expand Down Expand Up @@ -437,6 +510,7 @@ const _selectRandomReplicaSetNodes = async (
}

type IssueUpdateReplicaSetResult = {
result: UpdateReplicaSetJobResult
errorMsg: string
issuedReconfig: boolean
syncJobsToEnqueue: IssueSyncRequestJobParams[]
Expand Down Expand Up @@ -464,6 +538,7 @@ const _issueUpdateReplicaSetOp = async (
logger: Logger
): Promise<IssueUpdateReplicaSetResult> => {
const response: IssueUpdateReplicaSetResult = {
result: UpdateReplicaSetJobResult.Success,
errorMsg: '',
issuedReconfig: false,
syncJobsToEnqueue: []
Expand All @@ -490,13 +565,17 @@ const _issueUpdateReplicaSetOp = async (
)}`
)

if (!issueReconfig) return response
if (!issueReconfig) {
response.result = UpdateReplicaSetJobResult.SuccessIssueReconfigDisabled
return response
}

// Create new array of replica set spIds and write to URSM
for (const endpt of newReplicaSetEndpoints) {
// If for some reason any node in the new replica set is not registered on chain as a valid SP and is
// selected as part of the new replica set, do not issue reconfig
if (!ContentNodeInfoManager.getCNodeEndpointToSpIdMap()[endpt]) {
response.result = UpdateReplicaSetJobResult.FailureNoValidSP
response.errorMsg = `[_issueUpdateReplicaSetOp] userId=${userId} wallet=${wallet} unable to find valid SPs from new replica set=[${newReplicaSetEndpoints}] | new replica set spIds=[${newReplicaSetSPIds}] | reconfig type=[${reconfigType}] | endpointToSPIdMap=${JSON.stringify(
ContentNodeInfoManager.getCNodeEndpointToSpIdMap()
)} | endpt=${endpt}. Skipping reconfig.`
Expand Down Expand Up @@ -527,7 +606,7 @@ const _issueUpdateReplicaSetOp = async (
[secondary2]: oldSecondary2SpId
} = ContentNodeInfoManager.getCNodeEndpointToSpIdMap()

const { canReconfig, chainPrimarySpId, chainSecondarySpIds } =
const { canReconfig, chainPrimarySpId, chainSecondarySpIds, error } =
await _canReconfig({
libs: audiusLibs,
oldPrimarySpId,
Expand All @@ -537,7 +616,12 @@ const _issueUpdateReplicaSetOp = async (
logger
})

if (error) {
response.result = error
}

if (!canReconfig) {
response.result = UpdateReplicaSetJobResult.SkipUpdateReplicaSet
logger.info(
`[_issueUpdateReplicaSetOp] skipping _updateReplicaSet as reconfig already occurred for userId=${userId} wallet=${wallet}`
)
Expand Down Expand Up @@ -610,9 +694,10 @@ const _issueUpdateReplicaSetOp = async (
`[_issueUpdateReplicaSetOp] Reconfig SUCCESS: userId=${userId} wallet=${wallet} old replica set=[${primary},${secondary1},${secondary2}] | new replica set=[${newReplicaSetEndpoints}] | reconfig type=[${reconfigType}]`
)
} catch (e: any) {
response.result = UpdateReplicaSetJobResult.FailureToUpdateReplicaSet

response.errorMsg = `[_issueUpdateReplicaSetOp] Reconfig ERROR: userId=${userId} wallet=${wallet} old replica set=[${primary},${secondary1},${secondary2}] | new replica set=[${newReplicaSetEndpoints}] | Error: ${e.toString()}`
logger.error(response.errorMsg)
return response
}

return response
Expand All @@ -637,11 +722,14 @@ type CanReconfigParams = {
userId: number
logger: Logger
}

type CanReconfigReturnValue = {
canReconfig: boolean
error?: UpdateReplicaSetJobResult
chainPrimarySpId?: number
chainSecondarySpIds?: number[]
}

const _canReconfig = async ({
libs,
oldPrimarySpId,
Expand All @@ -650,6 +738,7 @@ const _canReconfig = async ({
userId,
logger
}: CanReconfigParams): Promise<CanReconfigReturnValue> => {
let error
try {
const { primaryId: chainPrimarySpId, secondaryIds: chainSecondarySpIds } =
await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSet(userId)
Expand All @@ -659,6 +748,7 @@ const _canReconfig = async ({
!chainSecondarySpIds ||
chainSecondarySpIds.length < 2
) {
error = UpdateReplicaSetJobResult.FailureGetCurrentReplicaSet
throw new Error(
`Could not get current replica set: chainPrimarySpId=${chainPrimarySpId} chainSecondarySpIds=${JSON.stringify(
chainSecondarySpIds || []
Expand All @@ -671,6 +761,7 @@ const _canReconfig = async ({
!oldPrimarySpId || !oldSecondary1SpId || !oldSecondary2SpId
if (isAnyNodeInReplicaSetDeregistered) {
return {
error,
canReconfig: true,
chainPrimarySpId,
chainSecondarySpIds
Expand All @@ -683,6 +774,7 @@ const _canReconfig = async ({
chainSecondarySpIds[0] === oldSecondary1SpId &&
chainSecondarySpIds[1] === oldSecondary2SpId
return {
error,
canReconfig: isReplicaSetCurrent
}
} catch (e: any) {
Expand All @@ -694,6 +786,7 @@ const _canReconfig = async ({
// If any error occurs in determining if a reconfig event can happen, default to issuing
// a reconfig event anyway just to prevent users from keeping an unhealthy replica set
return {
error,
canReconfig: true
}
}
Expand Down
Loading

0 comments on commit ee36515

Please sign in to comment.