Skip to content

Commit

Permalink
Refactor OVAL-relared code, fix failing tests #1079
Browse files Browse the repository at this point in the history
Reference: #1079

Signed-off-by: John M. Horan <johnmhoran@gmail.com>
  • Loading branch information
johnmhoran committed Feb 8, 2023
1 parent 3786195 commit 10bd0bd
Show file tree
Hide file tree
Showing 12 changed files with 1,701,298 additions and 24,019 deletions.
94 changes: 2 additions & 92 deletions vulnerabilities/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,83 +421,6 @@ def advisory_data(self) -> List[AdvisoryData]:
)
continue

# def get_data_from_xml_doc(
# self, xml_doc: ET.ElementTree, pkg_metadata={}
# ) -> Iterable[AdvisoryData]:
# """
# The orchestration method of the OvalDataSource. This method breaks an
# OVAL xml ElementTree into a list of `Advisory`.

# Note: pkg_metadata is a mapping of Package URL data that MUST INCLUDE
# "type" key.

# Example value of pkg_metadata:
# {"type":"deb","qualifiers":{"distro":"buster"} }
# """
# oval_parsed_data = OvalParser(self.translations, xml_doc)
# raw_data = oval_parsed_data.get_data()
# oval_doc = oval_parsed_data.oval_document
# timestamp = oval_doc.getGenerator().getTimestamp()

# print("\noval_parsed_data = {}\n".format(oval_parsed_data))
# print("\nraw_data = {}\n".format(raw_data))

# # convert definition_data to Advisory objects
# for definition_data in raw_data:
# print("\ndefinition_data = {}\n".format(definition_data))
# # These fields are definition level, i.e common for all elements
# # connected/linked to an OvalDefinition

# # TODO: 2023-01-24 Tuesday 22:34:20. Is this where we'd loop through the list of CVEs/aliases?

# vuln_id = definition_data["vuln_id"]
# description = definition_data["description"]

# severities = []
# severity = definition_data.get("severity")
# if severity:
# severities.append(
# VulnerabilitySeverity(system=severity_systems.GENERIC, value=severity)
# )
# references = [
# Reference(url=url, severities=severities)
# for url in definition_data["reference_urls"]
# ]
# affected_packages = []
# print('\ndefinition_data["test_data"] = {}\n'.format(definition_data["test_data"]))
# for test_data in definition_data["test_data"]:
# print("\ntest_data['package_list'] = {}\n".format(test_data["package_list"]))
# for package_name in test_data["package_list"]:
# affected_version_range = test_data["version_ranges"]
# vrc = RANGE_CLASS_BY_SCHEMES[pkg_metadata["type"]]
# if affected_version_range:
# try:
# affected_version_range = vrc.from_native(affected_version_range)
# except Exception as e:
# logger.error(
# f"Failed to parse version range {affected_version_range!r} "
# f"for package {package_name!r}:\n{e}"
# )
# continue
# if package_name:
# affected_packages.append(
# AffectedPackage(
# package=self.create_purl(package_name, pkg_metadata),
# affected_version_range=affected_version_range,
# )
# )
# print("affected_packages = {}".format(affected_packages))
# date_published = dateparser.parse(timestamp)
# if not date_published.tzinfo:
# date_published = date_published.replace(tzinfo=pytz.UTC)
# yield AdvisoryData(
# aliases=[vuln_id],
# summary=description,
# affected_packages=sorted(affected_packages),
# references=sorted(references),
# date_published=date_published,
# )

def get_data_from_xml_doc(
self, xml_doc: ET.ElementTree, pkg_metadata={}
) -> Iterable[AdvisoryData]:
Expand All @@ -516,23 +439,15 @@ def get_data_from_xml_doc(
oval_doc = oval_parsed_data.oval_document
timestamp = oval_doc.getGenerator().getTimestamp()

print("\n== Run OvalImporter() get_data_from_xml_doc() ==\n")

# print("\noval_parsed_data = {}\n".format(oval_parsed_data))
print("\n==> raw_data = {}\n".format(raw_data))

# convert definition_data to Advisory objects
for definition_data in raw_data:
print("\n==> definition_data = {}\n".format(definition_data))
# These fields are definition level, i.e common for all elements
# connected/linked to an OvalDefinition

# NOTE: This is where we loop through the list of CVEs/aliases.

vuln_id_list = definition_data["vuln_id"]

for vuln_id_item in vuln_id_list:
# vuln_id = definition_data["vuln_id"]
vuln_id = vuln_id_item
description = definition_data["description"]

Expand All @@ -547,13 +462,8 @@ def get_data_from_xml_doc(
for url in definition_data["reference_urls"]
]
affected_packages = []
print(
'\n==> definition_data["test_data"] = {}\n'.format(definition_data["test_data"])
)

for test_data in definition_data["test_data"]:
print(
"\n==> test_data['package_list'] = {}\n".format(test_data["package_list"])
)
for package_name in test_data["package_list"]:
affected_version_range = test_data["version_ranges"]
vrc = RANGE_CLASS_BY_SCHEMES[pkg_metadata["type"]]
Expand All @@ -573,7 +483,7 @@ def get_data_from_xml_doc(
affected_version_range=affected_version_range,
)
)
print("==> affected_packages = {}\n".format(affected_packages))

date_published = dateparser.parse(timestamp)
if not date_published.tzinfo:
date_published = date_published.replace(tzinfo=pytz.UTC)
Expand Down
54 changes: 14 additions & 40 deletions vulnerabilities/oval_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,46 +36,29 @@ def get_data(self) -> List[Dict]:
Return a list of OvalDefinition mappings.
"""
oval_data = []
print("\nlen(self.all_definitions) = {}\n".format(len(self.all_definitions)))
for definition in self.all_definitions:
# print(definition)
# print(list(definition))

matching_tests = self.get_tests_of_definition(definition)
if not matching_tests:
continue
definition_data = {"test_data": []}
# TODO:this could use some data cleaning
definition_data["description"] = definition.getMetadata().getDescription() or ""

definition_data["vuln_id"] = self.get_vuln_id_from_definition(definition)
definition_data["reference_urls"] = self.get_urls_from_definition(definition)

definition_data["severity"] = self.get_severity_from_definition(definition)
print("\nlen(matching_tests) = {}\n".format(len(matching_tests)))
# print("\nmatching_tests = {}\n".format(matching_tests))

for test in matching_tests:
# print("\ntest = {}\n".format(test))
# print("\ntest.element = {}\n".format(test.element))
test_obj, test_state = self.get_object_state_of_test(test)
if not test_obj or not test_state:
continue
test_data = {"package_list": []}
# print("\ntest_obj = {}\n".format(test_obj))
test_data["package_list"].extend(self.get_pkgs_from_obj(test_obj))
print(
"\nself.get_pkgs_from_obj(test_obj) = {}\n".format(
self.get_pkgs_from_obj(test_obj)
)
)
version_ranges = self.get_version_range_from_state(test_state)
test_data["version_ranges"] = version_ranges
definition_data["test_data"].append(test_data)

oval_data.append(definition_data)

# print('\ntest_data["package_list"] = {}\n'.format(test_data["package_list"]))

return oval_data

def get_tests_of_definition(self, definition: OvalDefinition) -> List[OvalTest]:
Expand All @@ -86,25 +69,28 @@ def get_tests_of_definition(self, definition: OvalDefinition) -> List[OvalTest]:
criteria_refs = []

for child in definition.element.iter():

if "test_ref" in child.attrib:
criteria_refs.append(child.get("test_ref"))

matching_tests = []
for ref in criteria_refs:
oval_test = self.oval_document.getElementByID(ref)
# All matches will be `rpminfo_test` elements inside the `tests` element.
# Test for len == 2 because this IDs a pair of nested `object` and `state` elements.
if len(oval_test.element) == 2:
_, state = self.get_object_state_of_test(oval_test)
valid_test = True
for child in state.element:
if child.get("operation") not in self.translations:
valid_test = False
break
if valid_test:
matching_tests.append(self.oval_document.getElementByID(ref))
print("\nThese are matching_tests: {}".format(matching_tests))
for mt in matching_tests:
print("mt = {}".format(mt.element))
continue
elif (
child.get("operation") in self.translations
# "debian_evr_string" is used in both Debian and Ubuntu test XML files; SUSE OVAL uses "evr_string".
# See also https://github.com/OVALProject/Language/blob/master/docs/oval-common-schema.md
and child.get("datatype") in ["evr_string", "debian_evr_string"]
):
matching_tests.append(self.oval_document.getElementByID(ref))

return list(set(matching_tests))

Expand All @@ -126,6 +112,7 @@ def get_pkgs_from_obj(self, obj: OvalObject) -> List[str]:
pkg_list = []

for var in obj.element:
# It appears that `var_ref` is used in Ubuntu OVAL but not Debian or SUSE.
if var.get("var_ref"):
var_elem = self.oval_document.getElementByID(var.get("var_ref"))
comment = var_elem.element.get("comment")
Expand Down Expand Up @@ -195,26 +182,13 @@ def get_severity_from_definition(definition: OvalDefinition) -> Set[str]:

@staticmethod
def get_vuln_id_from_definition(definition):
# # SUSE and Ubuntu OVAL files will get cves via this loop
# for child in definition.element.iter():
# # if child.get("ref_id"):
# # return child.get("ref_id")
# # Must also check whether 'source' field exists and value is 'CVE'
# # TODO: what if there are multiple elements that satisfy the condition?
# # Add to list and report as separate AdvisoryData() objects?
# if child.get("ref_id") and child.get("source"):
# if child.get("source") == "CVE":
# return child.get("ref_id")
# # Debian OVAL files will get cves via this
# return definition.getMetadata().getTitle()
# ========================================================
# SUSE and Ubuntu OVAL files will get CVEs via this loop.
cve_list = []
for child in definition.element.iter():
if child.get("ref_id") and child.get("source"):
if child.get("source") == "CVE":
cve_list.append(child.get("ref_id"))

# Debian OVAL files will get cves via this
# Debian OVAL files (no "ref_id") will get CVEs via this.
if len(cve_list) == 0:
cve_list.append(definition.getMetadata().getTitle())

Expand Down
Loading

0 comments on commit 10bd0bd

Please sign in to comment.