Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some additional utils #667

Open
anthonygoslar opened this issue Aug 28, 2024 · 0 comments
Open

Some additional utils #667

anthonygoslar opened this issue Aug 28, 2024 · 0 comments

Comments

@anthonygoslar
Copy link

I've run into instances where I have multiple repeated tasks in queue. I thought I'd share them in case they're worth adding to django_rq:

# path/to/utils.py

def func_in_task_queue(queue_name:str , job_func: str, *args, **kwargs) -> bool:
    queue = django_rq.get_queue(queue_name)
    # check for the function in the queue
    return any(job.func_name == job_func and job.args == args and job.kwargs == kwargs for job in queue.jobs)


def job_in_task_queue(queue_name: str, job_id: str) -> bool:
    queue = django_rq.get_queue(queue_name)
    # check for the function in the queue
    return any(job.id == job_id for job in queue.jobs)

Then the use is something like:

# path/to/utils.py

def update_record(instance):
    job_id = f"record_{instance.id}"
    
    if not job_in_task_queue(queue_name="default", job_id=job_id):
        queue.enqueue(
            client.make_record_api_call, 
            record, 
            job_id=job_id
        )

Thanks for the work on the package.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant