diff --git a/creator-node/package-lock.json b/creator-node/package-lock.json index 00f5a04daaa..810bfbc7685 100644 --- a/creator-node/package-lock.json +++ b/creator-node/package-lock.json @@ -2230,7 +2230,7 @@ "@protobufjs/aspromise": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/@protobufjs/aspromise/-/aspromise-1.1.2.tgz", - "integrity": "sha512-j+gKExEuLmKwvz3OgROXtrJ2UG2x8Ch2YZUxahh+s1F2HZ+wAceUNLkvy6zKCPVRkU++ZWQrdxsUeQXmcg4uoQ==" + "integrity": "sha1-m4sMxmPWaafY9vXQiToU00jzD78=" }, "@protobufjs/base64": { "version": "1.1.2", @@ -2245,12 +2245,12 @@ "@protobufjs/eventemitter": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/@protobufjs/eventemitter/-/eventemitter-1.1.0.tgz", - "integrity": "sha512-j9ednRT81vYJ9OfVuXG6ERSTdEL1xVsNgqpkxMsbIabzSo3goCjDIveeGv5d03om39ML71RdmrGNjG5SReBP/Q==" + "integrity": "sha1-NVy8mLr61ZePntCV85diHx0Ga3A=" }, "@protobufjs/fetch": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/@protobufjs/fetch/-/fetch-1.1.0.tgz", - "integrity": "sha512-lljVXpqXebpsijW71PZaCYeIcE5on1w5DlQy5WH6GLbFryLUrBD4932W/E2BSpfRJWseIL4v/KPgBFxDOIdKpQ==", + "integrity": "sha1-upn7WYYUr2VwDBYZ/wbUVLDYTEU=", "requires": { "@protobufjs/aspromise": "^1.1.1", "@protobufjs/inquire": "^1.1.0" @@ -2259,27 +2259,27 @@ "@protobufjs/float": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/@protobufjs/float/-/float-1.0.2.tgz", - "integrity": "sha512-Ddb+kVXlXst9d+R9PfTIxh1EdNkgoRe5tOX6t01f1lYWOvJnSPDBlG241QLzcyPdoNTsblLUdujGSE4RzrTZGQ==" + "integrity": "sha1-Xp4avctz/Ap8uLKR33jIy9l7h9E=" }, "@protobufjs/inquire": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/@protobufjs/inquire/-/inquire-1.1.0.tgz", - "integrity": "sha512-kdSefcPdruJiFMVSbn801t4vFK7KB/5gd2fYvrxhuJYg8ILrmn9SKSX2tZdV6V+ksulWqS7aXjBcRXl3wHoD9Q==" + "integrity": "sha1-/yAOPnzyQp4tyvwRQIKOjMY48Ik=" }, "@protobufjs/path": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/@protobufjs/path/-/path-1.1.2.tgz", - "integrity": "sha512-6JOcJ5Tm08dOHAbdR3GrvP+yUUfkjG5ePsHYczMFLq3ZmMkAD98cDgcT2iA1lJ9NVwFd4tH/iSSoe44YWkltEA==" + "integrity": "sha1-bMKyDFya1q0NzP0hynZz2Nf79o0=" }, "@protobufjs/pool": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/@protobufjs/pool/-/pool-1.1.0.tgz", - "integrity": "sha512-0kELaGSIDBKvcgS4zkjz1PeddatrjYcmMWOlAuAPwAeccUrPHdUqo/J6LiymHHEiJT5NrF1UVwxY14f+fy4WQw==" + "integrity": "sha1-Cf0V8tbTq/qbZbw2ZQbWrXhG/1Q=" }, "@protobufjs/utf8": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/@protobufjs/utf8/-/utf8-1.1.0.tgz", - "integrity": "sha512-Vvn3zZrhQZkkBE8LSuW3em98c0FwgO4nxzv6OdSxPKJIEKY2bGbHn+mhGIPerzI4twdxaP8/0+06HBpwf345Lw==" + "integrity": "sha1-p3c2C1s5oaLlEG+OhY8v0tBgxXA=" }, "@sindresorhus/is": { "version": "0.14.0", @@ -3112,7 +3112,7 @@ "strict-uri-encode": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/strict-uri-encode/-/strict-uri-encode-2.0.0.tgz", - "integrity": "sha512-QwiXZgpRcKkhTj2Scnn++4PKtWsH0kpzZ62L2R6c/LUVYv7hVnZqcg2+sMuT6R7Jusu1vviK/MFsu6kNJfWlEQ==" + "integrity": "sha1-ucczDHBChi9rFC3CdLvMWGbONUY=" } } }, @@ -3754,7 +3754,7 @@ "babel-plugin-syntax-jsx": { "version": "6.18.0", "resolved": "https://registry.npmjs.org/babel-plugin-syntax-jsx/-/babel-plugin-syntax-jsx-6.18.0.tgz", - "integrity": "sha512-qrPaCSo9c8RHNRHIotaufGbuOBN8rtdC4QrrFFc43vyWCCz7Kl7GL1PGaXtMGQZUXrkCjNEgxDfmAuAabr/rlw==" + "integrity": "sha1-CvMqmm4Tyno/1QaeYtew9Y0NiUY=" }, "balanced-match": { "version": "1.0.2", @@ -3952,7 +3952,7 @@ "bmp-js": { "version": "0.1.0", "resolved": "https://registry.npmjs.org/bmp-js/-/bmp-js-0.1.0.tgz", - "integrity": "sha512-vHdS19CnY3hwiNdkaqk93DvjVLfbEcI8mys4UjuWrlX1haDmroo8o4xCzh4wD6DGV6HxRCyauwhHRqMTfERtjw==" + "integrity": "sha1-4Fpj95amwf8l9Hcex62twUjAcjM=" }, "bn.js": { "version": "4.12.0", @@ -4210,7 +4210,7 @@ "buffer-equal": { "version": "0.0.1", "resolved": "https://registry.npmjs.org/buffer-equal/-/buffer-equal-0.0.1.tgz", - "integrity": "sha512-RgSV6InVQ9ODPdLWJ5UAqBqJBOg370Nz6ZQtRzpt6nUjc8v0St97uJ4PYC6NztqIScrAXafKM3mZPMygSe1ggA==" + "integrity": "sha1-kbx0sR6kBbyRa8aqkI+q+ltKrEs=" }, "buffer-layout": { "version": "1.2.2", @@ -4481,7 +4481,7 @@ "camelize": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/camelize/-/camelize-1.0.0.tgz", - "integrity": "sha512-W2lPwkBkMZwFlPCXhIlYgxu+7gC/NUlCtdK652DAJ1JdgV0sTrvuPFshNPrFa1TY2JOkLhgdeEBplB4ezEa+xg==" + "integrity": "sha1-FkpUg+Yw+kMh5a8HAg5TGDGyYJs=" }, "caniuse-lite": { "version": "1.0.30001241", @@ -5181,7 +5181,7 @@ "css-color-keywords": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/css-color-keywords/-/css-color-keywords-1.0.0.tgz", - "integrity": "sha512-FyyrDHZKEjXDpNJYvVsV960FiqQyXc/LlYmsxl2BcdMb2WPx0OGRVgTg55rPSyLSNMqP52R9r8geSp7apN3Ofg==" + "integrity": "sha1-/qJhbcZ2spYmhrOvjb2+GAskTgU=" }, "css-to-react-native": { "version": "3.0.0", @@ -7121,7 +7121,7 @@ "fast-stable-stringify": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/fast-stable-stringify/-/fast-stable-stringify-1.0.0.tgz", - "integrity": "sha512-wpYMUmFu5f00Sm0cj2pfivpmawLZ0NKdviQ4w9zJeR8JVtOpOxHmLaJuj0vxvGqMJQWyP/COUkF75/57OKyRag==" + "integrity": "sha1-XFVDRisiru79NtBbNOUceMuG0xM=" }, "fastq": { "version": "1.13.0", @@ -11513,7 +11513,7 @@ "qr.js": { "version": "0.0.0", "resolved": "https://registry.npmjs.org/qr.js/-/qr.js-0.0.0.tgz", - "integrity": "sha512-c4iYnWb+k2E+vYpRimHqSu575b1/wKl4XFeJGpFmrJQz5I88v9aY2czh7s0w36srfCM1sXgC/xpoJz5dJfq+OQ==" + "integrity": "sha1-ys6GOG9ZoNuAUPqQ2baw6IoeNk8=" }, "qrcode.react": { "version": "1.0.1", @@ -12679,7 +12679,7 @@ "slide": { "version": "1.1.6", "resolved": "https://registry.npmjs.org/slide/-/slide-1.1.6.tgz", - "integrity": "sha512-NwrtjCg+lZoqhFU8fOwl4ay2ei8PaqCBOUV3/ektPY9trO1yQ1oXEfmHAhKArUVUr/hOHvy5f6AdP17dCM0zMw==" + "integrity": "sha1-VusCfWW00tzmyy4tMsTUr8nh1wc=" }, "snake-case": { "version": "3.0.4", @@ -15031,7 +15031,7 @@ "wif": { "version": "2.0.6", "resolved": "https://registry.npmjs.org/wif/-/wif-2.0.6.tgz", - "integrity": "sha512-HIanZn1zmduSF+BQhkE+YXIbEiH0xPr1012QbFEGB0xsKqJii0/SqJjyn8dFv6y36kOznMgMB+LGcbZTJ1xACQ==", + "integrity": "sha1-CNP1IFbGZnkplyb63g1DKudLRwQ=", "requires": { "bs58check": "<3.0.0" } @@ -15147,7 +15147,7 @@ "write-file-atomic": { "version": "1.3.4", "resolved": "https://registry.npmjs.org/write-file-atomic/-/write-file-atomic-1.3.4.tgz", - "integrity": "sha512-SdrHoC/yVBPpV0Xq/mUZQIpW2sWXAShb/V4pomcJXh92RuaO+f3UTWItiR3Px+pLnV2PvC2/bfn5cwr5X6Vfxw==", + "integrity": "sha1-+Aek8LHZ6ROuekgRLmzDrxmRtF8=", "requires": { "graceful-fs": "^4.1.11", "imurmurhash": "^0.1.4", @@ -15233,7 +15233,7 @@ "xmlhttprequest": { "version": "1.8.0", "resolved": "https://registry.npmjs.org/xmlhttprequest/-/xmlhttprequest-1.8.0.tgz", - "integrity": "sha512-58Im/U0mlVBLM38NdZjHyhuMtCqa61469k2YP/AaPbvCoV9aQGUpbJBj1QRm2ytRiVQBD/fsw7L2bJGDVQswBA==" + "integrity": "sha1-Z/4HXFwk/vOfnWX197f+dRcZaPw=" }, "xtend": { "version": "4.0.2", diff --git a/creator-node/src/middlewares.js b/creator-node/src/middlewares.js index c2ee072e395..42fd612912e 100644 --- a/creator-node/src/middlewares.js +++ b/creator-node/src/middlewares.js @@ -15,6 +15,9 @@ const { hasEnoughStorageSpace } = require('./fileManager') const { getMonitors, MONITORS } = require('./monitors/monitors') const { verifyRequesterIsValidSP } = require('./apiSigning') const BlacklistManager = require('./blacklistManager') +const { + issueSyncRequestsUntilSynced +} = require('./services/stateMachineManager/stateReconciliation/stateReconciliationUtils') /** * Ensure valid cnodeUser and session exist for provided session token @@ -269,7 +272,7 @@ async function ensureStorageMiddleware(req, res, next) { */ async function issueAndWaitForSecondarySyncRequests(req) { const serviceRegistry = req.app.get('serviceRegistry') - const { snapbackSM } = serviceRegistry + const { manualSyncQueue } = serviceRegistry // Parse request headers const pollingDurationMs = @@ -332,11 +335,13 @@ async function issueAndWaitForSecondarySyncRequests(req) { const replicationStart = Date.now() try { const secondaryPromises = secondaries.map((secondary) => { - return snapbackSM.issueSyncRequestsUntilSynced( + return issueSyncRequestsUntilSynced( + primary, secondary, wallet, primaryClockVal, - pollingDurationMs + pollingDurationMs, + manualSyncQueue ) }) diff --git a/creator-node/src/serviceRegistry.js b/creator-node/src/serviceRegistry.js index ef77f25ce30..9af830a4ac0 100644 --- a/creator-node/src/serviceRegistry.js +++ b/creator-node/src/serviceRegistry.js @@ -172,11 +172,11 @@ class ServiceRegistry { queues: [ stateMonitoringAdapter, stateReconciliationAdapter, + new BullAdapter(this.manualSyncQueue, { readOnlyMode: true }), new BullAdapter(this.cNodeEndpointToSpIdMapQueue, { readOnlyMode: true }), new BullAdapter(this.stateMachineQueue, { readOnlyMode: true }), - new BullAdapter(this.manualSyncQueue, { readOnlyMode: true }), new BullAdapter(this.recurringSyncQueue, { readOnlyMode: true }), new BullAdapter(syncProcessingQueue, { readOnlyMode: true }), new BullAdapter(asyncProcessingQueue, { readOnlyMode: true }), @@ -289,7 +289,8 @@ class ServiceRegistry { const { stateMonitoringQueue, cNodeEndpointToSpIdMapQueue, - stateReconciliationQueue + stateReconciliationQueue, + manualSyncQueue } = await this.stateMachineManager.init(this.libs, this.prometheusRegistry) this.stateMonitoringQueue = stateMonitoringQueue this.cNodeEndpointToSpIdMapQueue = cNodeEndpointToSpIdMapQueue @@ -298,6 +299,7 @@ class ServiceRegistry { // SyncQueue construction (requires L1 identity) // Note - passes in reference to instance of self (serviceRegistry), a very sub-optimal workaround this.syncQueue = new SyncQueue(config, this.redis, this) + this.manualSyncQueue = manualSyncQueue // L2URSMRegistration (requires L1 identity) // Retries indefinitely @@ -441,10 +443,8 @@ class ServiceRegistry { */ async _initSnapbackSM() { this.snapbackSM = new SnapbackSM(config, this.libs) - const { stateMachineQueue, manualSyncQueue, recurringSyncQueue } = - this.snapbackSM + const { stateMachineQueue, recurringSyncQueue } = this.snapbackSM this.stateMachineQueue = stateMachineQueue - this.manualSyncQueue = manualSyncQueue this.recurringSyncQueue = recurringSyncQueue let isInitialized = false diff --git a/creator-node/src/services/stateMachineManager/index.js b/creator-node/src/services/stateMachineManager/index.js index e6a6b5e4a2e..90dd651d150 100644 --- a/creator-node/src/services/stateMachineManager/index.js +++ b/creator-node/src/services/stateMachineManager/index.js @@ -29,9 +29,8 @@ class StateMachineManager { audiusLibs.discoveryProvider.discoveryProviderEndpoint, prometheusRegistry ) - const stateReconciliationQueue = await stateReconciliationManager.init( - prometheusRegistry - ) + const { stateReconciliationQueue, manualSyncQueue } = + await stateReconciliationManager.init(prometheusRegistry) // Upon completion, make jobs record metrics and enqueue other jobs as necessary stateMonitoringQueue.on( @@ -60,7 +59,8 @@ class StateMachineManager { return { stateMonitoringQueue, cNodeEndpointToSpIdMapQueue, - stateReconciliationQueue + stateReconciliationQueue, + manualSyncQueue } } diff --git a/creator-node/src/services/stateMachineManager/stateMachineConstants.js b/creator-node/src/services/stateMachineManager/stateMachineConstants.js index 7f1ac572f24..37509292a40 100644 --- a/creator-node/src/services/stateMachineManager/stateMachineConstants.js +++ b/creator-node/src/services/stateMachineManager/stateMachineConstants.js @@ -2,9 +2,6 @@ module.exports = { // Max number of completed/failed jobs to keep in redis for the state monitoring queue MONITORING_QUEUE_HISTORY: 20, - // Max number of completed/failed jobs to keep in redis for the state monitoring queue - RECONCILIATION_QUEUE_HISTORY: 300, - // Max millis to run a fetch cNodeEndpoint->spId mapping job for before marking it as stalled (1 minute) C_NODE_ENDPOINT_TO_SP_ID_MAP_QUEUE_MAX_JOB_RUNTIME_MS: 1000 * 60, @@ -53,7 +50,9 @@ module.exports = { // Max number of completed/failed jobs to keep in redis for the cNodeEndpoint->spId map queue C_NODE_ENDPOINT_TO_SP_ID_MAP: 100, // Max number of completed/failed jobs to keep in redis for the state monitoring queue - STATE_RECONCILIATION: 300 + STATE_RECONCILIATION: 300, + // Max number of completed/failed jobs to keep in redis for the manual sync queue + MANUAL_SYNC: 300 }), QUEUE_NAMES: Object.freeze({ @@ -62,7 +61,9 @@ module.exports = { // Name of queue that only processes jobs to fetch the cNodeEndpoint->spId mapping, C_NODE_ENDPOINT_TO_SP_ID_MAP: 'c-node-to-endpoint-sp-id-map-queue', // Name of StateReconciliationQueue - STATE_RECONCILIATION: 'state-reconciliation-queue' + STATE_RECONCILIATION: 'state-reconciliation-queue', + // Name of ManualSyncQueue + MANUAL_SYNC: 'manual-sync-queue' }), JOB_NAMES: Object.freeze({ diff --git a/creator-node/src/services/stateMachineManager/stateReconciliation/index.js b/creator-node/src/services/stateMachineManager/stateReconciliation/index.js index 30385012823..5266d44b8e1 100644 --- a/creator-node/src/services/stateMachineManager/stateReconciliation/index.js +++ b/creator-node/src/services/stateMachineManager/stateReconciliation/index.js @@ -5,17 +5,22 @@ const { QUEUE_HISTORY, QUEUE_NAMES, JOB_NAMES, - STATE_RECONCILIATION_QUEUE_MAX_JOB_RUNTIME_MS + STATE_RECONCILIATION_QUEUE_MAX_JOB_RUNTIME_MS, + MANUAL_SYNC_QUEUE_MAX_JOB_RUNTIME_MS } = require('../stateMachineConstants') const processJob = require('../processJob') const { logger: baseLogger, createChildLogger } = require('../../../logging') const handleSyncRequestJobProcessor = require('./issueSyncRequest.jobProcessor') const updateReplicaSetJobProcessor = require('./updateReplicaSet.jobProcessor') -const logger = createChildLogger(baseLogger, { +const reconciliationLogger = createChildLogger(baseLogger, { queue: QUEUE_NAMES.STATE_RECONCILIATION }) +const manualSyncLogger = createChildLogger(baseLogger, { + queue: QUEUE_NAMES.MANUAL_SYNC +}) + /** * Handles setup and job processing of the queue with jobs for: * - issuing sync requests to nodes (this can be other nodes or this node) @@ -23,12 +28,27 @@ const logger = createChildLogger(baseLogger, { */ class StateReconciliationManager { async init(prometheusRegistry) { - const queue = this.makeQueue( - config.get('redisHost'), - config.get('redisPort') - ) + const stateReconciliationQueue = this.makeQueue({ + redisHost: config.get('redisHost'), + redisPort: config.get('redisPort'), + name: QUEUE_NAMES.STATE_RECONCILIATION, + removeOnComplete: QUEUE_HISTORY.RECONCILIATION_QUEUE_HISTORY, + removeOnFail: QUEUE_HISTORY.RECONCILIATION_QUEUE_HISTORY, + lockDuration: STATE_RECONCILIATION_QUEUE_MAX_JOB_RUNTIME_MS + }) + + const manualSyncQueue = this.makeQueue({ + redisHost: config.get('redisHost'), + redisPort: config.get('redisPort'), + name: QUEUE_NAMES.MANUAL_SYNC, + removeOnComplete: QUEUE_HISTORY.MANUAL_SYNC, + removeOnFail: QUEUE_HISTORY.MANUAL_SYNC, + lockDuration: MANUAL_SYNC_QUEUE_MAX_JOB_RUNTIME_MS + }) + this.registerQueueEventHandlersAndJobProcessors({ - queue, + stateReconciliationQueue, + manualSyncQueue, processManualSync: this.makeProcessManualSyncJob(prometheusRegistry).bind(this), processRecurringSync: @@ -38,25 +58,36 @@ class StateReconciliationManager { }) // Clear any old state if redis was running but the rest of the server restarted - await queue.obliterate({ force: true }) + await stateReconciliationQueue.clean({ force: true }) + await manualSyncQueue.clean({ force: true }) - return queue + return { + stateReconciliationQueue, + manualSyncQueue + } } - makeQueue(redisHost, redisPort) { + makeQueue({ + redisHost, + redisPort, + name, + removeOnComplete, + removeOnFail, + lockDuration + }) { // Settings config from https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#advanced-settings - return new BullQueue(QUEUE_NAMES.STATE_RECONCILIATION, { + return new BullQueue(name, { redis: { host: redisHost, port: redisPort }, defaultJobOptions: { - removeOnComplete: QUEUE_HISTORY.RECONCILIATION_QUEUE_HISTORY, - removeOnFail: QUEUE_HISTORY.RECONCILIATION_QUEUE_HISTORY + removeOnComplete: removeOnComplete, + removeOnFail: removeOnFail }, settings: { // Should be sufficiently larger than expected job runtime - lockDuration: STATE_RECONCILIATION_QUEUE_MAX_JOB_RUNTIME_MS, + lockDuration: lockDuration, // We never want to re-process stalled jobs maxStalledCount: 0 } @@ -73,52 +104,58 @@ class StateReconciliationManager { * @param {Function} params.processUpdateReplicaSet the function to call when processing an update-replica-set job from the queue */ registerQueueEventHandlersAndJobProcessors({ - queue, + stateReconciliationQueue, + manualSyncQueue, processManualSync, processRecurringSync, processUpdateReplicaSet }) { // Add handlers for logging - queue.on('global:waiting', (jobId) => { - logger.info(`Queue Job Waiting - ID ${jobId}`) - }) - queue.on('global:active', (jobId, jobPromise) => { - logger.info(`Queue Job Active - ID ${jobId}`) + stateReconciliationQueue.on('global:waiting', (jobId) => { + reconciliationLogger.info(`Queue Job Waiting - ID ${jobId}`) }) - queue.on('global:lock-extension-failed', (jobId, err) => { - logger.error( - `Queue Job Lock Extension Failed - ID ${jobId} - Error ${err}` - ) + stateReconciliationQueue.on('global:active', (jobId, jobPromise) => { + reconciliationLogger.info(`Queue Job Active - ID ${jobId}`) }) - queue.on('global:stalled', (jobId) => { - logger.error(`stateMachineQueue Job Stalled - ID ${jobId}`) + stateReconciliationQueue.on( + 'global:lock-extension-failed', + (jobId, err) => { + reconciliationLogger.error( + `Queue Job Lock Extension Failed - ID ${jobId} - Error ${err}` + ) + } + ) + stateReconciliationQueue.on('global:stalled', (jobId) => { + reconciliationLogger.error(`stateMachineQueue Job Stalled - ID ${jobId}`) }) - queue.on('global:error', (error) => { - logger.error(`Queue Job Error - ${error}`) + stateReconciliationQueue.on('global:error', (error) => { + reconciliationLogger.error(`Queue Job Error - ${error}`) }) // Add handlers for when a job fails to complete (or completes with an error) or successfully completes - queue.on('completed', (job, result) => { - logger.info( + stateReconciliationQueue.on('completed', (job, result) => { + reconciliationLogger.info( `Queue Job Completed - ID ${job?.id} - Result ${JSON.stringify(result)}` ) }) - queue.on('failed', (job, err) => { - logger.error(`Queue Job Failed - ID ${job?.id} - Error ${err}`) + stateReconciliationQueue.on('failed', (job, err) => { + reconciliationLogger.error( + `Queue Job Failed - ID ${job?.id} - Error ${err}` + ) }) // Register the logic that gets executed to process each new job from the queue - queue.process( + manualSyncQueue.process( JOB_NAMES.ISSUE_MANUAL_SYNC_REQUEST, config.get('maxManualRequestSyncJobConcurrency'), processManualSync ) - queue.process( + stateReconciliationQueue.process( JOB_NAMES.ISSUE_RECURRING_SYNC_REQUEST, config.get('maxRecurringRequestSyncJobConcurrency'), processRecurringSync ) - queue.process( + stateReconciliationQueue.process( JOB_NAMES.UPDATE_REPLICA_SET, 1 /** concurrency */, processUpdateReplicaSet @@ -135,7 +172,7 @@ class StateReconciliationManager { JOB_NAMES.ISSUE_MANUAL_SYNC_REQUEST, job, handleSyncRequestJobProcessor, - logger, + manualSyncLogger, prometheusRegistry ) } @@ -146,7 +183,7 @@ class StateReconciliationManager { JOB_NAMES.ISSUE_RECURRING_SYNC_REQUEST, job, handleSyncRequestJobProcessor, - logger, + reconciliationLogger, prometheusRegistry ) } @@ -157,7 +194,7 @@ class StateReconciliationManager { JOB_NAMES.UPDATE_REPLICA_SET, job, updateReplicaSetJobProcessor, - logger, + reconciliationLogger, prometheusRegistry ) } diff --git a/creator-node/src/services/stateMachineManager/stateReconciliation/stateReconciliationUtils.js b/creator-node/src/services/stateMachineManager/stateReconciliation/stateReconciliationUtils.js index 5f887664e70..104546154ac 100644 --- a/creator-node/src/services/stateMachineManager/stateReconciliation/stateReconciliationUtils.js +++ b/creator-node/src/services/stateMachineManager/stateReconciliation/stateReconciliationUtils.js @@ -1,4 +1,7 @@ +const _ = require('lodash') +const axios = require('axios') const { logger } = require('../../../logging') +const Utils = require('../../../utils') const { SyncType, JOB_NAMES } = require('../stateMachineConstants') const SyncRequestDeDuplicator = require('./SyncRequestDeDuplicator') @@ -73,6 +76,97 @@ const getNewOrExistingSyncReq = ({ return { syncReqToEnqueue } } +/** + * Issues syncRequest for user against secondary, and polls for replication up to primary + * If secondary fails to sync within specified timeoutMs, will error + */ +const issueSyncRequestsUntilSynced = async ( + primaryUrl, + secondaryUrl, + wallet, + primaryClockVal, + timeoutMs, + queue +) => { + // Issue syncRequest before polling secondary for replication + const { duplicateSyncReq, syncReqToEnqueue } = getNewOrExistingSyncReq({ + userWallet: wallet, + secondaryEndpoint: secondaryUrl, + primaryEndpoint: primaryUrl, + syncType: SyncType.Manual, + immediate: true + }) + if (!_.isEmpty(duplicateSyncReq)) { + // Log duplicate and return + logger.warn(`Duplicate sync request: ${duplicateSyncReq}`) + return + } else if (!_.isEmpty(syncReqToEnqueue)) { + const { jobName, jobData } = syncReqToEnqueue + await queue.add(jobName, jobData) + } else { + // Log error that the sync request couldn't be created and return + logger.error(`Failed to create manual sync request: ${duplicateSyncReq}`) + return + } + + // Poll clock status and issue syncRequests until secondary is caught up or until timeoutMs + const start = Date.now() + while (Date.now() - start < timeoutMs) { + try { + // Retrieve secondary clock status for user + const secondaryClockStatusResp = await axios({ + method: 'get', + baseURL: secondaryUrl, + url: `/users/clock_status/${wallet}`, + responseType: 'json', + timeout: 1000 // 1000ms = 1s + }) + const { clockValue: secondaryClockVal, syncInProgress } = + secondaryClockStatusResp.data.data + + // If secondary is synced, return successfully + if (secondaryClockVal >= primaryClockVal) { + return + + // Else, if a sync is not already in progress on the secondary, issue a new SyncRequest + } else if (!syncInProgress) { + const { duplicateSyncReq, syncReqToEnqueue } = getNewOrExistingSyncReq({ + userWallet: wallet, + secondaryEndpoint: secondaryUrl, + primaryEndpoint: primaryUrl, + syncType: SyncType.Manual + }) + if (!_.isEmpty(duplicateSyncReq)) { + // Log duplicate and return + logger.warn(`Duplicate sync request: ${duplicateSyncReq}`) + return + } else if (!_.isEmpty(syncReqToEnqueue)) { + const { jobName, jobData } = syncReqToEnqueue + await queue.add(jobName, jobData) + } else { + // Log error that the sync request couldn't be created and return + logger.error( + `Failed to create manual sync request: ${duplicateSyncReq}` + ) + return + } + } + + // Give secondary some time to process ongoing or newly enqueued sync + // NOTE - we might want to make this timeout longer + await Utils.timeout(500) + } catch (e) { + // do nothing and let while loop continue + } + } + + // This condition will only be hit if the secondary has failed to sync within timeoutMs + throw new Error( + `Secondary ${secondaryUrl} did not sync up to primary for user ${wallet} within ${timeoutMs}ms` + ) +} + module.exports = { - getNewOrExistingSyncReq + getNewOrExistingSyncReq, + issueSyncRequestsUntilSynced } diff --git a/creator-node/test/StateReconciliationManager.test.js b/creator-node/test/StateReconciliationManager.test.js index 2d833b51b67..9a92255552b 100644 --- a/creator-node/test/StateReconciliationManager.test.js +++ b/creator-node/test/StateReconciliationManager.test.js @@ -70,17 +70,19 @@ describe('test StateReconciliationManager initialization, events, and job proces return { processJobMock, loggerStub } } - it('creates the queue and registers its event handlers', async function () { + it('creates the stateReconciliationQueue and registers its event handlers', async function () { // Initialize StateReconciliationManager and spy on its registerQueueEventHandlersAndJobProcessors function const stateReconciliationManager = new StateReconciliationManager() sandbox.spy( stateReconciliationManager, 'registerQueueEventHandlersAndJobProcessors' ) - const queue = await stateReconciliationManager.init(getPrometheusRegistry()) + const { stateReconciliationQueue } = await stateReconciliationManager.init( + getPrometheusRegistry() + ) - // Verify that the queue was successfully initialized and that its event listeners were registered - expect(queue).to.exist.and.to.be.instanceOf(BullQueue) + // Verify that the stateReconciliationQueue was successfully initialized and that its event listeners were registered + expect(stateReconciliationQueue).to.exist.and.to.be.instanceOf(BullQueue) expect( stateReconciliationManager.registerQueueEventHandlersAndJobProcessors ).to.have.been.calledOnce @@ -89,7 +91,7 @@ describe('test StateReconciliationManager initialization, events, and job proces 0 ).args[0] ) - .to.have.property('queue') + .to.have.property('stateReconciliationQueue') .that.has.deep.property('name', QUEUE_NAMES.STATE_RECONCILIATION) })