Skip to content

Commit

Permalink
Remove extra done() callback and add sync job result (#3634)
Browse files Browse the repository at this point in the history
  • Loading branch information
theoilie authored Aug 4, 2022
1 parent be35ce8 commit 27433f5
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 8 deletions.
14 changes: 11 additions & 3 deletions creator-node/src/services/sync/secondarySyncFromPrimary.js
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,15 @@ const handleSyncFromPrimary = async (
* Every subsequent sync will enter the if case and update the existing local cnodeUserRecord.
*/
if (cnodeUserRecord) {
logger.info(
logPrefix,
`cNodeUserRecord was non-empty -- updating CNodeUser for cnodeUser wallet ${fetchedWalletPublicKey}. Clock value: ${fetchedLatestClockVal}`
)
const [numRowsUpdated, respObj] = await models.CNodeUser.update(
{
lastLogin: fetchedCNodeUser.lastLogin,
latestBlockNumber: fetchedLatestBlockNumber,
clock: fetchedCNodeUser.clock,
clock: fetchedLatestClockVal,
createdAt: fetchedCNodeUser.createdAt
},
{
Expand Down Expand Up @@ -307,13 +311,17 @@ const handleSyncFromPrimary = async (
}
cnodeUser = respObj[0]
} else {
logger.info(
logPrefix,
`cNodeUserRecord was empty -- inserting CNodeUser for cnodeUser wallet ${fetchedWalletPublicKey}. Clock value: ${fetchedLatestClockVal}`
)
// Will throw error if creation fails
cnodeUser = await models.CNodeUser.create(
{
walletPublicKey: fetchedWalletPublicKey,
lastLogin: fetchedCNodeUser.lastLogin,
latestBlockNumber: fetchedLatestBlockNumber,
clock: fetchedCNodeUser.clock,
clock: fetchedLatestClockVal,
createdAt: fetchedCNodeUser.createdAt
},
{
Expand All @@ -326,7 +334,7 @@ const handleSyncFromPrimary = async (
const cnodeUserUUID = cnodeUser.cnodeUserUUID
logger.info(
logPrefix,
`Inserted CNodeUser for cnodeUser wallet ${fetchedWalletPublicKey}: cnodeUserUUID: ${cnodeUserUUID}`
`Upserted CNodeUser for cnodeUser wallet ${fetchedWalletPublicKey}: cnodeUserUUID: ${cnodeUserUUID}. Clock value: ${fetchedLatestClockVal}`
)

/**
Expand Down
4 changes: 3 additions & 1 deletion creator-node/src/services/sync/syncImmediateQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ class SyncImmediateQueue {
removeOnFail: SYNC_QUEUE_HISTORY
},
settings: {
lockDuration: LOCK_DURATION
lockDuration: LOCK_DURATION,
// We never want to re-process stalled jobs
maxStalledCount: 0
}
})

Expand Down
12 changes: 8 additions & 4 deletions creator-node/src/services/sync/syncQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ class SyncQueue {
removeOnFail: SYNC_QUEUE_HISTORY
},
settings: {
lockDuration: LOCK_DURATION
lockDuration: LOCK_DURATION,
// We never want to re-process stalled jobs
maxStalledCount: 0
}
})

Expand All @@ -46,11 +48,12 @@ class SyncQueue {
const jobProcessorConcurrency = this.nodeConfig.get(
'syncQueueMaxConcurrency'
)
this.queue.process(jobProcessorConcurrency, async (job, done) => {
this.queue.process(jobProcessorConcurrency, async (job) => {
const { walletPublicKeys, creatorNodeEndpoint, forceResync } = job.data

let result = {}
try {
await secondarySyncFromPrimary(
result = await secondarySyncFromPrimary(
this.serviceRegistry,
walletPublicKeys,
creatorNodeEndpoint,
Expand All @@ -62,9 +65,10 @@ class SyncQueue {
`secondarySyncFromPrimary failure for wallets ${walletPublicKeys} against ${creatorNodeEndpoint}`,
e.message
)
result = { error: e.message }
}

done()
return result
})
}

Expand Down

0 comments on commit 27433f5

Please sign in to comment.