Skip to content

Commit

Permalink
Add prometheus metrics for secondarySyncFromPrimary (#3532)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonaylor89 authored Jul 22, 2022
1 parent 8dfd837 commit 0bec2b4
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 82 deletions.
19 changes: 10 additions & 9 deletions creator-node/src/components/replicaSet/replicaSetController.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,16 @@ const syncRouteController = async (req, res) => {
* Else, debounce + add sync to queue
*/
if (immediate) {
const errorObj = await secondarySyncFromPrimary(
serviceRegistry,
walletPublicKeys,
creatorNodeEndpoint,
blockNumber,
forceResync
)
if (errorObj) {
return errorResponseServerError(errorObj)
try {
await secondarySyncFromPrimary(
serviceRegistry,
walletPublicKeys,
creatorNodeEndpoint,
blockNumber,
forceResync
)
} catch (e) {
return errorResponseServerError(e)
}
} else {
const debounceTime = nodeConfig.get('debounceTime')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ const metricNames = {
ISSUE_SYNC_REQUEST_DURATION_SECONDS_HISTOGRAM:
'issue_sync_request_duration_seconds',
FIND_SYNC_REQUEST_COUNTS_GAUGE: 'find_sync_request_counts',
WRITE_QUORUM_DURATION_SECONDS_HISTOGRAM: 'write_quorum_duration_seconds'
WRITE_QUORUM_DURATION_SECONDS_HISTOGRAM: 'write_quorum_duration_seconds',
SECONDARY_SYNC_FROM_PRIMARY_DURATION_SECONDS_HISTOGRAM:
'secondary_sync_from_primary_duration_seconds'
}
// Add a histogram for each job in the state machine queues.
// Some have custom labels below, and all of them use the label: uncaughtError=true/false
Expand All @@ -52,6 +54,24 @@ const METRIC_NAMES = Object.freeze(
)

const METRIC_LABELS = Object.freeze({
[METRIC_NAMES.SECONDARY_SYNC_FROM_PRIMARY_DURATION_SECONDS_HISTOGRAM]: {
sync_type: Object.values(SyncType).map(_.snakeCase),
sync_mode: Object.values(SYNC_MODES).map(_.snakeCase),
result: [
'success',
'failure_sync_secondary_from_primary',
'failure_malformed_export',
'failure_db_transaction',
'failure_sync_in_progress',
'failure_export_wallet',
'failure_skip_threshold_not_reached',
'failure_import_not_consistent',
'failure_import_not_contiguous',
'failure_inconsistent_clock'
],
// 5 buckets in the range of 1 second to max seconds before timing out write quorum
buckets: exponentialBucketsRange(0.1, 60, 10)
},
[METRIC_NAMES.ISSUE_SYNC_REQUEST_DURATION_SECONDS_HISTOGRAM]: {
sync_type: Object.values(SyncType).map(_.snakeCase),
sync_mode: Object.values(SYNC_MODES).map(_.snakeCase),
Expand Down Expand Up @@ -135,6 +155,17 @@ const METRIC_LABEL_NAMES = Object.freeze(
)

const METRICS = Object.freeze({
[METRIC_NAMES.SECONDARY_SYNC_FROM_PRIMARY_DURATION_SECONDS_HISTOGRAM]: {
metricType: METRIC_TYPES.HISTOGRAM,
metricConfig: {
name: METRIC_NAMES.SECONDARY_SYNC_FROM_PRIMARY_DURATION_SECONDS_HISTOGRAM,
help: 'Time spent to sync a secondary from a primary (seconds)',
labelNames:
MetricLabelNames[
METRIC_NAMES.SECONDARY_SYNC_FROM_PRIMARY_DURATION_SECONDS_HISTOGRAM
]
}
},
[METRIC_NAMES.SYNC_QUEUE_JOBS_TOTAL_GAUGE]: {
metricType: METRIC_TYPES.GAUGE,
metricConfig: {
Expand Down
203 changes: 135 additions & 68 deletions creator-node/src/services/sync/secondarySyncFromPrimary.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const _ = require('lodash')
const axios = require('axios')

const { logger } = require('../../logging')
Expand All @@ -8,27 +9,14 @@ const SyncHistoryAggregator = require('../../snapbackSM/syncHistoryAggregator')
const DBManager = require('../../dbManager')
const UserSyncFailureCountManager = require('./UserSyncFailureCountManager')

/**
* This function is only run on secondaries, to export and sync data from a user's primary.
*
* @notice - By design, will reject any syncs with non-contiguous clock values. For now,
* any data corruption from primary needs to be handled separately and should not be replicated.
*
* @notice - There is a maxExportClockValueRange enforced in export, meaning that some syncs will
* only replicate partial data state. This is by design, and Snapback will trigger repeated syncs
* with progressively increasing clock values until secondaries have completely synced up.
* Secondaries have no knowledge of the current data state on primary, they simply replicate
* what they receive in each export.
*/
module.exports = async function (
const handleSyncFromPrimary = async (
serviceRegistry,
walletPublicKeys,
creatorNodeEndpoint,
blockNumber = null,
forceResync = false
) {
) => {
const { nodeConfig, redis, libs } = serviceRegistry

const FileSaveMaxConcurrency = nodeConfig.get(
'nodeSyncFileSaveMaxConcurrency'
)
Expand All @@ -40,9 +28,6 @@ module.exports = async function (

logger.info('begin nodesync for ', walletPublicKeys, 'time', start)

// object to track if the function errored, returned at the end of the function
let errorObj = null

/**
* Ensure access to each wallet, then acquire redis lock for duration of sync
* @notice - there's a bug where logPrefix is set to the last element of `walletPublicKeys` - this code only works when `walletPublicKeys.length === 1` 🤦‍♂️
Expand All @@ -56,10 +41,12 @@ module.exports = async function (
redis.WalletWriteLock.VALID_ACQUIRERS.SecondarySyncFromPrimary
)
} catch (e) {
errorObj = new Error(
`Cannot change state of wallet ${wallet}. Node sync currently in progress.`
)
return errorObj
return {
error: new Error(
`Cannot change state of wallet ${wallet}. Node sync currently in progress.`
),
result: 'failure_sync_in_progress'
}
}
}

Expand Down Expand Up @@ -115,19 +102,30 @@ module.exports = async function (
`Failed to retrieve export from ${creatorNodeEndpoint} for wallets`,
walletPublicKeys
)
throw new Error(resp.data.error)
return {
error: new Error(resp.data.error),
result: 'failure_export_wallet'
}
}

// TODO - explain patch
if (!resp.data) {
if (resp.request && resp.request.responseText) {
resp.data = JSON.parse(resp.request.responseText)
} else throw new Error(`Malformed response from ${creatorNodeEndpoint}.`)
} else {
return {
error: new Error(`Malformed response from ${creatorNodeEndpoint}.`),
result: 'failure_malformed_export'
}
}
}

const { data: body } = resp
if (!body.data.hasOwnProperty('cnodeUsers')) {
throw new Error(`Malformed response from ${creatorNodeEndpoint}.`)
return {
error: new Error(`Malformed response from ${creatorNodeEndpoint}.`),
result: 'failure_malformed_export'
}
}

logger.info(
Expand All @@ -145,9 +143,12 @@ module.exports = async function (
// Since different nodes may assign different cnodeUserUUIDs to a given walletPublicKey,
// retrieve local cnodeUserUUID from fetched walletPublicKey and delete all associated data.
if (!fetchedCNodeUser.hasOwnProperty('walletPublicKey')) {
throw new Error(
`Malformed response received from ${creatorNodeEndpoint}. "walletPublicKey" property not found on CNodeUser in response object`
)
return {
error: new Error(
`Malformed response received from ${creatorNodeEndpoint}. "walletPublicKey" property not found on CNodeUser in response object`
),
result: 'failure_malformed_export'
}
}
const fetchedWalletPublicKey = fetchedCNodeUser.walletPublicKey

Expand Down Expand Up @@ -181,9 +182,12 @@ module.exports = async function (
}

if (!walletPublicKeys.includes(fetchedWalletPublicKey)) {
throw new Error(
`Malformed response from ${creatorNodeEndpoint}. Returned data for walletPublicKey that was not requested.`
)
return {
error: new Error(
`Malformed response from ${creatorNodeEndpoint}. Returned data for walletPublicKey that was not requested.`
),
result: 'failure_malformed_export'
}
}

/**
Expand All @@ -203,9 +207,12 @@ module.exports = async function (

// Error if returned data is not within requested range
if (fetchedLatestClockVal < localMaxClockVal) {
throw new Error(
`Cannot sync for localMaxClockVal ${localMaxClockVal} - imported data has max clock val ${fetchedLatestClockVal}`
)
return {
error: new Error(
`Cannot sync for localMaxClockVal ${localMaxClockVal} - imported data has max clock val ${fetchedLatestClockVal}`
),
result: 'failure_inconsistent_clock'
}
} else if (fetchedLatestClockVal === localMaxClockVal) {
// Already up to date, no sync necessary
logger.info(
Expand All @@ -218,13 +225,22 @@ module.exports = async function (
fetchedClockRecords[0] &&
fetchedClockRecords[0].clock !== localMaxClockVal + 1
) {
throw new Error(
`Cannot sync - imported data is not contiguous. Local max clock val = ${localMaxClockVal} and imported min clock val ${fetchedClockRecords[0].clock}`
)
} else if (maxClockRecordId !== fetchedLatestClockVal) {
throw new Error(
`Cannot sync - imported data is not consistent. Imported max clock val = ${fetchedLatestClockVal} and imported max ClockRecord val ${maxClockRecordId}`
)
return {
error: new Error(
`Cannot sync - imported data is not contiguous. Local max clock val = ${localMaxClockVal} and imported min clock val ${fetchedClockRecords[0].clock}`
),
result: 'failure_import_not_contiguous'
}
} else if (
!_.isEmpty(fetchedCNodeUser.clockRecords) &&
maxClockRecordId !== fetchedLatestClockVal
) {
return {
error: new Error(
`Cannot sync - imported data is not consistent. Imported max clock val = ${fetchedLatestClockVal} and imported max ClockRecord val ${maxClockRecordId}`
),
result: 'failure_import_not_consistent'
}
}

// All DB updates must happen in single atomic tx - partial state updates will lead to data loss
Expand Down Expand Up @@ -282,9 +298,12 @@ module.exports = async function (

// Error if update failed
if (numRowsUpdated !== 1 || respObj.length !== 1) {
throw new Error(
`Failed to update cnodeUser row for cnodeUser wallet ${fetchedWalletPublicKey}`
)
return {
error: new Error(
`Failed to update cnodeUser row for cnodeUser wallet ${fetchedWalletPublicKey}`
),
result: 'failure_db_transaction'
}
}
cnodeUser = respObj[0]
} else {
Expand Down Expand Up @@ -441,7 +460,10 @@ module.exports = async function (
if (userSyncFailureCount < SyncRequestMaxUserFailureCountBeforeSkip) {
const errorMsg = `User Sync failed due to ${numCIDsThatFailedSaveFileOp} failing saveFileForMultihashToFS op. userSyncFailureCount = ${userSyncFailureCount} // SyncRequestMaxUserFailureCountBeforeSkip = ${SyncRequestMaxUserFailureCountBeforeSkip}`
logger.error(logPrefix, errorMsg)
throw new Error(errorMsg)
return {
error: new Error(errorMsg),
result: 'failure_skip_threshold_not_reached'
}

// If max failure threshold reached, continue with sync and reset failure count
} else {
Expand Down Expand Up @@ -538,15 +560,29 @@ module.exports = async function (

await transaction.rollback()

throw new Error(e)
return {
error: new Error(e),
result: 'failure_db_transaction'
}
}
}
} catch (e) {
errorObj = e

for (const wallet of walletPublicKeys) {
await SyncHistoryAggregator.recordSyncFail(wallet)
}
logger.error(
logPrefix,
`Sync complete for wallets: ${walletPublicKeys.join(
','
)}. Status: Error, message: ${e.message}. Duration sync: ${
Date.now() - start
}. From endpoint ${creatorNodeEndpoint}.`
)

return {
error: new Error(e),
result: 'failure_sync_secondary_from_primary'
}
} finally {
// Release all redis locks
for (const wallet of walletPublicKeys) {
Expand All @@ -559,27 +595,58 @@ module.exports = async function (
)
}
}
}

if (errorObj) {
logger.error(
logPrefix,
`Sync complete for wallets: ${walletPublicKeys.join(
','
)}. Status: Error, message: ${errorObj.message}. Duration sync: ${
Date.now() - start
}. From endpoint ${creatorNodeEndpoint}.`
)
} else {
logger.info(
logPrefix,
`Sync complete for wallets: ${walletPublicKeys.join(
','
)}. Status: Success. Duration sync: ${
Date.now() - start
}. From endpoint ${creatorNodeEndpoint}.`
)
}
logger.info(
logPrefix,
`Sync complete for wallets: ${walletPublicKeys.join(
','
)}. Status: Success. Duration sync: ${
Date.now() - start
}. From endpoint ${creatorNodeEndpoint}.`
)

return { result: 'success' }
}

/**
* This function is only run on secondaries, to export and sync data from a user's primary.
*
* @notice - By design, will reject any syncs with non-contiguous clock values. For now,
* any data corruption from primary needs to be handled separately and should not be replicated.
*
* @notice - There is a maxExportClockValueRange enforced in export, meaning that some syncs will
* only replicate partial data state. This is by design, and Snapback will trigger repeated syncs
* with progressively increasing clock values until secondaries have completely synced up.
* Secondaries have no knowledge of the current data state on primary, they simply replicate
* what they receive in each export.
*/
module.exports = async function (
serviceRegistry,
walletPublicKeys,
creatorNodeEndpoint,
blockNumber = null,
forceResync = false
) {
const { prometheusRegistry } = serviceRegistry
const secondarySyncFromPrimaryMetric = prometheusRegistry.getMetric(
prometheusRegistry.metricNames
.SECONDARY_SYNC_FROM_PRIMARY_DURATION_SECONDS_HISTOGRAM
)
const metricEndTimerFn = secondarySyncFromPrimaryMetric.startTimer()

const { error, ...labels } = await handleSyncFromPrimary(
serviceRegistry,
walletPublicKeys,
creatorNodeEndpoint,
blockNumber,
forceResync
)
metricEndTimerFn(labels)

if (error) {
throw new Error(error)
}

return errorObj
return labels
}
Loading

0 comments on commit 0bec2b4

Please sign in to comment.