Skip to content

Commit

Permalink
Make queues limit how many jobs they add to each other (#3807)
Browse files Browse the repository at this point in the history
  • Loading branch information
theoilie authored Sep 1, 2022
1 parent aabd49a commit a475c31
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 11 deletions.
37 changes: 29 additions & 8 deletions creator-node/src/services/stateMachineManager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,36 @@ class StateMachineManager {

// Upon completion, make queue jobs record metrics and enqueue other jobs as necessary
const queueNameToQueueMap = {
[QUEUE_NAMES.MONITOR_STATE]: monitorStateQueue,
[QUEUE_NAMES.FIND_SYNC_REQUESTS]: findSyncRequestsQueue,
[QUEUE_NAMES.FIND_REPLICA_SET_UPDATES]: findReplicaSetUpdatesQueue,
[QUEUE_NAMES.MANUAL_SYNC]: manualSyncQueue,
[QUEUE_NAMES.RECURRING_SYNC]: recurringSyncQueue,
[QUEUE_NAMES.UPDATE_REPLICA_SET]: updateReplicaSetQueue,
[QUEUE_NAMES.RECOVER_ORPHANED_DATA]: recoverOrphanedDataQueue
[QUEUE_NAMES.MONITOR_STATE]: {
queue: monitorStateQueue,
maxWaitingJobs: 10
},
[QUEUE_NAMES.FIND_SYNC_REQUESTS]: {
queue: findSyncRequestsQueue,
maxWaitingJobs: 10
},
[QUEUE_NAMES.FIND_REPLICA_SET_UPDATES]: {
queue: findReplicaSetUpdatesQueue,
maxWaitingJobs: 10
},
[QUEUE_NAMES.MANUAL_SYNC]: {
queue: manualSyncQueue,
maxWaitingJobs: 1000
},
[QUEUE_NAMES.RECURRING_SYNC]: {
queue: recurringSyncQueue,
maxWaitingJobs: 1000
},
[QUEUE_NAMES.UPDATE_REPLICA_SET]: {
queue: updateReplicaSetQueue,
maxWaitingJobs: 1000
},
[QUEUE_NAMES.RECOVER_ORPHANED_DATA]: {
queue: recoverOrphanedDataQueue,
maxWaitingJobs: 10
}
}
for (const [queueName, queue] of Object.entries(queueNameToQueueMap)) {
for (const [queueName, { queue }] of Object.entries(queueNameToQueueMap)) {
queue.on(
'global:completed',
makeOnCompleteCallback(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const {
* See usage in index.js (in same directory) for example of how it's bound to StateMachineManager.
*
* @param {string} nameOfQueueWithCompletedJob the name of the queue that this onComplete callback is for
* @param {Object} queueNameToQueueMap mapping of queue name (string) to queue object (BullQueue)
* @param {Object} queueNameToQueueMap mapping of queue name (string) to queue object (BullQueue) and max jobs that are allowed to be waiting in the queue
* @param {Object} prometheusRegistry the registry of prometheus metrics
* @returns a function that:
* - takes a jobId (string) and result (string) of a job that successfully completed
Expand Down Expand Up @@ -88,7 +88,7 @@ module.exports = function (
jobsToEnqueue || {}
) as [TQUEUE_NAMES, ParamsForJobsToEnqueue[]][]) {
// Make sure we're working with a valid queue
const queue: Queue = queueNameToQueueMap[queueName]
const { queue, maxWaitingJobs } = queueNameToQueueMap[queueName]
if (!queue) {
logger.error(
`Job returned data trying to enqueue jobs to a queue whose name isn't recognized: ${queueName}`
Expand Down Expand Up @@ -117,6 +117,7 @@ module.exports = function (
queue,
queueName,
nameOfQueueWithCompletedJob,
maxWaitingJobs,
jobId,
logger
)
Expand All @@ -131,13 +132,23 @@ const enqueueJobs = async (
queueToAddTo: Queue,
queueNameToAddTo: TQUEUE_NAMES,
triggeredByQueueName: TQUEUE_NAMES,
maxWaitingJobs: number,
triggeredByJobId: string,
logger: Logger
) => {
logger.info(
`Attempting to add ${jobs?.length} jobs in bulk to queue ${queueNameToAddTo}`
)

// Don't add to the queue if the queue is already backed up (i.e., it has too many waiting jobs)
const numWaitingJobs = await queueToAddTo.getWaitingCount()
if (numWaitingJobs > maxWaitingJobs) {
logger.warn(
`Queue ${queueNameToAddTo} already has ${numWaitingJobs} waiting jobs. Not adding any more jobs until ${maxWaitingJobs} or fewer jobs are waiting in this queue`
)
return
}

// Add 'enqueuedBy' field for tracking
try {
const bulkAddResult = await queueToAddTo.addBulk(
Expand Down
8 changes: 7 additions & 1 deletion creator-node/src/services/stateMachineManager/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ import type { TQUEUE_NAMES } from './stateMachineConstants'

import { Queue } from 'bull'

export type QueueNameToQueueMap = Record<TQUEUE_NAMES, Queue>
export type QueueNameToQueueMap = Record<
TQUEUE_NAMES,
{
queue: Queue
maxWaitingJobs: number
}
>

export type WalletsToSecondariesMapping = {
[wallet: string]: string[]
Expand Down

0 comments on commit a475c31

Please sign in to comment.