Skip to content

Commit

Permalink
Upgrade the worker related dependencies to latest versions #726 (#727)
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Druez <tdruez@nexb.com>
  • Loading branch information
tdruez authored May 12, 2023
1 parent c43d676 commit df3b3ec
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 28 deletions.
2 changes: 1 addition & 1 deletion scancodeio/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def run_maintenance_tasks(self):
super().run_maintenance_tasks()

# The Runs and Jobs synchronization needs to be executed after the
# `self.clean_registries()` that takes place in the in the parent
# `self.clean_registries()` that takes place in the parent
# `super().run_maintenance_tasks()`.
scanpipe_app.sync_runs_and_jobs()

Expand Down
4 changes: 2 additions & 2 deletions scanpipe/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def ready(self):

# In SYNC mode, the Run instances cleanup is triggered on app.ready()
# only when the app is started through "runserver".
# This cleanup is required if the a running pipeline process gets killed and
# This cleanup is required if a running pipeline process gets killed and
# since KeyboardInterrupt cannot be captured to properly update the Run instance
# before its running process death.
# In ASYNC mode, the cleanup is handled by the "ScanCodeIOWorker" worker.
Expand Down Expand Up @@ -192,7 +192,7 @@ def policies_enabled(self):
return bool(self.license_policies_index)

def sync_runs_and_jobs(self):
"""Synchronize QUEUED and RUNNING Run with their related Jobs."""
"""Synchronize ``QUEUED`` and ``RUNNING`` Run with their related Jobs."""
logger.info("Synchronizing QUEUED and RUNNING Run with their related Jobs...")

run_model = self.get_model("Run")
Expand Down
25 changes: 19 additions & 6 deletions scanpipe/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,26 @@
scanpipe_app = apps.get_app_config("scanpipe")


class MultipleFileInput(forms.ClearableFileInput):
allow_multiple_selected = True


class MultipleFileField(forms.FileField):
def __init__(self, *args, **kwargs):
kwargs.setdefault("widget", MultipleFileInput(attrs={"class": "file-input"}))
super().__init__(*args, **kwargs)

def clean(self, data, initial=None):
single_file_clean = super().clean
if isinstance(data, (list, tuple)):
result = [single_file_clean(entry, initial) for entry in data]
else:
result = single_file_clean(data, initial)
return result


class InputsBaseForm(forms.Form):
input_files = forms.FileField(
required=False,
widget=forms.ClearableFileInput(
attrs={"class": "file-input", "multiple": True},
),
)
input_files = MultipleFileField()
input_urls = forms.CharField(
label="Download URLs",
required=False,
Expand Down
29 changes: 15 additions & 14 deletions scanpipe/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ class AbstractTaskFieldsModel(models.Model):
blank=True,
editable=False,
)
log = models.TextField(blank=True, editable=False)

class Meta:
abstract = True
Expand Down Expand Up @@ -345,6 +346,8 @@ def set_task_stopped(self):

def stop_task(self):
"""Stop a "running" task."""
self.append_to_log("Stop task requested", save=True)

if not settings.SCANCODEIO_ASYNC:
self.set_task_stopped()
return
Expand All @@ -357,7 +360,8 @@ def stop_task(self):

if self.job_status == JobStatus.FAILED:
self.set_task_ended(
exitcode=1, output=f"Killed from outside, exc_info={self.job.exc_info}"
exitcode=1,
output=f"Killed from outside, latest_result={self.job.latest_result()}",
)
return

Expand All @@ -376,6 +380,16 @@ def delete_task(self, delete_self=True):
if delete_self:
self.delete()

def append_to_log(self, message, save=False):
"""Append the ``message`` string to the ``log`` field of this instance."""
message = message.strip()
if any(lf in message for lf in ("\n", "\r")):
raise ValueError("message cannot contain line returns (either CR or LF).")

self.log = self.log + message + "\n"
if save:
self.save()


class ExtraDataFieldMixin(models.Model):
"""Add the `extra_data` field and helper methods."""
Expand Down Expand Up @@ -1181,7 +1195,6 @@ class Run(UUIDPKModel, ProjectRelatedModel, AbstractTaskFieldsModel):
scancodeio_version = models.CharField(max_length=30, blank=True)
description = models.TextField(blank=True)
current_step = models.CharField(max_length=256, blank=True)
log = models.TextField(blank=True, editable=False)

objects = RunQuerySet.as_manager()

Expand Down Expand Up @@ -1223,11 +1236,9 @@ def execute_task_async(self):
def sync_with_job(self):
"""
Synchronise this Run instance with its related RQ Job.
This is required when a Run gets out of sync with its Job, this can happen
when the worker or one of its processes is killed, the Run status is not
properly updated and may stay in a Queued or Running state forever.
In case the Run is out of sync of its related Job, the Run status will be
updated accordingly. When the run was in the queue, it will be enqueued again.
"""
Expand Down Expand Up @@ -1303,16 +1314,6 @@ def make_pipeline_instance(self):
"""Return a pipelines instance using this Run pipeline_class."""
return self.pipeline_class(self)

def append_to_log(self, message, save=False):
"""Append the `message` string to the `log` field of this Run instance."""
message = message.strip()
if any(lf in message for lf in ("\n", "\r")):
raise ValueError("message cannot contain line returns (either CR or LF).")

self.log = self.log + message + "\n"
if save:
self.save()

def deliver_project_subscriptions(self):
"""Triggers related project webhook subscriptions."""
for subscription in self.project.webhooksubscriptions.all():
Expand Down
1 change: 1 addition & 0 deletions scanpipe/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ def test_scanpipe_run_model_stop_task_method(self):
run1.stop_task()
self.assertEqual(Run.Status.STOPPED, run1.status)
self.assertTrue(run1.task_stopped)
self.assertIn("Stop task requested", run1.log)

@override_settings(SCANCODEIO_ASYNC=False)
def test_scanpipe_run_model_delete_task_method(self):
Expand Down
10 changes: 5 additions & 5 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,20 @@ zip_safe = false
install_requires =
importlib-metadata==6.1.0
# Django related
Django==4.2.0
Django==4.2.1
django-environ==0.10.0
django-crispy-forms==2.0
crispy-bootstrap3==2022.1
django-filter==23.1
djangorestframework==3.14.0
# Database
psycopg==3.1.8
psycopg==3.1.9
# wait_for_database Django management command
django-probes==1.7.0
# Task queue
rq==1.13.0
django-rq==2.7.0
redis==4.5.4
rq==1.14.1
django-rq==2.8.0
redis==4.5.5
# WSGI server
gunicorn==20.1.0
# Docker
Expand Down

0 comments on commit df3b3ec

Please sign in to comment.