Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor GitlabDataSource to work with browser extension #1524

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 29 additions & 58 deletions vulntotal/datasources/gitlab.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,17 @@ def datasource_advisory(self, purl) -> Iterable[VendorData]:
VendorData instance containing the advisory information for the package.
"""
package_slug = get_package_slug(purl)
location = download_subtree(package_slug, speculative_execution=True)
if not location:
clear_download(location)
directory_files = fetch_directory_contents(package_slug)
if not directory_files:
path = self.supported_ecosystem()[purl.type]
casesensitive_package_slug = get_casesensitive_slug(path, package_slug)
location = download_subtree(casesensitive_package_slug)
if location:
interesting_advisories = parse_interesting_advisories(
location, purl, delete_download=True
)
directory_files = fetch_directory_contents(casesensitive_package_slug)

if directory_files:
yml_files = [file for file in directory_files if file["name"].endswith(".yml")]

interesting_advisories = parse_interesting_advisories(yml_files, purl)
return interesting_advisories
clear_download(location)

@classmethod
def supported_ecosystem(cls):
Expand All @@ -68,6 +67,21 @@ def supported_ecosystem(cls):
}


def fetch_directory_contents(package_slug):
url = f"https://gitlab.com/api/v4/projects/12006272/repository/tree?path={package_slug}"
response = requests.get(url)
if response.status_code == 200:
return response.json()


def fetch_yaml(file_path):
response = requests.get(
f"https://gitlab.com/gitlab-org/security-products/gemnasium-db/-/raw/master/{file_path}"
)
if response.status_code == 200:
return response.text


def get_package_slug(purl):
"""
Constructs a package slug from a given purl.
Expand All @@ -92,43 +106,6 @@ def get_package_slug(purl):
return f"{ecosystem}/{package_name}"


def download_subtree(package_slug: str, speculative_execution=False):
"""
Downloads and extracts a tar file from a given package slug.

Parameters:
package_slug: A string representing the package slug to query.
speculative_execution: A boolean indicating whether to log errors or not.

Returns:
A Path object representing the extracted location, or None if an error occurs.
"""
url = f"https://gitlab.com/gitlab-org/security-products/gemnasium-db/-/archive/master/gemnasium-db-master.tar.gz?path={package_slug}"
response = fetch(url)
if os.path.getsize(response.location) > 0:
extracted_location = Path(response.location).parent.joinpath(
"temp_vulntotal_gitlab_datasource"
)
with tarfile.open(response.location, "r") as file_obj:
file_obj.extractall(extracted_location)
os.remove(response.location)
return extracted_location
if not speculative_execution:
logger.error(f"{package_slug} doesn't exist")
os.remove(response.location)


def clear_download(location):
"""
Deletes a directory and its contents.

Parameters:
location: A Path object representing the directory to delete.
"""
if location:
shutil.rmtree(location)


def get_casesensitive_slug(path, package_slug):
payload = [
{
Expand Down Expand Up @@ -186,26 +163,22 @@ def get_casesensitive_slug(path, package_slug):
has_next = paginated_tree["pageInfo"]["hasNextPage"]


def parse_interesting_advisories(location, purl, delete_download=False) -> Iterable[VendorData]:
def parse_interesting_advisories(yml_files, purl) -> Iterable[VendorData]:
michaelehab marked this conversation as resolved.
Show resolved Hide resolved
"""
Parses advisories from YAML files in a given location that match a given version.

Parameters:
location: A Path object representing the location of the YAML files.
yml_files: An array having the paths of yml files to parse.
purl: PURL for the advisory.
version: A string representing the version to check against the affected range.
delete_download: A boolean indicating whether to delete the downloaded files after parsing.

Yields:
VendorData instance containing the advisory information for the package.
"""
version = purl.version
path = Path(location)
pattern = "**/*.yml"
files = [p for p in path.glob(pattern) if p.is_file()]
for file in sorted(files):
with open(file) as f:
gitlab_advisory = saneyaml.load(f)

for file in yml_files:
yml_data = fetch_yaml(file["path"])
gitlab_advisory = saneyaml.load(yml_data)
affected_range = gitlab_advisory["affected_range"]
if gitlab_constraints_satisfied(affected_range, version):
yield VendorData(
Expand All @@ -214,5 +187,3 @@ def parse_interesting_advisories(location, purl, delete_download=False) -> Itera
affected_versions=[affected_range],
fixed_versions=gitlab_advisory["fixed_versions"],
)
if delete_download:
clear_download(location)
35 changes: 27 additions & 8 deletions vulntotal/tests/test_gitlab.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
Expand All @@ -8,6 +7,7 @@
#

from pathlib import Path
from unittest import mock

from commoncode import testcase
from packageurl import PackageURL
Expand All @@ -32,17 +32,36 @@ def test_generate_package_advisory_url(self):
expected_file = self.get_test_loc("package_advisory_url-expected.json", must_exist=False)
util_tests.check_results_against_json(results, expected_file)

def test_parse_html_advisory(self):
@mock.patch("vulntotal.datasources.gitlab.fetch_yaml")
def test_parse_interesting_advisories(self, mock_fetch_yaml):
# Mock the yaml file responses
advisory_folder = (
Path(__file__)
.resolve()
.parent.joinpath("test_data/gitlab/temp_vulntotal_gitlab_datasource")
)
results = [
adv.to_dict()
for adv in gitlab.parse_interesting_advisories(
advisory_folder, PackageURL("generic", "namespace", "test", "0.1.1"), False
.parent.joinpath(
"test_data/gitlab/temp_vulntotal_gitlab_datasource/gemnasium-db-master-pypi-Jinja2/pypi/Jinja2"
)
)
yaml_files = []
sorted_files = sorted(advisory_folder.iterdir(), key=lambda x: x.name)
for file in sorted_files:
if file.suffix == ".yml":
with open(file, "r") as f:
yaml_files.append(f.read())

mock_fetch_yaml.side_effect = yaml_files

purl = PackageURL("generic", "namespace", "test", "0.1.1")

yml_files = [
{"name": "CVE-2014-1402.yml", "path": "path/to/CVE-2014-1402.yml"},
{"name": "CVE-2016-10745.yml", "path": "path/to/CVE-2016-10745.yml"},
{"name": "CVE-2019-10906.yml", "path": "path/to/CVE-2019-10906.yml"},
michaelehab marked this conversation as resolved.
Show resolved Hide resolved
{"name": "CVE-2019-8341.yml", "path": "path/to/CVE-2019-8341.yml"},
{"name": "CVE-2020-28493.yml", "path": "path/to/CVE-2020-28493.yml"},
]

results = [adv.to_dict() for adv in gitlab.parse_interesting_advisories(yml_files, purl)]

expected_file = self.get_test_loc("parsed_advisory-expected.json", must_exist=False)
util_tests.check_results_against_json(results, expected_file)
Loading