Skip to content

Commit

Permalink
Misc state machine improvements / bug fixes (#3487)
Browse files Browse the repository at this point in the history
  • Loading branch information
theoilie authored Jul 19, 2022
1 parent 7f27e3d commit c3e886a
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ module.exports = function (
// Bull serializes the job result into redis, so we have to deserialize it into JSON
let jobResult = {}
try {
logger.info(`Job successfully completed. Parsing result: ${resultString}`)
jobResult = JSON.parse(resultString) || {}
} catch (e) {
logger.warn(`Failed to parse job result string: ${resultString}`)
logger.warn(`Failed to parse job result string`)
return
}

Expand All @@ -85,9 +86,7 @@ module.exports = function (
logger
)
} else {
logger.info(
`No jobs to enqueue after successful completion. Result: ${resultString}`
)
logger.info('No jobs to enqueue after successful completion.')
}

recordMetrics(prometheusRegistry, logger, metricsToRecord)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,33 +138,31 @@ class StateMonitoringManager {
}) {
// Add handlers for logging
monitoringQueue.on('global:waiting', (jobId) => {
monitoringQueueLogger.info(`Queue Job Waiting - ID ${jobId}`)
const logger = createChildLogger(monitoringQueueLogger, { jobId })
logger.info('Job active')
})
monitoringQueue.on('global:active', (jobId, jobPromise) => {
monitoringQueueLogger.info(`Queue Job Active - ID ${jobId}`)
const logger = createChildLogger(monitoringQueueLogger, { jobId })
logger.info('Job active')
})
monitoringQueue.on('global:lock-extension-failed', (jobId, err) => {
monitoringQueueLogger.error(
`Queue Job Lock Extension Failed - ID ${jobId} - Error ${err}`
)
const logger = createChildLogger(monitoringQueueLogger, { jobId })
logger.error(`Job lock extension failed. Error: ${err}`)
})
monitoringQueue.on('global:stalled', (jobId) => {
monitoringQueueLogger.error(`Queue Job Stalled - ID ${jobId}`)
const logger = createChildLogger(monitoringQueueLogger, { jobId })
logger.error('Job stalled')
})
monitoringQueue.on('global:error', (error) => {
monitoringQueueLogger.error(`Queue Job Error - ${error}`)
})

// Add handlers for when a job fails to complete (or completes with an error) or successfully completes
monitoringQueue.on('completed', (job, result) => {
monitoringQueueLogger.info(
`Queue Job Completed - ID ${job?.id} - Result ${JSON.stringify(result)}`
)
})
// Log when a job fails to complete and re-enqueue another monitoring job
monitoringQueue.on('failed', (job, err) => {
monitoringQueueLogger.error(
`Queue Job Failed - ID ${job?.id} - Error ${err}`
)
const logger = createChildLogger(monitoringQueueLogger, {
jobId: job?.id || 'unknown'
})
logger.error(`Job failed to complete. ID=${job?.id}. Error=${err}`)
if (job?.name === JOB_NAMES.MONITOR_STATE) {
monitorStateJobFailureCallback(monitoringQueue, job)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ module.exports = async function ({
// Make the next job try again instead of looping back to userId 0
users = [{ user_id: lastProcessedUserId }]

logger.error(e.stack)
_addToDecisionTree(decisionTree, 'getNodeUsers Error', logger, {
error: e.message
})
Expand All @@ -78,6 +79,7 @@ module.exports = async function ({
unhealthyPeers: Array.from(unhealthyPeers)
})
} catch (e) {
logger.error(e.stack)
_addToDecisionTree(
decisionTree,
'monitor-state job processor getUnhealthyPeers Error',
Expand Down Expand Up @@ -120,6 +122,7 @@ module.exports = async function ({
logger
)
} catch (e) {
logger.error(e.stack)
_addToDecisionTree(
decisionTree,
'retrieveUserInfoFromReplicaSet Error',
Expand All @@ -146,6 +149,7 @@ module.exports = async function ({
}
)
} catch (e) {
logger.error(e.stack)
_addToDecisionTree(
decisionTree,
'computeUserSecondarySyncSuccessRatesMap Error',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,11 @@ const buildReplicaSetNodesToUserWalletsMap = (nodeUsers) => {
return replicaSetNodesToUserWalletsMap
}

const computeUserSecondarySyncSuccessRatesMap = async (nodeUsers) => {
// Map each nodeUser to truthy secondaries (ignore empty secondaries that result from incomplete replica sets)
const computeUserSecondarySyncSuccessRatesMap = async (users = []) => {
// Map each user to truthy secondaries (ignore empty secondaries that result from incomplete replica sets)
const walletsToSecondariesMapping = {}
for (const nodeUser of nodeUsers) {
const { wallet, secondary1, secondary2 } = nodeUser
for (const user of users) {
const { wallet, secondary1, secondary2 } = user
const secondaries = [secondary1, secondary2].filter(Boolean)
walletsToSecondariesMapping[wallet] = secondaries
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ class StateReconciliationManager {
redisHost: config.get('redisHost'),
redisPort: config.get('redisPort'),
name: QUEUE_NAMES.STATE_RECONCILIATION,
removeOnComplete: QUEUE_HISTORY.RECONCILIATION_QUEUE_HISTORY,
removeOnFail: QUEUE_HISTORY.RECONCILIATION_QUEUE_HISTORY,
removeOnComplete: QUEUE_HISTORY.STATE_RECONCILIATION,
removeOnFail: QUEUE_HISTORY.STATE_RECONCILIATION,
lockDuration: STATE_RECONCILIATION_QUEUE_MAX_JOB_RUNTIME_MS
})

Expand Down Expand Up @@ -112,36 +112,40 @@ class StateReconciliationManager {
}) {
// Add handlers for logging
stateReconciliationQueue.on('global:waiting', (jobId) => {
reconciliationLogger.info(`Queue Job Waiting - ID ${jobId}`)
const logger = createChildLogger(reconciliationLogger, { jobId })
logger.info('Job waiting')
})
stateReconciliationQueue.on('global:active', (jobId, jobPromise) => {
reconciliationLogger.info(`Queue Job Active - ID ${jobId}`)
const logger = createChildLogger(reconciliationLogger, { jobId })
logger.info('Job active')
})
stateReconciliationQueue.on(
'global:lock-extension-failed',
(jobId, err) => {
reconciliationLogger.error(
`Queue Job Lock Extension Failed - ID ${jobId} - Error ${err}`
)
const logger = createChildLogger(stateReconciliationQueue, { jobId })
logger.error(`Job lock extension failed. Error: ${err}`)
}
)
stateReconciliationQueue.on('global:stalled', (jobId) => {
reconciliationLogger.error(`stateMachineQueue Job Stalled - ID ${jobId}`)
const logger = createChildLogger(stateReconciliationQueue, { jobId })
logger.error('Job stalled')
})
stateReconciliationQueue.on('global:error', (error) => {
reconciliationLogger.error(`Queue Job Error - ${error}`)
})

// Add handlers for when a job fails to complete (or completes with an error) or successfully completes
stateReconciliationQueue.on('completed', (job, result) => {
reconciliationLogger.info(
`Queue Job Completed - ID ${job?.id} - Result ${JSON.stringify(result)}`
)
})
// Log when a job fails to complete
stateReconciliationQueue.on('failed', (job, err) => {
reconciliationLogger.error(
`Queue Job Failed - ID ${job?.id} - Error ${err}`
)
const logger = createChildLogger(reconciliationLogger, {
jobId: job?.id || 'unknown'
})
logger.error(`Job failed to complete. ID=${job?.id}. Error=${err}`)
})
manualSyncQueue.on('failed', (job, err) => {
const logger = createChildLogger(manualSyncLogger, {
jobId: job?.id || 'unknown'
})
logger.error(`Job failed to complete. ID=${job?.id}. Error=${err}`)
})

// Register the logic that gets executed to process each new job from the queue
Expand Down
18 changes: 11 additions & 7 deletions creator-node/src/snapbackSM/secondarySyncHealthTracker.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ const Getters = {
* @returns {Object} { '0x...': { 'https://secondary1...': { 'successCount' : _, 'failureCount': _, 'successRate': _ }, ... } ... }
*/
async computeUsersSecondarySyncSuccessRatesForToday(
walletsToSecondariesMapping
walletsToSecondariesMapping = {}
) {
// Initialize sync success and failure counts for every secondary to 0
const secondarySyncMetricsMap = {}
Expand Down Expand Up @@ -187,12 +187,16 @@ const Getters = {

// For each secondary, compute and store successRate
for (const wallet of wallets) {
Object.keys(secondarySyncMetricsMap[wallet]).forEach((secondary) => {
const { successCount, failureCount } =
secondarySyncMetricsMap[wallet][secondary]
secondarySyncMetricsMap[wallet][secondary].successRate =
failureCount === 0 ? 1 : successCount / (successCount + failureCount)
})
Object.keys(secondarySyncMetricsMap[wallet] || {}).forEach(
(secondary) => {
const { successCount, failureCount } =
secondarySyncMetricsMap[wallet][secondary]
secondarySyncMetricsMap[wallet][secondary].successRate =
failureCount === 0
? 1
: successCount / (successCount + failureCount)
}
)
}

return secondarySyncMetricsMap
Expand Down

0 comments on commit c3e886a

Please sign in to comment.