Skip to content

Commit

Permalink
Refactor GitlabDataSource to work with browser extension (#1524)
Browse files Browse the repository at this point in the history
* -Refactor GitlabDataSource to improve code readability and performance

-Update Gitlab Datasource tests

-Reorder test_parse_interesting_advisories test files

-Format changed files with black

-Merge fetch yml logic and update method doc

Signed-off-by: Michael Ehab Mikhail <michael.ehab@hotmail.com>

* Add another check to avoid iterating on None

Signed-off-by: Michael Ehab Mikhail <michael.ehab@hotmail.com>

* Restore test file and update gitlab tests

Signed-off-by: Michael Ehab Mikhail <michael.ehab@hotmail.com>

---------

Signed-off-by: Michael Ehab Mikhail <michael.ehab@hotmail.com>
  • Loading branch information
michaelehab authored Aug 19, 2024
1 parent b7a7237 commit d62f377
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 66 deletions.
87 changes: 29 additions & 58 deletions vulntotal/datasources/gitlab.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,17 @@ def datasource_advisory(self, purl) -> Iterable[VendorData]:
VendorData instance containing the advisory information for the package.
"""
package_slug = get_package_slug(purl)
location = download_subtree(package_slug, speculative_execution=True)
if not location:
clear_download(location)
directory_files = fetch_directory_contents(package_slug)
if not directory_files:
path = self.supported_ecosystem()[purl.type]
casesensitive_package_slug = get_casesensitive_slug(path, package_slug)
location = download_subtree(casesensitive_package_slug)
if location:
interesting_advisories = parse_interesting_advisories(
location, purl, delete_download=True
)
directory_files = fetch_directory_contents(casesensitive_package_slug)

if directory_files:
yml_files = [file for file in directory_files if file["name"].endswith(".yml")]

interesting_advisories = parse_interesting_advisories(yml_files, purl)
return interesting_advisories
clear_download(location)

@classmethod
def supported_ecosystem(cls):
Expand All @@ -68,6 +67,21 @@ def supported_ecosystem(cls):
}


def fetch_directory_contents(package_slug):
url = f"https://gitlab.com/api/v4/projects/12006272/repository/tree?path={package_slug}"
response = requests.get(url)
if response.status_code == 200:
return response.json()


def fetch_yaml(file_path):
response = requests.get(
f"https://gitlab.com/gitlab-org/security-products/gemnasium-db/-/raw/master/{file_path}"
)
if response.status_code == 200:
return response.text


def get_package_slug(purl):
"""
Constructs a package slug from a given purl.
Expand All @@ -92,43 +106,6 @@ def get_package_slug(purl):
return f"{ecosystem}/{package_name}"


def download_subtree(package_slug: str, speculative_execution=False):
"""
Downloads and extracts a tar file from a given package slug.
Parameters:
package_slug: A string representing the package slug to query.
speculative_execution: A boolean indicating whether to log errors or not.
Returns:
A Path object representing the extracted location, or None if an error occurs.
"""
url = f"https://gitlab.com/gitlab-org/security-products/gemnasium-db/-/archive/master/gemnasium-db-master.tar.gz?path={package_slug}"
response = fetch(url)
if os.path.getsize(response.location) > 0:
extracted_location = Path(response.location).parent.joinpath(
"temp_vulntotal_gitlab_datasource"
)
with tarfile.open(response.location, "r") as file_obj:
file_obj.extractall(extracted_location)
os.remove(response.location)
return extracted_location
if not speculative_execution:
logger.error(f"{package_slug} doesn't exist")
os.remove(response.location)


def clear_download(location):
"""
Deletes a directory and its contents.
Parameters:
location: A Path object representing the directory to delete.
"""
if location:
shutil.rmtree(location)


def get_casesensitive_slug(path, package_slug):
payload = [
{
Expand Down Expand Up @@ -186,26 +163,22 @@ def get_casesensitive_slug(path, package_slug):
has_next = paginated_tree["pageInfo"]["hasNextPage"]


def parse_interesting_advisories(location, purl, delete_download=False) -> Iterable[VendorData]:
def parse_interesting_advisories(yml_files, purl) -> Iterable[VendorData]:
"""
Parses advisories from YAML files in a given location that match a given version.
Parameters:
location: A Path object representing the location of the YAML files.
yml_files: An array having the paths of yml files to parse.
purl: PURL for the advisory.
version: A string representing the version to check against the affected range.
delete_download: A boolean indicating whether to delete the downloaded files after parsing.
Yields:
VendorData instance containing the advisory information for the package.
"""
version = purl.version
path = Path(location)
pattern = "**/*.yml"
files = [p for p in path.glob(pattern) if p.is_file()]
for file in sorted(files):
with open(file) as f:
gitlab_advisory = saneyaml.load(f)

for file in yml_files:
yml_data = fetch_yaml(file["path"])
gitlab_advisory = saneyaml.load(yml_data)
affected_range = gitlab_advisory["affected_range"]
if gitlab_constraints_satisfied(affected_range, version):
yield VendorData(
Expand All @@ -214,5 +187,3 @@ def parse_interesting_advisories(location, purl, delete_download=False) -> Itera
affected_versions=[affected_range],
fixed_versions=gitlab_advisory["fixed_versions"],
)
if delete_download:
clear_download(location)
35 changes: 27 additions & 8 deletions vulntotal/tests/test_gitlab.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
Expand All @@ -8,6 +7,7 @@
#

from pathlib import Path
from unittest import mock

from commoncode import testcase
from packageurl import PackageURL
Expand All @@ -32,17 +32,36 @@ def test_generate_package_advisory_url(self):
expected_file = self.get_test_loc("package_advisory_url-expected.json", must_exist=False)
util_tests.check_results_against_json(results, expected_file)

def test_parse_html_advisory(self):
@mock.patch("vulntotal.datasources.gitlab.fetch_yaml")
def test_parse_interesting_advisories(self, mock_fetch_yaml):
# Mock the yaml file responses
advisory_folder = (
Path(__file__)
.resolve()
.parent.joinpath("test_data/gitlab/temp_vulntotal_gitlab_datasource")
)
results = [
adv.to_dict()
for adv in gitlab.parse_interesting_advisories(
advisory_folder, PackageURL("generic", "namespace", "test", "0.1.1"), False
.parent.joinpath(
"test_data/gitlab/temp_vulntotal_gitlab_datasource/gemnasium-db-master-pypi-Jinja2/pypi/Jinja2"
)
)
yaml_files = []
sorted_files = sorted(advisory_folder.iterdir(), key=lambda x: x.name)
for file in sorted_files:
if file.suffix == ".yml":
with open(file, "r") as f:
yaml_files.append(f.read())

mock_fetch_yaml.side_effect = yaml_files

purl = PackageURL("generic", "namespace", "test", "0.1.1")

yml_files = [
{"name": "CVE-2014-1402.yml", "path": "path/to/CVE-2014-1402.yml"},
{"name": "CVE-2016-10745.yml", "path": "path/to/CVE-2016-10745.yml"},
{"name": "CVE-2019-10906.yml", "path": "path/to/CVE-2019-10906.yml"},
{"name": "CVE-2019-8341.yml", "path": "path/to/CVE-2019-8341.yml"},
{"name": "CVE-2020-28493.yml", "path": "path/to/CVE-2020-28493.yml"},
]

results = [adv.to_dict() for adv in gitlab.parse_interesting_advisories(yml_files, purl)]

expected_file = self.get_test_loc("parsed_advisory-expected.json", must_exist=False)
util_tests.check_results_against_json(results, expected_file)

0 comments on commit d62f377

Please sign in to comment.