Skip to content

Commit

Permalink
Remove code duplication and add unit tests #22
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Druez <tdruez@nexb.com>
  • Loading branch information
tdruez committed Sep 24, 2020
1 parent e758d84 commit 22a88fa
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 34 deletions.
5 changes: 4 additions & 1 deletion scanpipe/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,10 +421,13 @@ def run_pipeline_task_async(self):
tasks.run_pipeline_task.apply_async(args=[self.pk], queue="default")

def resume_pipeline_task_async(self):
tasks.resume_pipeline_task.apply_async(args=[self.pk], queue="default")
tasks.run_pipeline_task.apply_async(args=[self.pk, True], queue="default")

@property
def task_succeeded(self):
"""
Return True if the pipeline task was successfully executed.
"""
return self.task_exitcode == 0

def get_run_id(self):
Expand Down
36 changes: 10 additions & 26 deletions scanpipe/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,46 +54,30 @@ def start_next_run_task(run):


@shared_task(bind=True)
def run_pipeline_task(self, run_pk):
def run_pipeline_task(self, run_pk, resume=False):
task_id = self.request.id
info(f"Enter `{self.name}` Task.id={task_id}", run_pk)

run = get_run_instance(run_pk)
run.set_task_started(task_id)

info(f'Run pipeline: "{run.pipeline}" on project: "{run.project.name}"', run_pk)
cmd = f"{python} {run.pipeline} run --project {run.project.name}"
exitcode, output = subprocess.getstatusoutput(cmd)

info("Update Run instance with exitcode, output, and end_date", run_pk)
run.set_task_ended(exitcode, output, refresh_first=True)

if run.task_succeeded:
# We keep the temporary files available for resume in case of error
run.project.clear_tmp_directory()
start_next_run_task(run)


@shared_task(bind=True)
def resume_pipeline_task(self, run_pk):
task_id = self.request.id
info(f"Enter `{self.name}` Task.id={task_id}", run_pk)
project = run.project

run = get_run_instance(run_pk)
# Capture the run_id before resetting the task
run_id = run.get_run_id()
if resume:
run_id = run.get_run_id()
cmd_options = f"resume --origin-run-id {run_id}"
else:
cmd_options = f"run --project {project.name}"

run.reset_task_values()
run.set_task_started(task_id)

info(f'Resume pipeline: "{run.pipeline}" on project: "{run.project.name}"', run_pk)
cmd = f"{python} {run.pipeline} resume --origin-run-id {run_id}"
info(f'Run pipeline: "{run.pipeline}" on project: "{project.name}"', run_pk)
cmd = f"{python} {run.pipeline} {cmd_options}"
exitcode, output = subprocess.getstatusoutput(cmd)

info("Update Run instance with exitcode, output, and end_date", run_pk)
run.set_task_ended(exitcode, output, refresh_first=True)

if run.task_succeeded:
# We keep the temporary files available for resume in case of error
run.project.clear_tmp_directory()
project.clear_tmp_directory()
start_next_run_task(run)
34 changes: 27 additions & 7 deletions scanpipe/tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ def test_scanpipe_management_command_create_project_pipelines(self):

options = [
"--pipeline",
"scanpipe/pipelines/docker.py",
self.pipeline_location,
"--pipeline",
"scanpipe/pipelines/root_filesystems.py",
]
call_command("create-project", "my_project", *options, stdout=out)
self.assertIn("Project my_project created", out.getvalue())
project = Project.objects.get(name="my_project")
expected = [
"scanpipe/pipelines/docker.py",
self.pipeline_location,
"scanpipe/pipelines/root_filesystems.py",
]
self.assertEqual(expected, [run.pipeline for run in project.runs.all()])
Expand Down Expand Up @@ -142,7 +142,7 @@ def test_scanpipe_management_command_add_pipeline(self):
project = Project.objects.create(name="my_project")

pipelines = [
"scanpipe/pipelines/docker.py",
self.pipeline_location,
"scanpipe/pipelines/root_filesystems.py",
]

Expand All @@ -163,7 +163,7 @@ def test_scanpipe_management_command_add_pipeline(self):

def test_scanpipe_management_command_show_pipeline(self):
pipelines = [
"scanpipe/pipelines/docker.py",
self.pipeline_location,
"scanpipe/pipelines/root_filesystems.py",
]

Expand Down Expand Up @@ -200,8 +200,7 @@ def test_scanpipe_management_command_run(self):
with self.assertRaisesMessage(CommandError, expected):
call_command("run", *options, stdout=out)

pipeline = "scanpipe/pipelines/docker.py"
project.add_pipeline(pipeline)
project.add_pipeline(self.pipeline_location)

def task_success(run):
run.task_exitcode = 0
Expand All @@ -221,10 +220,31 @@ def task_failure(run):
run.save()

err = StringIO()
project.add_pipeline(pipeline)
project.add_pipeline(self.pipeline_location)
with mock.patch("scanpipe.models.Run.run_pipeline_task_async", task_failure):
with self.assertRaisesMessage(SystemExit, "1"):
call_command("run", *options, stdout=out, stderr=err)
expected = "Error during scanpipe/pipelines/docker.py execution:"
self.assertIn(expected, err.getvalue())
self.assertIn("Error log", err.getvalue())

@mock.patch("scanpipe.models.Run.resume_pipeline_task_async")
def test_scanpipe_management_command_run_resume(self, mock_resume_pipeline_task):
project = Project.objects.create(name="my_project")
options = ["--project", project.name, "--resume"]

out = StringIO()
expected = "No pipelines to resume on project my_project"
with self.assertRaisesMessage(CommandError, expected):
call_command("run", *options, stdout=out)

run = project.add_pipeline(self.pipeline_location)
run.task_exitcode = 1
run.save()

err = StringIO()
with self.assertRaisesMessage(SystemExit, "1"):
call_command("run", *options, stdout=out, stderr=err)
mock_resume_pipeline_task.assert_called_once()
expected = "Error during scanpipe/pipelines/docker.py execution:"
self.assertIn(expected, err.getvalue())

0 comments on commit 22a88fa

Please sign in to comment.