Skip to content

Commit

Permalink
Refactor the set_task_queued for both ASYNC and SYNC modes #176
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Druez <tdruez@nexb.com>
  • Loading branch information
tdruez committed Sep 27, 2021
1 parent 9ebe60d commit d0f1a7d
Showing 1 changed file with 22 additions and 3 deletions.
25 changes: 22 additions & 3 deletions scanpipe/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def set_task_ended(self, exitcode, output="", refresh_first=True):
"""
Sets the task-related fields after the task execution.
An optional `refresh_first`—enabled by default—forces refreshing
An optional `refresh_first` —enabled by default— forces refreshing
the instance with the latest data from the database before saving.
This prevents losing values saved on the instance during the task
execution.
Expand All @@ -249,6 +249,16 @@ def set_task_ended(self, exitcode, output="", refresh_first=True):
self.task_end_date = timezone.now()
self.save()

def set_task_queued(self):
"""
Sets the task as "queued" by updating the `task_id` from None to this instance
`pk`.
Uses the QuerySet `update` method instead of `save` to prevent overriding
any fields that were set but not saved yet in the DB.
"""
manager = self.__class__.objects
return manager.filter(pk=self.pk, task_id__isnull=True).update(task_id=self.pk)

def set_task_staled(self):
"""
Sets the task as "stale" using a special "99" exitcode value.
Expand Down Expand Up @@ -946,10 +956,8 @@ def __str__(self):
def execute_task_async(self):
"""
Enqueues the pipeline execution task for an asynchronous execution.
Stores the `task_id` of the current Run instance for a future use.
"""
run_pk = str(self.pk)
self.init_task_id(run_pk)

job = django_rq.enqueue(
tasks.execute_pipeline_task,
Expand All @@ -958,6 +966,17 @@ def execute_task_async(self):
on_failure=tasks.report_failure,
job_timeout=settings.SCANCODEIO_TASK_TIMEOUT,
)

# In async mode, we want to set the status as "queued" **after** the job was
# properly "enqueued".
# In case the `django_rq.enqueue()` raise an exception (Redis server error),
# we want to keep the Run status as "not started" rather than "queued".
# Note that the Run is also set as "queued" at the start of
# `execute_pipeline_task()` by calling the `set_task_started()`.
# There's no need to call the following in synchronous single thread mode.
if settings.SCANCODEIO_ASYNC:
self.set_task_queued()

return job

def init_task_id(self, task_id):
Expand Down

0 comments on commit d0f1a7d

Please sign in to comment.