diff --git a/creator-node/default-config.json b/creator-node/default-config.json index 0eec1ea1163..060b32d946f 100644 --- a/creator-node/default-config.json +++ b/creator-node/default-config.json @@ -40,6 +40,7 @@ "nodeSyncFileSaveMaxConcurrency": 10, "syncQueueMaxConcurrency": 50, "disableSnapback": true, + "mergePrimaryAndSecondaryEnabled": false, "stateMonitoringQueueRateLimitInterval": 60000, "stateMonitoringQueueRateLimitJobsPerInterval": 3 } \ No newline at end of file diff --git a/creator-node/src/config.js b/creator-node/src/config.js index b5bc615af29..5a6c04bac38 100644 --- a/creator-node/src/config.js +++ b/creator-node/src/config.js @@ -695,6 +695,12 @@ const config = convict({ format: Boolean, env: 'disableSnapback', default: true + }, + mergePrimaryAndSecondaryEnabled: { + doc: 'True to enable issuing sync requests with sync mode = mergePrimaryAndSecondary', + format: Boolean, + env: 'mergePrimaryAndSecondaryEnabled', + default: false } /** * unsupported options at the moment diff --git a/creator-node/src/services/prometheusMonitoring/prometheus.constants.js b/creator-node/src/services/prometheusMonitoring/prometheus.constants.js index cb4718bdcee..5d8456275a4 100644 --- a/creator-node/src/services/prometheusMonitoring/prometheus.constants.js +++ b/creator-node/src/services/prometheusMonitoring/prometheus.constants.js @@ -4,7 +4,8 @@ const config = require('../../config') const { exponentialBucketsRange } = require('./prometheusUtils') const { JOB_NAMES: STATE_MACHINE_JOB_NAMES, - SyncType + SyncType, + SYNC_MODES } = require('../stateMachineManager/stateMachineConstants') /** @@ -77,15 +78,16 @@ const MetricLabels = Object.freeze({ ] }, [MetricNames.FIND_SYNC_REQUEST_COUNTS_GAUGE]: { + sync_mode: Object.values(SYNC_MODES).map(_.snakeCase), result: [ 'not_checked', // Default value -- means the logic short-circuited before checking if the primary should sync to the secondary. This can be expected if this node wasn't the user's primary 'no_sync_already_marked_unhealthy', // Sync not found because the secondary was marked unhealthy before being passed to the find-sync-requests job 'no_sync_sp_id_mismatch', // Sync not found because the secondary's spID mismatched what the chain reported 'no_sync_success_rate_too_low', // Sync not found because the success rate of syncing to this secondary is below the acceptable threshold + 'no_sync_error_computing_sync_mode', // Sync not found because of failure to compute sync mode 'no_sync_secondary_data_matches_primary', // Sync not found because the secondary's clock value and filesHash match primary's 'no_sync_unexpected_error', // Sync not found because some uncaught error was thrown - 'new_sync_request_enqueued_primary_to_secondary', // Sync was found from primary->secondary because all other conditions were met and primary clock value was greater than secondary - 'new_sync_request_enqueued_secondary_to_primary', // Sync was found from secondary->primary because all other conditions were met and secondary clock value was greater than primary + 'new_sync_request_enqueued', // Sync found because all other conditions were met 'sync_request_already_enqueued', // Sync was found but a duplicate request has already been enqueued so no need to enqueue another 'new_sync_request_unable_to_enqueue' // Sync was found but something prevented a new request from being created ] diff --git a/creator-node/src/services/stateMachineManager/stateMachineUtils.js b/creator-node/src/services/stateMachineManager/stateMachineUtils.js index 8cee454ab4b..15cf950eb0f 100644 --- a/creator-node/src/services/stateMachineManager/stateMachineUtils.js +++ b/creator-node/src/services/stateMachineManager/stateMachineUtils.js @@ -176,7 +176,7 @@ const makeGaugeIncToRecord = (metricName, incBy, metricLabels = {}) => { /** * Returns an object that can be returned from any state machine job to record a change in a metric. * Validates the params to make sure the metric is valid. - * @param {string} metricType the type of metric being recorded -- HISTOGRAM or GAUGE_INC + * @param {string} metricType the type of metric being recorded -- HISTOGRAM_OBSERVE or GAUGE_INC * @param {string} metricName the name of the metric from prometheus.constants * @param {number} metricValue the value to observe * @param {string} [metricLabels] the optional mapping of metric label name => metric label value diff --git a/creator-node/src/services/stateMachineManager/stateMonitoring/findSyncRequests.jobProcessor.js b/creator-node/src/services/stateMachineManager/stateMonitoring/findSyncRequests.jobProcessor.js index d1e3ea69f15..4a411a090fd 100644 --- a/creator-node/src/services/stateMachineManager/stateMonitoring/findSyncRequests.jobProcessor.js +++ b/creator-node/src/services/stateMachineManager/stateMonitoring/findSyncRequests.jobProcessor.js @@ -51,100 +51,87 @@ module.exports = async function ({ const unhealthyPeersSet = new Set(unhealthyPeers || []) const metricsToRecord = [] - // Mapping of primary -> (secondary -> { result: numTimesSeenResult }) - const primaryToSecondaryToResultCountsMap = {} + // mapping ( syncMode => mapping ( result => count ) ) + const outcomeCountsMap = {} // Find any syncs that should be performed from each user to any of their secondaries let syncReqsToEnqueue = [] let duplicateSyncReqs = [] let errors = [] for (const user of users) { - const userSecondarySyncMetrics = userSecondarySyncMetricsMap[ - user.wallet - ] || { - [user.secondary1]: { successRate: 1, failureCount: 0 }, - [user.secondary2]: { successRate: 1, failureCount: 0 } + const { wallet, primary, secondary1, secondary2 } = user + + const userSecondarySyncMetrics = userSecondarySyncMetricsMap[wallet] || { + [secondary1]: { successRate: 1, failureCount: 0 }, + [secondary2]: { successRate: 1, failureCount: 0 } } const { syncReqsToEnqueue: userSyncReqsToEnqueue, duplicateSyncReqs: userDuplicateSyncReqs, errors: userErrors, - resultBySecondary: userResultsBySecondary + outcomesBySecondary: userOutcomesBySecondary } = await _findSyncsForUser( user, unhealthyPeersSet, userSecondarySyncMetrics, minSecondaryUserSyncSuccessPercent, minFailedSyncRequestsBeforeReconfig, - replicaToUserInfoMap, - logger + replicaToUserInfoMap ) if (userSyncReqsToEnqueue?.length) { syncReqsToEnqueue = syncReqsToEnqueue.concat(userSyncReqsToEnqueue) - } else if (userDuplicateSyncReqs?.length) { + } + if (userDuplicateSyncReqs?.length) { duplicateSyncReqs = duplicateSyncReqs.concat(userDuplicateSyncReqs) - } else if (userErrors?.length) errors = errors.concat(userErrors) - - // Increment total counters for the user's 2 secondaries so we can report an aggregate total - const { primary } = user - if (!primaryToSecondaryToResultCountsMap[primary]) { - primaryToSecondaryToResultCountsMap[primary] = {} } - for (const [secondary, resultForSecondary] of Object.entries( - userResultsBySecondary + if (userErrors?.length) { + errors = errors.concat(userErrors) + } + + // Emit a log for every result except for default + for (const [secondary, outcome] of Object.entries( + userOutcomesBySecondary )) { - if (!primaryToSecondaryToResultCountsMap[primary][secondary]) { - primaryToSecondaryToResultCountsMap[primary][secondary] = {} + if (outcome.result !== 'not_checked') { + logger.info( + `Recorded findSyncRequests from primary=${primary} to secondary=${secondary} for wallet ${wallet} with syncMode=${outcome.syncMode} and result=${outcome.result}` + ) + } + } + + // Update aggregate outcome counts for metric reporting + for (const outcome of Object.values(userOutcomesBySecondary)) { + const { syncMode, result } = outcome + if (!outcomeCountsMap[syncMode]) { + outcomeCountsMap[syncMode] = {} } - if ( - !primaryToSecondaryToResultCountsMap[primary][secondary][ - resultForSecondary - ] - ) { - primaryToSecondaryToResultCountsMap[primary][secondary][ - resultForSecondary - ] = 0 + if (!outcomeCountsMap[syncMode][result]) { + outcomeCountsMap[syncMode][result] = 0 } - primaryToSecondaryToResultCountsMap[primary][secondary][ - resultForSecondary - ]++ + outcomeCountsMap[syncMode][result] += 1 } } - // Map the result of each findSyncs call to metrics for the reason a sync was found / not found - for (const [primary, secondaryToResultCountMap] of Object.entries( - primaryToSecondaryToResultCountsMap - )) { - for (const [secondary, resultCountMap] of Object.entries( - secondaryToResultCountMap - )) { - for (const [labelValue, metricValue] of Object.entries(resultCountMap)) { - metricsToRecord.push( - makeGaugeIncToRecord( - MetricNames.FIND_SYNC_REQUEST_COUNTS_GAUGE, - metricValue, - { result: labelValue } - ) + // Report aggregate metrics + for (const [syncMode, resultCountsMap] of Object.entries(outcomeCountsMap)) { + for (const [result, count] of Object.entries(resultCountsMap)) { + metricsToRecord.push( + makeGaugeIncToRecord( + MetricNames.FIND_SYNC_REQUEST_COUNTS_GAUGE, + count, + { sync_mode: _.snakeCase(syncMode), result } ) - - // Log so we can find the primary+secondary for each result, but don't spam logs with the default result - if (labelValue !== 'not_checked') { - logger.info( - `Recorded findSyncRequests from primary=${primary} to secondary=${secondary} with result=${labelValue}` - ) - } - } + ) } } + return { duplicateSyncReqs, errors, jobsToEnqueue: syncReqsToEnqueue?.length - ? { - [QUEUE_NAMES.STATE_RECONCILIATION]: syncReqsToEnqueue - } + ? { [QUEUE_NAMES.STATE_RECONCILIATION]: syncReqsToEnqueue } : {}, metricsToRecord } @@ -200,19 +187,14 @@ const _validateJobData = ( * @param {number} minFailedSyncRequestsBeforeReconfig minimum number of failed sync requests to a secondary before the user's replica set gets updated to not include the secondary * @param {Object} replicaToUserInfoMap map(secondary endpoint => map(user wallet => { clock value, filesHash })) */ -const _findSyncsForUser = async ( +async function _findSyncsForUser( user, unhealthyPeers, userSecondarySyncMetricsMap, minSecondaryUserSyncSuccessPercent, minFailedSyncRequestsBeforeReconfig, - replicaToUserInfoMap, - logger -) => { - const syncReqsToEnqueue = [] - const duplicateSyncReqs = [] - const errors = [] - + replicaToUserInfoMap +) { const { wallet, primary, @@ -222,16 +204,14 @@ const _findSyncsForUser = async ( secondary2SpID } = user - const resultBySecondary = { - [secondary1]: 'not_checked', - [secondary2]: 'not_checked' + const outcomesBySecondary = { + [secondary1]: { syncMode: SYNC_MODES.None, result: 'not_checked' }, + [secondary2]: { syncMode: SYNC_MODES.None, result: 'not_checked' } } // Only sync from this node to other nodes if this node is the user's primary if (primary !== thisContentNodeEndpoint) { - return { - resultBySecondary - } + return { outcomesBySecondary } } const replicaSetNodesToObserve = [ @@ -244,6 +224,10 @@ const _findSyncsForUser = async ( (entry) => entry.endpoint ) + const syncReqsToEnqueue = [] + const duplicateSyncReqs = [] + const errors = [] + // For each secondary, add a potential sync request if healthy for (const secondaryInfo of secondariesInfo) { const secondary = secondaryInfo.endpoint @@ -252,7 +236,7 @@ const _findSyncsForUser = async ( // Secondary is unhealthy if we already marked it as unhealthy previously -- don't sync to it if (unhealthyPeers.has(secondary)) { - resultBySecondary[secondary] = 'no_sync_already_marked_unhealthy' + outcomesBySecondary[secondary].result = 'no_sync_already_marked_unhealthy' continue } @@ -261,7 +245,7 @@ const _findSyncsForUser = async ( CNodeToSpIdMapManager.getCNodeEndpointToSpIdMap()[secondary] !== secondaryInfo.spId ) { - resultBySecondary[secondary] = 'no_sync_sp_id_mismatch' + outcomesBySecondary[secondary].result = 'no_sync_sp_id_mismatch' continue } @@ -270,17 +254,16 @@ const _findSyncsForUser = async ( failureCount >= minFailedSyncRequestsBeforeReconfig && successRate < minSecondaryUserSyncSuccessPercent ) { - resultBySecondary[secondary] = 'no_sync_success_rate_too_low' + outcomesBySecondary[secondary].result = 'no_sync_success_rate_too_low' continue } // Determine if secondary requires a sync by comparing its user data against primary (this node) + let syncMode const { clock: primaryClock, filesHash: primaryFilesHash } = replicaToUserInfoMap[primary][wallet] const { clock: secondaryClock, filesHash: secondaryFilesHash } = replicaToUserInfoMap[secondary][wallet] - - let syncMode try { syncMode = await computeSyncModeForUserAndReplica({ wallet, @@ -290,53 +273,50 @@ const _findSyncsForUser = async ( secondaryFilesHash }) } catch (e) { + outcomesBySecondary[secondary].result = + 'no_sync_error_computing_sync_mode' errors.push( `Error computing sync mode for user ${wallet} and secondary ${secondary} - ${e.message}` ) continue } - if (syncMode === SYNC_MODES.SyncSecondaryFromPrimary) { + let result + if (syncMode === SYNC_MODES.None) { + result = 'no_sync_secondary_data_matches_primary' + } else if ( + syncMode === SYNC_MODES.SyncSecondaryFromPrimary || + syncMode === SYNC_MODES.MergePrimaryAndSecondary + ) { try { const { duplicateSyncReq, syncReqToEnqueue } = getNewOrExistingSyncReq({ userWallet: wallet, primaryEndpoint: thisContentNodeEndpoint, secondaryEndpoint: secondary, - syncType: SyncType.Recurring + syncType: SyncType.Recurring, + syncMode }) if (!_.isEmpty(syncReqToEnqueue)) { - resultBySecondary[secondary] = - 'new_sync_request_enqueued_primary_to_secondary' + result = 'new_sync_request_enqueued' syncReqsToEnqueue.push(syncReqToEnqueue) } else if (!_.isEmpty(duplicateSyncReq)) { - resultBySecondary[secondary] = 'sync_request_already_enqueued' + result = 'sync_request_already_enqueued' duplicateSyncReqs.push(duplicateSyncReq) } else { - resultBySecondary[secondary] = 'new_sync_request_unable_to_enqueue' + result = 'new_sync_request_unable_to_enqueue' } } catch (e) { - resultBySecondary[secondary] = 'no_sync_unexpected_error' + result = 'no_sync_unexpected_error' errors.push( - `Error getting new or existing sync request for user ${wallet} and secondary ${secondary} - ${e.message}` + `Error getting new or existing sync request for syncMode ${syncMode}, user ${wallet} and secondary ${secondary} - ${e.message}` ) - continue } - } else if (syncMode === SYNC_MODES.MergePrimaryAndSecondary) { - /** - * TODO - currently just logs as placeholder - * 1. Primary will sync all content from secondary - * 2. Primary will force secondary to wipe its local state and resync all content - */ - logger.info( - `[findSyncRequests][_findSyncsForUser][MergePrimaryAndSecondary = true][SyncType = ${SyncType.Recurring}] wallet ${wallet} secondary ${secondary} Clocks: [${primaryClock},${secondaryClock}] Files hashes: [${primaryFilesHash},${secondaryFilesHash}]` - ) + } - // Note that this metric says a sync was enqueued, but once implemented it could be a duplicate so will need to be changed to be more like the log in the if block above this - resultBySecondary[secondary] = - 'new_sync_request_enqueued_secondary_to_primary' - } else if (syncMode === SYNC_MODES.None) { - resultBySecondary[secondary] = 'no_sync_secondary_data_matches_primary' + outcomesBySecondary[secondary] = { + syncMode, + result } } @@ -344,6 +324,6 @@ const _findSyncsForUser = async ( syncReqsToEnqueue, duplicateSyncReqs, errors, - resultBySecondary + outcomesBySecondary } } diff --git a/creator-node/src/services/stateMachineManager/stateReconciliation/issueSyncRequest.jobProcessor.js b/creator-node/src/services/stateMachineManager/stateReconciliation/issueSyncRequest.jobProcessor.js index 8d4f1b95da2..6c7298b17c9 100644 --- a/creator-node/src/services/stateMachineManager/stateReconciliation/issueSyncRequest.jobProcessor.js +++ b/creator-node/src/services/stateMachineManager/stateReconciliation/issueSyncRequest.jobProcessor.js @@ -16,8 +16,10 @@ const SyncRequestDeDuplicator = require('./SyncRequestDeDuplicator') const SecondarySyncHealthTracker = require('./SecondarySyncHealthTracker') const { SYNC_MONITORING_RETRY_DELAY_MS, - QUEUE_NAMES + QUEUE_NAMES, + SYNC_MODES } = require('../stateMachineConstants') +const primarySyncFromSecondary = require('../../sync/primarySyncFromSecondary') const thisContentNodeEndpoint = config.get('creatorNodeEndpoint') const secondaryUserSyncDailyFailureCountThreshold = config.get( @@ -26,31 +28,43 @@ const secondaryUserSyncDailyFailureCountThreshold = config.get( const maxSyncMonitoringDurationInMs = config.get( 'maxSyncMonitoringDurationInMs' ) +const mergePrimaryAndSecondaryEnabled = config.get( + 'mergePrimaryAndSecondaryEnabled' +) /** - * Processes a job to issue a request to perform a manual or recurring sync (determined by syncType param). - * The sync request syncs a user's data from this node (the user's primary) - * to another node (one of the user's secondaries). + * Processes a job to issue a sync request from a user's primary (this node) to a user's secondary with syncType and syncMode + * Secondary is specified in param.syncRequestParameters * * @param {Object} param job data * @param {Object} param.logger the logger that can be filtered by jobName and jobId * @param {string} param.syncType the type of sync (manual or recurring) + * * @param {string} param.syncMode from SYNC_MODES object * @param {Object} param.syncRequestParameters axios params to make the sync request. Shape: { baseURL, url, method, data } */ -module.exports = async function ({ logger, syncType, syncRequestParameters }) { - _validateJobData(logger, syncType, syncRequestParameters) +module.exports = async function ({ + logger, + syncType, + syncMode, + syncRequestParameters +}) { + _validateJobData(logger, syncType, syncMode, syncRequestParameters) + const isValidSyncJobData = 'baseURL' in syncRequestParameters && 'url' in syncRequestParameters && 'method' in syncRequestParameters && 'data' in syncRequestParameters if (!isValidSyncJobData) { - const errorMsg = `Invalid sync data found: ${syncRequestParameters}` + const errorMsg = `Invalid sync data found: ${JSON.stringify( + syncRequestParameters + )}` logger.error(errorMsg) return { error: { message: errorMsg - } + }, + jobsToEnqueue: {} } } if ( @@ -62,14 +76,21 @@ module.exports = async function ({ logger, syncType, syncRequestParameters }) { return { error: { message: errorMsg - } + }, + jobsToEnqueue: {} + } + } + if (syncMode === SYNC_MODES.None) { + return { + error: {}, + jobsToEnqueue: {} } } const userWallet = syncRequestParameters.data.wallet[0] const secondaryEndpoint = syncRequestParameters.baseURL - const logMsgString = `(${syncType}) User ${userWallet} | Secondary: ${secondaryEndpoint}` + const logMsgString = `(${syncType})(${syncMode}) User ${userWallet} | Secondary: ${secondaryEndpoint}` /** * Remove sync from SyncRequestDeDuplicator once it moves to Active status, before processing. @@ -95,8 +116,43 @@ module.exports = async function ({ logger, syncType, syncRequestParameters }) { return { error: { message: errorMsg + }, + jobsToEnqueue: {} + } + } + + if (syncMode === SYNC_MODES.MergePrimaryAndSecondary) { + // Short-circuit if this syncMode is disabled + if (!mergePrimaryAndSecondaryEnabled) { + logger.info( + `${logMsgString} || Sync mode is disabled - Will not issue sync request` + ) + return { + error: {}, + jobsToEnqueue: {} } } + + /** + * For now, if primarySyncFromSecondary fails, we just log & error without any retries + * Eventually should make this more robust, but proceeding with caution + */ + + // Sync primary content from secondary and set secondary sync flag to forceResync before proceeding + const error = await primarySyncFromSecondary({ + wallet: userWallet, + secondary: secondaryEndpoint + }) + + if (error) { + return { + error: { + message: `primarySyncFromSecondary failed with error: ${error.message}` + }, + jobsToEnqueue: {} + } + } else { + } } // primaryClockValue is used in additionalSyncIsRequired() call below @@ -108,16 +164,27 @@ module.exports = async function ({ logger, syncType, syncRequestParameters }) { `------------------Process SYNC | ${logMsgString} | Primary clock value ${primaryClockValue}------------------` ) - // Issue sync request to secondary + /** + * Issue sync request to secondary + * - If SyncMode = MergePrimaryAndSecondary - issue sync request with forceResync = true + * - above call to primarySyncFromSecondary must have succeeded to get here + * - Only apply forceResync flag to this initial sync request, any future syncs proceed as usual + */ try { - await axios(syncRequestParameters) + if (syncMode === SYNC_MODES.MergePrimaryAndSecondary) { + const syncRequestParametersForceResync = { + ...syncRequestParameters, + data: { ...syncRequestParameters.data, forceResync: true } + } + await axios(syncRequestParametersForceResync) + } else { + await axios(syncRequestParameters) + } } catch (e) { // Axios request will throw on non-200 response -> swallow error to ensure below logic is executed logger.error(`${logMsgString} || Error issuing sync request: ${e.message}`) } - const metricsToRecord = [] - // Wait until has sync has completed (within time threshold) const startWaitingForCompletion = Date.now() const { additionalSyncIsRequired, reasonForAdditionalSync } = @@ -126,9 +193,11 @@ module.exports = async function ({ logger, syncType, syncRequestParameters }) { primaryClockValue, secondaryEndpoint, syncType, + syncMode, logger ) - metricsToRecord.push( + + const metricsToRecord = [ makeHistogramToRecord( MetricNames.ISSUE_SYNC_REQUEST_MONITORING_DURATION_SECONDS_HISTOGRAM, (Date.now() - startWaitingForCompletion) / 1000, // Metric is in seconds @@ -137,7 +206,7 @@ module.exports = async function ({ logger, syncType, syncRequestParameters }) { reason_for_additional_sync: reasonForAdditionalSync } ) - ) + ] // Re-enqueue sync if required let error = {} @@ -147,7 +216,8 @@ module.exports = async function ({ logger, syncType, syncRequestParameters }) { userWallet, secondaryEndpoint, primaryEndpoint: thisContentNodeEndpoint, - syncType + syncType, + syncMode: SYNC_MODES.SyncSecondaryFromPrimary }) if (duplicateSyncReq && !_.isEmpty(duplicateSyncReq)) { error = { @@ -163,6 +233,7 @@ module.exports = async function ({ logger, syncType, syncRequestParameters }) { logger.info( `------------------END Process SYNC | ${logMsgString}------------------` ) + return { error, jobsToEnqueue: _.isEmpty(additionalSyncReq) @@ -174,7 +245,12 @@ module.exports = async function ({ logger, syncType, syncRequestParameters }) { } } -const _validateJobData = (logger, syncType, syncRequestParameters) => { +const _validateJobData = ( + logger, + syncType, + syncMode, + syncRequestParameters +) => { if (typeof logger !== 'object') { throw new Error( `Invalid type ("${typeof logger}") or value ("${logger}") of logger param` @@ -185,6 +261,11 @@ const _validateJobData = (logger, syncType, syncRequestParameters) => { `Invalid type ("${typeof syncType}") or value ("${syncType}") of syncType param` ) } + if (typeof syncMode !== 'string') { + throw new Error( + `Invalid type ("${typeof syncMode}") or value ("${syncMode}") of syncMode param` + ) + } if ( typeof syncRequestParameters !== 'object' || syncRequestParameters instanceof Array @@ -235,6 +316,7 @@ const _additionalSyncIsRequired = async ( primaryClockValue = -1, secondaryUrl, syncType, + syncMode, logger ) => { const logMsgString = `additionalSyncIsRequired() (${syncType}): wallet ${userWallet} secondary ${secondaryUrl} primaryClock ${primaryClockValue}` @@ -259,7 +341,9 @@ const _additionalSyncIsRequired = async ( logger.info(`${logMsgString} secondaryClock ${secondaryClockValue}`) // Record starting and current clock values for secondary to determine future action - if (initialSecondaryClock === null) { + if (syncMode === SYNC_MODES.MergePrimaryAndSecondary) { + initialSecondaryClock = -1 + } else if (initialSecondaryClock === null) { initialSecondaryClock = secondaryClockValue } finalSecondaryClock = secondaryClockValue diff --git a/creator-node/src/services/stateMachineManager/stateReconciliation/stateReconciliationUtils.js b/creator-node/src/services/stateMachineManager/stateReconciliation/stateReconciliationUtils.js index 9270aa3f553..5d99f54442f 100644 --- a/creator-node/src/services/stateMachineManager/stateReconciliation/stateReconciliationUtils.js +++ b/creator-node/src/services/stateMachineManager/stateReconciliation/stateReconciliationUtils.js @@ -18,9 +18,13 @@ const getNewOrExistingSyncReq = ({ primaryEndpoint, secondaryEndpoint, syncType, + syncMode, immediate = false }) => { - // If duplicate sync already exists, do not add and instead return existing sync job info + /** + * If duplicate sync already exists, do not add and instead return existing sync job info + * Ignore syncMode when checking for duplicates, since it doesn't matter + */ const duplicateSyncJobInfo = SyncRequestDeDuplicator.getDuplicateSyncJobInfo( syncType, userWallet, @@ -58,6 +62,7 @@ const getNewOrExistingSyncReq = ({ : JOB_NAMES.ISSUE_RECURRING_SYNC_REQUEST const jobData = { syncType, + syncMode, syncRequestParameters } const syncReqToEnqueue = { diff --git a/creator-node/test/findSyncRequests.jobProcessor.test.js b/creator-node/test/findSyncRequests.jobProcessor.test.js index cc6147f0928..d3dc71e1605 100644 --- a/creator-node/test/findSyncRequests.jobProcessor.test.js +++ b/creator-node/test/findSyncRequests.jobProcessor.test.js @@ -1,8 +1,10 @@ /* eslint-disable no-unused-expressions */ +const _ = require('lodash') const chai = require('chai') const sinon = require('sinon') const { expect } = chai const proxyquire = require('proxyquire') +chai.use(require('sinon-chai')) const { getApp } = require('./lib/app') const { getLibsMock } = require('./lib/libsMock') @@ -14,8 +16,6 @@ const { SYNC_MODES } = require('../src/services/stateMachineManager/stateMachineConstants') -chai.use(require('sinon-chai')) - describe('test findSyncRequests job processor', function () { let server, sandbox, originalContentNodeEndpoint, logger @@ -41,15 +41,15 @@ describe('test findSyncRequests job processor', function () { config.set('creatorNodeEndpoint', originalContentNodeEndpoint) }) - const primary = 'http://primary_cn.co' - const secondary1 = 'http://secondary_to_sync_to.co' - const secondary2 = 'http://secondary_already_synced.co' - const primarySpID = 1 - const secondary1SpID = 2 - const secondary2SpID = 3 - const user_id = 1 - const wallet = '0x123456789' - const users = [ + let primary = 'http://primary_cn.co' + let secondary1 = 'http://secondary_to_sync_to.co' + let secondary2 = 'http://secondary_already_synced.co' + let primarySpID = 1 + let secondary1SpID = 2 + let secondary2SpID = 3 + let user_id = 1 + let wallet = '0x123456789' + let users = [ { user_id, wallet, @@ -61,6 +61,9 @@ describe('test findSyncRequests job processor', function () { secondary2SpID } ] + let syncType = SyncType.Recurring + let metricName = 'audius_cn_find_sync_request_counts' + let metricType = 'GAUGE_INC' function getJobProcessorStub( getNewOrExistingSyncReqStub, @@ -156,21 +159,20 @@ describe('test findSyncRequests job processor', function () { */ const expectedSyncReqToEnqueue = 'expectedSyncReqToEnqueue' - const getNewOrExistingSyncReqExpectedConditionsArr = [ - { - input: { - userWallet: wallet, - primaryEndpoint: primary, - secondaryEndpoint: secondary1, - syncType: SyncType.Recurring - }, - /** - * note - this value can be anything as it's outside scope of this integration test suite - * TODO - should prob change this to reflect real object - */ - output: { syncReqToEnqueue: expectedSyncReqToEnqueue } - } - ] + const getNewOrExistingSyncReqExpectedConditionsArr = [{ + input: { + userWallet: wallet, + primaryEndpoint: primary, + secondaryEndpoint: secondary1, + syncType, + syncMode: SYNC_MODES.SyncSecondaryFromPrimary + }, + /** + * note - this value can be anything as it's outside scope of this integration test suite + * TODO - should prob change this to reflect real object + */ + output: { syncReqToEnqueue: expectedSyncReqToEnqueue } + }] const getNewOrExistingSyncReqStub = getConditionalStub( 'getNewOrExistingSyncReq', getNewOrExistingSyncReqExpectedConditionsArr @@ -226,18 +228,20 @@ describe('test findSyncRequests job processor', function () { metricsToRecord: [ { metricLabels: { - result: 'new_sync_request_enqueued_primary_to_secondary' + sync_mode: _.snakeCase(SYNC_MODES.SyncSecondaryFromPrimary), + result: 'new_sync_request_enqueued' }, - metricName: 'audius_cn_find_sync_request_counts', - metricType: 'GAUGE_INC', + metricName, + metricType, metricValue: 1 }, { metricLabels: { + sync_mode: _.snakeCase(SYNC_MODES.None), result: 'no_sync_secondary_data_matches_primary' }, - metricName: 'audius_cn_find_sync_request_counts', - metricType: 'GAUGE_INC', + metricName, + metricType, metricValue: 1 } ] @@ -300,17 +304,16 @@ describe('test findSyncRequests job processor', function () { // Stub having a duplicate sync so that no new sync will be enqueued const expectedDuplicateSyncReq = 'expectedDuplicateSyncReq' - const getNewOrExistingSyncReqExpectedConditionsArr = [ - { - input: { - userWallet: wallet, - primaryEndpoint: primary, - secondaryEndpoint: secondary1, - syncType: SyncType.Recurring - }, - output: { duplicateSyncReq: expectedDuplicateSyncReq } - } - ] + const getNewOrExistingSyncReqExpectedConditionsArr = [{ + input: { + userWallet: wallet, + primaryEndpoint: primary, + secondaryEndpoint: secondary1, + syncType, + syncMode: SYNC_MODES.SyncSecondaryFromPrimary + }, + output: { duplicateSyncReq: expectedDuplicateSyncReq } + }] const getNewOrExistingSyncReqStub = getConditionalStub( 'getNewOrExistingSyncReq', getNewOrExistingSyncReqExpectedConditionsArr @@ -364,18 +367,20 @@ describe('test findSyncRequests job processor', function () { metricsToRecord: [ { metricLabels: { + sync_mode: _.snakeCase(SYNC_MODES.SyncSecondaryFromPrimary), result: 'sync_request_already_enqueued' }, - metricName: 'audius_cn_find_sync_request_counts', - metricType: 'GAUGE_INC', + metricName, + metricType, metricValue: 1 }, { metricLabels: { + sync_mode: _.snakeCase(SYNC_MODES.None), result: 'no_sync_secondary_data_matches_primary' }, - metricName: 'audius_cn_find_sync_request_counts', - metricType: 'GAUGE_INC', + metricName, + metricType, metricValue: 1 } ] @@ -482,18 +487,20 @@ describe('test findSyncRequests job processor', function () { metricsToRecord: [ { metricLabels: { + sync_mode: _.snakeCase(SYNC_MODES.None), result: 'no_sync_already_marked_unhealthy' }, - metricName: 'audius_cn_find_sync_request_counts', - metricType: 'GAUGE_INC', + metricName, + metricType, metricValue: 1 }, { metricLabels: { + sync_mode: _.snakeCase(SYNC_MODES.None), result: 'no_sync_secondary_data_matches_primary' }, - metricName: 'audius_cn_find_sync_request_counts', - metricType: 'GAUGE_INC', + metricName, + metricType, metricValue: 1 } ] @@ -596,18 +603,20 @@ describe('test findSyncRequests job processor', function () { metricsToRecord: [ { metricLabels: { + sync_mode: _.snakeCase(SYNC_MODES.None), result: 'no_sync_sp_id_mismatch' }, - metricName: 'audius_cn_find_sync_request_counts', - metricType: 'GAUGE_INC', + metricName, + metricType, metricValue: 1 }, { metricLabels: { + sync_mode: _.snakeCase(SYNC_MODES.None), result: 'no_sync_secondary_data_matches_primary' }, - metricName: 'audius_cn_find_sync_request_counts', - metricType: 'GAUGE_INC', + metricName, + metricType, metricValue: 1 } ] @@ -715,18 +724,20 @@ describe('test findSyncRequests job processor', function () { metricsToRecord: [ { metricLabels: { + sync_mode: _.snakeCase(SYNC_MODES.None), result: 'no_sync_success_rate_too_low' }, - metricName: 'audius_cn_find_sync_request_counts', - metricType: 'GAUGE_INC', + metricName, + metricType, metricValue: 1 }, { metricLabels: { + sync_mode: _.snakeCase(SYNC_MODES.None), result: 'no_sync_secondary_data_matches_primary' }, - metricName: 'audius_cn_find_sync_request_counts', - metricType: 'GAUGE_INC', + metricName, + metricType, metricValue: 1 } ] @@ -789,7 +800,8 @@ describe('test findSyncRequests job processor', function () { userWallet: wallet, primaryEndpoint: primary, secondaryEndpoint: secondary1, - syncType: SyncType.Recurring + syncType, + syncMode: SYNC_MODES.SyncSecondaryFromPrimary } const getNewOrExistingSyncReqStub = sandbox.stub().callsFake((args) => { throw new Error(expectedErrorMsg) @@ -839,24 +851,26 @@ describe('test findSyncRequests job processor', function () { const expectedOutput = { duplicateSyncReqs: [], errors: [ - `Error getting new or existing sync request for user ${wallet} and secondary ${secondary1} - ${expectedErrorMsg}` + `Error getting new or existing sync request for syncMode ${SYNC_MODES.SyncSecondaryFromPrimary}, user ${wallet} and secondary ${secondary1} - ${expectedErrorMsg}` ], jobsToEnqueue: {}, metricsToRecord: [ { metricLabels: { + sync_mode: _.snakeCase(SYNC_MODES.SyncSecondaryFromPrimary), result: 'no_sync_unexpected_error' }, - metricName: 'audius_cn_find_sync_request_counts', - metricType: 'GAUGE_INC', + metricName, + metricType, metricValue: 1 }, { metricLabels: { + sync_mode: _.snakeCase(SYNC_MODES.None), result: 'no_sync_secondary_data_matches_primary' }, - metricName: 'audius_cn_find_sync_request_counts', - metricType: 'GAUGE_INC', + metricName, + metricType, metricValue: 1 } ] @@ -880,4 +894,393 @@ describe('test findSyncRequests job processor', function () { computeSyncModeForUserAndReplicaExpectedConditionsArr[1].input ) }) + + it('test for when _findSyncsforUser returns syncReqsToEnqueue and duplicateSyncReqs', async function () { + /** + * Define all input variables + */ + + // spIds in mapping must match those in the `users` variable + const cNodeEndpointToSpIdMap = { + [primary]: primarySpID, + [secondary1]: secondary1SpID, + [secondary2]: secondary2SpID + } + + const unhealthyPeers = [] + + // Both secondaries are behind -> will enqueue syncs to both + const replicaToUserInfoMap = { + [primary]: { + [wallet]: { clock: 10, filesHash: '0xabc' } + }, + [secondary1]: { + [wallet]: { clock: 9, filesHash: '0xnotabc' } + }, + [secondary2]: { + [wallet]: { clock: 9, filesHash: '0xnotabc' } + } + } + + const userSecondarySyncMetricsMap = {} + + // This node must be the primary in order to sync + config.set('creatorNodeEndpoint', primary) + + /** + * Create all stubs for jobProcessor + */ + + // Mock `getNewOrExistingSyncReq()` to return expectedSyncReq for secondary1 and duplicateSyncReq for secondary2 + const expectedSyncReqToEnqueue = 'expectedSyncReqToEnqueue' + const expectedDuplicateSyncReq = 'expectedDuplicateSyncReq' + const getNewOrExistingSyncReqExpectedConditionsArr = [{ + input: { + userWallet: wallet, + primaryEndpoint: primary, + secondaryEndpoint: secondary1, + syncType, + syncMode: SYNC_MODES.SyncSecondaryFromPrimary + }, + output: { syncReqToEnqueue: expectedSyncReqToEnqueue } + }, { + input: { + userWallet: wallet, + primaryEndpoint: primary, + secondaryEndpoint: secondary2, + syncType, + syncMode: SYNC_MODES.SyncSecondaryFromPrimary + }, + output: { duplicateSyncReq: expectedDuplicateSyncReq } + }] + const getNewOrExistingSyncReqStub = getConditionalStub( + 'getNewOrExistingSyncReq', + getNewOrExistingSyncReqExpectedConditionsArr + ) + + const getCNodeEndpointToSpIdMapStub = getGetCNodeEndpointToSpIdMapStub( + cNodeEndpointToSpIdMap + ) + + const computeSyncModeForUserAndReplicaExpectedConditionsArr = [ + { + input: { + wallet, + primaryClock: replicaToUserInfoMap[primary][wallet].clock, + secondaryClock: replicaToUserInfoMap[secondary1][wallet].clock, + primaryFilesHash: replicaToUserInfoMap[primary][wallet].filesHash, + secondaryFilesHash: replicaToUserInfoMap[secondary1][wallet].filesHash + }, + output: SYNC_MODES.SyncSecondaryFromPrimary + }, + { + input: { + wallet, + primaryClock: replicaToUserInfoMap[primary][wallet].clock, + secondaryClock: replicaToUserInfoMap[secondary2][wallet].clock, + primaryFilesHash: replicaToUserInfoMap[primary][wallet].filesHash, + secondaryFilesHash: replicaToUserInfoMap[secondary2][wallet].filesHash + }, + output: SYNC_MODES.SyncSecondaryFromPrimary + } + ] + const computeSyncModeForUserAndReplicaStub = getConditionalStub( + 'computeSyncModeForUserAndReplica', + computeSyncModeForUserAndReplicaExpectedConditionsArr + ) + + const findSyncRequestsJobProcessor = getJobProcessorStub( + getNewOrExistingSyncReqStub, + getCNodeEndpointToSpIdMapStub, + computeSyncModeForUserAndReplicaStub + ) + + /** + * Verify job outputs the correct results: sync to user1 to secondary1 because its clock value is behind + */ + + const expectedOutput = { + duplicateSyncReqs: [expectedDuplicateSyncReq], + errors: [], + jobsToEnqueue: { + [QUEUE_NAMES.STATE_RECONCILIATION]: [expectedSyncReqToEnqueue] + }, + metricsToRecord: [ + { + metricLabels: { + sync_mode: _.snakeCase(SYNC_MODES.SyncSecondaryFromPrimary), + result: 'new_sync_request_enqueued' + }, + metricName, + metricType, + metricValue: 1 + }, + { + metricLabels: { + sync_mode: _.snakeCase(SYNC_MODES.SyncSecondaryFromPrimary), + result: 'sync_request_already_enqueued' + }, + metricName, + metricType, + metricValue: 1 + } + ] + } + const actualOutput = await findSyncRequestsJobProcessor({ + users, + unhealthyPeers, + replicaToUserInfoMap, + userSecondarySyncMetricsMap, + logger + }) + expect(actualOutput).to.deep.equal(expectedOutput) + expect(getNewOrExistingSyncReqStub).to.have.been.calledWithExactly( + getNewOrExistingSyncReqExpectedConditionsArr[0].input + ).and.to.have.been.calledWithExactly( + getNewOrExistingSyncReqExpectedConditionsArr[1].input + ) + expect(computeSyncModeForUserAndReplicaStub) + .to.have.been.calledTwice.and.to.have.been.calledWithExactly( + computeSyncModeForUserAndReplicaExpectedConditionsArr[0].input + ) + .and.to.have.been.calledWithExactly( + computeSyncModeForUserAndReplicaExpectedConditionsArr[1].input + ) + }) + + it('Test with multiple users and outcomes, including MergePrimaryAndSecondary', async function () { + /** + * Define all input variables + */ + + const CN1 = 'http://cn1.co' + const CN2 = 'http://cn2.co' + const CN3 = 'http://cn3.co' + const CN1SpID = 1 + const CN2SpID = 2 + const CN3SpID = 3 + const userID1 = 1 + const userID2 = 2 + const wallet1 = '0xwallet1' + const wallet2 = '0xwallet2' + + users = [ + { + user_id: userID1, + wallet: wallet1, + primary: CN1, + secondary1: CN2, + secondary2: CN3, + primarySpID: CN1SpID, + secondary1SpID: CN2SpID, + secondary2SpID: CN3SpID + }, + { + user_id: userID2, + wallet: wallet2, + primary: CN1, + secondary1: CN3, + secondary2: CN2, + primarySpID: CN1SpID, + secondary1SpID: CN3SpID, + secondary2SpID: CN2SpID + }, + ] + + // spIds in mapping must match those in the `users` variable + const cNodeEndpointToSpIdMap = { + [CN1]: CN1SpID, + [CN2]: CN2SpID, + [CN3]: CN3SpID + } + + const unhealthyPeers = [] + + // wallet1 - primary (CN1) clock value > secondary1 (CN2) clock value + // wallet2 - primary (CN1) files hash != secondary1 (CN3) files hash with same clock val + const replicaToUserInfoMap = { + [CN1]: { + [wallet1]: { clock: 10, filesHash: '0xW1C10FH' }, // primary + [wallet2]: { clock: 10, filesHash: '0xW2C10FH' } // primary + }, + [CN2]: { + [wallet1]: { clock: 9, filesHash: '0xW1C9FH' }, // secondary1 + [wallet2]: { clock: 10, filesHash: '0xW2C10FH' } // secondary2 + }, + [CN3]: { + [wallet1]: { clock: 10, filesHash: '0xW1C10FH' }, // secondary2 + [wallet2]: { clock: 10, filesHash: '0xW2C10BadFH' } // secondary1 + } + } + + const userSecondarySyncMetricsMap = {} + + // This node must be the primary in order to sync + config.set('creatorNodeEndpoint', CN1) + + /** + * Create all stubs for jobProcessor + */ + + const getCNodeEndpointToSpIdMapStub = getGetCNodeEndpointToSpIdMapStub( + cNodeEndpointToSpIdMap + ) + + const computeSyncModeForUserAndReplicaExpectedConditionsArr = [ + // wallet1 - (primary, secondary1) = (CN1, CN2) -> SyncSecondaryFromPrimary + { + input: { + wallet: wallet1, + primaryClock: replicaToUserInfoMap[CN1][wallet1].clock, + secondaryClock: replicaToUserInfoMap[CN2][wallet1].clock, + primaryFilesHash: replicaToUserInfoMap[CN1][wallet1].filesHash, + secondaryFilesHash: replicaToUserInfoMap[CN2][wallet1].filesHash + }, + output: SYNC_MODES.SyncSecondaryFromPrimary + }, + // wallet1 - (primary, secondary2) = (CN1, CN3) -> None + { + input: { + wallet: wallet1, + primaryClock: replicaToUserInfoMap[CN1][wallet1].clock, + secondaryClock: replicaToUserInfoMap[CN3][wallet1].clock, + primaryFilesHash: replicaToUserInfoMap[CN1][wallet1].filesHash, + secondaryFilesHash: replicaToUserInfoMap[CN3][wallet1].filesHash + }, + output: SYNC_MODES.None + }, + // wallet2 - (primary, secondary1) = (CN1, CN3) -> MergePrimaryAndSecondary + { + input: { + wallet: wallet2, + primaryClock: replicaToUserInfoMap[CN1][wallet2].clock, + secondaryClock: replicaToUserInfoMap[CN3][wallet2].clock, + primaryFilesHash: replicaToUserInfoMap[CN1][wallet2].filesHash, + secondaryFilesHash: replicaToUserInfoMap[CN3][wallet2].filesHash + }, + output: SYNC_MODES.MergePrimaryAndSecondary + }, + // wallet2 - (primary, secondary2) = (CN1, CN2) -> None + { + input: { + wallet: wallet2, + primaryClock: replicaToUserInfoMap[CN1][wallet2].clock, + secondaryClock: replicaToUserInfoMap[CN2][wallet2].clock, + primaryFilesHash: replicaToUserInfoMap[CN1][wallet2].filesHash, + secondaryFilesHash: replicaToUserInfoMap[CN2][wallet2].filesHash + }, + output: SYNC_MODES.None + }, + ] + const computeSyncModeForUserAndReplicaStub = getConditionalStub( + 'computeSyncModeForUserAndReplica', + computeSyncModeForUserAndReplicaExpectedConditionsArr + ) + + const expectedSyncReqToEnqueueWallet1 = 'expectedSyncReqToEnqueueWallet1' + const expectedSyncReqToEnqueueWallet2 = 'expectedSyncReqToEnqueueWallet2' + const getNewOrExistingSyncReqExpectedConditionsArr = [ + // wallet1 + { + input: { + userWallet: wallet1, + primaryEndpoint: CN1, + secondaryEndpoint: CN2, + syncType, + syncMode: SYNC_MODES.SyncSecondaryFromPrimary + }, + output: { syncReqToEnqueue: expectedSyncReqToEnqueueWallet1 } + }, // wallet2 + { + input: { + userWallet: wallet2, + primaryEndpoint: CN1, + secondaryEndpoint: CN3, + syncType, + syncMode: SYNC_MODES.MergePrimaryAndSecondary + }, + output: { syncReqToEnqueue: expectedSyncReqToEnqueueWallet2 } + } + ] + const getNewOrExistingSyncReqStub = getConditionalStub( + 'getNewOrExistingSyncReq', + getNewOrExistingSyncReqExpectedConditionsArr + ) + + const findSyncRequestsJobProcessor = getJobProcessorStub( + getNewOrExistingSyncReqStub, + getCNodeEndpointToSpIdMapStub, + computeSyncModeForUserAndReplicaStub + ) + + /** + * Verify job outputs the correct results: sync to user1 to secondary1 because its clock value is behind + */ + + const expectedOutput = { + duplicateSyncReqs: [], + errors: [], + jobsToEnqueue: { + [QUEUE_NAMES.STATE_RECONCILIATION]: [ + expectedSyncReqToEnqueueWallet1, expectedSyncReqToEnqueueWallet2 + ] + }, + metricsToRecord: [ + { + metricLabels: { + sync_mode: _.snakeCase(SYNC_MODES.SyncSecondaryFromPrimary), + result: 'new_sync_request_enqueued' + }, + metricName, + metricType, + metricValue: 1 + }, + { + metricLabels: { + sync_mode: _.snakeCase(SYNC_MODES.None), + result: 'no_sync_secondary_data_matches_primary' + }, + metricName, + metricType, + metricValue: 2 + }, + { + metricLabels: { + sync_mode: _.snakeCase(SYNC_MODES.MergePrimaryAndSecondary), + result: 'new_sync_request_enqueued' + }, + metricName, + metricType, + metricValue: 1 + } + ] + } + const actualOutput = await findSyncRequestsJobProcessor({ + users, + unhealthyPeers, + replicaToUserInfoMap, + userSecondarySyncMetricsMap, + logger + }) + expect(actualOutput).to.deep.equal(expectedOutput) + expect(computeSyncModeForUserAndReplicaStub) + .to.have.been.calledWithExactly( + computeSyncModeForUserAndReplicaExpectedConditionsArr[0].input + ) + .and.to.have.been.calledWithExactly( + computeSyncModeForUserAndReplicaExpectedConditionsArr[1].input + ) + .and.to.have.been.calledWithExactly( + computeSyncModeForUserAndReplicaExpectedConditionsArr[2].input + ) + .and.to.have.been.calledWithExactly( + computeSyncModeForUserAndReplicaExpectedConditionsArr[3].input + ) + expect(getNewOrExistingSyncReqStub) + .to.have.been.calledWithExactly( + getNewOrExistingSyncReqExpectedConditionsArr[0].input + ).and.to.have.been.calledWithExactly( + getNewOrExistingSyncReqExpectedConditionsArr[1].input + ) + }) }) diff --git a/creator-node/test/issueSyncRequest.jobProcessor.test.js b/creator-node/test/issueSyncRequest.jobProcessor.test.js index fa7e7119280..317a0bab7d4 100644 --- a/creator-node/test/issueSyncRequest.jobProcessor.test.js +++ b/creator-node/test/issueSyncRequest.jobProcessor.test.js @@ -9,10 +9,12 @@ const { getLibsMock } = require('./lib/libsMock') const models = require('../src/models') const config = require('../src/config') +const stateMachineConstants = require('../src/services/stateMachineManager/stateMachineConstants') const { SyncType, - QUEUE_NAMES -} = require('../src/services/stateMachineManager/stateMachineConstants') + QUEUE_NAMES, + SYNC_MODES +} = stateMachineConstants const issueSyncRequestJobProcessor = require('../src/services/stateMachineManager/stateReconciliation/issueSyncRequest.jobProcessor') chai.use(require('sinon-chai')) @@ -21,6 +23,9 @@ const { expect } = chai describe('test issueSyncRequest job processor param validation', function () { let server, sandbox, originalContentNodeEndpoint, logger + + let syncMode = SYNC_MODES.SyncSecondaryFromPrimary + beforeEach(async function () { const appInfo = await getApp(getLibsMock()) await appInfo.app.get('redisClient').flushdb() @@ -51,25 +56,27 @@ describe('test issueSyncRequest job processor param validation', function () { const method = 'post' const data = { wallet: [wallet] } const syncRequestParameters = { - // Missing baseURL + // Missing secondary url, method, data } - const expectedErrorMessage = `Invalid sync data found: ${syncRequestParameters}` + const expectedErrorMessage = `Invalid sync data found: ${JSON.stringify(syncRequestParameters)}` // Verify job outputs the correct results: sync to user1 to secondary1 because its clock value is behind await expect( issueSyncRequestJobProcessor({ logger, syncType: 'anyDummyType', + syncMode, syncRequestParameters }) ).to.eventually.be.fulfilled.and.deep.equal({ error: { message: expectedErrorMessage - } + }, + "jobsToEnqueue": {} }) expect(logger.error).to.have.been.calledOnceWithExactly( expectedErrorMessage @@ -78,12 +85,12 @@ describe('test issueSyncRequest job processor param validation', function () { it('catches bad wallet in data', async function () { const wallet = '0x123456789' - const baseURL = 'http://some_cn.co' + const secondary = 'http://some_cn.co' const url = '/sync' const method = 'post' const data = { wallet } // Bad wallet -- should be an array const syncRequestParameters = { - baseURL, + baseURL: secondary, url, method, data @@ -96,12 +103,14 @@ describe('test issueSyncRequest job processor param validation', function () { issueSyncRequestJobProcessor({ logger, syncType: 'anyDummyType', + syncMode, syncRequestParameters }) ).to.eventually.be.fulfilled.and.deep.equal({ error: { message: expectedErrorMessage - } + }, + "jobsToEnqueue": {} }) expect(logger.error).to.have.been.calledOnceWithExactly( expectedErrorMessage @@ -115,8 +124,30 @@ describe('test issueSyncRequest job processor', function () { originalContentNodeEndpoint, logger, recordSuccessStub, - recordFailureStub + recordFailureStub, + syncType, + syncMode, + primary, + secondary, + wallet, + data, + syncRequestParameters + beforeEach(async function () { + syncType = SyncType.Manual + syncMode = SYNC_MODES.SyncSecondaryFromPrimary + primary = 'http://primary_cn.co' + wallet = '0x123456789' + + secondary = 'http://some_cn.co' + data = { wallet: [wallet] } + syncRequestParameters = { + baseURL: secondary, + url: '/sync', + method: 'post', + data + } + const appInfo = await getApp(getLibsMock()) await appInfo.app.get('redisClient').flushdb() server = appInfo.server @@ -143,47 +174,41 @@ describe('test issueSyncRequest job processor', function () { nock.enableNetConnect() }) - const syncType = SyncType.Manual - const primary = 'http://primary_cn.co' - const wallet = '0x123456789' - - const baseURL = 'http://some_cn.co' - const url = '/sync' - const method = 'post' - const data = { wallet: [wallet] } - const syncRequestParameters = { - baseURL, - url, - method, - data - } - function getJobProcessorStub({ getNewOrExistingSyncReqStub, getSecondaryUserSyncFailureCountForTodayStub, - retrieveClockValueForUserFromReplicaStub + retrieveClockValueForUserFromReplicaStub, + primarySyncFromSecondaryStub }) { + + const stubs = { + '../../../config': config, + './stateReconciliationUtils': { + getNewOrExistingSyncReq: getNewOrExistingSyncReqStub + }, + './SecondarySyncHealthTracker': { + getSecondaryUserSyncFailureCountForToday: + getSecondaryUserSyncFailureCountForTodayStub, + recordSuccess: recordSuccessStub, + recordFailure: recordFailureStub + }, + '../stateMachineUtils': { + retrieveClockValueForUserFromReplica: + retrieveClockValueForUserFromReplicaStub + }, + '../stateMachineConstants': { + ...stateMachineConstants, + SYNC_MONITORING_RETRY_DELAY_MS: 1, + } + } + + if (primarySyncFromSecondaryStub) { + stubs['../../sync/primarySyncFromSecondary'] = primarySyncFromSecondaryStub + } + return proxyquire( '../src/services/stateMachineManager/stateReconciliation/issueSyncRequest.jobProcessor.js', - { - '../../../config': config, - './stateReconciliationUtils': { - getNewOrExistingSyncReq: getNewOrExistingSyncReqStub - }, - './SecondarySyncHealthTracker': { - getSecondaryUserSyncFailureCountForToday: - getSecondaryUserSyncFailureCountForTodayStub, - recordSuccess: recordSuccessStub, - recordFailure: recordFailureStub - }, - '../stateMachineUtils': { - retrieveClockValueForUserFromReplica: - retrieveClockValueForUserFromReplicaStub - }, - '../stateMachineConstants': { - SYNC_MONITORING_RETRY_DELAY_MS: 1 - } - } + stubs ) } @@ -198,19 +223,23 @@ describe('test issueSyncRequest job processor', function () { const retrieveClockValueForUserFromReplicaStub = sandbox.stub().resolves(1) + const primarySyncFromSecondaryStub = sandbox.stub().returns(null) + + // Make the axios request succeed + nock(secondary).post('/sync', data).reply(200) + const issueSyncRequestJobProcessor = getJobProcessorStub({ getNewOrExistingSyncReqStub, getSecondaryUserSyncFailureCountForTodayStub, - retrieveClockValueForUserFromReplicaStub + retrieveClockValueForUserFromReplicaStub, + primarySyncFromSecondaryStub }) - // Make the axios request succeed - nock(baseURL).post('/sync', data).reply(200) - // Verify job outputs the correct results: no sync issued (nock will error if the wrong network req was made) const result = await issueSyncRequestJobProcessor({ logger, syncType, + syncMode, syncRequestParameters }) expect(result).to.have.deep.property('error', {}) @@ -253,12 +282,13 @@ describe('test issueSyncRequest job processor', function () { retrieveClockValueForUserFromReplicaStub }) - const expectedErrorMessage = `(${syncType}) User ${wallet} | Secondary: ${baseURL} || Secondary has already met SecondaryUserSyncDailyFailureCountThreshold (${failureThreshold}). Will not issue further syncRequests today.` + const expectedErrorMessage = `(${syncType})(${syncMode}) User ${wallet} | Secondary: ${secondary} || Secondary has already met SecondaryUserSyncDailyFailureCountThreshold (${failureThreshold}). Will not issue further syncRequests today.` // Verify job outputs the correct results: error and no sync issued (nock will error if a network req was made) const result = await issueSyncRequestJobProcessor({ logger, syncType, + syncMode, syncRequestParameters }) expect(result).to.have.deep.property('error', { @@ -279,11 +309,12 @@ describe('test issueSyncRequest job processor', function () { const expectedSyncReqToEnqueue = 'expectedSyncReqToEnqueue' const getNewOrExistingSyncReqStub = sandbox.stub().callsFake((args) => { - const { userWallet, secondaryEndpoint, syncType: syncTypeArg } = args + const { userWallet, secondaryEndpoint, syncType: syncTypeArg, syncMode: syncModeArg } = args if ( userWallet === wallet && - secondaryEndpoint === baseURL && - syncTypeArg === syncType + secondaryEndpoint === secondary && + syncTypeArg === syncType && + syncModeArg === syncMode ) { return { syncReqToEnqueue: expectedSyncReqToEnqueue } } @@ -314,12 +345,13 @@ describe('test issueSyncRequest job processor', function () { .resolves([{ walletPublicKey: wallet, clock: primaryClockValue }]) // Make the axios request succeed - nock(baseURL).post('/sync', data).reply(200) + nock(secondary).post('/sync', data).reply(200) // Verify job outputs the correct results: an additional sync const result = await issueSyncRequestJobProcessor({ logger, syncType, + syncMode, syncRequestParameters }) expect(result).to.have.deep.property('error', {}) @@ -342,15 +374,16 @@ describe('test issueSyncRequest job processor', function () { expect(result.metricsToRecord[0].metricValue).to.be.a('number') expect(getNewOrExistingSyncReqStub).to.have.been.calledOnceWithExactly({ userWallet: wallet, - secondaryEndpoint: baseURL, + secondaryEndpoint: secondary, primaryEndpoint: primary, - syncType + syncType, + syncMode }) expect( retrieveClockValueForUserFromReplicaStub.callCount ).to.be.greaterThanOrEqual(2) expect(recordSuccessStub).to.have.been.calledOnceWithExactly( - baseURL, + secondary, wallet, syncType ) @@ -365,11 +398,12 @@ describe('test issueSyncRequest job processor', function () { const expectedSyncReqToEnqueue = 'expectedSyncReqToEnqueue' const getNewOrExistingSyncReqStub = sandbox.stub().callsFake((args) => { - const { userWallet, secondaryEndpoint, syncType: syncTypeArg } = args + const { userWallet, secondaryEndpoint, syncType: syncTypeArg, syncMode: syncModeArg } = args if ( userWallet === wallet && - secondaryEndpoint === baseURL && - syncTypeArg === syncType + secondaryEndpoint === secondary && + syncTypeArg === syncType && + syncModeArg === syncMode ) { return { syncReqToEnqueue: expectedSyncReqToEnqueue } } @@ -397,12 +431,13 @@ describe('test issueSyncRequest job processor', function () { .resolves([{ walletPublicKey: wallet, clock: primaryClockValue }]) // Make the axios request succeed - nock(baseURL).post('/sync', data).reply(200) + nock(secondary).post('/sync', data).reply(200) // Verify job outputs the correct results: an additional sync const result = await issueSyncRequestJobProcessor({ logger, syncType, + syncMode, syncRequestParameters }) expect(result).to.have.deep.property('error', {}) @@ -425,18 +460,213 @@ describe('test issueSyncRequest job processor', function () { expect(result.metricsToRecord[0].metricValue).to.be.a('number') expect(getNewOrExistingSyncReqStub).to.have.been.calledOnceWithExactly({ userWallet: wallet, - secondaryEndpoint: baseURL, + secondaryEndpoint: secondary, primaryEndpoint: primary, - syncType + syncType, + syncMode }) expect( retrieveClockValueForUserFromReplicaStub.callCount ).to.be.greaterThanOrEqual(2) expect(recordFailureStub).to.have.been.calledOnceWithExactly( - baseURL, + secondary, wallet, syncType ) expect(recordSuccessStub).to.have.not.been.called }) + + it('SyncMode.None', async function () { + syncMode = SYNC_MODES.None + + const getNewOrExistingSyncReqStub = sandbox.stub().callsFake((args) => { + throw new Error('getNewOrExistingSyncReq was not expected to be called') + }) + + const getSecondaryUserSyncFailureCountForTodayStub = sandbox + .stub() + .returns(0) + + const retrieveClockValueForUserFromReplicaStub = sandbox.stub().resolves(1) + + const primarySyncFromSecondaryStub = sandbox.stub().returns(null) + + // Make the axios request succeed + nock(secondary).post('/sync', data).reply(200) + + const issueSyncRequestJobProcessor = getJobProcessorStub({ + getNewOrExistingSyncReqStub, + getSecondaryUserSyncFailureCountForTodayStub, + retrieveClockValueForUserFromReplicaStub, + primarySyncFromSecondaryStub + }) + + // Verify job outputs the correct results: no sync issued (nock will error if the wrong network req was made) + const result = await issueSyncRequestJobProcessor({ + logger, + syncType, + syncMode, + syncRequestParameters + }) + expect(result).to.have.deep.property('error', {}) + expect(result).to.have.deep.property('jobsToEnqueue', {}) + expect(result).to.not.have.deep.property('metricsToRecord') + expect(getNewOrExistingSyncReqStub).to.not.have.been.called + }) + + describe('test SYNC_MODES.MergePrimaryAndSecondary', function () { + beforeEach(async function () { + syncMode = SYNC_MODES.MergePrimaryAndSecondary + }) + + it('Issues correct sync when primarySyncFromSecondary() succeeds and no additional sync is required', async function () { + const getNewOrExistingSyncReqStub = sandbox.stub().callsFake((args) => { + throw new Error('getNewOrExistingSyncReq was not expected to be called') + }) + + const getSecondaryUserSyncFailureCountForTodayStub = sandbox + .stub() + .returns(0) + + const retrieveClockValueForUserFromReplicaStub = sandbox.stub().resolves(1) + + config.set('mergePrimaryAndSecondaryEnabled', true) + + const primarySyncFromSecondaryStub = sandbox.stub().callsFake((args) => { + const { wallet: walletParam, secondary: secondaryParam } = args + if (walletParam === wallet && secondaryParam === secondary) { + return + } + throw new Error(`primarySyncFromSecondary was not expected to be called with the given args`) + }) + + const issueSyncRequestJobProcessor = getJobProcessorStub({ + getNewOrExistingSyncReqStub, + getSecondaryUserSyncFailureCountForTodayStub, + retrieveClockValueForUserFromReplicaStub, + primarySyncFromSecondaryStub + }) + + // Make the axios request succeed + nock(secondary).post('/sync', data).reply(200) + + // Verify job outputs the correct results: no sync issued (nock will error if the wrong network req was made) + const result = await issueSyncRequestJobProcessor({ + logger, + syncType, + syncMode, + syncRequestParameters + }) + expect(result).to.have.deep.property('error', {}) + expect(result).to.have.deep.property('jobsToEnqueue', {}) + expect(result.metricsToRecord).to.have.lengthOf(1) + expect(result.metricsToRecord[0]).to.have.deep.property( + 'metricName', + 'audius_cn_issue_sync_request_monitoring_duration_seconds' + ) + expect(result.metricsToRecord[0]).to.have.deep.property('metricLabels', { + syncType: 'manual', + reason_for_additional_sync: 'none' + }) + expect(result.metricsToRecord[0]).to.have.deep.property( + 'metricType', + 'HISTOGRAM_OBSERVE' + ) + expect(result.metricsToRecord[0].metricValue).to.be.a('number') + expect(getNewOrExistingSyncReqStub).to.not.have.been.called + expect(primarySyncFromSecondaryStub).to.have.been.calledOnceWithExactly({ + wallet, + secondary + }) + }) + + it('primarySyncFromSecondary errors', async function () { + const getNewOrExistingSyncReqStub = sandbox.stub().callsFake((args) => { + throw new Error('getNewOrExistingSyncReq was not expected to be called') + }) + + const getSecondaryUserSyncFailureCountForTodayStub = sandbox + .stub() + .returns(0) + + const retrieveClockValueForUserFromReplicaStub = sandbox.stub().resolves(1) + + config.set('mergePrimaryAndSecondaryEnabled', true) + + const primarySyncFromSecondaryError = new Error('Sync failure') + const primarySyncFromSecondaryStub = sandbox.stub().callsFake((args) => { + const { wallet: walletParam, secondary: secondaryParam } = args + if (walletParam === wallet && secondaryParam === secondary) { + return primarySyncFromSecondaryError + } + throw new Error(`primarySyncFromSecondary was not expected to be called with the given args`) + }) + + const issueSyncRequestJobProcessor = getJobProcessorStub({ + getNewOrExistingSyncReqStub, + getSecondaryUserSyncFailureCountForTodayStub, + retrieveClockValueForUserFromReplicaStub, + primarySyncFromSecondaryStub, + }) + + // Make the axios request succeed + nock(secondary).post('/sync', data).reply(200) + + // Verify job outputs the correct results: no sync issued (nock will error if the wrong network req was made) + const result = await issueSyncRequestJobProcessor({ + logger, + syncType, + syncMode, + syncRequestParameters + }) + expect(result).to.have.deep.property('error', { + message: `primarySyncFromSecondary failed with error: ${primarySyncFromSecondaryError.message}` + }) + expect(result).to.have.deep.property('jobsToEnqueue', {}) + expect(result).to.not.have.deep.property('metricsToRecord') + expect(getNewOrExistingSyncReqStub).to.not.have.been.called + }) + + it('mergePrimaryAndSecondaryEnabled = false', async function () { + const getNewOrExistingSyncReqStub = sandbox.stub().callsFake((args) => { + throw new Error('getNewOrExistingSyncReq was not expected to be called') + }) + + const getSecondaryUserSyncFailureCountForTodayStub = sandbox + .stub() + .returns(0) + + const retrieveClockValueForUserFromReplicaStub = sandbox.stub().resolves(1) + + config.set('mergePrimaryAndSecondaryEnabled', false) + + const primarySyncFromSecondaryStub = sandbox.stub().callsFake((args) => { + throw new Error(`primarySyncFromSecondary was not expected to be called with the given args`) + }) + + const issueSyncRequestJobProcessor = getJobProcessorStub({ + getNewOrExistingSyncReqStub, + getSecondaryUserSyncFailureCountForTodayStub, + retrieveClockValueForUserFromReplicaStub, + primarySyncFromSecondaryStub + }) + + // Make the axios request succeed + nock(secondary).post('/sync', data).reply(200) + + // Verify job outputs the correct results: no sync issued (nock will error if the wrong network req was made) + const result = await issueSyncRequestJobProcessor({ + logger, + syncType, + syncMode, + syncRequestParameters + }) + expect(result).to.have.deep.property('error', {}) + expect(result).to.have.deep.property('jobsToEnqueue', {}) + expect(result).to.not.have.deep.property('metricsToRecord') + expect(getNewOrExistingSyncReqStub).to.not.have.been.called + }) + + it.skip('requires additional sync when secondary updates clock value but is still behind primary') + }) })