Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues related to rq job dependencies #7139

Merged
merged 13 commits into from
Dec 1, 2023
2 changes: 1 addition & 1 deletion cvat/apps/engine/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,7 @@ def export(db_instance, request, queue_name):
args=(db_instance, Exporter, '{}_backup.zip'.format(obj_type), logger, cache_ttl),
job_id=rq_id,
meta=get_rq_job_meta(request=request, db_obj=db_instance),
depends_on=define_dependent_job(queue, user_id),
depends_on=define_dependent_job(queue, user_id, rq_id=rq_id),
result_ttl=ttl,
failure_ttl=ttl,
)
Expand Down
28 changes: 24 additions & 4 deletions cvat/apps/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,12 @@ def process_failed_job(rq_job: Job):
return msg


def define_dependent_job(queue: DjangoRQ, user_id: int, should_be_dependent: bool = settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER) -> Optional[Dependency]:
def define_dependent_job(
queue: DjangoRQ,
user_id: int,
should_be_dependent: bool = settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER,
rq_id: Optional[str] = None,
SpecLad marked this conversation as resolved.
Show resolved Hide resolved
) -> Optional[Dependency]:
if not should_be_dependent:
return None

Expand All @@ -171,9 +176,24 @@ def define_dependent_job(queue: DjangoRQ, user_id: int, should_be_dependent: boo
for job in queue.job_class.fetch_many(
queue.deferred_job_registry.get_job_ids(), queue.connection
)
if job and job.meta.get("user", {}).get("id") == user_id
if job and job.meta.get("user", {}).get("id") == user_id and job.is_deferred
SpecLad marked this conversation as resolved.
Show resolved Hide resolved
]
user_jobs = list(filter(lambda job: not job.meta.get(KEY_TO_EXCLUDE_FROM_DEPENDENCY), started_user_jobs + deferred_user_jobs))
all_user_jobs = started_user_jobs + deferred_user_jobs

all_job_dependency_ids = []

# prevent possible cyclic dependencies
if rq_id:
for job in all_user_jobs:
if job.dependency_ids:
all_job_dependency_ids.extend([i.decode() for i in job.dependency_ids])
SpecLad marked this conversation as resolved.
Show resolved Hide resolved

user_jobs = list(
filter(
lambda job: not job.meta.get(KEY_TO_EXCLUDE_FROM_DEPENDENCY),
all_user_jobs
)
) if not rq_id or rq_id and Job.redis_job_namespace_prefix + rq_id not in set(all_job_dependency_ids) else []
SpecLad marked this conversation as resolved.
Show resolved Hide resolved

return Dependency(jobs=[sorted(user_jobs, key=lambda job: job.created_at)[-1]], allow_failure=True) if user_jobs else None

Expand Down Expand Up @@ -218,7 +238,7 @@ def configure_dependent_job_to_download_from_cs(
},
result_ttl=result_ttl,
failure_ttl=failure_ttl,
depends_on=define_dependent_job(queue, user_id, should_be_dependent)
depends_on=define_dependent_job(queue, user_id, should_be_dependent, rq_job_id_download_file)
)
return rq_job_download_file

Expand Down
6 changes: 3 additions & 3 deletions cvat/apps/engine/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2959,7 +2959,7 @@ def _import_annotations(request, rq_id_template, rq_func, db_obj, format_name,
func=import_resource_with_clean_up_after,
args=(rq_func, filename, db_obj.pk, format_name, conv_mask_to_poly),
job_id=rq_id,
depends_on=dependent_job or define_dependent_job(queue, user_id),
depends_on=dependent_job or define_dependent_job(queue, user_id, rq_id=rq_id),
meta={
'tmp_file': filename,
**get_rq_job_meta(request=request, db_obj=db_obj),
Expand Down Expand Up @@ -3091,7 +3091,7 @@ def _export_annotations(db_instance, rq_id, request, format_name, action, callba
args=(db_instance.id, format_name, server_address),
job_id=rq_id,
meta=get_rq_job_meta(request=request, db_obj=db_instance),
depends_on=define_dependent_job(queue, user_id),
depends_on=define_dependent_job(queue, user_id, rq_id=rq_id),
result_ttl=ttl,
failure_ttl=ttl,
)
Expand Down Expand Up @@ -3173,7 +3173,7 @@ def _import_project_dataset(request, rq_id_template, rq_func, db_obj, format_nam
'tmp_file': filename,
**get_rq_job_meta(request=request, db_obj=db_obj),
},
depends_on=dependent_job or define_dependent_job(queue, user_id),
depends_on=dependent_job or define_dependent_job(queue, user_id, rq_id=rq_id),
result_ttl=settings.IMPORT_CACHE_SUCCESS_TTL.total_seconds(),
failure_ttl=settings.IMPORT_CACHE_FAILED_TTL.total_seconds()
)
Expand Down
71 changes: 71 additions & 0 deletions cvat/rq_patching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright (C) 2023 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

import traceback
from datetime import datetime
from typing import Optional

import rq.registry
from rq.exceptions import AbandonedJobError, NoSuchJobError
from rq.job import JobStatus
from rq.utils import current_timestamp


# NOTE: we should patch implementation of original method because
# there is no enqueuing dependent jobs in original function
# https://github.com/rq/rq/issues/2006
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you already have a fix, maybe you could submit it upstream? That way we could potentially remove this hack later (or maybe not apply it in the first place, depending on how quickly the maintainers respond).

def custom_started_job_registry_cleanup(self, timestamp: Optional[float] = None):
"""Remove abandoned jobs from registry and add them to FailedJobRegistry.

Removes jobs with an expiry time earlier than timestamp, specified as
seconds since the Unix epoch. timestamp defaults to call time if
unspecified. Removed jobs are added to the global failed job queue.

Args:
timestamp (datetime): The datetime to use as the limit.
"""

score = timestamp if timestamp is not None else current_timestamp()
job_ids = self.get_expired_job_ids(score)

if job_ids:
failed_job_registry = rq.registry.FailedJobRegistry(self.name, self.connection, serializer=self.serializer)
queue = self.get_queue()

with self.connection.pipeline() as pipeline:
for job_id in job_ids:
try:
job = self.job_class.fetch(job_id, connection=self.connection, serializer=self.serializer)
except NoSuchJobError:
continue

job.execute_failure_callback(
self.death_penalty_class, AbandonedJobError, AbandonedJobError(), traceback.extract_stack()
)

retry = job.retries_left and job.retries_left > 0

if retry:
job.retry(queue, pipeline)

else:
exc_string = f"due to {AbandonedJobError.__name__}"
rq.registry.logger.warning(
f'{self.__class__.__name__} cleanup: Moving job to {rq.registry.FailedJobRegistry.__name__} '
f'({exc_string})'
)
job.set_status(JobStatus.FAILED)
job._exc_info = f"Moved to {rq.registry.FailedJobRegistry.__name__}, {exc_string}, at {datetime.now()}"
job.save(pipeline=pipeline, include_meta=False)
job.cleanup(ttl=-1, pipeline=pipeline)
failed_job_registry.add(job, job.failure_ttl)
queue.enqueue_dependents(job)

pipeline.zremrangebyscore(self.key, 0, score)
pipeline.execute()

return job_ids

def update_started_job_registry_cleanup() -> None:
rq.registry.StartedJobRegistry.cleanup = custom_started_job_registry_cleanup
SpecLad marked this conversation as resolved.
Show resolved Hide resolved
3 changes: 3 additions & 0 deletions cvat/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,3 +723,6 @@ class CVAT_QUEUES(Enum):

# How many chunks can be prepared simultaneously during task creation in case the cache is not used
CVAT_CONCURRENT_CHUNK_PROCESSING = int(os.getenv('CVAT_CONCURRENT_CHUNK_PROCESSING', 1))

from cvat.rq_patching import update_started_job_registry_cleanup
update_started_job_registry_cleanup()
Loading