diff --git a/creator-node/src/services/sync/secondarySyncFromPrimary.js b/creator-node/src/services/sync/secondarySyncFromPrimary.js index 0995cfa0a48..cb77fa37b39 100644 --- a/creator-node/src/services/sync/secondarySyncFromPrimary.js +++ b/creator-node/src/services/sync/secondarySyncFromPrimary.js @@ -275,11 +275,15 @@ const handleSyncFromPrimary = async ( * Every subsequent sync will enter the if case and update the existing local cnodeUserRecord. */ if (cnodeUserRecord) { + logger.info( + logPrefix, + `cNodeUserRecord was non-empty -- updating CNodeUser for cnodeUser wallet ${fetchedWalletPublicKey}. Clock value: ${fetchedLatestClockVal}` + ) const [numRowsUpdated, respObj] = await models.CNodeUser.update( { lastLogin: fetchedCNodeUser.lastLogin, latestBlockNumber: fetchedLatestBlockNumber, - clock: fetchedCNodeUser.clock, + clock: fetchedLatestClockVal, createdAt: fetchedCNodeUser.createdAt }, { @@ -307,13 +311,17 @@ const handleSyncFromPrimary = async ( } cnodeUser = respObj[0] } else { + logger.info( + logPrefix, + `cNodeUserRecord was empty -- inserting CNodeUser for cnodeUser wallet ${fetchedWalletPublicKey}. Clock value: ${fetchedLatestClockVal}` + ) // Will throw error if creation fails cnodeUser = await models.CNodeUser.create( { walletPublicKey: fetchedWalletPublicKey, lastLogin: fetchedCNodeUser.lastLogin, latestBlockNumber: fetchedLatestBlockNumber, - clock: fetchedCNodeUser.clock, + clock: fetchedLatestClockVal, createdAt: fetchedCNodeUser.createdAt }, { @@ -326,7 +334,7 @@ const handleSyncFromPrimary = async ( const cnodeUserUUID = cnodeUser.cnodeUserUUID logger.info( logPrefix, - `Inserted CNodeUser for cnodeUser wallet ${fetchedWalletPublicKey}: cnodeUserUUID: ${cnodeUserUUID}` + `Upserted CNodeUser for cnodeUser wallet ${fetchedWalletPublicKey}: cnodeUserUUID: ${cnodeUserUUID}. Clock value: ${fetchedLatestClockVal}` ) /** diff --git a/creator-node/src/services/sync/syncImmediateQueue.js b/creator-node/src/services/sync/syncImmediateQueue.js index 6c596d3fb7d..475493bbedb 100644 --- a/creator-node/src/services/sync/syncImmediateQueue.js +++ b/creator-node/src/services/sync/syncImmediateQueue.js @@ -33,7 +33,9 @@ class SyncImmediateQueue { removeOnFail: SYNC_QUEUE_HISTORY }, settings: { - lockDuration: LOCK_DURATION + lockDuration: LOCK_DURATION, + // We never want to re-process stalled jobs + maxStalledCount: 0 } }) diff --git a/creator-node/src/services/sync/syncQueue.js b/creator-node/src/services/sync/syncQueue.js index 1709a1b6098..e4c33070f6d 100644 --- a/creator-node/src/services/sync/syncQueue.js +++ b/creator-node/src/services/sync/syncQueue.js @@ -31,7 +31,9 @@ class SyncQueue { removeOnFail: SYNC_QUEUE_HISTORY }, settings: { - lockDuration: LOCK_DURATION + lockDuration: LOCK_DURATION, + // We never want to re-process stalled jobs + maxStalledCount: 0 } }) @@ -46,11 +48,12 @@ class SyncQueue { const jobProcessorConcurrency = this.nodeConfig.get( 'syncQueueMaxConcurrency' ) - this.queue.process(jobProcessorConcurrency, async (job, done) => { + this.queue.process(jobProcessorConcurrency, async (job) => { const { walletPublicKeys, creatorNodeEndpoint, forceResync } = job.data + let result = {} try { - await secondarySyncFromPrimary( + result = await secondarySyncFromPrimary( this.serviceRegistry, walletPublicKeys, creatorNodeEndpoint, @@ -62,9 +65,10 @@ class SyncQueue { `secondarySyncFromPrimary failure for wallets ${walletPublicKeys} against ${creatorNodeEndpoint}`, e.message ) + result = { error: e.message } } - done() + return result }) }