diff --git a/vulnerabilities/tests/example_importer_improver.py b/vulnerabilities/tests/example_importer_improver.py new file mode 100644 index 000000000..bdecf3204 --- /dev/null +++ b/vulnerabilities/tests/example_importer_improver.py @@ -0,0 +1,97 @@ +from datetime import datetime +from datetime import timezone +from typing import Iterable + +import requests +from django.db.models.query import QuerySet +from packageurl import PackageURL +from univers.version_range import NginxVersionRange +from univers.versions import SemverVersion + +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.importer import AffectedPackage +from vulnerabilities.importer import Importer +from vulnerabilities.importer import Reference +from vulnerabilities.importer import VulnerabilitySeverity +from vulnerabilities.improver import MAX_CONFIDENCE +from vulnerabilities.improver import Improver +from vulnerabilities.improver import Inference +from vulnerabilities.models import Advisory +from vulnerabilities.severity_systems import SCORING_SYSTEMS + + +class ExampleImporter(Importer): + + spdx_license_expression = "BSD-2-Clause" + + def advisory_data(self) -> Iterable[AdvisoryData]: + raw_data = fetch_advisory_data() + for data in raw_data: + yield parse_advisory_data(data) + + +def fetch_advisory_data(): + return [ + { + "id": "CVE-2021-230171337", + "summary": "1-byte memory overwrite in resolver", + "advisory_severity": "medium", + "vulnerable": "0.6.18-1.20.0", + "fixed": "1.20.1", + "reference": "http://mailman.nginx.org/pipermail/nginx-announce/2021/000300.html", + "published_on": "14-02-2021 UTC", + }, + { + "id": "CVE-2021-12341337", + "summary": "Dummy advisory", + "advisory_severity": "high", + "vulnerable": "0.6.18-1.20.0", + "fixed": "1.20.1", + "reference": "http://example.com/cve-2021-1234", + "published_on": "06-10-2021 UTC", + }, + ] + + +def parse_advisory_data(raw_data) -> AdvisoryData: + purl = PackageURL(type="example", name="dummy_package") + affected_version_range = NginxVersionRange.from_native(raw_data["vulnerable"]) + fixed_version = SemverVersion(raw_data["fixed"]) + affected_package = AffectedPackage( + package=purl, affected_version_range=affected_version_range, fixed_version=fixed_version + ) + severity = VulnerabilitySeverity( + system=SCORING_SYSTEMS["generic_textual"], value=raw_data["advisory_severity"] + ) + references = [Reference(url=raw_data["reference"], severities=[severity])] + date_published = datetime.strptime(raw_data["published_on"], "%d-%m-%Y %Z").replace( + tzinfo=timezone.utc + ) + + return AdvisoryData( + aliases=[raw_data["id"]], + summary=raw_data["summary"], + affected_packages=[affected_package], + references=references, + date_published=date_published, + ) + + +class ExampleAliasImprover(Improver): + @property + def interesting_advisories(self) -> QuerySet: + return Advisory.objects.filter(created_by=ExampleImporter.qualified_name) + + def get_inferences(self, advisory_data) -> Iterable[Inference]: + for alias in advisory_data.aliases: + new_aliases = fetch_additional_aliases(alias) + aliases = new_aliases + [alias] + yield Inference(aliases=aliases, confidence=MAX_CONFIDENCE) + + +def fetch_additional_aliases(alias): + alias_map = { + "CVE-2021-230171337": ["PYSEC-1337", "CERTIN-1337"], + "CVE-2021-12341337": ["ANONSEC-1337", "CERTDES-1337"], + } + return alias_map.get(alias) diff --git a/vulnerabilities/tests/test_example.py b/vulnerabilities/tests/test_example.py new file mode 100644 index 000000000..77e55506f --- /dev/null +++ b/vulnerabilities/tests/test_example.py @@ -0,0 +1,123 @@ +import datetime +import os +from unittest.mock import patch + +from django.test import TestCase +from packageurl import PackageURL +from univers.version_constraint import VersionConstraint +from univers.version_range import NginxVersionRange +from univers.versions import SemverVersion + +from vulnerabilities import models +from vulnerabilities.import_runner import ImportRunner +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.importer import AffectedPackage +from vulnerabilities.importer import Reference +from vulnerabilities.importer import ScoringSystem +from vulnerabilities.importer import VulnerabilitySeverity +from vulnerabilities.improve_runner import ImproveRunner +from vulnerabilities.improvers.default import DefaultImprover +from vulnerabilities.tests.example_importer_improver import ExampleAliasImprover +from vulnerabilities.tests.example_importer_improver import ExampleImporter +from vulnerabilities.tests.example_importer_improver import parse_advisory_data + + +def mock_fetch_advisory_data(): + return [ + { + "id": "CVE-2021-12341337", + "summary": "Dummy advisory", + "advisory_severity": "high", + "vulnerable": "0.6.18-1.20.0", + "fixed": "1.20.1", + "reference": "http://example.com/cve-2021-1234", + "published_on": "06-10-2021 UTC", + } + ] + + +def mock_fetch_additional_aliases(alias): + alias_map = { + "CVE-2021-12341337": ["ANONSEC-1337", "CERTDES-1337"], + } + return alias_map.get(alias) + + +@patch( + "vulnerabilities.tests.example_importer_improver.fetch_advisory_data", mock_fetch_advisory_data +) +@patch( + "vulnerabilities.tests.example_importer_improver.fetch_additional_aliases", + mock_fetch_additional_aliases, +) +class TestExampleImporter(TestCase): + def test_parse_advisory_data(self): + raw_data = mock_fetch_advisory_data()[0] + expected = AdvisoryData( + aliases=["CVE-2021-12341337"], + summary="Dummy advisory", + affected_packages=[ + AffectedPackage( + package=PackageURL( + type="example", + namespace=None, + name="dummy_package", + version=None, + qualifiers={}, + subpath=None, + ), + affected_version_range=NginxVersionRange( + constraints=( + VersionConstraint( + comparator=">=", version=SemverVersion(string="0.6.18") + ), + VersionConstraint( + comparator="<=", version=SemverVersion(string="1.20.0") + ), + ) + ), + fixed_version=SemverVersion(string="1.20.1"), + ) + ], + references=[ + Reference( + reference_id="", + url="http://example.com/cve-2021-1234", + severities=[ + VulnerabilitySeverity( + system=ScoringSystem( + identifier="generic_textual", + name="Generic textual severity rating", + url="", + notes="Severity for unknown scoring systems. Contains generic textual values like High, Low etc", + ), + value="high", + ) + ], + ) + ], + date_published=datetime.datetime(2021, 10, 6, 0, 0, tzinfo=datetime.timezone.utc), + ) + actual = parse_advisory_data(raw_data) + assert actual == expected + + def test_import_framework_using_example_importer(self): + raw_datas = mock_fetch_advisory_data() + ImportRunner(ExampleImporter).run() + + for raw_data in raw_datas: + assert models.Advisory.objects.get(aliases__contains=raw_data["id"]) + + def test_improve_framework_using_example_improver(self): + ImportRunner(ExampleImporter).run() + ImproveRunner(DefaultImprover).run() + ImproveRunner(ExampleAliasImprover).run() + raw_datas = mock_fetch_advisory_data() + + assert models.Package.objects.count() == 3 + assert models.PackageRelatedVulnerability.objects.filter(fix=True).count() == 1 + assert models.PackageRelatedVulnerability.objects.filter(fix=False).count() == 2 + assert models.VulnerabilitySeverity.objects.count() == 1 + assert models.VulnerabilityReference.objects.count() == 1 + for raw_data in raw_datas: + assert models.Vulnerability.objects.get(summary=raw_data["summary"])