Skip to content

Commit

Permalink
Merge pull request #1481 from ziadhany/epss
Browse files Browse the repository at this point in the history
Add Support to EPSS
  • Loading branch information
ziadhany authored Aug 6, 2024
2 parents 84a35db + 0fe73ef commit 1561efe
Show file tree
Hide file tree
Showing 11 changed files with 325 additions and 62 deletions.
9 changes: 8 additions & 1 deletion vulnerabilities/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,14 @@
class VulnerabilitySeveritySerializer(serializers.ModelSerializer):
class Meta:
model = VulnerabilitySeverity
fields = ["value", "scoring_system", "scoring_elements"]
fields = ["value", "scoring_system", "scoring_elements", "published_at"]

def to_representation(self, instance):
data = super().to_representation(instance)
published_at = data.get("published_at", None)
if not published_at:
data.pop("published_at")
return data


class VulnerabilityReferenceSerializer(serializers.ModelSerializer):
Expand Down
1 change: 1 addition & 0 deletions vulnerabilities/import_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ def process_inferences(inferences: List[Inference], advisory: Advisory, improver
defaults={
"value": str(severity.value),
"scoring_elements": str(severity.scoring_elements),
"published_at": str(severity.published_at),
},
)
if updated:
Expand Down
6 changes: 6 additions & 0 deletions vulnerabilities/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,17 @@ class VulnerabilitySeverity:
system: ScoringSystem
value: str
scoring_elements: str = ""
published_at: Optional[datetime.datetime] = None

def to_dict(self):
published_at_dict = (
{"published_at": self.published_at.isoformat()} if self.published_at else {}
)
return {
"system": self.system.identifier,
"value": self.value,
"scoring_elements": self.scoring_elements,
**published_at_dict,
}

@classmethod
Expand All @@ -70,6 +75,7 @@ def from_dict(cls, severity: dict):
system=SCORING_SYSTEMS[severity["system"]],
value=severity["value"],
scoring_elements=severity.get("scoring_elements", ""),
published_at=severity.get("published_at"),
)


Expand Down
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from vulnerabilities.importers import debian
from vulnerabilities.importers import debian_oval
from vulnerabilities.importers import elixir_security
from vulnerabilities.importers import epss
from vulnerabilities.importers import fireeye
from vulnerabilities.importers import gentoo
from vulnerabilities.importers import github
Expand Down Expand Up @@ -71,6 +72,7 @@
oss_fuzz.OSSFuzzImporter,
ruby.RubyImporter,
github_osv.GithubOSVImporter,
epss.EPSSImporter,
]

IMPORTERS_REGISTRY = {x.qualified_name: x for x in IMPORTERS_REGISTRY}
67 changes: 67 additions & 0 deletions vulnerabilities/importers/epss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import csv
import gzip
import logging
import urllib.request
from datetime import datetime
from typing import Iterable

from vulnerabilities import severity_systems
from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import Importer
from vulnerabilities.importer import Reference
from vulnerabilities.importer import VulnerabilitySeverity

logger = logging.getLogger(__name__)


class EPSSImporter(Importer):
"""Exploit Prediction Scoring System (EPSS) Importer"""

advisory_url = "https://epss.cyentia.com/epss_scores-current.csv.gz"
spdx_license_expression = "unknown"
importer_name = "EPSS Importer"

def advisory_data(self) -> Iterable[AdvisoryData]:
response = urllib.request.urlopen(self.advisory_url)
with gzip.open(response, "rb") as f:
lines = [l.decode("utf-8") for l in f.readlines()]

epss_reader = csv.reader(lines)
model_version, score_date = next(
epss_reader
) # score_date='score_date:2024-05-19T00:00:00+0000'
published_at = datetime.strptime(score_date[11::], "%Y-%m-%dT%H:%M:%S%z")

next(epss_reader) # skip the header row
for epss_row in epss_reader:
cve, score, percentile = epss_row

if not cve or not score or not percentile:
logger.error(f"Invalid epss row: {epss_row}")
continue

severity = VulnerabilitySeverity(
system=severity_systems.EPSS,
value=score,
scoring_elements=percentile,
published_at=published_at,
)

references = Reference(
url=f"https://api.first.org/data/v1/epss?cve={cve}",
severities=[severity],
)

yield AdvisoryData(
aliases=[cve],
references=[references],
url=self.advisory_url,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Generated by Django 4.1.13 on 2024-08-06 09:38

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("vulnerabilities", "0058_alter_vulnerabilityreference_options_and_more"),
]

operations = [
migrations.AddField(
model_name="vulnerabilityseverity",
name="published_at",
field=models.DateTimeField(
blank=True,
help_text="UTC Date of publication of the vulnerability severity",
null=True,
),
),
migrations.AlterField(
model_name="vulnerabilityseverity",
name="scoring_system",
field=models.CharField(
choices=[
("cvssv2", "CVSSv2 Base Score"),
("cvssv3", "CVSSv3 Base Score"),
("cvssv3.1", "CVSSv3.1 Base Score"),
("rhbs", "RedHat Bugzilla severity"),
("rhas", "RedHat Aggregate severity"),
("archlinux", "Archlinux Vulnerability Group Severity"),
("cvssv3.1_qr", "CVSSv3.1 Qualitative Severity Rating"),
("generic_textual", "Generic textual severity rating"),
("apache_httpd", "Apache Httpd Severity"),
("apache_tomcat", "Apache Tomcat Severity"),
("epss", "Exploit Prediction Scoring System"),
],
help_text="Identifier for the scoring system used. Available choices are: cvssv2: CVSSv2 Base Score,\ncvssv3: CVSSv3 Base Score,\ncvssv3.1: CVSSv3.1 Base Score,\nrhbs: RedHat Bugzilla severity,\nrhas: RedHat Aggregate severity,\narchlinux: Archlinux Vulnerability Group Severity,\ncvssv3.1_qr: CVSSv3.1 Qualitative Severity Rating,\ngeneric_textual: Generic textual severity rating,\napache_httpd: Apache Httpd Severity,\napache_tomcat: Apache Tomcat Severity,\nepss: Exploit Prediction Scoring System ",
max_length=50,
),
),
]
4 changes: 4 additions & 0 deletions vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,10 @@ class VulnerabilitySeverity(models.Model):
"For example a CVSS vector string as used to compute a CVSS score.",
)

published_at = models.DateTimeField(
blank=True, null=True, help_text="UTC Date of publication of the vulnerability severity"
)

class Meta:
unique_together = ["reference", "scoring_system", "value"]
ordering = ["reference", "scoring_system", "value"]
Expand Down
14 changes: 14 additions & 0 deletions vulnerabilities/severity_systems.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,19 @@ def get(self, scoring_elements: str) -> dict:
"Low",
]


@dataclasses.dataclass(order=True)
class EPSSScoringSystem(ScoringSystem):
def compute(self, scoring_elements: str):
return NotImplementedError


EPSS = EPSSScoringSystem(
identifier="epss",
name="Exploit Prediction Scoring System",
url="https://www.first.org/epss/",
)

SCORING_SYSTEMS = {
system.identifier: system
for system in (
Expand All @@ -170,5 +183,6 @@ def get(self, scoring_elements: str) -> dict:
GENERIC,
APACHE_HTTPD,
APACHE_TOMCAT,
EPSS,
)
}
Loading

0 comments on commit 1561efe

Please sign in to comment.