forked from aboutcode-org/vulnerablecode
-
Notifications
You must be signed in to change notification settings - Fork 1
/
helpers.py
158 lines (125 loc) · 5.46 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# Copyright (c) nexB Inc. and others. All rights reserved.
# http://nexb.com and https://github.com/nexB/vulnerablecode/
# The VulnerableCode software is licensed under the Apache License version 2.0.
# Data generated with VulnerableCode require an acknowledgment.
#
# You may not use this software except in compliance with the License.
# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
# When you publish or redistribute any data created with VulnerableCode or any VulnerableCode
# derivative work, you must accompany this data with the following acknowledgment:
#
# Generated with VulnerableCode and provided on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. No content created from
# VulnerableCode should be considered or used as legal advice. Consult an Attorney
# for any legal advice.
# VulnerableCode is a free software from nexB Inc. and others.
# Visit https://github.com/nexB/vulnerablecode/ for support and download.
import bisect
import dataclasses
import json
import re
from typing import Optional
import requests
import toml
import urllib3
import yaml
from packageurl import PackageURL
from univers.versions import version_class_by_package_type
# TODO add logging here
is_cve = re.compile(r"CVE-\d{4}-\d{4,7}", re.IGNORECASE).match
@dataclasses.dataclass(order=True, frozen=True)
class AffectedPackageWithPatchedPackage:
vulnerable_package: PackageURL
patched_package: Optional[PackageURL] = None
def load_yaml(path):
with open(path) as f:
return yaml.safe_load(f)
def load_json(path):
with open(path) as f:
return json.load(f)
def load_toml(path):
with open(path) as f:
return toml.load(f)
def fetch_yaml(url):
response = requests.get(url)
return yaml.safe_load(response.content)
# FIXME: this is NOT how etags work .
# We should instead send the proper HTTP header
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-None-Match
# and integrate this finely in the processing as this typically needs to use
# streaming=True requests, and proper handling of the HTTP return code
# In all cases this ends up being a single request, not a HEADD followed
# by another real request
def create_etag(data_src, url, etag_key):
"""
Etags are like hashes of web responses. For a data source `data_src`,
we maintain (url, etag) mappings in the DB. `create_etag` creates
(`url`, etag) pair. If a (`url`, etag) already exists then the code
skips processing the response further to avoid duplicate work.
`etag_key` is the name of header which contains the etag for the url.
"""
etag = requests.head(url).headers.get(etag_key)
if not etag:
return True
elif url in data_src.config.etags:
if data_src.config.etags[url] == etag:
return False
data_src.config.etags[url] = etag
return True
def contains_alpha(string):
"""
Return True if the input 'string' contains any alphabet
"""
return any([c.isalpha() for c in string])
def requests_with_5xx_retry(max_retries=5, backoff_factor=0.5):
"""
Returns a requests sessions which retries on 5xx errors with
a backoff_factor
"""
retries = urllib3.util.Retry(
total=max_retries,
backoff_factor=backoff_factor,
raise_on_status=True,
status_forcelist=range(500, 600, 1),
)
adapter = requests.adapters.HTTPAdapter(max_retries=retries)
session = requests.Session()
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
def nearest_patched_package(vulnerable_packages, resolved_packages):
# This class is used to get around bisect module's lack of supplying custom
# compartor. Get rid of this once we use python 3.10 which supports this.
# See https://github.com/python/cpython/pull/20556
class PackageURLWithVersionComparator:
def __init__(self, package):
self.package = package
self.version_object = version_class_by_package_type[package.type](package.version)
def __eq__(self, other):
return self.version_object == other.version_object
def __lt__(self, other):
return self.version_object < other.version_object
vulnerable_packages = sorted(
[PackageURLWithVersionComparator(package) for package in vulnerable_packages]
)
resolved_packages = sorted(
[PackageURLWithVersionComparator(package) for package in resolved_packages]
)
resolved_package_count = len(resolved_packages)
affected_package_with_patched_package_objects = []
for vulnerable_package in vulnerable_packages:
patched_package_index = bisect.bisect_right(resolved_packages, vulnerable_package)
patched_package = None
if patched_package_index < resolved_package_count:
patched_package = resolved_packages[patched_package_index].package
affected_package_with_patched_package_objects.append(
AffectedPackageWithPatchedPackage(
vulnerable_package=vulnerable_package.package, patched_package=patched_package
)
)
return affected_package_with_patched_package_objects