Skip to content

Commit

Permalink
Replace Celery by RQ #176
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Druez <tdruez@nexb.com>
  • Loading branch information
tdruez committed Sep 15, 2021
1 parent fa0a054 commit 7371695
Show file tree
Hide file tree
Showing 14 changed files with 93 additions and 151 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,3 @@ jobs:
SCANCODEIO_DB_NAME: ${{ env.DB_NAME }}
SCANCODEIO_DB_USER: ${{ env.DB_USER }}
SCANCODEIO_DB_PASSWORD: ${{ env.DB_PASSWORD }}
# Run tasks in the current thread, no need for workers in CI mode
CELERY_TASK_ALWAYS_EAGER: True
6 changes: 1 addition & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,7 @@ test:
${MANAGE} test --noinput

worker:
bin/celery --app scancodeio worker \
--loglevel=INFO \
--concurrency 1 --pool threads \
--events -Ofair --prefetch-multiplier=1 \
--soft-time-limit=21600 --time-limit=22000
${MANAGE} rqworker

bump:
@echo "-> Bump the version"
Expand Down
27 changes: 12 additions & 15 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,37 @@ services:
redis:
image: redis

celery:
web:
build: .
command: celery --app scancodeio worker
--loglevel=INFO
--concurrency 1 --pool threads
--events -Ofair --prefetch-multiplier=1
--soft-time-limit=21600 --time-limit=22000
command: sh -c "
./manage.py migrate &&
./manage.py collectstatic --no-input --clear &&
gunicorn scancodeio.wsgi:application --bind :8000 --timeout 600 --workers 8"
env_file:
- docker.env
expose:
- 8000
volumes:
- .:/opt/scancodeio/
- /etc/scancodeio/:/etc/scancodeio/
- workspace:/var/scancodeio/workspace/
- static:/var/scancodeio/static/
depends_on:
- redis
- db

web:
worker:
build: .
command: sh -c "
./manage.py migrate &&
./manage.py collectstatic --no-input --clear &&
gunicorn scancodeio.wsgi:application --bind :8000 --timeout 600 --workers 8"
command: ./manage.py rqworker
env_file:
- docker.env
expose:
- 8000
volumes:
- .:/opt/scancodeio/
- /etc/scancodeio/:/etc/scancodeio/
- workspace:/var/scancodeio/workspace/
- static:/var/scancodeio/static/
depends_on:
- redis
- db
- web # ensure that potential db migrations run first

nginx:
image: nginx
Expand Down
4 changes: 0 additions & 4 deletions docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,3 @@ POSTGRES_PASSWORD=scancodeio

SCANCODEIO_DB_HOST=db
SCANCODEIO_WORKSPACE_LOCATION=/var/scancodeio/workspace/

CELERY_TASK_ALWAYS_EAGER=False
CELERY_BROKER_URL=redis://redis:6379/0
CELERY_RESULT_BACKEND=redis://redis:6379/0
4 changes: 0 additions & 4 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@

# -- Autodoc -----------------------------------------------------------------

# Mock Django related modules to avoid complex configuration in the context of
# autodoc.
autodoc_mock_imports = ["celery"]

# The default options for autodoc directives.
# They are applied to all autodoc directives automatically.
# It must be a dictionary which maps option names to the values.
Expand Down
10 changes: 0 additions & 10 deletions scancodeio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,11 @@
import sys
from pathlib import Path

from django.conf import settings

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from scancodeio.celery import app as celery_app

__version__ = "21.9.6"

SCAN_NOTICE = Path(__file__).resolve().parent.joinpath("scan.NOTICE").read_text()


# Resolve and set the workspace location from the settings.
WORKSPACE_LOCATION = str(Path(settings.SCANCODEIO_WORKSPACE_LOCATION).resolve())


def command_line():
"""
Command line entry point.
Expand Down
37 changes: 0 additions & 37 deletions scancodeio/celery.py

This file was deleted.

23 changes: 14 additions & 9 deletions scancodeio/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
"django_filters",
"rest_framework",
"rest_framework.authtoken",
"django_rq",
)

MIDDLEWARE = (
Expand Down Expand Up @@ -233,16 +234,20 @@

CRISPY_TEMPLATE_PACK = "bootstrap3"

# Celery
# Job Queue

CELERY_BROKER_URL = env.str("CELERY_BROKER_URL", default="redis://")
CELERY_RESULT_BACKEND = env.str("CELERY_RESULT_BACKEND", default="redis://")
CELERY_TRACK_STARTED = env.bool("CELERY_TRACK_STARTED", default=True)
CELERY_IGNORE_RESULT = env.bool("CELERY_IGNORE_RESULT", default=False)
CELERY_TASK_DEFAULT_QUEUE = env.str("CELERY_RESULT_BACKEND", default="default")
# When True, tasks will be executed immediately in the local thread instead of being
# sent to the queue for execution by a worker.
CELERY_TASK_ALWAYS_EAGER = env.bool("CELERY_TASK_ALWAYS_EAGER", default=True)
RQ_QUEUES = {
"default": {
"HOST": "localhost",
"PORT": 6379,
"DB": 0,
"DEFAULT_TIMEOUT": 360,
},
}

if DEBUG or IS_TESTS:
for queue_config in RQ_QUEUES.values():
queue_config["ASYNC"] = False

# Django restframework

Expand Down
1 change: 1 addition & 0 deletions scanpipe/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(self, app_name, app_module):
# Mapping of registered pipeline names to pipeline classes.
self._pipelines = {}
self.license_policies_index = {}
self.workspace = str(Path(settings.SCANCODEIO_WORKSPACE_LOCATION).resolve())

def ready(self):
self.load_pipelines()
Expand Down
72 changes: 39 additions & 33 deletions scanpipe/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@
from django.utils.text import slugify
from django.utils.translation import gettext_lazy as _

from celery.result import AsyncResult
import django_rq
from packageurl import normalize_qualifiers
from packageurl.contrib.django.models import PackageURLQuerySetMixin

from scancodeio import WORKSPACE_LOCATION
from scancodeio import __version__ as scancodeio_version
from scanpipe import tasks
from scanpipe.packagedb_models import AbstractPackage
Expand Down Expand Up @@ -111,32 +110,32 @@ class AbstractTaskFieldsModel(models.Model):
class Meta:
abstract = True

@property
def task_result(self):
return AsyncResult(str(self.task_id))

def task_state(self):
"""
Possible values include:
- UNKNOWN (PENDING)
No history about the task is available.
- STARTED
The task has been started.
- RETRY
The task is to be re-executed; possibly due to a failure.
- FAILURE
The task raised an exception or has exceeded the retry limit.
The result attribute would contain the exception raised by the task.
- SUCCESS
The task executed successfully. The result attribute would contain
the task's return value.
Notes: All tasks are PENDING by default in Celery, so it would make more
sense if the state was named "unknown". Celery doesn't update the state
when a task is sent, and any task with no history is assumed to be pending.
"""
state = self.task_result.state
return "UNKNOWN" if state == "PENDING" else state
# @property
# def task_result(self):
# return AsyncResult(str(self.task_id))
#
# def task_state(self):
# """
# Possible values include:
# - UNKNOWN (PENDING)
# No history about the task is available.
# - STARTED
# The task has been started.
# - RETRY
# The task is to be re-executed; possibly due to a failure.
# - FAILURE
# The task raised an exception or has exceeded the retry limit.
# The result attribute would contain the exception raised by the task.
# - SUCCESS
# The task executed successfully. The result attribute would contain
# the task's return value.
#
# Notes: All tasks are PENDING by default in Celery, so it would make more
# sense if the state was named "unknown". Celery doesn't update the state
# when a task is sent, and any task with no history is assumed to be pending.
# """
# state = self.task_result.state
# return "UNKNOWN" if state == "PENDING" else state

@property
def execution_time(self):
Expand Down Expand Up @@ -224,7 +223,8 @@ def get_project_work_directory(project):
A short version of the `project` uuid is added as a suffix to ensure
uniqueness of the work directory location.
"""
return f"{WORKSPACE_LOCATION}/projects/{slugify(project.name)}-{project.short_uuid}"
project_workspace_id = f"{slugify(project.name)}-{project.short_uuid}"
return f"{scanpipe_app.workspace}/projects/{project_workspace_id}"


class Project(UUIDPKModel, ExtraDataFieldMixin, models.Model):
Expand Down Expand Up @@ -843,12 +843,18 @@ def __str__(self):

def execute_task_async(self):
"""
Sends a message to the task manager to create an asynchronous pipeline
execution task.
Enqueues the pipeline execution task for an asynchronous execution.
Stores the `task_id` of the current Run instance for a future use.
"""
future = tasks.execute_pipeline_task.apply_async(args=[self.pk])
self.init_task_id(future.task_id)
run_pk = str(self.pk)
self.init_task_id(run_pk)

job = django_rq.enqueue(
tasks.execute_pipeline_task,
job_id=run_pk,
run_pk=run_pk,
)
return job

def init_task_id(self, task_id):
"""
Expand Down
29 changes: 13 additions & 16 deletions scanpipe/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@
# ScanCode.io is a free software code scanning tool from nexB Inc. and others.
# Visit https://github.com/nexB/scancode.io for support and download.

from django.apps import apps
import logging

from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded
from celery.utils.log import get_task_logger
from django.apps import apps

tasks_logger = get_task_logger(__name__)
logger = logging.getLogger(__name__)


def info(message, pk):
tasks_logger.info(f"Run[{pk}] {message}")
logger.info(f"Run[{pk}] {message}")


def get_run_instance(run_pk):
Expand All @@ -41,27 +39,26 @@ def get_run_instance(run_pk):
return run_model.objects.get(pk=run_pk)


@shared_task(bind=True)
def execute_pipeline_task(self, run_pk):
task_id = self.request.id
info(f"Enter `{self.name}` Task.id={task_id}", run_pk)
def execute_pipeline_task(run_pk):
info(f"Enter `execute_pipeline_task` Run.pk/Task.id={run_pk}", run_pk)

run = get_run_instance(run_pk)
project = run.project

run.reset_task_values()
run.set_scancodeio_version()
run.set_task_started(task_id)
run.set_task_started(run_pk)

info(f'Run pipeline: "{run.pipeline_name}" on project: "{project.name}"', run_pk)

pipeline = run.make_pipeline_instance()
exitcode, output = pipeline.execute()

try:
exitcode, output = pipeline.execute()
except SoftTimeLimitExceeded:
info("SoftTimeLimitExceeded", run_pk)
exitcode, output = 1, "SoftTimeLimitExceeded"
# try:
# exitcode, output = pipeline.execute()
# except SoftTimeLimitExceeded:
# info("SoftTimeLimitExceeded", run_pk)
# exitcode, output = 1, "SoftTimeLimitExceeded"

info("Update Run instance with exitcode, output, and end_date", run_pk)
run.set_task_ended(exitcode, output, refresh_first=True)
Expand Down
4 changes: 1 addition & 3 deletions scanpipe/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@

from django.test import TestCase

from celery.exceptions import SoftTimeLimitExceeded

from scanpipe import tasks
from scanpipe.models import Project

Expand All @@ -51,7 +49,7 @@ def test_scanpipe_tasks_timeout_soft_time_limit_exceeded(self, mock_execute):
project = Project.objects.create(name="my_project")
run = project.add_pipeline("do_nothing")

mock_execute.side_effect = SoftTimeLimitExceeded()
# mock_execute.side_effect = SoftTimeLimitExceeded()
tasks.execute_pipeline_task(run.pk)

run.refresh_from_db()
Expand Down
Loading

0 comments on commit 7371695

Please sign in to comment.