diff --git a/vulntotal/datasources/gitlab.py b/vulntotal/datasources/gitlab.py index 27b0e7517..dbf84dce7 100644 --- a/vulntotal/datasources/gitlab.py +++ b/vulntotal/datasources/gitlab.py @@ -41,18 +41,17 @@ def datasource_advisory(self, purl) -> Iterable[VendorData]: VendorData instance containing the advisory information for the package. """ package_slug = get_package_slug(purl) - location = download_subtree(package_slug, speculative_execution=True) - if not location: - clear_download(location) + directory_files = fetch_directory_contents(package_slug) + if not directory_files: path = self.supported_ecosystem()[purl.type] casesensitive_package_slug = get_casesensitive_slug(path, package_slug) - location = download_subtree(casesensitive_package_slug) - if location: - interesting_advisories = parse_interesting_advisories( - location, purl, delete_download=True - ) + directory_files = fetch_directory_contents(casesensitive_package_slug) + + if directory_files: + yml_files = [file for file in directory_files if file["name"].endswith(".yml")] + + interesting_advisories = parse_interesting_advisories(yml_files, purl) return interesting_advisories - clear_download(location) @classmethod def supported_ecosystem(cls): @@ -68,6 +67,21 @@ def supported_ecosystem(cls): } +def fetch_directory_contents(package_slug): + url = f"https://gitlab.com/api/v4/projects/12006272/repository/tree?path={package_slug}" + response = requests.get(url) + if response.status_code == 200: + return response.json() + + +def fetch_yaml(file_path): + response = requests.get( + f"https://gitlab.com/gitlab-org/security-products/gemnasium-db/-/raw/master/{file_path}" + ) + if response.status_code == 200: + return response.text + + def get_package_slug(purl): """ Constructs a package slug from a given purl. @@ -92,43 +106,6 @@ def get_package_slug(purl): return f"{ecosystem}/{package_name}" -def download_subtree(package_slug: str, speculative_execution=False): - """ - Downloads and extracts a tar file from a given package slug. - - Parameters: - package_slug: A string representing the package slug to query. - speculative_execution: A boolean indicating whether to log errors or not. - - Returns: - A Path object representing the extracted location, or None if an error occurs. - """ - url = f"https://gitlab.com/gitlab-org/security-products/gemnasium-db/-/archive/master/gemnasium-db-master.tar.gz?path={package_slug}" - response = fetch(url) - if os.path.getsize(response.location) > 0: - extracted_location = Path(response.location).parent.joinpath( - "temp_vulntotal_gitlab_datasource" - ) - with tarfile.open(response.location, "r") as file_obj: - file_obj.extractall(extracted_location) - os.remove(response.location) - return extracted_location - if not speculative_execution: - logger.error(f"{package_slug} doesn't exist") - os.remove(response.location) - - -def clear_download(location): - """ - Deletes a directory and its contents. - - Parameters: - location: A Path object representing the directory to delete. - """ - if location: - shutil.rmtree(location) - - def get_casesensitive_slug(path, package_slug): payload = [ { @@ -186,26 +163,22 @@ def get_casesensitive_slug(path, package_slug): has_next = paginated_tree["pageInfo"]["hasNextPage"] -def parse_interesting_advisories(location, purl, delete_download=False) -> Iterable[VendorData]: +def parse_interesting_advisories(yml_files, purl) -> Iterable[VendorData]: """ Parses advisories from YAML files in a given location that match a given version. Parameters: - location: A Path object representing the location of the YAML files. + yml_files: An array having the paths of yml files to parse. purl: PURL for the advisory. - version: A string representing the version to check against the affected range. - delete_download: A boolean indicating whether to delete the downloaded files after parsing. Yields: VendorData instance containing the advisory information for the package. """ version = purl.version - path = Path(location) - pattern = "**/*.yml" - files = [p for p in path.glob(pattern) if p.is_file()] - for file in sorted(files): - with open(file) as f: - gitlab_advisory = saneyaml.load(f) + + for file in yml_files: + yml_data = fetch_yaml(file["path"]) + gitlab_advisory = saneyaml.load(yml_data) affected_range = gitlab_advisory["affected_range"] if gitlab_constraints_satisfied(affected_range, version): yield VendorData( @@ -214,5 +187,3 @@ def parse_interesting_advisories(location, purl, delete_download=False) -> Itera affected_versions=[affected_range], fixed_versions=gitlab_advisory["fixed_versions"], ) - if delete_download: - clear_download(location) diff --git a/vulntotal/tests/test_gitlab.py b/vulntotal/tests/test_gitlab.py index e7f38009d..2870e81dc 100644 --- a/vulntotal/tests/test_gitlab.py +++ b/vulntotal/tests/test_gitlab.py @@ -1,4 +1,3 @@ -# # Copyright (c) nexB Inc. and others. All rights reserved. # VulnerableCode is a trademark of nexB Inc. # SPDX-License-Identifier: Apache-2.0 @@ -8,6 +7,7 @@ # from pathlib import Path +from unittest import mock from commoncode import testcase from packageurl import PackageURL @@ -32,17 +32,36 @@ def test_generate_package_advisory_url(self): expected_file = self.get_test_loc("package_advisory_url-expected.json", must_exist=False) util_tests.check_results_against_json(results, expected_file) - def test_parse_html_advisory(self): + @mock.patch("vulntotal.datasources.gitlab.fetch_yaml") + def test_parse_interesting_advisories(self, mock_fetch_yaml): + # Mock the yaml file responses advisory_folder = ( Path(__file__) .resolve() - .parent.joinpath("test_data/gitlab/temp_vulntotal_gitlab_datasource") - ) - results = [ - adv.to_dict() - for adv in gitlab.parse_interesting_advisories( - advisory_folder, PackageURL("generic", "namespace", "test", "0.1.1"), False + .parent.joinpath( + "test_data/gitlab/temp_vulntotal_gitlab_datasource/gemnasium-db-master-pypi-Jinja2/pypi/Jinja2" ) + ) + yaml_files = [] + sorted_files = sorted(advisory_folder.iterdir(), key=lambda x: x.name) + for file in sorted_files: + if file.suffix == ".yml": + with open(file, "r") as f: + yaml_files.append(f.read()) + + mock_fetch_yaml.side_effect = yaml_files + + purl = PackageURL("generic", "namespace", "test", "0.1.1") + + yml_files = [ + {"name": "CVE-2014-1402.yml", "path": "path/to/CVE-2014-1402.yml"}, + {"name": "CVE-2016-10745.yml", "path": "path/to/CVE-2016-10745.yml"}, + {"name": "CVE-2019-10906.yml", "path": "path/to/CVE-2019-10906.yml"}, + {"name": "CVE-2019-8341.yml", "path": "path/to/CVE-2019-8341.yml"}, + {"name": "CVE-2020-28493.yml", "path": "path/to/CVE-2020-28493.yml"}, ] + + results = [adv.to_dict() for adv in gitlab.parse_interesting_advisories(yml_files, purl)] + expected_file = self.get_test_loc("parsed_advisory-expected.json", must_exist=False) util_tests.check_results_against_json(results, expected_file)