Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error when running queues in separate workers - Bullmq strategy #1726

Open
ttournie opened this issue Aug 19, 2022 · 2 comments
Open

Error when running queues in separate workers - Bullmq strategy #1726

ttournie opened this issue Aug 19, 2022 · 2 comments
Assignees
Labels
Milestone

Comments

@ttournie
Copy link
Contributor

Describe the bug
When running 2 workers with separate queues I get errors from the server and the worker that is not supposed to run the queue still tries to do it.

To Reproduce
Steps to reproduce the behavior:

  1. Create 2 worker, une dedicated to indexing, the other to apply-collection-filter
  2. Run the project
  3. Add/modify a product to trigger some jobs
  4. See error in console

Expected behavior
No error should be displayed and jobs thaht does not concerne a worker should be ignored

Environment (please complete the following information):

  • @vendure/core version: 1.6.4
  • Nodejs version: 16.16
  • Database (mysql/postgres etc): postgres
@ttournie ttournie added the type: bug 🐛 Something isn't working label Aug 19, 2022
@michaelbromley
Copy link
Member

michaelbromley commented Aug 19, 2022

Note, error seems to come from this part:

if (processFn) {
const job = await this.createVendureJob(bullJob);
try {
job.on('progress', _job => bullJob.updateProgress(_job.progress));
const result = await processFn(job);
await bullJob.updateProgress(100);
return result;
} catch (e) {
throw e;
}
}
throw new InternalServerError(`No processor defined for the queue "${queueName}"`);

in the case of worker instances configured to only handle a subset of jobs, the situation of having no defined processFn is actually expected and not an exception situation.

Problem - if the queue that should not be processing the job consumes the job while ignoring it, that job will never get processed. So we need a way to either ignore certain jobs (not pull them from the queue) or return them to the queue.

@michaelbromley
Copy link
Member

Just spent a bit more time looking into this. Fundamentally the issue is that all Vendure job queues are stored in a single BullMQ Queue instance, which means there is 1 Redis list of job Ids which all workers will take jobs from.

BullMQ does not have any kind of "filtering" feature which allows a job to be conditionally picked up by a worker based on some function.

The closest thing I can find is that in the workerOptions there is a stalledInterval setting which controls how often checks are made for stalled jobs. When a job is stalled, it will be returned to the "waiting" queue and can then be picked up by another worker.

Using this "stalled" mechanism however - if it could be made to work - would be a hack so it's not something I want to do in the plugin itself.

Perhaps we can create a subclass of the Worker class which overrides the getNextJob method and implements a filter for the job queue name.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: ♻️ In progress
Development

No branches or pull requests

3 participants