@@ -115,17 +127,6 @@
Pipelines:
{% block scripts %}
{% include "scanpipe/includes/modal.js.html" %}
diff --git a/scanpipe/templates/scanpipe/resource_detail.html b/scanpipe/templates/scanpipe/resource_detail.html
new file mode 100644
index 000000000..ddf25a9ea
--- /dev/null
+++ b/scanpipe/templates/scanpipe/resource_detail.html
@@ -0,0 +1,130 @@
+{% extends "scanpipe/base.html" %}
+{% load static humanize %}
+
+{% block title %}ScanCode.io: {{ project.name }} - {{ object.filename }}{% endblock %}
+
+{% block content %}
+
+ {% include 'scanpipe/includes/navbar_header.html' %}
+
{% include 'scanpipe/includes/messages.html' %}
+
+
+
+
+
+
+
+{% endblock %}
+
+{% block scripts %}
+
+ {{ detected_values|json_script:"detected_values" }}
+
+{% endblock %}
\ No newline at end of file
diff --git a/scanpipe/templates/scanpipe/resource_list.html b/scanpipe/templates/scanpipe/resource_list.html
index 0d8433087..6483e1ad1 100644
--- a/scanpipe/templates/scanpipe/resource_list.html
+++ b/scanpipe/templates/scanpipe/resource_list.html
@@ -28,7 +28,7 @@
{% for resource in object_list %}
- {{ resource.path }} |
+ {{ resource.path }} |
{{ resource.status }} |
{{ resource.type }} |
{{ resource.size|default_if_none:"" }} |
diff --git a/scanpipe/tests/test_api.py b/scanpipe/tests/test_api.py
index ab19e16c8..50d924feb 100644
--- a/scanpipe/tests/test_api.py
+++ b/scanpipe/tests/test_api.py
@@ -51,12 +51,10 @@
class ScanPipeAPITest(TransactionTestCase):
def setUp(self):
self.project1 = Project.objects.create(name="Analysis")
- self.codebase_resource1 = CodebaseResource.objects.create(
+ self.resource1 = CodebaseResource.objects.create(
project=self.project1, path="filename.ext"
)
- self.discovered_package1 = DiscoveredPackage.create_for_resource(
- package_data1, self.codebase_resource1
- )
+ self.discovered_package1 = self.resource1.create_and_add_package(package_data1)
self.project_list_url = reverse("project-list")
self.project1_detail_url = reverse("project-detail", args=[self.project1.uuid])
@@ -76,12 +74,14 @@ def test_scanpipe_api_project_list(self):
self.assertEqual(1, response.data["count"])
self.assertNotContains(response, "input_root")
self.assertNotContains(response, "extra_data")
+ self.assertNotContains(response, "input_sources")
def test_scanpipe_api_project_detail(self):
response = self.csrf_client.get(self.project1_detail_url)
self.assertIn(self.project1_detail_url, response.data["url"])
self.assertEqual(str(self.project1.uuid), response.data["uuid"])
self.assertEqual(self.project1.name, response.data["name"])
+ self.assertEqual([], response.data["input_sources"])
self.assertIn("input_root", response.data)
self.assertIn("extra_data", response.data)
@@ -94,8 +94,14 @@ def test_scanpipe_api_project_detail(self):
}
self.assertEqual(expected, response.data["discovered_package_summary"])
+ self.project1.add_input_source(filename="file", source="uploaded", save=True)
+ response = self.csrf_client.get(self.project1_detail_url)
+ expected = [{"filename": "file", "source": "uploaded"}]
+ self.assertEqual(expected, response.data["input_sources"])
+
+ @mock.patch("requests.get")
@mock.patch("scanpipe.models.Run.execute_task_async")
- def test_scanpipe_api_project_create(self, mock_execute_pipeline_task):
+ def test_scanpipe_api_project_create(self, mock_execute_pipeline_task, mock_get):
data = {}
response = self.csrf_client.post(self.project_list_url, data)
self.assertEqual(status.HTTP_400_BAD_REQUEST, response.status_code)
@@ -167,6 +173,24 @@ def test_scanpipe_api_project_create(self, mock_execute_pipeline_task):
response = self.csrf_client.get(created_project_detail_url)
self.assertEqual(["upload_file"], response.data["input_root"])
+ mock_get.return_value = mock.Mock(
+ content=b"\x00", headers={}, status_code=200, url="archive.zip"
+ )
+ data = {
+ "name": "Upload",
+ "input_urls": ["https://example.com/archive.zip"],
+ }
+ response = self.csrf_client.post(self.project_list_url, data)
+ self.assertEqual(status.HTTP_201_CREATED, response.status_code)
+ created_project_detail_url = response.data["url"]
+ response = self.csrf_client.get(created_project_detail_url)
+ expected = {
+ "filename": "archive.zip",
+ "source": "https://example.com/archive.zip",
+ }
+ self.assertEqual([expected], response.data["input_sources"])
+ self.assertEqual(["archive.zip"], response.data["input_root"])
+
def test_scanpipe_api_project_results_generator(self):
results_generator = JSONResultsGenerator(self.project1)
results = json.loads("".join(results_generator))
@@ -250,7 +274,7 @@ def test_scanpipe_api_project_action_file_content(self):
expected = {"status": "Resource not found. Use ?path="}
self.assertEqual(expected, response.data)
- response = self.csrf_client.get(url + f"?path={self.codebase_resource1.path}")
+ response = self.csrf_client.get(url + f"?path={self.resource1.path}")
self.assertEqual(status.HTTP_400_BAD_REQUEST, response.status_code)
expected = {"status": "File not available"}
self.assertEqual(expected, response.data)
diff --git a/scanpipe/tests/test_commands.py b/scanpipe/tests/test_commands.py
index 0480e86ed..01a51b2ee 100644
--- a/scanpipe/tests/test_commands.py
+++ b/scanpipe/tests/test_commands.py
@@ -20,6 +20,7 @@
# ScanCode.io is a free software code scanning tool from nexB Inc. and others.
# Visit https://github.com/nexB/scancode.io for support and download.
+import datetime
import tempfile
from io import StringIO
from pathlib import Path
@@ -119,16 +120,16 @@ def test_scanpipe_management_command_create_project_pipelines(self):
def test_scanpipe_management_command_create_project_inputs(self):
out = StringIO()
- options = ["--input", "non-existing"]
+ options = ["--input-file", "non-existing"]
expected = "non-existing not found or not a file"
with self.assertRaisesMessage(CommandError, expected):
call_command("create-project", "my_project", *options)
parent_path = Path(__file__).parent
options = [
- "--input",
+ "--input-file",
str(parent_path / "test_commands.py"),
- "--input",
+ "--input-file",
str(parent_path / "test_models.py"),
]
call_command("create-project", "my_project", *options, stdout=out)
@@ -158,13 +159,15 @@ def test_scanpipe_management_command_create_project_execute(self):
self.assertIn(f"Pipeline {pipeline} run in progress...", out.getvalue())
self.assertIn("successfully executed on project my_project", out.getvalue())
- def test_scanpipe_management_command_add_input(self):
+ def test_scanpipe_management_command_add_input_file(self):
out = StringIO()
project = Project.objects.create(name="my_project")
parent_path = Path(__file__).parent
options = [
+ "--input-file",
str(parent_path / "test_commands.py"),
+ "--input-file",
str(parent_path / "test_models.py"),
]
@@ -178,11 +181,23 @@ def test_scanpipe_management_command_add_input(self):
expected = sorted(["test_commands.py", "test_models.py"])
self.assertEqual(expected, sorted(project.input_files))
- options = ["--project", project.name, "non-existing.py"]
+ options = ["--project", project.name, "--input-file", "non-existing.py"]
expected = "non-existing.py not found or not a file"
with self.assertRaisesMessage(CommandError, expected):
call_command("add-input", *options, stdout=out)
+ def test_scanpipe_management_command_add_input_url(self):
+ out = StringIO()
+
+ project = Project.objects.create(name="my_project")
+ parent_path = Path(__file__).parent
+ options = [
+ "--input-file",
+ str(parent_path / "test_commands.py"),
+ "--input-file",
+ str(parent_path / "test_models.py"),
+ ]
+
def test_scanpipe_management_command_add_pipeline(self):
out = StringIO()
@@ -221,7 +236,7 @@ def test_scanpipe_management_command_show_pipeline(self):
options = ["--project", project.name, "--no-color"]
out = StringIO()
call_command("show-pipeline", *options, stdout=out)
- expected = " [ ] docker\n" " [ ] root_filesystems\n"
+ expected = " [NOT_STARTED] docker\n" " [NOT_STARTED] root_filesystems\n"
self.assertEqual(expected, out.getvalue())
project.runs.filter(pipeline_name=pipeline_names[0]).update(task_exitcode=0)
@@ -273,7 +288,7 @@ def test_scanpipe_management_command_status(self):
self.assertIn("- CodebaseResource: 0", output)
self.assertIn("- DiscoveredPackage: 0", output)
self.assertIn("- ProjectError: 0", output)
- self.assertIn("[ ] docker", output)
+ self.assertIn("[NOT_STARTED] docker", output)
run.task_start_date = timezone.now()
run.log = (
@@ -289,6 +304,15 @@ def test_scanpipe_management_command_status(self):
for line in run.log.split("\n"):
self.assertIn(line, output)
+ run.task_end_date = run.task_start_date + datetime.timedelta(0, 42)
+ run.task_exitcode = 0
+ run.save()
+ out = StringIO()
+ call_command("status", *options, stdout=out)
+ output = out.getvalue()
+ expected = f"[SUCCESS] docker (executed in {run.execution_time} seconds)"
+ self.assertIn(expected, output)
+
def test_scanpipe_management_command_output(self):
project = Project.objects.create(name="my_project")
diff --git a/scanpipe/tests/test_forms.py b/scanpipe/tests/test_forms.py
new file mode 100644
index 000000000..28b4d3699
--- /dev/null
+++ b/scanpipe/tests/test_forms.py
@@ -0,0 +1,59 @@
+# SPDX-License-Identifier: Apache-2.0
+#
+# http://nexb.com and https://github.com/nexB/scancode.io
+# The ScanCode.io software is licensed under the Apache License version 2.0.
+# Data generated with ScanCode.io is provided as-is without warranties.
+# ScanCode is a trademark of nexB Inc.
+#
+# You may not use this software except in compliance with the License.
+# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software distributed
+# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+# CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+#
+# Data Generated with ScanCode.io is provided on an "AS IS" BASIS, WITHOUT WARRANTIES
+# OR CONDITIONS OF ANY KIND, either express or implied. No content created from
+# ScanCode.io should be considered or used as legal advice. Consult an Attorney
+# for any legal advice.
+#
+# ScanCode.io is a free software code scanning tool from nexB Inc. and others.
+# Visit https://github.com/nexB/scancode.io for support and download.
+
+from unittest import mock
+
+from django.test import TestCase
+
+from scanpipe.forms import InputsBaseForm
+from scanpipe.models import Project
+
+
+class ScanPipeFormsTest(TestCase):
+ def setUp(self):
+ self.project1 = Project.objects.create(name="Analysis")
+
+ @mock.patch("requests.get")
+ def test_scanpipe_forms_inputs_base_form_input_urls(self, mock_get):
+ data = {
+ "input_urls": "https://example.com/archive.zip",
+ }
+
+ mock_get.side_effect = Exception
+ form = InputsBaseForm(data=data)
+ self.assertFalse(form.is_valid())
+ expected = {"input_urls": ["Could not fetch: https://example.com/archive.zip"]}
+ self.assertEqual(expected, form.errors)
+
+ mock_get.side_effect = None
+ mock_get.return_value = mock.Mock(
+ content=b"\x00",
+ headers={},
+ status_code=200,
+ url="url/archive.zip",
+ )
+ form = InputsBaseForm(data=data)
+ self.assertTrue(form.is_valid())
+ form.handle_inputs(project=self.project1)
+ self.assertEqual(["archive.zip"], self.project1.input_files)
+ expected = {"archive.zip": "https://example.com/archive.zip"}
+ self.assertEqual(expected, self.project1.input_sources)
diff --git a/scanpipe/tests/test_models.py b/scanpipe/tests/test_models.py
index 968db539d..64590414b 100644
--- a/scanpipe/tests/test_models.py
+++ b/scanpipe/tests/test_models.py
@@ -39,6 +39,9 @@
from scanpipe.models import Project
from scanpipe.models import ProjectError
from scanpipe.models import Run
+from scanpipe.models import get_project_work_directory
+from scanpipe.pipes.fetch import Download
+from scanpipe.pipes.input import copy_inputs
from scanpipe.tests import mocked_now
from scanpipe.tests import package_data1
from scanpipe.tests.pipelines.do_nothing import DoNothing
@@ -67,9 +70,7 @@ def test_scanpipe_project_model_extra_data(self):
self.assertEqual({}, project1_from_db.extra_data)
def test_scanpipe_project_model_work_directories(self):
- expected_work_directory = (
- f"projects/{self.project1.name}-{self.project1.short_uuid}"
- )
+ expected_work_directory = f"projects/analysis-{self.project1.short_uuid}"
self.assertTrue(self.project1.work_directory.endswith(expected_work_directory))
self.assertTrue(self.project1.work_path.exists())
self.assertTrue(self.project1.input_path.exists())
@@ -77,6 +78,12 @@ def test_scanpipe_project_model_work_directories(self):
self.assertTrue(self.project1.codebase_path.exists())
self.assertTrue(self.project1.tmp_path.exists())
+ def test_scanpipe_get_project_work_directory(self):
+ project = Project.objects.create(name="Name with spaces and @£$éæ")
+ expected = f"/projects/name-with-spaces-and-e-{project.short_uuid}"
+ self.assertTrue(get_project_work_directory(project).endswith(expected))
+ self.assertTrue(project.work_directory.endswith(expected))
+
def test_scanpipe_project_model_clear_tmp_directory(self):
new_file_path = self.project1.tmp_path / "file.ext"
new_file_path.touch()
@@ -90,7 +97,8 @@ def test_scanpipe_project_model_delete(self):
work_path = self.project1.work_path
self.assertTrue(work_path.exists())
- self.project1.add_input_file(SimpleUploadedFile("file.ext", content=b"content"))
+ uploaded_file = SimpleUploadedFile("file.ext", content=b"content")
+ self.project1.write_input_file(uploaded_file)
self.project1.add_pipeline("docker")
resource = CodebaseResource.objects.create(project=self.project1, path="path")
package = DiscoveredPackage.objects.create(project=self.project1)
@@ -137,11 +145,11 @@ def test_scanpipe_project_model_get_output_file_path(self):
filename = self.project1.get_output_file_path("file", "ext")
self.assertTrue(str(filename).endswith("/output/file-2010-10-10-10-10-10.ext"))
- def test_scanpipe_project_model_add_input_file(self):
+ def test_scanpipe_project_model_write_input_file(self):
self.assertEqual([], self.project1.input_files)
uploaded_file = SimpleUploadedFile("file.ext", content=b"content")
- self.project1.add_input_file(uploaded_file)
+ self.project1.write_input_file(uploaded_file)
self.assertEqual(["file.ext"], self.project1.input_files)
@@ -165,6 +173,98 @@ def test_scanpipe_project_model_move_input_from(self):
self.assertEqual([input_filename], self.project1.input_files)
self.assertFalse(Path(input_location).exists())
+ def test_scanpipe_project_model_inputs_with_source(self):
+ inputs, missing_inputs = self.project1.inputs_with_source
+ self.assertEqual([], inputs)
+ self.assertEqual({}, missing_inputs)
+
+ uploaded_file = SimpleUploadedFile("file.ext", content=b"content")
+ self.project1.add_uploads([uploaded_file])
+ self.project1.copy_input_from(self.data_location / "notice.NOTICE")
+ self.project1.add_input_source(filename="missing.zip", source="uploaded")
+
+ inputs, missing_inputs = self.project1.inputs_with_source
+ expected = [
+ {
+ "is_file": True,
+ "name": "file.ext",
+ "size": 7,
+ "source": "uploaded",
+ },
+ {
+ "is_file": True,
+ "name": "notice.NOTICE",
+ "size": 1178,
+ "source": "not_found",
+ },
+ ]
+
+ def sort_by_name(x):
+ return x.get("name")
+
+ self.assertEqual(
+ sorted(expected, key=sort_by_name), sorted(inputs, key=sort_by_name)
+ )
+ self.assertEqual({"missing.zip": "uploaded"}, missing_inputs)
+
+ def test_scanpipe_project_model_can_add_input(self):
+ self.assertTrue(self.project1.can_add_input)
+
+ run = self.project1.add_pipeline("docker")
+ self.project1 = Project.objects.get(uuid=self.project1.uuid)
+ self.assertTrue(self.project1.can_add_input)
+
+ run.task_start_date = timezone.now()
+ run.save()
+ self.project1 = Project.objects.get(uuid=self.project1.uuid)
+ self.assertFalse(self.project1.can_add_input)
+
+ def test_scanpipe_project_model_add_input_source(self):
+ self.assertEqual({}, self.project1.input_sources)
+
+ self.project1.add_input_source("filename", "source", save=True)
+ self.project1.refresh_from_db()
+ self.assertEqual({"filename": "source"}, self.project1.input_sources)
+
+ def test_scanpipe_project_model_add_downloads(self):
+ file_location = self.data_location / "notice.NOTICE"
+ copy_inputs([file_location], self.project1.tmp_path)
+
+ download = Download(
+ uri="https://example.com/filename.zip",
+ directory="",
+ filename="notice.NOTICE",
+ path=self.project1.tmp_path / "notice.NOTICE",
+ size="",
+ sha1="",
+ md5="",
+ )
+
+ self.project1.add_downloads([download])
+
+ inputs, missing_inputs = self.project1.inputs_with_source
+ expected = [
+ {
+ "is_file": True,
+ "name": "notice.NOTICE",
+ "size": 1178,
+ "source": "https://example.com/filename.zip",
+ }
+ ]
+ self.assertEqual(expected, inputs)
+ self.assertEqual({}, missing_inputs)
+
+ def test_scanpipe_project_model_add_uploads(self):
+ uploaded_file = SimpleUploadedFile("file.ext", content=b"content")
+ self.project1.add_uploads([uploaded_file])
+
+ inputs, missing_inputs = self.project1.inputs_with_source
+ expected = [
+ {"name": "file.ext", "is_file": True, "size": 7, "source": "uploaded"}
+ ]
+ self.assertEqual(expected, inputs)
+ self.assertEqual({}, missing_inputs)
+
def test_scanpipe_project_model_get_next_run(self):
self.assertEqual(None, self.project1.get_next_run())
@@ -368,7 +468,7 @@ def test_scanpipe_codebase_resource_model_methods(self):
with open(resource.location, "w") as f:
f.write("content")
- self.assertEqual("content", resource.file_content)
+ self.assertEqual("content\n", resource.file_content)
package = DiscoveredPackage.objects.create(project=self.project1)
resource.discovered_packages.add(package)
@@ -436,6 +536,18 @@ def test_scanpipe_codebase_resource_type_methods(self):
self.assertEqual(1, len(qs))
self.assertIn(file, qs)
+ qs = CodebaseResource.objects.empty()
+ self.assertEqual(3, len(qs))
+ file.size = 1
+ file.save()
+ qs = CodebaseResource.objects.empty()
+ self.assertEqual(2, len(qs))
+ self.assertNotIn(file, qs)
+ file.size = 0
+ file.save()
+ qs = CodebaseResource.objects.empty()
+ self.assertEqual(3, len(qs))
+
qs = CodebaseResource.objects.directories()
self.assertEqual(1, len(qs))
self.assertIn(directory, qs)
@@ -467,7 +579,7 @@ def test_scanpipe_codebase_resource_type_methods(self):
self.assertEqual(0, CodebaseResource.objects.in_package().count())
self.assertEqual(3, CodebaseResource.objects.not_in_package().count())
- DiscoveredPackage.create_for_resource(package_data1, file)
+ file.create_and_add_package(package_data1)
self.assertEqual(1, CodebaseResource.objects.in_package().count())
self.assertEqual(2, CodebaseResource.objects.not_in_package().count())
@@ -513,6 +625,16 @@ def test_scanpipe_codebase_resource_children(self):
]
self.assertEqual(expected, [resource.path for resource in children])
+ def test_scanpipe_codebase_resource_create_and_add_package(self):
+ codebase_resource = CodebaseResource.objects.create(
+ project=self.project1, path="filename.ext"
+ )
+ package = codebase_resource.create_and_add_package(package_data1)
+ self.assertEqual(self.project1, package.project)
+ self.assertEqual("pkg:deb/debian/adduser@3.118?arch=all", str(package))
+ self.assertEqual(1, codebase_resource.discovered_packages.count())
+ self.assertEqual(package, codebase_resource.discovered_packages.get())
+
def test_scanpipe_discovered_package_model_create_from_data(self):
package = DiscoveredPackage.create_from_data(self.project1, package_data1)
self.assertEqual(self.project1, package.project)
@@ -528,17 +650,13 @@ def test_scanpipe_discovered_package_model_create_from_data(self):
"gpl-2.0 AND gpl-2.0-plus AND unknown", package.license_expression
)
- def test_scanpipe_discovered_package_model_create_for_resource(self):
- codebase_resource = CodebaseResource.objects.create(
- project=self.project1, path="filename.ext"
+ package_count = DiscoveredPackage.objects.count()
+ missing_required_field = dict(package_data1)
+ missing_required_field["name"] = ""
+ self.assertIsNone(
+ DiscoveredPackage.create_from_data(self.project1, missing_required_field)
)
- package = DiscoveredPackage.create_for_resource(
- package_data1, codebase_resource
- )
- self.assertEqual(self.project1, package.project)
- self.assertEqual("pkg:deb/debian/adduser@3.118?arch=all", str(package))
- self.assertEqual(1, codebase_resource.discovered_packages.count())
- self.assertEqual(package, codebase_resource.discovered_packages.get())
+ self.assertEqual(package_count, DiscoveredPackage.objects.count())
class ScanPipeModelsTransactionTest(TransactionTestCase):
diff --git a/scanpipe/tests/test_pipes.py b/scanpipe/tests/test_pipes.py
index b0392a9b0..2509dba65 100644
--- a/scanpipe/tests/test_pipes.py
+++ b/scanpipe/tests/test_pipes.py
@@ -27,15 +27,21 @@
from django.core.management import call_command
from django.test import TestCase
+from django.test import TransactionTestCase
from scanpipe.models import CodebaseResource
from scanpipe.models import DiscoveredPackage
from scanpipe.models import Project
from scanpipe.pipes import codebase
+from scanpipe.pipes import docker
+from scanpipe.pipes import fetch
from scanpipe.pipes import filename_now
+from scanpipe.pipes import make_codebase_resource
from scanpipe.pipes import output
+from scanpipe.pipes import rootfs
from scanpipe.pipes import scancode
from scanpipe.pipes import strip_root
+from scanpipe.pipes import tag_not_analyzed_codebase_resources
from scanpipe.pipes.input import copy_inputs
from scanpipe.tests import mocked_now
from scanpipe.tests import package_data1
@@ -59,16 +65,32 @@ def test_scanpipe_pipes_strip_root(self):
self.assertEqual(expected, strip_root(path))
self.assertEqual(expected, strip_root(Path(path)))
+ def test_scanpipe_pipes_tag_not_analyzed_codebase_resources(self):
+ p1 = Project.objects.create(name="Analysis")
+ resource1 = CodebaseResource.objects.create(project=p1, path="filename.ext")
+ resource2 = CodebaseResource.objects.create(
+ project=p1,
+ path="filename1.ext",
+ status="scanned",
+ )
+
+ tag_not_analyzed_codebase_resources(p1)
+ resource1.refresh_from_db()
+ resource2.refresh_from_db()
+ self.assertEqual("not-analyzed", resource1.status)
+ self.assertEqual("scanned", resource2.status)
+
+ @mock.patch("scanpipe.pipes.datetime", mocked_now)
+ def test_scanpipe_pipes_filename_now(self):
+ self.assertEqual("2010-10-10-10-10-10", filename_now())
+
def test_scanpipe_pipes_outputs_queryset_to_csv_file(self):
project1 = Project.objects.create(name="Analysis")
codebase_resource = CodebaseResource.objects.create(
project=project1,
path="filename.ext",
)
- DiscoveredPackage.create_for_resource(
- package_data1,
- codebase_resource,
- )
+ codebase_resource.create_and_add_package(package_data1)
queryset = project1.discoveredpackages.all()
fieldnames = ["purl", "name", "version"]
@@ -103,10 +125,7 @@ def test_scanpipe_pipes_outputs_queryset_to_csv_stream(self):
project=project1,
path="filename.ext",
)
- DiscoveredPackage.create_for_resource(
- package_data1,
- codebase_resource,
- )
+ codebase_resource.create_and_add_package(package_data1)
queryset = project1.discoveredpackages.all()
fieldnames = ["purl", "name", "version"]
@@ -160,7 +179,7 @@ def test_scanpipe_pipes_outputs_to_json(self):
project=project1,
path="filename.ext",
)
- DiscoveredPackage.create_for_resource(package_data1, codebase_resource)
+ codebase_resource.create_and_add_package(package_data1)
output_file = output.to_json(project=project1)
self.assertEqual([output_file.name], project1.output_root)
@@ -181,15 +200,11 @@ def test_scanpipe_pipes_outputs_to_xlsx(self):
project=project1,
path="filename.ext",
)
- DiscoveredPackage.create_for_resource(package_data1, codebase_resource)
+ codebase_resource.create_and_add_package(package_data1)
output_file = output.to_xlsx(project=project1)
self.assertEqual([output_file.name], project1.output_root)
- @mock.patch("scanpipe.pipes.datetime", mocked_now)
- def test_scanpipe_pipes_filename_now(self):
- self.assertEqual("2010-10-10-10-10-10", filename_now())
-
def test_scanpipe_pipes_scancode_get_resource_info(self):
input_location = str(self.data_location / "notice.NOTICE")
sha256 = "b323607418a36b5bd700fcf52ae9ca49f82ec6359bc4b89b1b2d73cf75321757"
@@ -223,13 +238,13 @@ def test_scanpipe_pipes_scancode_scan_file(self):
self.assertEqual(expected, list(scan_results.keys()))
self.assertEqual([], scan_errors)
- def test_scanpipe_pipes_scancode_scan_and_save_results(self):
+ def test_scanpipe_pipes_scancode_scan_file_and_save_results(self):
project1 = Project.objects.create(name="Analysis")
codebase_resource1 = CodebaseResource.objects.create(
project=project1, path="not available"
)
- scancode.scan_and_save_results(codebase_resource1)
+ scancode.scan_file_and_save_results(codebase_resource1)
codebase_resource1.refresh_from_db()
self.assertEqual("scanned-with-error", codebase_resource1.status)
@@ -237,7 +252,7 @@ def test_scanpipe_pipes_scancode_scan_and_save_results(self):
codebase_resource2 = CodebaseResource.objects.create(
project=project1, path="notice.NOTICE"
)
- scancode.scan_and_save_results(codebase_resource2)
+ scancode.scan_file_and_save_results(codebase_resource2)
codebase_resource2.refresh_from_db()
self.assertEqual("scanned", codebase_resource2.status)
expected = [
@@ -270,7 +285,8 @@ def test_scanpipe_pipes_scancode_scan_for_files(self, mock_scan_file):
scancode.scan_for_files(project1)
# The scan_file is only called once as the cache is used for the second
# duplicated resource.
- mock_scan_file.assert_called_once()
+ # WARNING: The cache is turned off for now in favor of multiprocessing
+ # mock_scan_file.assert_called_once()
for resource in [resource1, resource2]:
resource.refresh_from_db()
@@ -368,3 +384,123 @@ def test_scanpipe_pipes_codebase_project_codebase_class_with_resources(self):
expected = json.loads(f.read())
self.assertEqual(expected, tree)
+
+ @mock.patch("requests.get")
+ def test_scanpipe_pipes_fetch_download(self, mock_get):
+ url = "https://example.com/filename.zip"
+
+ mock_get.return_value = mock.Mock(
+ content=b"\x00", headers={}, status_code=200, url=url
+ )
+ downloaded_file = fetch.download(url)
+ self.assertTrue(Path(downloaded_file.directory, "filename.zip").exists())
+
+ redirect_url = "https://example.com/redirect.zip"
+ mock_get.return_value = mock.Mock(
+ content=b"\x00", headers={}, status_code=200, url=redirect_url
+ )
+ downloaded_file = fetch.download(url)
+ self.assertTrue(Path(downloaded_file.directory, "redirect.zip").exists())
+
+ headers = {
+ "content-disposition": 'attachment; filename="another_name.zip"',
+ }
+ mock_get.return_value = mock.Mock(
+ content=b"\x00", headers=headers, status_code=200, url=url
+ )
+ downloaded_file = fetch.download(url)
+ self.assertTrue(Path(downloaded_file.directory, "another_name.zip").exists())
+
+ @mock.patch("requests.get")
+ def test_scanpipe_pipes_fetch_fetch_urls(self, mock_get):
+ urls = [
+ "https://example.com/filename.zip",
+ "https://example.com/archive.tar.gz",
+ ]
+
+ mock_get.return_value = mock.Mock(
+ content=b"\x00", headers={}, status_code=200, url="mocked_url"
+ )
+ downloads, errors = fetch.fetch_urls(urls)
+ self.assertEqual(2, len(downloads))
+ self.assertEqual(urls[0], downloads[0].uri)
+ self.assertEqual(urls[1], downloads[1].uri)
+ self.assertEqual(0, len(errors))
+
+ mock_get.side_effect = Exception
+ downloads, errors = fetch.fetch_urls(urls)
+ self.assertEqual(0, len(downloads))
+ self.assertEqual(2, len(errors))
+ self.assertEqual(urls, errors)
+
+ def test_scanpipe_pipes_docker_tag_whiteout_codebase_resources(self):
+ p1 = Project.objects.create(name="Analysis")
+ resource1 = CodebaseResource.objects.create(project=p1, path="filename.ext")
+ resource2 = CodebaseResource.objects.create(project=p1, name=".wh.filename2")
+
+ docker.tag_whiteout_codebase_resources(p1)
+ resource1.refresh_from_db()
+ resource2.refresh_from_db()
+ self.assertEqual("", resource1.status)
+ self.assertEqual("ignored-whiteout", resource2.status)
+
+ def test_scanpipe_pipes_rootfs_tag_empty_codebase_resources(self):
+ p1 = Project.objects.create(name="Analysis")
+ resource1 = CodebaseResource.objects.create(project=p1, path="dir/")
+ resource2 = CodebaseResource.objects.create(
+ project=p1, path="filename.ext", type=CodebaseResource.Type.FILE
+ )
+
+ rootfs.tag_empty_codebase_resources(p1)
+ resource1.refresh_from_db()
+ resource2.refresh_from_db()
+ self.assertEqual("", resource1.status)
+ self.assertEqual("ignored-empty-file", resource2.status)
+
+ def test_scanpipe_pipes_rootfs_tag_uninteresting_codebase_resources(self):
+ p1 = Project.objects.create(name="Analysis")
+ resource1 = CodebaseResource.objects.create(project=p1, path="filename.ext")
+ resource2 = CodebaseResource.objects.create(project=p1, rootfs_path="/tmp/file")
+
+ rootfs.tag_uninteresting_codebase_resources(p1)
+ resource1.refresh_from_db()
+ resource2.refresh_from_db()
+ self.assertEqual("", resource1.status)
+ self.assertEqual("ignored-not-interesting", resource2.status)
+
+
+class ScanPipePipesTransactionTest(TransactionTestCase):
+ """
+ Since we are testing some Database errors, we need to use a
+ TransactionTestCase to avoid any TransactionManagementError while running
+ the tests.
+ """
+
+ data_location = Path(__file__).parent / "data"
+
+ def test_scanpipe_pipes_make_codebase_resource(self):
+ p1 = Project.objects.create(name="Analysis")
+ resource_location = str(self.data_location / "notice.NOTICE")
+
+ with self.assertRaises(AssertionError) as cm:
+ make_codebase_resource(p1, resource_location)
+
+ self.assertIn("is not under project/codebase/", str(cm.exception))
+
+ copy_inputs([resource_location], p1.codebase_path)
+ resource_location = str(p1.codebase_path / "notice.NOTICE")
+ make_codebase_resource(p1, resource_location)
+
+ resource = p1.codebaseresources.get()
+ self.assertEqual(1178, resource.size)
+ self.assertEqual("4bd631df28995c332bf69d9d4f0f74d7ee089598", resource.sha1)
+ self.assertEqual("90cd416fd24df31f608249b77bae80f1", resource.md5)
+ self.assertEqual("text/plain", resource.mime_type)
+ self.assertEqual("ASCII text", resource.file_type)
+ self.assertEqual("", resource.status)
+ self.assertEqual(CodebaseResource.Type.FILE, resource.type)
+
+ # Duplicated path: skip the creation and no project error added
+ make_codebase_resource(p1, resource_location)
+ self.assertEqual(1, p1.codebaseresources.count())
+ self.assertEqual(0, p1.projecterrors.count())
diff --git a/scanpipe/tests/test_tasks.py b/scanpipe/tests/test_tasks.py
index 95f20b2c2..862197bd8 100644
--- a/scanpipe/tests/test_tasks.py
+++ b/scanpipe/tests/test_tasks.py
@@ -24,6 +24,8 @@
from django.test import TestCase
+from celery.exceptions import SoftTimeLimitExceeded
+
from scanpipe import tasks
from scanpipe.models import Project
@@ -43,3 +45,17 @@ def test_scanpipe_tasks_execute_pipeline_task(self, mock_execute):
self.assertEqual("", run.task_output)
self.assertIsNotNone(run.task_start_date)
self.assertIsNotNone(run.task_end_date)
+
+ @mock.patch("scanpipe.pipelines.Pipeline.execute")
+ def test_scanpipe_tasks_timeout_soft_time_limit_exceeded(self, mock_execute):
+ project = Project.objects.create(name="my_project")
+ run = project.add_pipeline("do_nothing")
+
+ mock_execute.side_effect = SoftTimeLimitExceeded()
+ tasks.execute_pipeline_task(run.pk)
+
+ run.refresh_from_db()
+ self.assertTrue(run.task_start_date)
+ self.assertTrue(run.task_end_date)
+ self.assertEqual(1, run.task_exitcode)
+ self.assertEqual("SoftTimeLimitExceeded", run.task_output)
diff --git a/scanpipe/tests/test_views.py b/scanpipe/tests/test_views.py
new file mode 100644
index 000000000..040252be9
--- /dev/null
+++ b/scanpipe/tests/test_views.py
@@ -0,0 +1,88 @@
+# SPDX-License-Identifier: Apache-2.0
+#
+# http://nexb.com and https://github.com/nexB/scancode.io
+# The ScanCode.io software is licensed under the Apache License version 2.0.
+# Data generated with ScanCode.io is provided as-is without warranties.
+# ScanCode is a trademark of nexB Inc.
+#
+# You may not use this software except in compliance with the License.
+# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software distributed
+# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+# CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+#
+# Data Generated with ScanCode.io is provided on an "AS IS" BASIS, WITHOUT WARRANTIES
+# OR CONDITIONS OF ANY KIND, either express or implied. No content created from
+# ScanCode.io should be considered or used as legal advice. Consult an Attorney
+# for any legal advice.
+#
+# ScanCode.io is a free software code scanning tool from nexB Inc. and others.
+# Visit https://github.com/nexB/scancode.io for support and download.
+
+from pathlib import Path
+from unittest import mock
+
+from django.test import TestCase
+
+from scanpipe.models import Project
+
+
+class ScanPipeViewsTest(TestCase):
+ data_location = Path(__file__).parent / "data"
+
+ def setUp(self):
+ self.project1 = Project.objects.create(name="Analysis")
+
+ @mock.patch("requests.get")
+ def test_scanpipe_views_project_details_add_inputs(self, mock_get):
+ url = self.project1.get_absolute_url()
+
+ data = {
+ "input_urls": "https://example.com/archive.zip",
+ "add-inputs-submit": "",
+ }
+
+ mock_get.side_effect = Exception
+ response = self.client.post(url, data, follow=True)
+ self.assertContains(response, "Input file addition error.")
+
+ mock_get.side_effect = None
+ mock_get.return_value = mock.Mock(
+ content=b"\x00",
+ headers={},
+ status_code=200,
+ url="url/archive.zip",
+ )
+ response = self.client.post(url, data, follow=True)
+ self.assertContains(response, "Input file(s) added.")
+
+ self.assertEqual(["archive.zip"], self.project1.input_files)
+ expected = {"archive.zip": "https://example.com/archive.zip"}
+ self.project1.refresh_from_db()
+ self.assertEqual(expected, self.project1.input_sources)
+
+ def test_scanpipe_views_project_details_missing_inputs(self):
+ self.project1.add_input_source(
+ filename="missing.zip", source="uploaded", save=True
+ )
+ url = self.project1.get_absolute_url()
+ response = self.client.get(url)
+ expected = (
+ ''
+ " The following input files are not available on disk anymore:
"
+ " - missing.zip"
+ "
"
+ )
+ self.assertContains(response, expected, html=True)
+
+ def test_scanpipe_views_project_details_add_pipelines(self):
+ url = self.project1.get_absolute_url()
+ data = {
+ "pipeline": "docker",
+ }
+ response = self.client.post(url, data, follow=True)
+ self.assertContains(response, "Pipeline added.")
+ run = self.project1.runs.get()
+ self.assertEqual("docker", run.pipeline_name)
+ self.assertIsNone(run.task_start_date)
diff --git a/scanpipe/urls.py b/scanpipe/urls.py
index 79b10963d..a4eb56f60 100644
--- a/scanpipe/urls.py
+++ b/scanpipe/urls.py
@@ -25,6 +25,11 @@
from scanpipe import views
urlpatterns = [
+ path(
+ "project//resources/",
+ views.CodebaseResourceDetailsView.as_view(),
+ name="resource_detail",
+ ),
path(
"project//resources/",
views.CodebaseResourceListView.as_view(),
@@ -56,7 +61,7 @@
name="project_results",
),
path(
- "project//run_pipeline//",
+ "project//execute_pipeline//",
views.execute_pipeline_view,
name="project_execute_pipeline",
),
diff --git a/scanpipe/views.py b/scanpipe/views.py
index 155bd9070..7a38cde29 100644
--- a/scanpipe/views.py
+++ b/scanpipe/views.py
@@ -34,6 +34,7 @@
from django_filters.views import FilterView
from scanpipe.api.serializers import scanpipe_app_config
+from scanpipe.forms import AddInputsForm
from scanpipe.forms import AddPipelineForm
from scanpipe.forms import PackageFilterSet
from scanpipe.forms import ProjectFilterSet
@@ -66,7 +67,7 @@ class ProjectListView(PrefetchRelatedViewMixin, FilterView):
filterset_class = ProjectFilterSet
template_name = "scanpipe/project_list.html"
prefetch_related = ["runs"]
- paginate_by = 15
+ paginate_by = 10
class ProjectCreateView(generic.CreateView):
@@ -117,12 +118,6 @@ def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
project = self.object
- input_path = project.input_path
- context["inputs"] = [
- (path.relative_to(input_path), path.is_file())
- for path in input_path.glob("*")
- ]
-
files_qs = project.codebaseresources.files()
file_filter = self.request.GET.get("file-filter", "all")
@@ -155,8 +150,17 @@ def get_context_data(self, **kwargs):
package_licenses = packages.values_list("license_expression", flat=True)
package_types = packages.values_list("type", flat=True)
+ inputs, missing_inputs = project.inputs_with_source
+ if missing_inputs:
+ message = (
+ "The following input files are not available on disk anymore:\n- "
+ + "\n- ".join(missing_inputs.keys())
+ )
+ messages.error(self.request, message)
+
context.update(
{
+ "inputs_with_source": inputs,
"programming_languages": self.get_summary(file_languages),
"mime_types": self.get_summary(file_mime_types),
"holders": self.get_summary(file_holders),
@@ -167,6 +171,7 @@ def get_context_data(self, **kwargs):
"package_types": self.get_summary(package_types),
"file_filter": file_filter,
"add_pipeline_form": AddPipelineForm(),
+ "add_inputs_form": AddInputsForm(),
}
)
@@ -177,14 +182,23 @@ def get_context_data(self, **kwargs):
def post(self, request, *args, **kwargs):
project = self.get_object()
- form = AddPipelineForm(request.POST)
+
+ if "add-inputs-submit" in request.POST:
+ form_class = AddInputsForm
+ success_message = "Input file(s) added."
+ error_message = "Input file addition error."
+ else:
+ form_class = AddPipelineForm
+ success_message = "Pipeline added."
+ error_message = "Pipeline addition error."
+
+ form_kwargs = {"data": request.POST, "files": request.FILES}
+ form = form_class(**form_kwargs)
if form.is_valid():
- pipeline = form.data["pipeline"]
- execute_now = form.data.get("execute_now", False)
- project.add_pipeline(pipeline, execute_now)
- messages.success(request, f"Pipeline {pipeline} added.")
+ form.save(project)
+ messages.success(request, success_message)
else:
- messages.error(request, "Pipeline addition error.")
+ messages.error(request, error_message)
return redirect(project)
@@ -300,3 +314,50 @@ class ProjectErrorListView(ProjectRelatedViewMixin, generic.ListView):
model = ProjectError
template_name = "scanpipe/error_list.html"
paginate_by = 50
+
+
+class CodebaseResourceDetailsView(ProjectRelatedViewMixin, generic.DetailView):
+ model = CodebaseResource
+ template_name = "scanpipe/resource_detail.html"
+
+ @staticmethod
+ def get_annotation_text(entry, field_name, value_key):
+ """
+ Workaround to get the license_expression until the data structure is updated
+ on the ScanCode-toolkit side.
+ https://github.com/nexB/scancode-results-analyzer/blob/6c132bc20153d5c96929c
+ f378bd0f06d83db9005/src/results_analyze/analyzer_plugin.py#L131-L198
+ """
+ if field_name == "licenses":
+ return entry["matched_rule"]["license_expression"]
+ return entry[value_key]
+
+ def get_annotations(self, field_name, value_key="value"):
+ return [
+ {
+ "start_line": entry["start_line"],
+ "end_line": entry["end_line"],
+ "text": self.get_annotation_text(entry, field_name, value_key),
+ "type": "info",
+ }
+ for entry in getattr(self.object, field_name)
+ ]
+
+ def get_context_data(self, **kwargs):
+ context = super().get_context_data(**kwargs)
+
+ try:
+ context["file_content"] = self.object.file_content
+ except OSError:
+ raise Http404("File not found.")
+
+ context["detected_values"] = {
+ "licenses": self.get_annotations("licenses"),
+ "copyrights": self.get_annotations("copyrights"),
+ "holders": self.get_annotations("holders"),
+ "authors": self.get_annotations("authors"),
+ "emails": self.get_annotations("emails", value_key="email"),
+ "urls": self.get_annotations("urls", value_key="url"),
+ }
+
+ return context