From 0bec2b445b36f22e2b2d1ff02c810a230f3953ae Mon Sep 17 00:00:00 2001 From: Johannes Naylor Date: Fri, 22 Jul 2022 18:40:11 -0400 Subject: [PATCH] Add prometheus metrics for `secondarySyncFromPrimary` (#3532) --- .../replicaSet/replicaSetController.js | 19 +- .../prometheus.constants.js | 33 ++- .../services/sync/secondarySyncFromPrimary.js | 203 ++++++++++++------ creator-node/test/lib/app.js | 3 +- creator-node/test/sync.test.js | 18 +- 5 files changed, 194 insertions(+), 82 deletions(-) diff --git a/creator-node/src/components/replicaSet/replicaSetController.js b/creator-node/src/components/replicaSet/replicaSetController.js index 8a63529a5c7..7449ae2dd24 100644 --- a/creator-node/src/components/replicaSet/replicaSetController.js +++ b/creator-node/src/components/replicaSet/replicaSetController.js @@ -90,15 +90,16 @@ const syncRouteController = async (req, res) => { * Else, debounce + add sync to queue */ if (immediate) { - const errorObj = await secondarySyncFromPrimary( - serviceRegistry, - walletPublicKeys, - creatorNodeEndpoint, - blockNumber, - forceResync - ) - if (errorObj) { - return errorResponseServerError(errorObj) + try { + await secondarySyncFromPrimary( + serviceRegistry, + walletPublicKeys, + creatorNodeEndpoint, + blockNumber, + forceResync + ) + } catch (e) { + return errorResponseServerError(e) } } else { const debounceTime = nodeConfig.get('debounceTime') diff --git a/creator-node/src/services/prometheusMonitoring/prometheus.constants.js b/creator-node/src/services/prometheusMonitoring/prometheus.constants.js index f9299f09f1e..1cf7bf90eee 100644 --- a/creator-node/src/services/prometheusMonitoring/prometheus.constants.js +++ b/creator-node/src/services/prometheusMonitoring/prometheus.constants.js @@ -38,7 +38,9 @@ const metricNames = { ISSUE_SYNC_REQUEST_DURATION_SECONDS_HISTOGRAM: 'issue_sync_request_duration_seconds', FIND_SYNC_REQUEST_COUNTS_GAUGE: 'find_sync_request_counts', - WRITE_QUORUM_DURATION_SECONDS_HISTOGRAM: 'write_quorum_duration_seconds' + WRITE_QUORUM_DURATION_SECONDS_HISTOGRAM: 'write_quorum_duration_seconds', + SECONDARY_SYNC_FROM_PRIMARY_DURATION_SECONDS_HISTOGRAM: + 'secondary_sync_from_primary_duration_seconds' } // Add a histogram for each job in the state machine queues. // Some have custom labels below, and all of them use the label: uncaughtError=true/false @@ -52,6 +54,24 @@ const METRIC_NAMES = Object.freeze( ) const METRIC_LABELS = Object.freeze({ + [METRIC_NAMES.SECONDARY_SYNC_FROM_PRIMARY_DURATION_SECONDS_HISTOGRAM]: { + sync_type: Object.values(SyncType).map(_.snakeCase), + sync_mode: Object.values(SYNC_MODES).map(_.snakeCase), + result: [ + 'success', + 'failure_sync_secondary_from_primary', + 'failure_malformed_export', + 'failure_db_transaction', + 'failure_sync_in_progress', + 'failure_export_wallet', + 'failure_skip_threshold_not_reached', + 'failure_import_not_consistent', + 'failure_import_not_contiguous', + 'failure_inconsistent_clock' + ], + // 5 buckets in the range of 1 second to max seconds before timing out write quorum + buckets: exponentialBucketsRange(0.1, 60, 10) + }, [METRIC_NAMES.ISSUE_SYNC_REQUEST_DURATION_SECONDS_HISTOGRAM]: { sync_type: Object.values(SyncType).map(_.snakeCase), sync_mode: Object.values(SYNC_MODES).map(_.snakeCase), @@ -135,6 +155,17 @@ const METRIC_LABEL_NAMES = Object.freeze( ) const METRICS = Object.freeze({ + [METRIC_NAMES.SECONDARY_SYNC_FROM_PRIMARY_DURATION_SECONDS_HISTOGRAM]: { + metricType: METRIC_TYPES.HISTOGRAM, + metricConfig: { + name: METRIC_NAMES.SECONDARY_SYNC_FROM_PRIMARY_DURATION_SECONDS_HISTOGRAM, + help: 'Time spent to sync a secondary from a primary (seconds)', + labelNames: + MetricLabelNames[ + METRIC_NAMES.SECONDARY_SYNC_FROM_PRIMARY_DURATION_SECONDS_HISTOGRAM + ] + } + }, [METRIC_NAMES.SYNC_QUEUE_JOBS_TOTAL_GAUGE]: { metricType: METRIC_TYPES.GAUGE, metricConfig: { diff --git a/creator-node/src/services/sync/secondarySyncFromPrimary.js b/creator-node/src/services/sync/secondarySyncFromPrimary.js index edc6ccc063a..ac1190eaddb 100644 --- a/creator-node/src/services/sync/secondarySyncFromPrimary.js +++ b/creator-node/src/services/sync/secondarySyncFromPrimary.js @@ -1,3 +1,4 @@ +const _ = require('lodash') const axios = require('axios') const { logger } = require('../../logging') @@ -8,27 +9,14 @@ const SyncHistoryAggregator = require('../../snapbackSM/syncHistoryAggregator') const DBManager = require('../../dbManager') const UserSyncFailureCountManager = require('./UserSyncFailureCountManager') -/** - * This function is only run on secondaries, to export and sync data from a user's primary. - * - * @notice - By design, will reject any syncs with non-contiguous clock values. For now, - * any data corruption from primary needs to be handled separately and should not be replicated. - * - * @notice - There is a maxExportClockValueRange enforced in export, meaning that some syncs will - * only replicate partial data state. This is by design, and Snapback will trigger repeated syncs - * with progressively increasing clock values until secondaries have completely synced up. - * Secondaries have no knowledge of the current data state on primary, they simply replicate - * what they receive in each export. - */ -module.exports = async function ( +const handleSyncFromPrimary = async ( serviceRegistry, walletPublicKeys, creatorNodeEndpoint, blockNumber = null, forceResync = false -) { +) => { const { nodeConfig, redis, libs } = serviceRegistry - const FileSaveMaxConcurrency = nodeConfig.get( 'nodeSyncFileSaveMaxConcurrency' ) @@ -40,9 +28,6 @@ module.exports = async function ( logger.info('begin nodesync for ', walletPublicKeys, 'time', start) - // object to track if the function errored, returned at the end of the function - let errorObj = null - /** * Ensure access to each wallet, then acquire redis lock for duration of sync * @notice - there's a bug where logPrefix is set to the last element of `walletPublicKeys` - this code only works when `walletPublicKeys.length === 1` 🤦‍♂️ @@ -56,10 +41,12 @@ module.exports = async function ( redis.WalletWriteLock.VALID_ACQUIRERS.SecondarySyncFromPrimary ) } catch (e) { - errorObj = new Error( - `Cannot change state of wallet ${wallet}. Node sync currently in progress.` - ) - return errorObj + return { + error: new Error( + `Cannot change state of wallet ${wallet}. Node sync currently in progress.` + ), + result: 'failure_sync_in_progress' + } } } @@ -115,19 +102,30 @@ module.exports = async function ( `Failed to retrieve export from ${creatorNodeEndpoint} for wallets`, walletPublicKeys ) - throw new Error(resp.data.error) + return { + error: new Error(resp.data.error), + result: 'failure_export_wallet' + } } // TODO - explain patch if (!resp.data) { if (resp.request && resp.request.responseText) { resp.data = JSON.parse(resp.request.responseText) - } else throw new Error(`Malformed response from ${creatorNodeEndpoint}.`) + } else { + return { + error: new Error(`Malformed response from ${creatorNodeEndpoint}.`), + result: 'failure_malformed_export' + } + } } const { data: body } = resp if (!body.data.hasOwnProperty('cnodeUsers')) { - throw new Error(`Malformed response from ${creatorNodeEndpoint}.`) + return { + error: new Error(`Malformed response from ${creatorNodeEndpoint}.`), + result: 'failure_malformed_export' + } } logger.info( @@ -145,9 +143,12 @@ module.exports = async function ( // Since different nodes may assign different cnodeUserUUIDs to a given walletPublicKey, // retrieve local cnodeUserUUID from fetched walletPublicKey and delete all associated data. if (!fetchedCNodeUser.hasOwnProperty('walletPublicKey')) { - throw new Error( - `Malformed response received from ${creatorNodeEndpoint}. "walletPublicKey" property not found on CNodeUser in response object` - ) + return { + error: new Error( + `Malformed response received from ${creatorNodeEndpoint}. "walletPublicKey" property not found on CNodeUser in response object` + ), + result: 'failure_malformed_export' + } } const fetchedWalletPublicKey = fetchedCNodeUser.walletPublicKey @@ -181,9 +182,12 @@ module.exports = async function ( } if (!walletPublicKeys.includes(fetchedWalletPublicKey)) { - throw new Error( - `Malformed response from ${creatorNodeEndpoint}. Returned data for walletPublicKey that was not requested.` - ) + return { + error: new Error( + `Malformed response from ${creatorNodeEndpoint}. Returned data for walletPublicKey that was not requested.` + ), + result: 'failure_malformed_export' + } } /** @@ -203,9 +207,12 @@ module.exports = async function ( // Error if returned data is not within requested range if (fetchedLatestClockVal < localMaxClockVal) { - throw new Error( - `Cannot sync for localMaxClockVal ${localMaxClockVal} - imported data has max clock val ${fetchedLatestClockVal}` - ) + return { + error: new Error( + `Cannot sync for localMaxClockVal ${localMaxClockVal} - imported data has max clock val ${fetchedLatestClockVal}` + ), + result: 'failure_inconsistent_clock' + } } else if (fetchedLatestClockVal === localMaxClockVal) { // Already up to date, no sync necessary logger.info( @@ -218,13 +225,22 @@ module.exports = async function ( fetchedClockRecords[0] && fetchedClockRecords[0].clock !== localMaxClockVal + 1 ) { - throw new Error( - `Cannot sync - imported data is not contiguous. Local max clock val = ${localMaxClockVal} and imported min clock val ${fetchedClockRecords[0].clock}` - ) - } else if (maxClockRecordId !== fetchedLatestClockVal) { - throw new Error( - `Cannot sync - imported data is not consistent. Imported max clock val = ${fetchedLatestClockVal} and imported max ClockRecord val ${maxClockRecordId}` - ) + return { + error: new Error( + `Cannot sync - imported data is not contiguous. Local max clock val = ${localMaxClockVal} and imported min clock val ${fetchedClockRecords[0].clock}` + ), + result: 'failure_import_not_contiguous' + } + } else if ( + !_.isEmpty(fetchedCNodeUser.clockRecords) && + maxClockRecordId !== fetchedLatestClockVal + ) { + return { + error: new Error( + `Cannot sync - imported data is not consistent. Imported max clock val = ${fetchedLatestClockVal} and imported max ClockRecord val ${maxClockRecordId}` + ), + result: 'failure_import_not_consistent' + } } // All DB updates must happen in single atomic tx - partial state updates will lead to data loss @@ -282,9 +298,12 @@ module.exports = async function ( // Error if update failed if (numRowsUpdated !== 1 || respObj.length !== 1) { - throw new Error( - `Failed to update cnodeUser row for cnodeUser wallet ${fetchedWalletPublicKey}` - ) + return { + error: new Error( + `Failed to update cnodeUser row for cnodeUser wallet ${fetchedWalletPublicKey}` + ), + result: 'failure_db_transaction' + } } cnodeUser = respObj[0] } else { @@ -441,7 +460,10 @@ module.exports = async function ( if (userSyncFailureCount < SyncRequestMaxUserFailureCountBeforeSkip) { const errorMsg = `User Sync failed due to ${numCIDsThatFailedSaveFileOp} failing saveFileForMultihashToFS op. userSyncFailureCount = ${userSyncFailureCount} // SyncRequestMaxUserFailureCountBeforeSkip = ${SyncRequestMaxUserFailureCountBeforeSkip}` logger.error(logPrefix, errorMsg) - throw new Error(errorMsg) + return { + error: new Error(errorMsg), + result: 'failure_skip_threshold_not_reached' + } // If max failure threshold reached, continue with sync and reset failure count } else { @@ -538,15 +560,29 @@ module.exports = async function ( await transaction.rollback() - throw new Error(e) + return { + error: new Error(e), + result: 'failure_db_transaction' + } } } } catch (e) { - errorObj = e - for (const wallet of walletPublicKeys) { await SyncHistoryAggregator.recordSyncFail(wallet) } + logger.error( + logPrefix, + `Sync complete for wallets: ${walletPublicKeys.join( + ',' + )}. Status: Error, message: ${e.message}. Duration sync: ${ + Date.now() - start + }. From endpoint ${creatorNodeEndpoint}.` + ) + + return { + error: new Error(e), + result: 'failure_sync_secondary_from_primary' + } } finally { // Release all redis locks for (const wallet of walletPublicKeys) { @@ -559,27 +595,58 @@ module.exports = async function ( ) } } + } - if (errorObj) { - logger.error( - logPrefix, - `Sync complete for wallets: ${walletPublicKeys.join( - ',' - )}. Status: Error, message: ${errorObj.message}. Duration sync: ${ - Date.now() - start - }. From endpoint ${creatorNodeEndpoint}.` - ) - } else { - logger.info( - logPrefix, - `Sync complete for wallets: ${walletPublicKeys.join( - ',' - )}. Status: Success. Duration sync: ${ - Date.now() - start - }. From endpoint ${creatorNodeEndpoint}.` - ) - } + logger.info( + logPrefix, + `Sync complete for wallets: ${walletPublicKeys.join( + ',' + )}. Status: Success. Duration sync: ${ + Date.now() - start + }. From endpoint ${creatorNodeEndpoint}.` + ) + + return { result: 'success' } +} + +/** + * This function is only run on secondaries, to export and sync data from a user's primary. + * + * @notice - By design, will reject any syncs with non-contiguous clock values. For now, + * any data corruption from primary needs to be handled separately and should not be replicated. + * + * @notice - There is a maxExportClockValueRange enforced in export, meaning that some syncs will + * only replicate partial data state. This is by design, and Snapback will trigger repeated syncs + * with progressively increasing clock values until secondaries have completely synced up. + * Secondaries have no knowledge of the current data state on primary, they simply replicate + * what they receive in each export. + */ +module.exports = async function ( + serviceRegistry, + walletPublicKeys, + creatorNodeEndpoint, + blockNumber = null, + forceResync = false +) { + const { prometheusRegistry } = serviceRegistry + const secondarySyncFromPrimaryMetric = prometheusRegistry.getMetric( + prometheusRegistry.metricNames + .SECONDARY_SYNC_FROM_PRIMARY_DURATION_SECONDS_HISTOGRAM + ) + const metricEndTimerFn = secondarySyncFromPrimaryMetric.startTimer() + + const { error, ...labels } = await handleSyncFromPrimary( + serviceRegistry, + walletPublicKeys, + creatorNodeEndpoint, + blockNumber, + forceResync + ) + metricEndTimerFn(labels) + + if (error) { + throw new Error(error) } - return errorObj + return labels } diff --git a/creator-node/test/lib/app.js b/creator-node/test/lib/app.js index fb68243c4e6..7ebc6877f83 100644 --- a/creator-node/test/lib/app.js +++ b/creator-node/test/lib/app.js @@ -58,7 +58,8 @@ function getServiceRegistryMock(libsClient, blacklistManager) { monitoringQueue: new MonitoringQueueMock(), syncQueue: new SyncQueue(nodeConfig, redisClient), nodeConfig, - initLibs: async function () {} + initLibs: async function () {}, + prometheusRegistry: new PrometheusRegistry() } } diff --git a/creator-node/test/sync.test.js b/creator-node/test/sync.test.js index 5a9ed60e85e..54fd4576e2e 100644 --- a/creator-node/test/sync.test.js +++ b/creator-node/test/sync.test.js @@ -1010,12 +1010,16 @@ describe('test nodesync', async function () { assert.strictEqual(initialCNodeUserCount, 0) // Call secondarySyncFromPrimary - await secondarySyncFromPrimary( + const result = await secondarySyncFromPrimary( serviceRegistryMock, userWallets, TEST_ENDPOINT ) + assert.deepStrictEqual(result, { + result: 'success' + }) + const newCNodeUserUUID = await verifyLocalCNodeUserStateForUser( exportedCnodeUser ) @@ -1055,12 +1059,16 @@ describe('test nodesync', async function () { assert.strictEqual(localCNodeUserCount, 1) // Call secondarySyncFromPrimary - await secondarySyncFromPrimary( + const result = await secondarySyncFromPrimary( serviceRegistryMock, userWallets, TEST_ENDPOINT ) + assert.deepStrictEqual(result, { + result: 'success' + }) + await verifyLocalCNodeUserStateForUser(exportedCnodeUser) await verifyLocalStateForUser({ @@ -1098,7 +1106,7 @@ describe('test nodesync', async function () { assert.strictEqual(localCNodeUserCount, 1) // Call secondarySyncFromPrimary with `forceResync` = true - await secondarySyncFromPrimary( + const result = await secondarySyncFromPrimary( serviceRegistryMock, userWallets, TEST_ENDPOINT, @@ -1106,6 +1114,10 @@ describe('test nodesync', async function () { /* forceResync */ true ) + assert.deepStrictEqual(result, { + result: 'success' + }) + const newCNodeUserUUID = await verifyLocalCNodeUserStateForUser( exportedCnodeUser )