Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add example importer and improver #672

Merged
merged 5 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions vulnerabilities/tests/example_importer_improver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from datetime import datetime
from datetime import timezone
from typing import Iterable

import requests
from django.db.models.query import QuerySet
from packageurl import PackageURL
from univers.version_range import NginxVersionRange
from univers.versions import SemverVersion

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackage
from vulnerabilities.importer import Importer
from vulnerabilities.importer import Reference
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.improver import MAX_CONFIDENCE
from vulnerabilities.improver import Improver
from vulnerabilities.improver import Inference
from vulnerabilities.models import Advisory
from vulnerabilities.severity_systems import SCORING_SYSTEMS


class ExampleImporter(Importer):

spdx_license_expression = "BSD-2-Clause"

def advisory_data(self) -> Iterable[AdvisoryData]:
raw_data = fetch_advisory_data()
for data in raw_data:
yield parse_advisory_data(data)


def fetch_advisory_data():
return [
{
"id": "CVE-2021-230171337",
"summary": "1-byte memory overwrite in resolver",
"advisory_severity": "medium",
"vulnerable": "0.6.18-1.20.0",
"fixed": "1.20.1",
"reference": "http://mailman.nginx.org/pipermail/nginx-announce/2021/000300.html",
"published_on": "14-02-2021 UTC",
},
{
"id": "CVE-2021-12341337",
"summary": "Dummy advisory",
"advisory_severity": "high",
"vulnerable": "0.6.18-1.20.0",
"fixed": "1.20.1",
"reference": "http://example.com/cve-2021-1234",
"published_on": "06-10-2021 UTC",
},
]


def parse_advisory_data(raw_data) -> AdvisoryData:
purl = PackageURL(type="example", name="dummy_package")
affected_version_range = NginxVersionRange.from_native(raw_data["vulnerable"])
fixed_version = SemverVersion(raw_data["fixed"])
affected_package = AffectedPackage(
package=purl, affected_version_range=affected_version_range, fixed_version=fixed_version
)
severity = VulnerabilitySeverity(
system=SCORING_SYSTEMS["generic_textual"], value=raw_data["advisory_severity"]
)
references = [Reference(url=raw_data["reference"], severities=[severity])]
date_published = datetime.strptime(raw_data["published_on"], "%d-%m-%Y %Z").replace(
tzinfo=timezone.utc
)

return AdvisoryData(
aliases=[raw_data["id"]],
summary=raw_data["summary"],
affected_packages=[affected_package],
references=references,
date_published=date_published,
)


class ExampleAliasImprover(Improver):
@property
def interesting_advisories(self) -> QuerySet:
return Advisory.objects.filter(created_by=ExampleImporter.qualified_name)

def get_inferences(self, advisory_data) -> Iterable[Inference]:
for alias in advisory_data.aliases:
new_aliases = fetch_additional_aliases(alias)
aliases = new_aliases + [alias]
yield Inference(aliases=aliases, confidence=MAX_CONFIDENCE)


def fetch_additional_aliases(alias):
alias_map = {
"CVE-2021-230171337": ["PYSEC-1337", "CERTIN-1337"],
"CVE-2021-12341337": ["ANONSEC-1337", "CERTDES-1337"],
}
return alias_map.get(alias)
123 changes: 123 additions & 0 deletions vulnerabilities/tests/test_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import datetime
import os
from unittest.mock import patch

from django.test import TestCase
from packageurl import PackageURL
from univers.version_constraint import VersionConstraint
from univers.version_range import NginxVersionRange
from univers.versions import SemverVersion

from vulnerabilities import models
from vulnerabilities.import_runner import ImportRunner
from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackage
from vulnerabilities.importer import Reference
from vulnerabilities.importer import ScoringSystem
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.improve_runner import ImproveRunner
from vulnerabilities.improvers.default import DefaultImprover
from vulnerabilities.tests.example_importer_improver import ExampleAliasImprover
from vulnerabilities.tests.example_importer_improver import ExampleImporter
from vulnerabilities.tests.example_importer_improver import parse_advisory_data


def mock_fetch_advisory_data():
return [
{
"id": "CVE-2021-12341337",
"summary": "Dummy advisory",
"advisory_severity": "high",
"vulnerable": "0.6.18-1.20.0",
"fixed": "1.20.1",
"reference": "http://example.com/cve-2021-1234",
"published_on": "06-10-2021 UTC",
}
]


def mock_fetch_additional_aliases(alias):
alias_map = {
"CVE-2021-12341337": ["ANONSEC-1337", "CERTDES-1337"],
}
return alias_map.get(alias)


@patch(
"vulnerabilities.tests.example_importer_improver.fetch_advisory_data", mock_fetch_advisory_data
)
@patch(
"vulnerabilities.tests.example_importer_improver.fetch_additional_aliases",
mock_fetch_additional_aliases,
)
class TestExampleImporter(TestCase):
def test_parse_advisory_data(self):
raw_data = mock_fetch_advisory_data()[0]
expected = AdvisoryData(
aliases=["CVE-2021-12341337"],
summary="Dummy advisory",
affected_packages=[
AffectedPackage(
package=PackageURL(
type="example",
namespace=None,
name="dummy_package",
version=None,
qualifiers={},
subpath=None,
),
affected_version_range=NginxVersionRange(
constraints=(
VersionConstraint(
comparator=">=", version=SemverVersion(string="0.6.18")
),
VersionConstraint(
comparator="<=", version=SemverVersion(string="1.20.0")
),
)
),
fixed_version=SemverVersion(string="1.20.1"),
)
],
references=[
Reference(
reference_id="",
url="http://example.com/cve-2021-1234",
severities=[
VulnerabilitySeverity(
system=ScoringSystem(
identifier="generic_textual",
name="Generic textual severity rating",
url="",
notes="Severity for unknown scoring systems. Contains generic textual values like High, Low etc",
),
value="high",
)
],
)
],
date_published=datetime.datetime(2021, 10, 6, 0, 0, tzinfo=datetime.timezone.utc),
)
actual = parse_advisory_data(raw_data)
assert actual == expected

def test_import_framework_using_example_importer(self):
raw_datas = mock_fetch_advisory_data()
ImportRunner(ExampleImporter).run()

for raw_data in raw_datas:
assert models.Advisory.objects.get(aliases__contains=raw_data["id"])

def test_improve_framework_using_example_improver(self):
ImportRunner(ExampleImporter).run()
ImproveRunner(DefaultImprover).run()
ImproveRunner(ExampleAliasImprover).run()
raw_datas = mock_fetch_advisory_data()

assert models.Package.objects.count() == 3
assert models.PackageRelatedVulnerability.objects.filter(fix=True).count() == 1
assert models.PackageRelatedVulnerability.objects.filter(fix=False).count() == 2
assert models.VulnerabilitySeverity.objects.count() == 1
assert models.VulnerabilityReference.objects.count() == 1
for raw_data in raw_datas:
assert models.Vulnerability.objects.get(summary=raw_data["summary"])