diff --git a/creator-node/src/services/prometheusMonitoring/prometheus.constants.ts b/creator-node/src/services/prometheusMonitoring/prometheus.constants.ts index bf76bd488ed..578d3650839 100644 --- a/creator-node/src/services/prometheusMonitoring/prometheus.constants.ts +++ b/creator-node/src/services/prometheusMonitoring/prometheus.constants.ts @@ -1,11 +1,12 @@ -import { Gauge, Histogram, Summary } from 'prom-client' +import { Gauge, Histogram } from 'prom-client' import { snakeCase, mapValues } from 'lodash' // eslint-disable-next-line import/no-unresolved import { exponentialBucketsRange } from './prometheusUtils' import { QUEUE_NAMES as STATE_MACHINE_JOB_NAMES, SyncType, - SYNC_MODES + SYNC_MODES, + UpdateReplicaSetJobResult // eslint-disable-next-line import/no-unresolved } from '../stateMachineManager/stateMachineConstants' import * as config from '../../config' @@ -118,7 +119,11 @@ export const METRIC_LABELS = Object.freeze({ 'multiple_secondaries', // Both secondaries were replaced in the user's replica set 'primary_and_or_secondaries', // A secondary gets promoted to new primary and one or both secondaries get replaced with new random nodes 'null' // No change was made to the user's replica set because the job short-circuited before selecting or was unable to select new node(s) - ] + ], + // https://stackoverflow.com/questions/18111657/how-to-get-names-of-enum-entries + result: Object.values(UpdateReplicaSetJobResult).filter( + (value) => typeof value === 'string' + ) as string[] }, [METRIC_NAMES.FIND_SYNC_REQUEST_COUNTS_GAUGE]: { diff --git a/creator-node/src/services/stateMachineManager/processJob.ts b/creator-node/src/services/stateMachineManager/processJob.ts index cce128687dc..eb41d0953e2 100644 --- a/creator-node/src/services/stateMachineManager/processJob.ts +++ b/creator-node/src/services/stateMachineManager/processJob.ts @@ -9,7 +9,6 @@ const _ = require('lodash') const { createChildLogger } = require('../../logging') const redis = require('../../redis') -const { QUEUE_NAMES } = require('./stateMachineConstants') /** * Higher order function to wrap a job processor with a logger and a try-catch. @@ -50,7 +49,7 @@ module.exports = async function ( try { await redis.set(`latestJobStart_${queueName}`, Date.now()) result = await jobProcessor({ logger: jobLogger, ...jobData }) - metricEndTimerFn({ uncaughtError: false, ...getLabels(queueName, result) }) + metricEndTimerFn({ uncaughtError: false }) await redis.set(`latestJobSuccess_${queueName}`, Date.now()) } catch (error: any) { jobLogger.error(`Error processing job: ${error}`) @@ -61,19 +60,3 @@ module.exports = async function ( return result } - -/** - * Creates prometheus label names and values that are specific to the given job type and its results. - * @param {string} jobName the name of the job to generate metrics for - * @param {Object} jobResult the result of the job to generate metrics for - */ -const getLabels = (jobName: string, jobResult: any) => { - if (jobName === QUEUE_NAMES.UPDATE_REPLICA_SET) { - const { issuedReconfig, newReplicaSet } = jobResult - return { - issuedReconfig: issuedReconfig || 'false', - reconfigType: _.snakeCase(newReplicaSet?.reconfigType || 'null') - } - } - return {} -} diff --git a/creator-node/src/services/stateMachineManager/stateMachineConstants.ts b/creator-node/src/services/stateMachineManager/stateMachineConstants.ts index 2833305924d..53025ef0d04 100644 --- a/creator-node/src/services/stateMachineManager/stateMachineConstants.ts +++ b/creator-node/src/services/stateMachineManager/stateMachineConstants.ts @@ -158,3 +158,17 @@ export const FETCH_FILES_HASH_NUM_RETRIES = 3 // Seconds to hold the cache of healthy content nodes for update-replica-set jobs export const HEALTHY_SERVICES_TTL_SEC = 60 /* 1 min */ + +export enum UpdateReplicaSetJobResult { + Success = 'success', + SuccessIssueReconfigDisabled = 'success_issue_reconfig_disabled', + FailureFindHealthyNodes = 'failure_find_healthy_nodes', + SkipUpdateReplicaSet = 'skip_update_replica_set', + FailureNoHealthyNodes = 'failure_no_healthy_nodes', + FailureNoValidSP = 'failure_no_valid_sp', + FailureToUpdateReplicaSet = 'failure_to_update_replica_set', + FailureIssueUpdateReplicaSet = 'failure_issue_update_replica_set', + FailureDetermineNewReplicaSet = 'failure_determine_new_replica_set', + FailureGetCurrentReplicaSet = 'failure_get_current_replica_set', + FailureInitAudiusLibs = 'failure_init_audius_libs' +} diff --git a/creator-node/src/services/stateMachineManager/stateMachineUtils.js b/creator-node/src/services/stateMachineManager/stateMachineUtils.js index b013254caf2..5f44746c8d7 100644 --- a/creator-node/src/services/stateMachineManager/stateMachineUtils.js +++ b/creator-node/src/services/stateMachineManager/stateMachineUtils.js @@ -147,7 +147,7 @@ const retrieveClockValueForUserFromReplica = async (replica, wallet) => { * makeHistogramToRecord('response_time', 1000, { code: '200' }) * @param {string} metricName the name of the metric from prometheus.constants * @param {number} metricValue the value to observe - * @param {string} [metricLabels] the optional mapping of metric label name => metric label value + * @param {Record} [metricLabels] the optional mapping of metric label name => metric label value */ const makeHistogramToRecord = (metricName, metricValue, metricLabels = {}) => { return makeMetricToRecord( diff --git a/creator-node/src/services/stateMachineManager/stateReconciliation/updateReplicaSet.jobProcessor.ts b/creator-node/src/services/stateMachineManager/stateReconciliation/updateReplicaSet.jobProcessor.ts index 5e5a02551f5..1c62c8f575a 100644 --- a/creator-node/src/services/stateMachineManager/stateReconciliation/updateReplicaSet.jobProcessor.ts +++ b/creator-node/src/services/stateMachineManager/stateReconciliation/updateReplicaSet.jobProcessor.ts @@ -1,5 +1,6 @@ import type Logger from 'bunyan' import type { DecoratedJobParams, DecoratedJobReturnValue } from '../types' +import { METRIC_NAMES } from '../../../services/prometheusMonitoring/prometheus.constants' import { getCachedHealthyNodes, cacheHealthyNodes @@ -11,6 +12,8 @@ import type { UpdateReplicaSetJobParams, UpdateReplicaSetJobReturnValue } from './types' +import { makeHistogramToRecord } from '../stateMachineUtils' +import { UpdateReplicaSetJobResult } from '../stateMachineConstants' const _ = require('lodash') @@ -68,6 +71,9 @@ const updateReplicaSetJobProcessor = async function ({ issueReconfig: false, reconfigType: null } + let result: UpdateReplicaSetJobResult + + const startTimeMs = Date.now() /** * Fetch all the healthy nodes while disabling sync checks to select nodes for new replica set @@ -87,6 +93,33 @@ const updateReplicaSetJobProcessor = async function ({ enableIdentity: true, logger }) + } catch (e: any) { + result = UpdateReplicaSetJobResult.FailureInitAudiusLibs + errorMsg = `Error initting libs and auto-selecting creator nodes: ${e.message}: ${e.stack}` + logger.error(`ERROR ${errorMsg} - ${(e as Error).message}`) + + return { + errorMsg, + issuedReconfig, + newReplicaSet, + healthyNodes, + metricsToRecord: [ + makeHistogramToRecord( + METRIC_NAMES[ + `STATE_MACHINE_${QUEUE_NAMES.UPDATE_REPLICA_SET}_JOB_DURATION_SECONDS_HISTOGRAM` + ], + (Date.now() - startTimeMs) / 1000, // Metric is in seconds + { + issuedReconfig: issuedReconfig?.toString() || 'false', + reconfigType: _.snakeCase(newReplicaSet?.reconfigType || 'null'), + result + } + ) + ] + } + } + + try { const { services: healthyServicesMap } = await audiusLibs.ServiceProvider.autoSelectCreatorNodes({ performSyncCheck: false, @@ -94,19 +127,34 @@ const updateReplicaSetJobProcessor = async function ({ log: true }) healthyNodes = Object.keys(healthyServicesMap || {}) - if (healthyNodes.length === 0) + if (healthyNodes.length === 0) { throw new Error( 'Auto-selecting Content Nodes returned an empty list of healthy nodes.' ) + } await cacheHealthyNodes(healthyNodes) } catch (e: any) { - const errorMsg = `Error initting libs and auto-selecting creator nodes: ${e.message}: ${e.stack}` - logger.error(errorMsg) + result = UpdateReplicaSetJobResult.FailureFindHealthyNodes + errorMsg = `Error finding healthy nodes to select - ${e.message}: ${e.stack}` + return { errorMsg, issuedReconfig, newReplicaSet, - healthyNodes + healthyNodes, + metricsToRecord: [ + makeHistogramToRecord( + METRIC_NAMES[ + `STATE_MACHINE_${QUEUE_NAMES.UPDATE_REPLICA_SET}_JOB_DURATION_SECONDS_HISTOGRAM` + ], + (Date.now() - startTimeMs) / 1000, // Metric is in seconds + { + issuedReconfig: issuedReconfig?.toString() || 'false', + reconfigType: _.snakeCase(newReplicaSet?.reconfigType || 'null'), + result + } + ) + ] } } } @@ -123,20 +171,32 @@ const updateReplicaSetJobProcessor = async function ({ replicaToUserInfoMap, enabledReconfigModes }) - ;({ errorMsg, issuedReconfig, syncJobsToEnqueue } = - await _issueUpdateReplicaSetOp( - userId, - wallet, - primary, - secondary1, - secondary2, - newReplicaSet, - audiusLibs, - logger - )) + + try { + ;({ errorMsg, issuedReconfig, syncJobsToEnqueue, result } = + await _issueUpdateReplicaSetOp( + userId, + wallet, + primary, + secondary1, + secondary2, + newReplicaSet, + audiusLibs, + logger + )) + } catch (e: any) { + result = UpdateReplicaSetJobResult.FailureIssueUpdateReplicaSet + logger.error( + `ERROR issuing update replica set op: userId=${userId} wallet=${wallet} old replica set=[${primary},${secondary1},${secondary2}] | Error: ${e.toString()}: ${ + e.stack + }` + ) + errorMsg = e.toString() + } } catch (e: any) { + result = UpdateReplicaSetJobResult.FailureDetermineNewReplicaSet logger.error( - `ERROR issuing update replica set op: userId=${userId} wallet=${wallet} old replica set=[${primary},${secondary1},${secondary2}] | Error: ${e.toString()}: ${ + `ERROR determining new replica set: userId=${userId} wallet=${wallet} old replica set=[${primary},${secondary1},${secondary2}] | Error: ${e.toString()}: ${ e.stack }` ) @@ -148,6 +208,19 @@ const updateReplicaSetJobProcessor = async function ({ issuedReconfig, newReplicaSet, healthyNodes, + metricsToRecord: [ + makeHistogramToRecord( + METRIC_NAMES[ + `STATE_MACHINE_${QUEUE_NAMES.UPDATE_REPLICA_SET}_JOB_DURATION_SECONDS_HISTOGRAM` + ], + (Date.now() - startTimeMs) / 1000, // Metric is in seconds + { + result: result || UpdateReplicaSetJobResult.Success, + issuedReconfig: issuedReconfig?.toString() || 'false', + reconfigType: _.snakeCase(newReplicaSet?.reconfigType || 'null') + } + ) + ], jobsToEnqueue: syncJobsToEnqueue?.length ? { [QUEUE_NAMES.RECURRING_SYNC]: syncJobsToEnqueue @@ -437,6 +510,7 @@ const _selectRandomReplicaSetNodes = async ( } type IssueUpdateReplicaSetResult = { + result: UpdateReplicaSetJobResult errorMsg: string issuedReconfig: boolean syncJobsToEnqueue: IssueSyncRequestJobParams[] @@ -464,6 +538,7 @@ const _issueUpdateReplicaSetOp = async ( logger: Logger ): Promise => { const response: IssueUpdateReplicaSetResult = { + result: UpdateReplicaSetJobResult.Success, errorMsg: '', issuedReconfig: false, syncJobsToEnqueue: [] @@ -490,13 +565,17 @@ const _issueUpdateReplicaSetOp = async ( )}` ) - if (!issueReconfig) return response + if (!issueReconfig) { + response.result = UpdateReplicaSetJobResult.SuccessIssueReconfigDisabled + return response + } // Create new array of replica set spIds and write to URSM for (const endpt of newReplicaSetEndpoints) { // If for some reason any node in the new replica set is not registered on chain as a valid SP and is // selected as part of the new replica set, do not issue reconfig if (!ContentNodeInfoManager.getCNodeEndpointToSpIdMap()[endpt]) { + response.result = UpdateReplicaSetJobResult.FailureNoValidSP response.errorMsg = `[_issueUpdateReplicaSetOp] userId=${userId} wallet=${wallet} unable to find valid SPs from new replica set=[${newReplicaSetEndpoints}] | new replica set spIds=[${newReplicaSetSPIds}] | reconfig type=[${reconfigType}] | endpointToSPIdMap=${JSON.stringify( ContentNodeInfoManager.getCNodeEndpointToSpIdMap() )} | endpt=${endpt}. Skipping reconfig.` @@ -527,7 +606,7 @@ const _issueUpdateReplicaSetOp = async ( [secondary2]: oldSecondary2SpId } = ContentNodeInfoManager.getCNodeEndpointToSpIdMap() - const { canReconfig, chainPrimarySpId, chainSecondarySpIds } = + const { canReconfig, chainPrimarySpId, chainSecondarySpIds, error } = await _canReconfig({ libs: audiusLibs, oldPrimarySpId, @@ -537,7 +616,12 @@ const _issueUpdateReplicaSetOp = async ( logger }) + if (error) { + response.result = error + } + if (!canReconfig) { + response.result = UpdateReplicaSetJobResult.SkipUpdateReplicaSet logger.info( `[_issueUpdateReplicaSetOp] skipping _updateReplicaSet as reconfig already occurred for userId=${userId} wallet=${wallet}` ) @@ -610,9 +694,10 @@ const _issueUpdateReplicaSetOp = async ( `[_issueUpdateReplicaSetOp] Reconfig SUCCESS: userId=${userId} wallet=${wallet} old replica set=[${primary},${secondary1},${secondary2}] | new replica set=[${newReplicaSetEndpoints}] | reconfig type=[${reconfigType}]` ) } catch (e: any) { + response.result = UpdateReplicaSetJobResult.FailureToUpdateReplicaSet + response.errorMsg = `[_issueUpdateReplicaSetOp] Reconfig ERROR: userId=${userId} wallet=${wallet} old replica set=[${primary},${secondary1},${secondary2}] | new replica set=[${newReplicaSetEndpoints}] | Error: ${e.toString()}` logger.error(response.errorMsg) - return response } return response @@ -637,11 +722,14 @@ type CanReconfigParams = { userId: number logger: Logger } + type CanReconfigReturnValue = { canReconfig: boolean + error?: UpdateReplicaSetJobResult chainPrimarySpId?: number chainSecondarySpIds?: number[] } + const _canReconfig = async ({ libs, oldPrimarySpId, @@ -650,6 +738,7 @@ const _canReconfig = async ({ userId, logger }: CanReconfigParams): Promise => { + let error try { const { primaryId: chainPrimarySpId, secondaryIds: chainSecondarySpIds } = await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSet(userId) @@ -659,6 +748,7 @@ const _canReconfig = async ({ !chainSecondarySpIds || chainSecondarySpIds.length < 2 ) { + error = UpdateReplicaSetJobResult.FailureGetCurrentReplicaSet throw new Error( `Could not get current replica set: chainPrimarySpId=${chainPrimarySpId} chainSecondarySpIds=${JSON.stringify( chainSecondarySpIds || [] @@ -671,6 +761,7 @@ const _canReconfig = async ({ !oldPrimarySpId || !oldSecondary1SpId || !oldSecondary2SpId if (isAnyNodeInReplicaSetDeregistered) { return { + error, canReconfig: true, chainPrimarySpId, chainSecondarySpIds @@ -683,6 +774,7 @@ const _canReconfig = async ({ chainSecondarySpIds[0] === oldSecondary1SpId && chainSecondarySpIds[1] === oldSecondary2SpId return { + error, canReconfig: isReplicaSetCurrent } } catch (e: any) { @@ -694,6 +786,7 @@ const _canReconfig = async ({ // If any error occurs in determining if a reconfig event can happen, default to issuing // a reconfig event anyway just to prevent users from keeping an unhealthy replica set return { + error, canReconfig: true } } diff --git a/creator-node/test/updateReplicaSet.jobProcessor.test.js b/creator-node/test/updateReplicaSet.jobProcessor.test.js index 65bb341ff16..7f1f03ede26 100644 --- a/creator-node/test/updateReplicaSet.jobProcessor.test.js +++ b/creator-node/test/updateReplicaSet.jobProcessor.test.js @@ -144,20 +144,28 @@ describe('test updateReplicaSet job processor', function () { retrieveClockValueForUserFromReplicaStub }) + const output = await updateReplicaSetJobProcessor({ + logger, + wallet, + userId, + primary, + secondary1, + secondary2, + unhealthyReplicas, + replicaToUserInfoMap, + enabledReconfigModes: [RECONFIG_MODES.ONE_SECONDARY.key] + }) + + const { metricsToRecord, ...rest } = output + + expect(metricsToRecord[0].metricLabels.result).to.equal('success') + expect(metricsToRecord[0].metricName).to.equal( + 'audius_cn_state_machine_update_replica_set_queue_job_duration_seconds' + ) + expect(metricsToRecord[0].metricType).to.equal('HISTOGRAM_OBSERVE') + // Verify job outputs the correct results: update secondary1 to randomHealthyNode - return expect( - updateReplicaSetJobProcessor({ - logger, - wallet, - userId, - primary, - secondary1, - secondary2, - unhealthyReplicas, - replicaToUserInfoMap, - enabledReconfigModes: [RECONFIG_MODES.ONE_SECONDARY.key] - }) - ).to.eventually.be.fulfilled.and.deep.equal({ + expect(rest).to.be.deep.equal({ errorMsg: '', issuedReconfig: true, newReplicaSet: { @@ -218,20 +226,30 @@ describe('test updateReplicaSet job processor', function () { retrieveClockValueForUserFromReplicaStub }) + const output = await updateReplicaSetJobProcessor({ + logger, + wallet, + userId, + primary, + secondary1, + secondary2, + unhealthyReplicas, + replicaToUserInfoMap, + enabledReconfigModes: [RECONFIG_MODES.RECONFIG_DISABLED.key] // Disable reconfigs + }) + + const { metricsToRecord, ...rest } = output + + expect(metricsToRecord[0].metricLabels.result).to.equal( + 'success_issue_reconfig_disabled' + ) + expect(metricsToRecord[0].metricName).to.equal( + 'audius_cn_state_machine_update_replica_set_queue_job_duration_seconds' + ) + expect(metricsToRecord[0].metricType).to.equal('HISTOGRAM_OBSERVE') + // Verify job outputs the correct results: find update from secondary1 to randomHealthyNode but don't issue it - return expect( - updateReplicaSetJobProcessor({ - logger, - wallet, - userId, - primary, - secondary1, - secondary2, - unhealthyReplicas, - replicaToUserInfoMap, - enabledReconfigModes: [RECONFIG_MODES.RECONFIG_DISABLED.key] // Disable reconfigs - }) - ).to.eventually.be.fulfilled.and.deep.equal({ + expect(rest).to.be.deep.equal({ errorMsg: '', issuedReconfig: false, newReplicaSet: { @@ -275,20 +293,30 @@ describe('test updateReplicaSet job processor', function () { retrieveClockValueForUserFromReplicaStub }) + const output = await updateReplicaSetJobProcessor({ + logger, + wallet, + userId, + primary, + secondary1, + secondary2, + unhealthyReplicas, + replicaToUserInfoMap, + enabledReconfigModes: [RECONFIG_MODES.ENTIRE_REPLICA_SET.key] + }) + + const { metricsToRecord, ...rest } = output + + expect(metricsToRecord[0].metricLabels.result).to.equal( + 'failure_determine_new_replica_set' + ) + expect(metricsToRecord[0].metricName).to.equal( + 'audius_cn_state_machine_update_replica_set_queue_job_duration_seconds' + ) + expect(metricsToRecord[0].metricType).to.equal('HISTOGRAM_OBSERVE') + // Verify job outputs the correct results: entire replica set is falsy because we can't sync if all nodes in the RS are unhealthy - return expect( - updateReplicaSetJobProcessor({ - logger, - wallet, - userId, - primary, - secondary1, - secondary2, - unhealthyReplicas, - replicaToUserInfoMap, - enabledReconfigModes: [RECONFIG_MODES.ENTIRE_REPLICA_SET.key] - }) - ).to.eventually.be.fulfilled.and.deep.equal({ + expect(rest).to.be.deep.equal({ errorMsg: `Error: [_selectRandomReplicaSetNodes] wallet=${wallet} healthyReplicaSet=[] numberOfUnhealthyReplicas=3 healthyNodes=${primary},${secondary2},${fourthHealthyNode} || Not enough healthy nodes found to issue new replica set after 100 attempts`, issuedReconfig: false, newReplicaSet: {