Skip to content

Commit

Permalink
Refactor the "execute" management command and add support for --async…
Browse files Browse the repository at this point in the history
… mode #176

Signed-off-by: Thomas Druez <tdruez@nexb.com>
  • Loading branch information
tdruez committed Sep 27, 2021
1 parent 12670ef commit a887195
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 18 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
- Detect and flag "stale" pipeline status.
https://github.com/nexB/scancode.io/issues/130

- Refactor the "execute" management command and add support for --async mode.
https://github.com/nexB/scancode.io/issues/130

- Add a SCANCODEIO_REST_API_PAGE_SIZE setting to control the number of objects
returned per page in the REST API.
https://github.com/nexB/scancode.io/issues/328
Expand Down
30 changes: 23 additions & 7 deletions scanpipe/management/commands/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,26 @@

import sys

from django.conf import settings
from django.core.management import CommandError

import redis

from scanpipe import tasks
from scanpipe.management.commands import ProjectCommand


class Command(ProjectCommand):
help = "Run pipelines on a project."

def add_arguments(self, parser):
super().add_arguments(parser)
parser.add_argument(
"--async",
action="store_true",
dest="async",
help="Add the pipeline run to the tasks queue for execution by a worker "
"instead of running in the current thread.",
)

def handle(self, *args, **options):
super().handle(*args, **options)

Expand All @@ -40,18 +50,24 @@ def handle(self, *args, **options):
if not run:
raise CommandError(f"No pipelines to run on project {self.project}")

if options["async"]:
if not settings.SCANCODEIO_ASYNC:
msg = "SCANCODEIO_ASYNC=False is not compatible with --async option."
raise CommandError(msg)

run.execute_task_async()
msg = f"{run.pipeline_name} added to the tasks queue for execution."
self.stdout.write(self.style.SUCCESS(msg))
sys.exit(0)

self.stdout.write(f"Start the {run.pipeline_name} pipeline execution...")

try:
run.execute_task_async()
tasks.execute_pipeline_task(run.pk)
except KeyboardInterrupt:
run.set_task_ended(exitcode=88)
self.stderr.write(self.style.ERROR("Pipeline execution stopped."))
sys.exit(1)
except redis.exceptions.RedisError as e:
msg = f"Error raised by the Redis client:\n{e}"
self.stderr.write(self.style.ERROR(msg))
sys.exit(1)
except Exception as e:
self.stderr.write(self.style.ERROR(e))
sys.exit(1)
Expand Down
11 changes: 0 additions & 11 deletions scanpipe/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -979,17 +979,6 @@ def execute_task_async(self):

return job

def init_task_id(self, task_id):
"""
Sets the provided `task_id` on the Run instance if not already stored in the
database.
Uses the QuerySet `update` method instead of `save` to prevent overriding
any fields that were set but not saved yet in the DB, which may occur when
SCANCODEIO_ASYNC is True.
"""
manager = self.__class__.objects
return manager.filter(pk=self.pk, task_id__isnull=True).update(task_id=task_id)

def set_scancodeio_version(self):
"""
Sets the current ScanCode.io version on the `Run.scancodeio_version` field.
Expand Down

0 comments on commit a887195

Please sign in to comment.