Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

activeQueues option not working as intended with BullMQ job queue strategy #2419

Open
asonnleitner opened this issue Sep 29, 2023 · 3 comments
Assignees
Labels
type: bug 🐛 Something isn't working

Comments

@asonnleitner
Copy link
Contributor

Description
When using the activeQueues option in JobQueueOptions, the observed behavior with the BullMQ job queue strategy is inconsistent with expectations. All worker processes pull jobs from a single queue, even when different activeQueues are specified. Consequently, jobs not listed in the activeQueues option are marked as failed by workers.

`JobQueueOptions`

export interface JobQueueOptions {
/**
* @description
* Defines how the jobs in the queue are persisted and accessed.
*
* @default InMemoryJobQueueStrategy
*/
jobQueueStrategy?: JobQueueStrategy;
jobBufferStorageStrategy?: JobBufferStorageStrategy;
/**
* @description
* Defines the queues that will run in this process.
* This can be used to configure only certain queues to run in this process.
* If its empty all queues will be run. Note: this option is primarily intended
* to apply to the Worker process. Jobs will _always_ get published to the queue
* regardless of this setting, but this setting determines whether they get
* _processed_ or not.
*/
activeQueues?: string[];
/**
* @description
* Prefixes all job queue names with the passed string. This is useful with multiple deployments
* in cloud environments using services such as Amazon SQS or Google Cloud Tasks.
*
* For example, we might have a staging and a production deployment in the same account/project and
* each one will need its own task queue. We can achieve this with a prefix.
*
* @since 1.5.0
*/
prefix?: string;
}

In the BullMQ strategy, only one queue is created:

this.queue = new Queue(QUEUE_NAME, {
...options.queueOptions,
connection: this.redisConnection,
})
.on('error', (e: any) =>
Logger.error(`BullMQ Queue error: ${JSON.stringify(e.message)}`, loggerCtx, e.stack),
)
.on('resumed', () => Logger.verbose('BullMQ Queue resumed', loggerCtx))
.on('paused', () => Logger.verbose('BullMQ Queue paused', loggerCtx));

Proposed Behavior
I believe a more robust solution would be to create a separate queue for each entry in the activeQueues option. Each worker process would then pull jobs only from its corresponding queue. While it might be possible to use a single queue and have each worker check the activeQueues before processing, this approach could introduce delays in job processing.

When running multiple worker processes with defined activeQueues, jobs are pulled from the single queue. This causes jobs to fail if they are not defined in activeQueues.

To Reproduce

  1. Set up Vendure with BullMQ and Redis.
  2. Run multiple worker processes with different activeQueues.
  3. Observe that all workers pull jobs from the same queue, causing jobs not defined in activeQueues to fail rather than be skipped.

Expected behavior
Each worker process should only process jobs from the queues listed in their activeQueues option. Ideally, separate queues should be created for each activeQueues entry to avoid unnecessary delays in job processing.

@asonnleitner asonnleitner added the type: bug 🐛 Something isn't working label Sep 29, 2023
@asonnleitner asonnleitner changed the title activeQueues option not working as intended with BullMQ strategy activeQueues option not working as intended with BullMQ job queue strategy Sep 29, 2023
@michaelbromley
Copy link
Member

Duplicate of

I did spend some time looking into this, and specifically started working on a proof-of-concept based on a separate BullMQ queue for each Vendure queue. That design would indeed make the activeQueues functionality easy to implement.

However, it also comes with a some challenges I was not able to yet resolve:

  • How do we then implement the job list with pagination? Let's say we want to return "10 most recent jobs" - we then need to fetch from each individual queue and combine the results. For the 1st page this is ok. For subsequent pages it starts to get pretty complex.
  • When fetching an individual job, we would then need to know not only the job ID (as is required by the current API), but also we would need to know what queue, so we know which BullMQ queue to fetch it from.

These issues may also be solvable but they require more time. If you are interested in trying to solve, I would very much welcome assistance.

@asonnleitner
Copy link
Contributor Author

asonnleitner commented Sep 29, 2023

Duplicate of

I did spend some time looking into this, and specifically started working on a proof-of-concept based on a separate BullMQ queue for each Vendure queue. That design would indeed make the activeQueues functionality easy to implement.

However, it also comes with a some challenges I was not able to yet resolve:

  • How do we then implement the job list with pagination? Let's say we want to return "10 most recent jobs" - we then need to fetch from each individual queue and combine the results. For the 1st page this is ok. For subsequent pages it starts to get pretty complex.
  • When fetching an individual job, we would then need to know not only the job ID (as is required by the current API), but also we would need to know what queue, so we know which BullMQ queue to fetch it from.

These issues may also be solvable but they require more time. If you are interested in trying to solve, I would very much welcome assistance.

Thanks for the quick response and the detailed explanation of the challenges involved.

Regarding the pagination issue, one approach could be to maintain a centralized list of jobs that is updated whenever a job is added or removed from a queue. This list could then be used to fetch jobs for the job list. But I'm unsure about the performance implications, it could simplify the pagination logic.

As for fetching individual jobs, we could combine the job ID and queue name into a single identifier. For example, the job ID could be formatted as queueName:jobId, allowing us to fetch the job from the correct queue.

I'm not certain if these approaches would work in practice, but I'm interested in hearing your thoughts.

@asonnleitner asonnleitner closed this as not planned Won't fix, can't repro, duplicate, stale Sep 29, 2023
@michaelbromley
Copy link
Member

As for fetching individual jobs, we could combine the job ID and queue name into a single identifier. For example, the job ID could be formatted as queueName:jobId, allowing us to fetch the job from the correct queue.

Yes, this would probably work.

Regarding the pagination issue, one approach could be to maintain a centralized list of jobs that is updated whenever a job is added or removed from a queue.

The main issue with this is not the perf (redis is just mega fast), it's the fact that we would be re-implementing parts of BullMQ. Bull has some pretty sophisticated logic around how it manages its queues (as represented by the hundreds and hundreds of lines of custom Lua scripts that powers it), which is why I didn't opt for this route so far.

So, both of these ideas are worth further exploration.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug 🐛 Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants