Skip to content

Commit

Permalink
New queue for immediate sync (#3619)
Browse files Browse the repository at this point in the history
* Add new sync immediate queue for manual syncs

* lint

* extend check to syncImmediatequeue

* bugfix

* lint

* Remove done
  • Loading branch information
dmanjunath authored Aug 3, 2022
1 parent 5cef789 commit 9b5f4c9
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 9 deletions.
17 changes: 10 additions & 7 deletions creator-node/src/components/replicaSet/replicaSetController.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ const {
respondToURSMRequestForSignature
} = require('./URSMRegistrationComponentService')
const { ensureStorageMiddleware } = require('../../middlewares')
const { enqueueSync } = require('./syncQueueComponentService')
const secondarySyncFromPrimary = require('../../services/sync/secondarySyncFromPrimary')
const {
enqueueSync,
processImmediateSync
} = require('./syncQueueComponentService')

const router = express.Router()

Expand Down Expand Up @@ -60,7 +62,10 @@ const respondToURSMRequestForProposalController = async (req) => {
*/
const syncRouteController = async (req, res) => {
const serviceRegistry = req.app.get('serviceRegistry')
if (_.isEmpty(serviceRegistry?.syncQueue)) {
if (
_.isEmpty(serviceRegistry?.syncQueue) ||
_.isEmpty(serviceRegistry?.syncImmediateQueue)
) {
return errorResponseServerError('Sync Queue is not up and running yet')
}
const nodeConfig = serviceRegistry.nodeConfig
Expand Down Expand Up @@ -95,13 +100,12 @@ const syncRouteController = async (req, res) => {
*/
if (immediate) {
try {
await secondarySyncFromPrimary(
await processImmediateSync({
serviceRegistry,
walletPublicKeys,
creatorNodeEndpoint,
blockNumber,
forceResync
)
})
} catch (e) {
return errorResponseServerError(e)
}
Expand All @@ -120,7 +124,6 @@ const syncRouteController = async (req, res) => {
serviceRegistry,
walletPublicKeys: [wallet],
creatorNodeEndpoint,
blockNumber,
forceResync
})
delete syncDebounceQueue[wallet]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@ const enqueueSync = async ({
})
}

const processImmediateSync = async ({
serviceRegistry,
walletPublicKeys,
creatorNodeEndpoint,
forceResync
}) => {
await serviceRegistry.syncImmediateQueue.processImmediateSync({
walletPublicKeys,
creatorNodeEndpoint,
forceResync
})
}

module.exports = {
enqueueSync
enqueueSync,
processImmediateSync
}
2 changes: 1 addition & 1 deletion creator-node/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ const config = convict({
doc: 'Max concurrency of SyncQueue',
format: 'nat',
env: 'syncQueueMaxConcurrency',
default: 50
default: 30
},
issueAndWaitForSecondarySyncRequestsPollingDurationMs: {
doc: 'Duration for which to poll secondaries for content replication in `issueAndWaitForSecondarySyncRequests` function',
Expand Down
5 changes: 5 additions & 0 deletions creator-node/src/serviceRegistry.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const utils = require('./utils')
const config = require('./config')
const MonitoringQueue = require('./monitors/MonitoringQueue')
const SyncQueue = require('./services/sync/syncQueue')
const SyncImmediateQueue = require('./services/sync/syncImmediateQueue')
const SkippedCIDsRetryQueue = require('./services/sync/skippedCIDsRetryService')
const SessionExpirationQueue = require('./services/SessionExpirationQueue')
const AsyncProcessingQueue = require('./AsyncProcessingQueue')
Expand Down Expand Up @@ -54,6 +55,7 @@ class ServiceRegistry {
this.transcodingQueue = TranscodingQueue // Transcodes and segments all tracks
this.skippedCIDsRetryQueue = null // Retries syncing CIDs that were unable to sync on first try
this.syncQueue = null // Handles syncing data to users' replica sets
this.syncImmediateQueue = null // Handles syncing manual immediate jobs
this.asyncProcessingQueue = null // Handles all jobs that should be performed asynchronously. Currently handles track upload and track hand off
this.monitorStateQueue = null // Handles jobs for slicing batches of users and gathering data about them
this.findSyncRequestsQueue = null // Handles jobs for finding sync requests
Expand Down Expand Up @@ -136,6 +138,7 @@ class ServiceRegistry {
this.logInfo('Setting up Bull queue monitoring...')

const { queue: syncProcessingQueue } = this.syncQueue
const { queue: syncImmediateProcessingQueue } = this.syncImmediateQueue
const { queue: asyncProcessingQueue } = this.asyncProcessingQueue
const { queue: imageProcessingQueue } = this.imageProcessingQueue
const { queue: transcodingQueue } = this.transcodingQueue
Expand Down Expand Up @@ -193,6 +196,7 @@ class ServiceRegistry {
new BullAdapter(this.updateReplicaSetQueue, { readOnlyMode: true }),
new BullAdapter(this.stateMachineQueue, { readOnlyMode: true }),
new BullAdapter(syncProcessingQueue, { readOnlyMode: true }),
new BullAdapter(syncImmediateProcessingQueue, { readOnlyMode: true }),
new BullAdapter(asyncProcessingQueue, { readOnlyMode: true }),
new BullAdapter(imageProcessingQueue, { readOnlyMode: true }),
new BullAdapter(transcodingQueue, { readOnlyMode: true }),
Expand Down Expand Up @@ -322,6 +326,7 @@ class ServiceRegistry {
// SyncQueue construction (requires L1 identity)
// Note - passes in reference to instance of self (serviceRegistry), a very sub-optimal workaround
this.syncQueue = new SyncQueue(config, this.redis, this)
this.syncImmediateQueue = new SyncImmediateQueue(config, this.redis, this)

// L2URSMRegistration (requires L1 identity)
// Retries indefinitely
Expand Down
75 changes: 75 additions & 0 deletions creator-node/src/services/sync/syncImmediateQueue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
const Bull = require('bull')

const { logger } = require('../../logging')
const secondarySyncFromPrimary = require('./secondarySyncFromPrimary')

const SYNC_QUEUE_HISTORY = 500
const LOCK_DURATION = 1000 * 60 * 5 // 5 minutes

/**
* SyncImmediateQueue - handles enqueuing and processing of immediate manual Sync jobs on secondary
* sync job = this node (secondary) will sync data for a user from their primary
* this queue is only for manual immediate syncs which are awaited until they're finished, for regular
* syncs look at SyncQueue
*/
class SyncImmediateQueue {
/**
* Construct bull queue and define job processor
* @notice - accepts `serviceRegistry` instance, even though this class is initialized
* in that serviceRegistry instance. A sub-optimal workaround for now.
*/
constructor(nodeConfig, redis, serviceRegistry) {
this.nodeConfig = nodeConfig
this.redis = redis
this.serviceRegistry = serviceRegistry

this.queue = new Bull('sync-immediate-processing-queue', {
redis: {
host: this.nodeConfig.get('redisHost'),
port: this.nodeConfig.get('redisPort')
},
defaultJobOptions: {
removeOnComplete: SYNC_QUEUE_HISTORY,
removeOnFail: SYNC_QUEUE_HISTORY
},
settings: {
lockDuration: LOCK_DURATION
}
})

const jobProcessorConcurrency = this.nodeConfig.get(
'syncQueueMaxConcurrency'
)
this.queue.process(jobProcessorConcurrency, async (job) => {
const { walletPublicKeys, creatorNodeEndpoint, forceResync } = job.data

try {
await secondarySyncFromPrimary(
this.serviceRegistry,
walletPublicKeys,
creatorNodeEndpoint,
null, // blockNumber
forceResync
)
} catch (e) {
logger.error(
`secondarySyncFromPrimary failure for wallets ${walletPublicKeys} against ${creatorNodeEndpoint}`,
e.message
)
}
})
}

async processImmediateSync({
walletPublicKeys,
creatorNodeEndpoint,
forceResync
}) {
const jobProps = { walletPublicKeys, creatorNodeEndpoint, forceResync }
const job = await this.queue.add(jobProps)
const result = await job.finished()
return result
}
}

module.exports = SyncImmediateQueue
4 changes: 4 additions & 0 deletions creator-node/src/services/sync/syncQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const { logger } = require('../../logging')
const secondarySyncFromPrimary = require('./secondarySyncFromPrimary')

const SYNC_QUEUE_HISTORY = 500
const LOCK_DURATION = 1000 * 60 * 30 // 30 minutes

/**
* SyncQueue - handles enqueuing and processing of Sync jobs on secondary
Expand All @@ -28,6 +29,9 @@ class SyncQueue {
defaultJobOptions: {
removeOnComplete: SYNC_QUEUE_HISTORY,
removeOnFail: SYNC_QUEUE_HISTORY
},
settings: {
lockDuration: LOCK_DURATION
}
})

Expand Down

0 comments on commit 9b5f4c9

Please sign in to comment.