From a568c633f500ff9853cf27b984962271a48c2310 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 12 Jun 2024 08:48:41 +0000 Subject: [PATCH] chore(iast): redaction algorithms refactor II [backport 2.9] (#9509) Backport 8d678697b35e8206237f134c697a45572df80d78 from #9163 to 2.9. # Summarize Refactor of the IAST redaction system. The old algorithms had several problems: ## Description This PR continues this https://github.com/DataDog/dd-trace-py/pull/9126 - Migrate SQL Injection to this new algorithm - Remove deprecated code ## Checklist - [x] Change(s) are motivated and described in the PR description - [x] Testing strategy is described if automated tests are not included in the PR - [x] Risks are described (performance impact, potential for breakage, maintainability) - [x] Change is maintainable (easy to change, telemetry, documentation) - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed or label `changelog/no-changelog` is set - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)) - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) - [x] If this PR changes the public interface, I've notified `@DataDog/apm-tees`. - [x] If change touches code that signs or publishes builds or packages, or handles credentials of any kind, I've requested a review from `@DataDog/security-design-and-guidance`. ## Reviewer Checklist - [x] Title is accurate - [x] All changes are related to the pull request's stated goal - [x] Description motivates each change - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - [x] Testing strategy adequately addresses listed risks - [x] Change is maintainable (easy to change, telemetry, documentation) - [x] Release note makes sense to a user of the library - [x] Author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) Co-authored-by: Alberto Vara --- .../_evidence_redaction/_sensitive_handler.py | 7 +- .../sql_sensitive_analyzer.py | 70 ++++ ddtrace/appsec/_iast/_taint_dict.py | 21 -- .../appsec/_iast/_taint_tracking/aspects.py | 1 - ddtrace/appsec/_iast/_taint_utils.py | 4 +- ddtrace/appsec/_iast/_utils.py | 90 +----- ddtrace/appsec/_iast/constants.py | 14 +- ddtrace/appsec/_iast/reporter.py | 20 +- ddtrace/appsec/_iast/taint_sinks/_base.py | 159 +-------- .../_iast/taint_sinks/command_injection.py | 3 - .../_iast/taint_sinks/header_injection.py | 14 - .../_iast/taint_sinks/insecure_cookie.py | 4 - .../appsec/_iast/taint_sinks/sql_injection.py | 164 ---------- ddtrace/appsec/_iast/taint_sinks/ssrf.py | 3 - .../appsec/_iast/taint_sinks/weak_cipher.py | 2 - ddtrace/appsec/_iast/taint_sinks/weak_hash.py | 2 - .../_iast/taint_sinks/weak_randomness.py | 2 - ddtrace/contrib/dbapi/__init__.py | 6 +- ddtrace/contrib/dbapi_async/__init__.py | 6 +- .../test_header_injection_redacted.py | 61 ++-- .../test_path_traversal_redacted.py | 83 +++-- .../iast/taint_sinks/test_sql_injection.py | 29 +- .../test_sql_injection_redacted.py | 305 ++++++++++-------- tests/appsec/iast/test_taint_utils.py | 16 +- tests/contrib/dbapi/test_dbapi_appsec.py | 2 +- .../contrib/django/django_app/appsec_urls.py | 4 +- .../contrib/django/test_django_appsec_iast.py | 180 +++++------ tests/contrib/flask/test_flask_appsec_iast.py | 11 +- 28 files changed, 476 insertions(+), 807 deletions(-) create mode 100644 ddtrace/appsec/_iast/_evidence_redaction/sql_sensitive_analyzer.py delete mode 100644 ddtrace/appsec/_iast/_taint_dict.py diff --git a/ddtrace/appsec/_iast/_evidence_redaction/_sensitive_handler.py b/ddtrace/appsec/_iast/_evidence_redaction/_sensitive_handler.py index b76ad6c96b1..c41e56ca1c3 100644 --- a/ddtrace/appsec/_iast/_evidence_redaction/_sensitive_handler.py +++ b/ddtrace/appsec/_iast/_evidence_redaction/_sensitive_handler.py @@ -3,11 +3,14 @@ from ddtrace.internal.logger import get_logger from ddtrace.settings.asm import config as asm_config +from .._utils import _get_source_index from ..constants import VULN_CMDI from ..constants import VULN_HEADER_INJECTION +from ..constants import VULN_SQL_INJECTION from ..constants import VULN_SSRF from .command_injection_sensitive_analyzer import command_injection_sensitive_analyzer from .header_injection_sensitive_analyzer import header_injection_sensitive_analyzer +from .sql_sensitive_analyzer import sql_sensitive_analyzer from .url_sensitive_analyzer import url_sensitive_analyzer @@ -27,7 +30,7 @@ def __init__(self): self._sensitive_analyzers = { VULN_CMDI: command_injection_sensitive_analyzer, - # SQL_INJECTION: sql_sensitive_analyzer, + VULN_SQL_INJECTION: sql_sensitive_analyzer, VULN_SSRF: url_sensitive_analyzer, VULN_HEADER_INJECTION: header_injection_sensitive_analyzer, } @@ -178,7 +181,7 @@ def to_redacted_json(self, evidence_value, sensitive, tainted_ranges, sources): if next_tainted and next_tainted["start"] == i: self.write_value_part(value_parts, evidence_value[start:i], source_index) - source_index = next_tainted_index + source_index = _get_source_index(sources, next_tainted["source"]) while next_sensitive and self._contains(next_tainted, next_sensitive): redaction_start = next_sensitive["start"] - next_tainted["start"] diff --git a/ddtrace/appsec/_iast/_evidence_redaction/sql_sensitive_analyzer.py b/ddtrace/appsec/_iast/_evidence_redaction/sql_sensitive_analyzer.py new file mode 100644 index 00000000000..7410ec46b4a --- /dev/null +++ b/ddtrace/appsec/_iast/_evidence_redaction/sql_sensitive_analyzer.py @@ -0,0 +1,70 @@ +import re + +from ddtrace.appsec._iast.constants import DBAPI_MARIADB +from ddtrace.appsec._iast.constants import DBAPI_MYSQL +from ddtrace.appsec._iast.constants import DBAPI_PSYCOPG +from ddtrace.appsec._iast.constants import DBAPI_SQLITE +from ddtrace.internal.logger import get_logger + + +log = get_logger(__name__) + + +STRING_LITERAL = r"'(?:''|[^'])*'" +POSTGRESQL_ESCAPED_LITERAL = r"\$([^$]*)\$.*?\$\1\$" +MYSQL_STRING_LITERAL = r'"(?:\\\\"|[^"])*"|\'(?:\\\\\'|[^\'])*\'' +LINE_COMMENT = r"--.*$" +BLOCK_COMMENT = r"/\*[\s\S]*?\*/" +EXPONENT = r"(?:E[-+]?\\d+[fd]?)?" +INTEGER_NUMBER = r"(? start + 1: + next_char = evidence.value[start + 1] + if start_char == "/" and next_char == "*": + start += 2 + end -= 2 + elif start_char == "-" and start_char == next_char: + start += 2 + elif start_char.lower() == "q" and next_char == "'": + start += 3 + end -= 2 + elif start_char == "$": + match = regex_result.group(0) + size = match.find("$", 1) + 1 + if size > 1: + start += size + end -= size + tokens.append({"start": start, "end": end}) + regex_result = pattern.search(evidence.value, regex_result.end()) + return tokens diff --git a/ddtrace/appsec/_iast/_taint_dict.py b/ddtrace/appsec/_iast/_taint_dict.py deleted file mode 100644 index 97df240d4d0..00000000000 --- a/ddtrace/appsec/_iast/_taint_dict.py +++ /dev/null @@ -1,21 +0,0 @@ -#!/usr/bin/env python3 -# -from typing import TYPE_CHECKING # noqa:F401 - - -if TYPE_CHECKING: - from typing import Dict # noqa:F401 - from typing import Tuple # noqa:F401 - - from ._taint_tracking import Source # noqa:F401 - -_IAST_TAINT_DICT = {} # type: Dict[int, Tuple[Tuple[Source, int, int],...]] - - -def get_taint_dict(): # type: () -> Dict[int, Tuple[Tuple[Source, int, int],...]] - return _IAST_TAINT_DICT - - -def clear_taint_mapping(): # type: () -> None - global _IAST_TAINT_DICT - _IAST_TAINT_DICT = {} diff --git a/ddtrace/appsec/_iast/_taint_tracking/aspects.py b/ddtrace/appsec/_iast/_taint_tracking/aspects.py index e8bd7610d87..e34245d7944 100644 --- a/ddtrace/appsec/_iast/_taint_tracking/aspects.py +++ b/ddtrace/appsec/_iast/_taint_tracking/aspects.py @@ -506,7 +506,6 @@ def format_value_aspect( if options == 115: new_text = str_aspect(str, 0, element) elif options == 114: - # TODO: use our repr once we have implemented it new_text = repr_aspect(repr, 0, element) elif options == 97: new_text = ascii(element) diff --git a/ddtrace/appsec/_iast/_taint_utils.py b/ddtrace/appsec/_iast/_taint_utils.py index 8f91c44ff5c..07e5199cb01 100644 --- a/ddtrace/appsec/_iast/_taint_utils.py +++ b/ddtrace/appsec/_iast/_taint_utils.py @@ -5,11 +5,11 @@ from typing import Optional from typing import Union +from ddtrace.appsec._iast.constants import DBAPI_INTEGRATIONS from ddtrace.internal.logger import get_logger from ddtrace.settings.asm import config as asm_config -DBAPI_INTEGRATIONS = ("sqlite", "psycopg", "mysql", "mariadb") DBAPI_PREFIXES = ("django-",) log = get_logger(__name__) @@ -529,7 +529,7 @@ def supported_dbapi_integration(integration_name): return integration_name in DBAPI_INTEGRATIONS or integration_name.startswith(DBAPI_PREFIXES) -def check_tainted_args(args, kwargs, tracer, integration_name, method): +def check_tainted_dbapi_args(args, kwargs, tracer, integration_name, method): if supported_dbapi_integration(integration_name) and method.__name__ == "execute": from ._taint_tracking import is_pyobject_tainted diff --git a/ddtrace/appsec/_iast/_utils.py b/ddtrace/appsec/_iast/_utils.py index 7272abb9016..c1b72f28f04 100644 --- a/ddtrace/appsec/_iast/_utils.py +++ b/ddtrace/appsec/_iast/_utils.py @@ -1,19 +1,10 @@ -import re -import string import sys -from typing import TYPE_CHECKING # noqa:F401 +from typing import List from ddtrace.internal.logger import get_logger from ddtrace.settings.asm import config as asm_config -if TYPE_CHECKING: - from typing import Any # noqa:F401 - from typing import List # noqa:F401 - from typing import Set # noqa:F401 - from typing import Tuple # noqa:F401 - - def _is_python_version_supported(): # type: () -> bool # IAST supports Python versions 3.6 to 3.12 return (3, 6, 0) <= sys.version_info < (3, 13, 0) @@ -31,78 +22,13 @@ def _is_iast_enabled(): return True -# Used to cache the compiled regular expression -_SOURCE_NAME_SCRUB = None -_SOURCE_VALUE_SCRUB = None -_SOURCE_NUMERAL_SCRUB = None - - -def _has_to_scrub(s): # type: (str) -> bool - # TODO: This function is deprecated. - # Redaction migrated to `ddtrace.appsec._iast._evidence_redaction._sensitive_handler` but we need to migrate - # all vulnerabilities to use it first. - global _SOURCE_NAME_SCRUB - global _SOURCE_VALUE_SCRUB - global _SOURCE_NUMERAL_SCRUB - - if _SOURCE_NAME_SCRUB is None: - _SOURCE_NAME_SCRUB = re.compile(asm_config._iast_redaction_name_pattern) - _SOURCE_VALUE_SCRUB = re.compile(asm_config._iast_redaction_value_pattern) - _SOURCE_NUMERAL_SCRUB = re.compile(asm_config._iast_redaction_numeral_pattern) - - return ( - _SOURCE_NAME_SCRUB.match(s) is not None - or _SOURCE_VALUE_SCRUB.match(s) is not None - or _SOURCE_NUMERAL_SCRUB.match(s) is not None - ) - - -def _is_numeric(s): - # TODO: This function is deprecated. - # Redaction migrated to `ddtrace.appsec._iast._evidence_redaction._sensitive_handler` but we need to migrate - # all vulnerabilities to use it first. - global _SOURCE_NUMERAL_SCRUB - - if _SOURCE_NUMERAL_SCRUB is None: - _SOURCE_NUMERAL_SCRUB = re.compile(asm_config._iast_redaction_numeral_pattern) - - return _SOURCE_NUMERAL_SCRUB.match(s) is not None - - -_REPLACEMENTS = string.ascii_letters -_LEN_REPLACEMENTS = len(_REPLACEMENTS) - - -def _scrub(s, has_range=False): # type: (str, bool) -> str - # TODO: This function is deprecated. - # Redaction migrated to `ddtrace.appsec._iast._evidence_redaction._sensitive_handler` but we need to migrate - # all vulnerabilities to use it first. - if has_range: - return "".join([_REPLACEMENTS[i % _LEN_REPLACEMENTS] for i in range(len(s))]) - return "*" * len(s) - - -def _is_evidence_value_parts(value): # type: (Any) -> bool - # TODO: This function is deprecated. - # Redaction migrated to `ddtrace.appsec._iast._evidence_redaction._sensitive_handler` but we need to migrate - # all vulnerabilities to use it first. - return isinstance(value, (set, list)) - - -def _scrub_get_tokens_positions(text, tokens): - # type: (str, Set[str]) -> List[Tuple[int, int]] - # TODO: This function is deprecated. - # Redaction migrated to `ddtrace.appsec._iast._evidence_redaction._sensitive_handler` but we need to migrate - # all vulnerabilities to use it first. - token_positions = [] - - for token in tokens: - position = text.find(token) - if position != -1: - token_positions.append((position, position + len(token))) - - token_positions.sort() - return token_positions +def _get_source_index(sources: List, source) -> int: + i = 0 + for source_ in sources: + if hash(source_) == hash(source): + return i + i += 1 + return -1 def _get_patched_code(module_path, module_name): # type: (str, str) -> str diff --git a/ddtrace/appsec/_iast/constants.py b/ddtrace/appsec/_iast/constants.py index 17981bccbcc..60c864f59ec 100644 --- a/ddtrace/appsec/_iast/constants.py +++ b/ddtrace/appsec/_iast/constants.py @@ -16,15 +16,6 @@ VULNERABILITY_TOKEN_TYPE = Dict[int, Dict[str, Any]] -EVIDENCE_ALGORITHM_TYPE = "ALGORITHM" -EVIDENCE_SQL_INJECTION = "SQL_INJECTION" -EVIDENCE_PATH_TRAVERSAL = "PATH_TRAVERSAL" -EVIDENCE_WEAK_RANDOMNESS = "WEAK_RANDOMNESS" -EVIDENCE_COOKIE = "COOKIE" -EVIDENCE_CMDI = "COMMAND" -EVIDENCE_HEADER_INJECTION = "HEADER_INJECTION" -EVIDENCE_SSRF = "SSRF" - HEADER_NAME_VALUE_SEPARATOR = ": " MD5_DEF = "md5" @@ -91,3 +82,8 @@ "tarfile": {"open"}, "zipfile": {"ZipFile"}, } +DBAPI_SQLITE = "sqlite" +DBAPI_PSYCOPG = "psycopg" +DBAPI_MYSQL = "mysql" +DBAPI_MARIADB = "mariadb" +DBAPI_INTEGRATIONS = (DBAPI_SQLITE, DBAPI_PSYCOPG, DBAPI_MYSQL, DBAPI_MARIADB) diff --git a/ddtrace/appsec/_iast/reporter.py b/ddtrace/appsec/_iast/reporter.py index fa2cc8ae96c..90d334277bd 100644 --- a/ddtrace/appsec/_iast/reporter.py +++ b/ddtrace/appsec/_iast/reporter.py @@ -13,6 +13,7 @@ import attr from ddtrace.appsec._iast._evidence_redaction import sensitive_handler +from ddtrace.appsec._iast._utils import _get_source_index from ddtrace.appsec._iast.constants import VULN_INSECURE_HASHING_TYPE from ddtrace.appsec._iast.constants import VULN_WEAK_CIPHER_TYPE from ddtrace.appsec._iast.constants import VULN_WEAK_RANDOMNESS @@ -26,8 +27,12 @@ def _only_if_true(value): return value if value else None +ATTRS_TO_SKIP = frozenset({"_ranges", "_evidences_with_no_sources", "dialect"}) + + @attr.s(eq=False, hash=False) class Evidence(object): + dialect = attr.ib(type=str, default=None) # type: Optional[str] value = attr.ib(type=str, default=None) # type: Optional[str] _ranges = attr.ib(type=dict, default={}) # type: Any valueParts = attr.ib(type=list, default=None) # type: Any @@ -143,14 +148,6 @@ def add_ranges_to_evidence_and_extract_sources(self, vuln): if source not in self.sources: self.sources = self.sources + [source] - def _get_source_index(self, sources: List[Source], source: Source) -> int: - i = 0 - for source_ in sources: - if hash(source_) == hash(source): - return i - i += 1 - return -1 - def build_and_scrub_value_parts(self) -> Dict[str, Any]: """ Builds and scrubs value parts of vulnerabilities. @@ -197,7 +194,7 @@ def get_unredacted_value_parts(self, evidence_value: str, ranges: List[dict], so if from_index < range_["start"]: value_parts.append({"value": evidence_value[from_index : range_["start"]]}) - source_index = self._get_source_index(sources, range_["source"]) + source_index = _get_source_index(sources, range_["source"]) value_parts.append( {"value": evidence_value[range_["start"] : range_["end"]], "source": source_index} # type: ignore[dict-item] @@ -217,7 +214,10 @@ def _to_dict(self) -> Dict[str, Any]: Returns: - Dict[str, Any]: Dictionary representation of the IAST span reporter. """ - return attr.asdict(self, filter=lambda attr, x: x is not None and attr.name != "_ranges") + return attr.asdict( + self, + filter=lambda attr, x: x is not None and attr.name not in ATTRS_TO_SKIP, + ) def _to_str(self) -> str: """ diff --git a/ddtrace/appsec/_iast/taint_sinks/_base.py b/ddtrace/appsec/_iast/taint_sinks/_base.py index 215e2abb208..50e025f393c 100644 --- a/ddtrace/appsec/_iast/taint_sinks/_base.py +++ b/ddtrace/appsec/_iast/taint_sinks/_base.py @@ -1,20 +1,18 @@ import os -from typing import TYPE_CHECKING # noqa:F401 -from typing import cast # noqa:F401 +from typing import Any +from typing import Callable +from typing import Optional +from typing import Text from ddtrace import tracer from ddtrace.appsec._constants import IAST from ddtrace.internal import core from ddtrace.internal.logger import get_logger from ddtrace.internal.utils.cache import LFUCache -from ddtrace.settings.asm import config as asm_config from ..._deduplications import deduplication from .._overhead_control_engine import Operation from .._stacktrace import get_info_frame -from .._utils import _has_to_scrub -from .._utils import _is_evidence_value_parts -from .._utils import _scrub from ..processor import AppSecIastSpanProcessor from ..reporter import Evidence from ..reporter import IastSpanReporter @@ -22,16 +20,6 @@ from ..reporter import Vulnerability -if TYPE_CHECKING: # pragma: no cover - from typing import Any # noqa:F401 - from typing import Callable # noqa:F401 - from typing import Dict # noqa:F401 - from typing import List # noqa:F401 - from typing import Optional # noqa:F401 - from typing import Set # noqa:F401 - from typing import Text # noqa:F401 - from typing import Union # noqa:F401 - log = get_logger(__name__) CWD = os.path.abspath(os.getcwd()) @@ -39,8 +27,8 @@ class taint_sink_deduplication(deduplication): def _extract(self, args): - # we skip 0, 1 and last position because its the cls, span and sources respectively - return args[2:-1] + # We skip positions 0 and 1 because they represent the 'cls' and 'span' respectively + return args[2:] def _check_positions_contained(needle, container): @@ -57,7 +45,6 @@ def _check_positions_contained(needle, container): class VulnerabilityBase(Operation): vulnerability_type = "" - evidence_type = "" _redacted_report_cache = LFUCache() @classmethod @@ -66,10 +53,8 @@ def _reset_cache_for_testing(cls): cls._redacted_report_cache.clear() @classmethod - def wrap(cls, func): - # type: (Callable) -> Callable - def wrapper(wrapped, instance, args, kwargs): - # type: (Callable, Any, Any, Any) -> Any + def wrap(cls, func: Callable) -> Callable: + def wrapper(wrapped: Callable, instance: Any, args: Any, kwargs: Any) -> Any: """Get the current root Span and attach it to the wrapped function. We need the span to report the vulnerability and update the context with the report information. """ @@ -83,7 +68,7 @@ def wrapper(wrapped, instance, args, kwargs): @classmethod @taint_sink_deduplication - def _prepare_report(cls, span, vulnerability_type, evidence, file_name, line_number, sources): + def _prepare_report(cls, span, vulnerability_type, evidence, file_name, line_number): if line_number is not None and (line_number == 0 or line_number < -1): line_number = -1 @@ -99,19 +84,12 @@ def _prepare_report(cls, span, vulnerability_type, evidence, file_name, line_num report = IastSpanReporter(vulnerabilities={vulnerability}) report.add_ranges_to_evidence_and_extract_sources(vulnerability) - if getattr(cls, "redact_report", False): - redacted_report = cls._redacted_report_cache.get( - hash(report), lambda x: cls._redact_report(cast(IastSpanReporter, report)) - ) - else: - redacted_report = report - core.set_item(IAST.CONTEXT_KEY, redacted_report, span=span) + core.set_item(IAST.CONTEXT_KEY, report, span=span) return True @classmethod - def report(cls, evidence_value="", value_parts=None, sources=None): - # type: (Any, Any, Optional[List[Any]]) -> None + def report(cls, evidence_value: Text = "", dialect: Optional[Text] = None) -> None: """Build a IastSpanReporter instance to report it in the `AppSecIastSpanProcessor` as a string JSON""" # TODO: type of evidence_value will be Text. We wait to finish the redaction refactor. if cls.acquire_quota(): @@ -146,122 +124,15 @@ def report(cls, evidence_value="", value_parts=None, sources=None): if not cls.is_not_reported(file_name, line_number): return - - # TODO: This function is deprecated, but we need to migrate all vulnerabilities first before deleting it - if _is_evidence_value_parts(evidence_value) or _is_evidence_value_parts(value_parts): - evidence = Evidence(value=evidence_value, valueParts=value_parts) # Evidence is a string in weak cipher, weak hash and weak randomness - elif isinstance(evidence_value, (str, bytes, bytearray)): - evidence = Evidence(value=evidence_value) # type: ignore + if isinstance(evidence_value, (str, bytes, bytearray)): + evidence = Evidence(value=evidence_value, dialect=dialect) else: log.debug("Unexpected evidence_value type: %s", type(evidence_value)) - evidence = Evidence(value="") + evidence = Evidence(value="", dialect=dialect) - result = cls._prepare_report(span, cls.vulnerability_type, evidence, file_name, line_number, sources) + result = cls._prepare_report(span, cls.vulnerability_type, evidence, file_name, line_number) # If result is None that's mean deduplication raises and no vulnerability wasn't reported, with that, # we need to restore the quota if not result: cls.increment_quota() - - @classmethod - def _extract_sensitive_tokens(cls, report): - # type: (Dict[Vulnerability, str]) -> Dict[int, Dict[str, Any]] - # TODO: This function is deprecated. - # Redaction migrated to `ddtrace.appsec._iast._evidence_redaction._sensitive_handler` but we need to migrate - # all vulnerabilities to use it first. - log.debug("Base class VulnerabilityBase._extract_sensitive_tokens called") - return {} - - @classmethod - def _get_vulnerability_text(cls, vulnerability): - # TODO: This function is deprecated. - # Redaction migrated to `ddtrace.appsec._iast._evidence_redaction._sensitive_handler` but we need to migrate - # all vulnerabilities to use it first. - if vulnerability and vulnerability.evidence.value is not None: - return vulnerability.evidence.value - - if vulnerability.evidence.valueParts is not None: - return "".join( - [ - (part.get("value", "") if type(part) is not str else part) - for part in vulnerability.evidence.valueParts - ] - ) - - return "" - - @classmethod - def replace_tokens( - cls, - vuln, - vulns_to_tokens, - has_range=False, - ): - # TODO: This function is deprecated. - # Redaction migrated to `ddtrace.appsec._iast._evidence_redaction._sensitive_handler` but we need to migrate - # all vulnerabilities to use it first. - ret = vuln.evidence.value - replaced = False - - for token in vulns_to_tokens[hash(vuln)]["tokens"]: - ret = ret.replace(token, _scrub(token, has_range)) - replaced = True - - return ret, replaced - - @classmethod - def _custom_edit_valueparts(cls, vuln): - # Subclasses could optionally implement this to add further processing to the - # vulnerability valueParts - # TODO: This function is deprecated. - # Redaction migrated to `ddtrace.appsec._iast._evidence_redaction._sensitive_handler` but we need to migrate - # all vulnerabilities to use it first. - return - - @classmethod - def _redact_report(cls, report): # type: (IastSpanReporter) -> IastSpanReporter - # TODO: This function is deprecated. - # Redaction migrated to `ddtrace.appsec._iast._evidence_redaction._sensitive_handler` but we need to migrate - # all vulnerabilities to use it first. - if not asm_config._iast_redaction_enabled: - return report - - # See if there is a match on either any of the sources or value parts of the report - already_scrubbed = {} - - sources_values_to_scrubbed = {} - vulns_to_text = {vuln: cls._get_vulnerability_text(vuln) for vuln in report.vulnerabilities} - vulns_to_tokens = cls._extract_sensitive_tokens(vulns_to_text) - - for source in report.sources: - # Join them so we only run the regexps once for each source - # joined_fields = "%s%s" % (source.name, source.value) - if _has_to_scrub(source.name) or _has_to_scrub(source.value): # type: ignore - scrubbed = _scrub(source.value, has_range=True) # type: ignore - already_scrubbed[source.value] = scrubbed - source.redacted = True - sources_values_to_scrubbed[source.value] = scrubbed - source.pattern = scrubbed - source.value = None - - already_scrubbed_set = set(already_scrubbed.keys()) - for vuln in report.vulnerabilities: - if vuln.evidence.value is not None: - pattern, replaced = cls.replace_tokens(vuln, vulns_to_tokens, hasattr(vuln.evidence.value, "source")) - if replaced: - vuln.evidence.value = None - - if vuln.evidence.valueParts is None: - continue - for part in vuln.evidence.valueParts: - part_value = part.get("value") - if not part_value: - continue - - if part_value in already_scrubbed_set: - part["pattern"] = already_scrubbed[part["value"]] - part["redacted"] = True - del part["value"] - - cls._custom_edit_valueparts(vuln) - return report diff --git a/ddtrace/appsec/_iast/taint_sinks/command_injection.py b/ddtrace/appsec/_iast/taint_sinks/command_injection.py index b5a3df94422..e369576b16d 100644 --- a/ddtrace/appsec/_iast/taint_sinks/command_injection.py +++ b/ddtrace/appsec/_iast/taint_sinks/command_injection.py @@ -70,9 +70,6 @@ def _iast_cmdi_subprocess_init(wrapped, instance, args, kwargs): @oce.register class CommandInjection(VulnerabilityBase): vulnerability_type = VULN_CMDI - # TODO: Redaction migrated to `ddtrace.appsec._iast._evidence_redaction._sensitive_handler` but we need to migrate - # all vulnerabilities to use it first. - redact_report = False def _iast_report_cmdi(shell_args: Union[str, List[str]]) -> None: diff --git a/ddtrace/appsec/_iast/taint_sinks/header_injection.py b/ddtrace/appsec/_iast/taint_sinks/header_injection.py index 1ce8a52d5e4..7fa0b6111dd 100644 --- a/ddtrace/appsec/_iast/taint_sinks/header_injection.py +++ b/ddtrace/appsec/_iast/taint_sinks/header_injection.py @@ -1,5 +1,3 @@ -import re - from ddtrace.internal.logger import get_logger from ddtrace.settings.asm import config as asm_config @@ -19,15 +17,6 @@ log = get_logger(__name__) -_HEADERS_NAME_REGEXP = re.compile( - r"(?:p(?:ass)?w(?:or)?d|pass(?:_?phrase)?|secret|(?:api_?|private_?|public_?|access_?|secret_?)key(?:_?id)?|token|consumer_?(?:id|key|secret)|sign(?:ed|ature)?|auth(?:entication|orization)?)", - re.IGNORECASE, -) -_HEADERS_VALUE_REGEXP = re.compile( - r"(?:bearer\\s+[a-z0-9\\._\\-]+|glpat-[\\w\\-]{20}|gh[opsu]_[0-9a-zA-Z]{36}|ey[I-L][\\w=\\-]+\\.ey[I-L][\\w=\\-]+(?:\\.[\\w.+/=\\-]+)?|(?:[\\-]{5}BEGIN[a-z\\s]+PRIVATE\\sKEY[\\-]{5}[^\\-]+[\\-]{5}END[a-z\\s]+PRIVATE\\sKEY[\\-]{5}|ssh-rsa\\s*[a-z0-9/\\.+]{100,}))", - re.IGNORECASE, -) - def get_version(): # type: () -> str @@ -103,9 +92,6 @@ def _iast_h(wrapped, instance, args, kwargs): @oce.register class HeaderInjection(VulnerabilityBase): vulnerability_type = VULN_HEADER_INJECTION - # TODO: Redaction migrated to `ddtrace.appsec._iast._evidence_redaction._sensitive_handler` but we need to migrate - # all vulnerabilities to use it first. - redact_report = False def _iast_report_header_injection(headers_args) -> None: diff --git a/ddtrace/appsec/_iast/taint_sinks/insecure_cookie.py b/ddtrace/appsec/_iast/taint_sinks/insecure_cookie.py index 989f647055d..41f0a9c93f9 100644 --- a/ddtrace/appsec/_iast/taint_sinks/insecure_cookie.py +++ b/ddtrace/appsec/_iast/taint_sinks/insecure_cookie.py @@ -4,7 +4,6 @@ from .. import oce from .._metrics import _set_metric_iast_executed_sink from .._metrics import increment_iast_span_metric -from ..constants import EVIDENCE_COOKIE from ..constants import VULN_INSECURE_COOKIE from ..constants import VULN_NO_HTTPONLY_COOKIE from ..constants import VULN_NO_SAMESITE_COOKIE @@ -19,7 +18,6 @@ @oce.register class InsecureCookie(VulnerabilityBase): vulnerability_type = VULN_INSECURE_COOKIE - evidence_type = EVIDENCE_COOKIE scrub_evidence = False skip_location = True @@ -27,14 +25,12 @@ class InsecureCookie(VulnerabilityBase): @oce.register class NoHttpOnlyCookie(VulnerabilityBase): vulnerability_type = VULN_NO_HTTPONLY_COOKIE - evidence_type = EVIDENCE_COOKIE skip_location = True @oce.register class NoSameSite(VulnerabilityBase): vulnerability_type = VULN_NO_SAMESITE_COOKIE - evidence_type = EVIDENCE_COOKIE skip_location = True diff --git a/ddtrace/appsec/_iast/taint_sinks/sql_injection.py b/ddtrace/appsec/_iast/taint_sinks/sql_injection.py index 68d5a289c01..f671d92c387 100644 --- a/ddtrace/appsec/_iast/taint_sinks/sql_injection.py +++ b/ddtrace/appsec/_iast/taint_sinks/sql_injection.py @@ -1,172 +1,8 @@ -import re -from typing import TYPE_CHECKING # noqa:F401 - from .. import oce -from .._taint_tracking import taint_ranges_as_evidence_info -from .._utils import _has_to_scrub -from .._utils import _is_numeric -from .._utils import _scrub_get_tokens_positions -from ..constants import EVIDENCE_SQL_INJECTION from ..constants import VULN_SQL_INJECTION from ._base import VulnerabilityBase -if TYPE_CHECKING: - from typing import Any # noqa:F401 - from typing import Dict # noqa:F401 - - from .reporter import Vulnerability # noqa:F401 - -from sqlparse import parse -from sqlparse import tokens - - -_TEXT_TOKENS_REGEXP = re.compile(r"\b\w+\b") - - @oce.register class SqlInjection(VulnerabilityBase): vulnerability_type = VULN_SQL_INJECTION - evidence_type = EVIDENCE_SQL_INJECTION - redact_report = True - - @classmethod - def report(cls, evidence_value=None, sources=None): - value_parts = [] - if isinstance(evidence_value, (str, bytes, bytearray)): - value_parts, sources = taint_ranges_as_evidence_info(evidence_value) - super(SqlInjection, cls).report(evidence_value=evidence_value, value_parts=value_parts, sources=sources) - - @classmethod - def _extract_sensitive_tokens(cls, vulns_to_text): - # type: (Dict[Vulnerability, str]) -> Dict[int, Dict[str, Any]] - ret = {} # type: Dict[int, Dict[str, Any]] - for vuln, text in vulns_to_text.items(): - vuln_hash = hash(vuln) - ret[vuln_hash] = { - "tokens": set(_TEXT_TOKENS_REGEXP.findall(text)), - } - ret[vuln_hash]["token_positions"] = _scrub_get_tokens_positions(text, ret[vuln_hash]["tokens"]) - - return ret - - @classmethod - def _custom_edit_valueparts(cls, vuln): - def _maybe_with_source(source, value): - if source is not None: - return {"value": value, "source": source} - return {"value": value} - - new_valueparts = [] - - in_singleline_comment = False - - for part in vuln.evidence.valueParts: - source = part.get("source") - value = part.get("value") - - if not value or part.get("redacted"): - new_valueparts.append(part) - continue - - parsed = parse(value)[0].flatten() - out = [] - - for item in parsed: - if item.ttype == tokens.Whitespace.Newline: - in_singleline_comment = False - - elif in_singleline_comment: - # Skip all tokens after a -- comment until newline - continue - - if item.ttype in { - tokens.Literal.String.Single, - tokens.Literal.String.Double, - tokens.Literal.String.Symbol, - tokens.Literal.Number.Integer, - tokens.Literal.Number.Float, - tokens.Literal.Number.Hexadecimal, - tokens.Comment.Single, - tokens.Comment.Multiline, - tokens.Name, - }: - redact_fully = False - add_later = None - sitem = str(item) - - if _is_numeric(sitem): - redact_fully = True - elif item.ttype == tokens.Literal.String.Single or ( - item.ttype == tokens.Literal.String.Symbol and "'" in str(item) - ): - out.append("'") - add_later = "'" - str_item = sitem.replace("'", "") - if _is_numeric(str_item): - redact_fully = True - elif item.ttype == tokens.Literal.String.Double or ( - item.ttype == tokens.Literal.String.Symbol and '"' in str(item) - ): - out.append('"') - add_later = '"' - str_item = sitem.replace('"', "") - if _is_numeric(str_item): - redact_fully = True - elif item.ttype == tokens.Comment.Single: - out.append("--") - add_later = "" - redact_fully = True - in_singleline_comment = True - elif item.ttype == tokens.Comment.Multiline: - out.append("/*") - add_later = "*/" - redact_fully = True - elif item.ttype in (tokens.Number.Integer, tokens.Number.Float, tokens.Number.Hexadecimal): - redact_fully = True - else: - out.append(sitem) - continue - - if len(out): - new_valueparts.append(_maybe_with_source(source, "".join(out))) - - if redact_fully: - # Comments are totally redacted - new_valueparts.append({"redacted": True}) - else: - new_valueparts.append(_maybe_with_source(source, str_item)) - - if add_later: - out = [add_later] - else: - out = [] - else: - out.append(str(item)) - - if len(out): - new_valueparts.append(_maybe_with_source(source, "".join(out))) - - # Scrub as needed - idx = 0 - len_parts = len(new_valueparts) - while idx < len_parts: - value = new_valueparts[idx].get("value") - if value and _has_to_scrub(value) and idx < (len_parts - 1) and "redacted" not in new_valueparts[idx + 1]: - # Scrub the value, which is the next one, except when the previous was a LIKE or an assignment - # in which case this is the value to scrub - prev_valuepart = new_valueparts[idx - 1].get("value", "").strip().lower() - if len(prev_valuepart) and (" like " in prev_valuepart or prev_valuepart[-1] == "="): - new_valueparts[idx] = {"redacted": True} - else: # scrub the next non empty quote value - for part in new_valueparts[idx + 1 :]: - idx += 1 - next_valuepart = part.get("value", "").strip() - if not len(next_valuepart) or next_valuepart in ("'", '"'): - continue - - new_valueparts[idx] = {"redacted": True} - break - idx += 1 - - vuln.evidence.valueParts = new_valueparts diff --git a/ddtrace/appsec/_iast/taint_sinks/ssrf.py b/ddtrace/appsec/_iast/taint_sinks/ssrf.py index 7a070cf5425..3e0103a70b1 100644 --- a/ddtrace/appsec/_iast/taint_sinks/ssrf.py +++ b/ddtrace/appsec/_iast/taint_sinks/ssrf.py @@ -16,9 +16,6 @@ @oce.register class SSRF(VulnerabilityBase): vulnerability_type = VULN_SSRF - # TODO: Redaction migrated to `ddtrace.appsec._iast._evidence_redaction._sensitive_handler` but we need to migrate - # all vulnerabilities to use it first. - redact_report = False def _iast_report_ssrf(func: Callable, *args, **kwargs): diff --git a/ddtrace/appsec/_iast/taint_sinks/weak_cipher.py b/ddtrace/appsec/_iast/taint_sinks/weak_cipher.py index 3199528ef03..21a494edf3b 100644 --- a/ddtrace/appsec/_iast/taint_sinks/weak_cipher.py +++ b/ddtrace/appsec/_iast/taint_sinks/weak_cipher.py @@ -15,7 +15,6 @@ from ..constants import BLOWFISH_DEF from ..constants import DEFAULT_WEAK_CIPHER_ALGORITHMS from ..constants import DES_DEF -from ..constants import EVIDENCE_ALGORITHM_TYPE from ..constants import RC2_DEF from ..constants import RC4_DEF from ..constants import VULN_WEAK_CIPHER_TYPE @@ -44,7 +43,6 @@ def get_weak_cipher_algorithms(): @oce.register class WeakCipher(VulnerabilityBase): vulnerability_type = VULN_WEAK_CIPHER_TYPE - evidence_type = EVIDENCE_ALGORITHM_TYPE def unpatch_iast(): diff --git a/ddtrace/appsec/_iast/taint_sinks/weak_hash.py b/ddtrace/appsec/_iast/taint_sinks/weak_hash.py index 9bebaf805be..0932cc9fb05 100644 --- a/ddtrace/appsec/_iast/taint_sinks/weak_hash.py +++ b/ddtrace/appsec/_iast/taint_sinks/weak_hash.py @@ -14,7 +14,6 @@ from .._patch import try_unwrap from .._patch import try_wrap_function_wrapper from ..constants import DEFAULT_WEAK_HASH_ALGORITHMS -from ..constants import EVIDENCE_ALGORITHM_TYPE from ..constants import MD5_DEF from ..constants import SHA1_DEF from ..constants import VULN_INSECURE_HASHING_TYPE @@ -42,7 +41,6 @@ def get_weak_hash_algorithms(): @oce.register class WeakHash(VulnerabilityBase): vulnerability_type = VULN_INSECURE_HASHING_TYPE - evidence_type = EVIDENCE_ALGORITHM_TYPE def unpatch_iast(): diff --git a/ddtrace/appsec/_iast/taint_sinks/weak_randomness.py b/ddtrace/appsec/_iast/taint_sinks/weak_randomness.py index bd7fc6e5051..a9fc130b39f 100644 --- a/ddtrace/appsec/_iast/taint_sinks/weak_randomness.py +++ b/ddtrace/appsec/_iast/taint_sinks/weak_randomness.py @@ -1,5 +1,4 @@ from .. import oce -from ..constants import EVIDENCE_WEAK_RANDOMNESS from ..constants import VULN_WEAK_RANDOMNESS from ._base import VulnerabilityBase @@ -7,7 +6,6 @@ @oce.register class WeakRandomness(VulnerabilityBase): vulnerability_type = VULN_WEAK_RANDOMNESS - evidence_type = EVIDENCE_WEAK_RANDOMNESS @classmethod def report(cls, evidence_value=None, sources=None): diff --git a/ddtrace/contrib/dbapi/__init__.py b/ddtrace/contrib/dbapi/__init__.py index 3fb5d6cbe1c..b0e4d2aec81 100644 --- a/ddtrace/contrib/dbapi/__init__.py +++ b/ddtrace/contrib/dbapi/__init__.py @@ -105,13 +105,13 @@ def _trace_method(self, method, name, resource, extra_tags, dbm_propagator, *arg if _is_iast_enabled(): try: from ddtrace.appsec._iast._metrics import _set_metric_iast_executed_sink - from ddtrace.appsec._iast._taint_utils import check_tainted_args + from ddtrace.appsec._iast._taint_utils import check_tainted_dbapi_args from ddtrace.appsec._iast.taint_sinks.sql_injection import SqlInjection increment_iast_span_metric(IAST_SPAN_TAGS.TELEMETRY_EXECUTED_SINK, SqlInjection.vulnerability_type) _set_metric_iast_executed_sink(SqlInjection.vulnerability_type) - if check_tainted_args(args, kwargs, pin.tracer, self._self_config.integration_name, method): - SqlInjection.report(evidence_value=args[0]) + if check_tainted_dbapi_args(args, kwargs, pin.tracer, self._self_config.integration_name, method): + SqlInjection.report(evidence_value=args[0], dialect=self._self_config.integration_name) except Exception: log.debug("Unexpected exception while reporting vulnerability", exc_info=True) diff --git a/ddtrace/contrib/dbapi_async/__init__.py b/ddtrace/contrib/dbapi_async/__init__.py index c37638fdf67..abbae3b77a3 100644 --- a/ddtrace/contrib/dbapi_async/__init__.py +++ b/ddtrace/contrib/dbapi_async/__init__.py @@ -77,13 +77,13 @@ async def _trace_method(self, method, name, resource, extra_tags, dbm_propagator if _is_iast_enabled(): from ddtrace.appsec._iast._metrics import _set_metric_iast_executed_sink - from ddtrace.appsec._iast._taint_utils import check_tainted_args + from ddtrace.appsec._iast._taint_utils import check_tainted_dbapi_args from ddtrace.appsec._iast.taint_sinks.sql_injection import SqlInjection increment_iast_span_metric(IAST_SPAN_TAGS.TELEMETRY_EXECUTED_SINK, SqlInjection.vulnerability_type) _set_metric_iast_executed_sink(SqlInjection.vulnerability_type) - if check_tainted_args(args, kwargs, pin.tracer, self._self_config.integration_name, method): - SqlInjection.report(evidence_value=args[0]) + if check_tainted_dbapi_args(args, kwargs, pin.tracer, self._self_config.integration_name, method): + SqlInjection.report(evidence_value=args[0], dialect=self._self_config.integration_name) # set analytics sample rate if enabled but only for non-FetchTracedCursor if not isinstance(self, FetchTracedAsyncCursor): diff --git a/tests/appsec/iast/taint_sinks/test_header_injection_redacted.py b/tests/appsec/iast/taint_sinks/test_header_injection_redacted.py index db9272e1625..6861d28edbf 100644 --- a/tests/appsec/iast/taint_sinks/test_header_injection_redacted.py +++ b/tests/appsec/iast/taint_sinks/test_header_injection_redacted.py @@ -1,14 +1,17 @@ +from mock.mock import ANY import pytest from ddtrace.appsec._constants import IAST +from ddtrace.appsec._iast._taint_tracking import OriginType from ddtrace.appsec._iast._taint_tracking import is_pyobject_tainted from ddtrace.appsec._iast._taint_tracking import origin_to_str from ddtrace.appsec._iast._taint_tracking import str_to_origin +from ddtrace.appsec._iast._taint_tracking import taint_pyobject +from ddtrace.appsec._iast._taint_tracking.aspects import add_aspect from ddtrace.appsec._iast.constants import VULN_HEADER_INJECTION from ddtrace.appsec._iast.reporter import Evidence from ddtrace.appsec._iast.reporter import IastSpanReporter from ddtrace.appsec._iast.reporter import Location -from ddtrace.appsec._iast.reporter import Source from ddtrace.appsec._iast.reporter import Vulnerability from ddtrace.appsec._iast.taint_sinks.header_injection import HeaderInjection from ddtrace.internal import core @@ -24,20 +27,25 @@ ], ) def test_header_injection_redact_excluded(header_name, header_value): - ev = Evidence( - valueParts=[ - {"value": header_name + ": "}, - {"value": header_value, "source": 0}, - ] - ) + header_value_tainted = taint_pyobject(pyobject=header_value, source_name="SomeName", source_value=header_value) + ev = Evidence(value=add_aspect(header_name, add_aspect(": ", header_value_tainted))) loc = Location(path="foobar.py", line=35, spanId=123) v = Vulnerability(type=VULN_HEADER_INJECTION, evidence=ev, location=loc) - s = Source(origin="SomeOrigin", name="SomeName", value=header_value) - report = IastSpanReporter([s], {v}) + report = IastSpanReporter(vulnerabilities={v}) report.add_ranges_to_evidence_and_extract_sources(v) - redacted_report = HeaderInjection._redact_report(report) - for v in redacted_report.vulnerabilities: - assert v.evidence.valueParts == [{"value": header_name + ": "}, {"source": 0, "value": header_value}] + result = report.build_and_scrub_value_parts() + + assert result == { + "sources": [{"name": "SomeName", "origin": OriginType.PARAMETER, "value": header_value}], + "vulnerabilities": [ + { + "evidence": {"valueParts": [{"value": header_name + ": "}, {"source": 0, "value": header_value}]}, + "hash": ANY, + "location": {"line": ANY, "path": "foobar.py", "spanId": ANY}, + "type": VULN_HEADER_INJECTION, + } + ], + } @pytest.mark.parametrize( @@ -46,7 +54,7 @@ def test_header_injection_redact_excluded(header_name, header_value): ( "WWW-Authenticate", 'Basic realm="api"', - [{"value": "WWW-Authenticate: "}, {"source": 0, "value": 'Basic realm="api"'}], + [{"value": "WWW-Authenticate: "}, {"pattern": "abcdefghijklmnopq", "redacted": True, "source": 0}], ), ( "Authorization", @@ -63,20 +71,25 @@ def test_header_injection_redact_excluded(header_name, header_value): ], ) def test_common_django_header_injection_redact(header_name, header_value, value_part): - ev = Evidence( - valueParts=[ - {"value": header_name + ": "}, - {"value": header_value, "source": 0}, - ] - ) + header_value_tainted = taint_pyobject(pyobject=header_value, source_name="SomeName", source_value=header_value) + ev = Evidence(value=add_aspect(header_name, add_aspect(": ", header_value_tainted))) loc = Location(path="foobar.py", line=35, spanId=123) v = Vulnerability(type=VULN_HEADER_INJECTION, evidence=ev, location=loc) - s = Source(origin="SomeOrigin", name="SomeName", value=header_value) - report = IastSpanReporter([s], {v}) + report = IastSpanReporter(vulnerabilities={v}) report.add_ranges_to_evidence_and_extract_sources(v) - redacted_report = HeaderInjection._redact_report(report) - for v in redacted_report.vulnerabilities: - assert v.evidence.valueParts == value_part + result = report.build_and_scrub_value_parts() + + assert result == { + "sources": [{"name": "SomeName", "origin": OriginType.PARAMETER, "pattern": ANY, "redacted": True}], + "vulnerabilities": [ + { + "evidence": {"valueParts": value_part}, + "hash": ANY, + "location": {"line": ANY, "path": "foobar.py", "spanId": ANY}, + "type": VULN_HEADER_INJECTION, + } + ], + } @pytest.mark.parametrize( diff --git a/tests/appsec/iast/taint_sinks/test_path_traversal_redacted.py b/tests/appsec/iast/taint_sinks/test_path_traversal_redacted.py index ccd88c0ce11..aacaae0a156 100644 --- a/tests/appsec/iast/taint_sinks/test_path_traversal_redacted.py +++ b/tests/appsec/iast/taint_sinks/test_path_traversal_redacted.py @@ -1,14 +1,15 @@ import os +from mock.mock import ANY import pytest +from ddtrace.appsec._iast._taint_tracking import OriginType +from ddtrace.appsec._iast._taint_tracking import taint_pyobject from ddtrace.appsec._iast.constants import VULN_PATH_TRAVERSAL from ddtrace.appsec._iast.reporter import Evidence from ddtrace.appsec._iast.reporter import IastSpanReporter from ddtrace.appsec._iast.reporter import Location -from ddtrace.appsec._iast.reporter import Source from ddtrace.appsec._iast.reporter import Vulnerability -from ddtrace.appsec._iast.taint_sinks.path_traversal import PathTraversal ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -29,19 +30,25 @@ ], ) def test_path_traversal_redact_exclude(file_path): - ev = Evidence( - valueParts=[ - {"value": file_path, "source": 0}, - ] - ) + file_path = taint_pyobject(pyobject=file_path, source_name="path_traversal", source_value=file_path) + ev = Evidence(value=file_path) loc = Location(path="foobar.py", line=35, spanId=123) v = Vulnerability(type=VULN_PATH_TRAVERSAL, evidence=ev, location=loc) - s = Source(origin="SomeOrigin", name="SomeName", value="SomeValue") - report = IastSpanReporter([s], {v}) + report = IastSpanReporter(vulnerabilities={v}) + report.add_ranges_to_evidence_and_extract_sources(v) + result = report.build_and_scrub_value_parts() - redacted_report = PathTraversal._redact_report(report) - for v in redacted_report.vulnerabilities: - assert v.evidence.valueParts == [{"source": 0, "value": file_path}] + assert result == { + "sources": [{"name": "path_traversal", "origin": OriginType.PARAMETER, "value": file_path}], + "vulnerabilities": [ + { + "evidence": {"valueParts": [{"source": 0, "value": file_path}]}, + "hash": ANY, + "location": {"line": ANY, "path": "foobar.py", "spanId": ANY}, + "type": VULN_PATH_TRAVERSAL, + } + ], + } @pytest.mark.parametrize( @@ -75,33 +82,45 @@ def test_path_traversal_redact_exclude(file_path): ], ) def test_path_traversal_redact_rel_paths(file_path): - ev = Evidence( - valueParts=[ - {"value": file_path, "source": 0}, - ] - ) + file_path = taint_pyobject(pyobject=file_path, source_name="path_traversal", source_value=file_path) + ev = Evidence(value=file_path) loc = Location(path="foobar.py", line=35, spanId=123) v = Vulnerability(type=VULN_PATH_TRAVERSAL, evidence=ev, location=loc) - s = Source(origin="SomeOrigin", name="SomeName", value="SomeValue") - report = IastSpanReporter([s], {v}) + report = IastSpanReporter(vulnerabilities={v}) + report.add_ranges_to_evidence_and_extract_sources(v) + result = report.build_and_scrub_value_parts() - redacted_report = PathTraversal._redact_report(report) - for v in redacted_report.vulnerabilities: - assert v.evidence.valueParts == [{"source": 0, "value": file_path}] + assert result == { + "sources": [{"name": "path_traversal", "origin": OriginType.PARAMETER, "value": file_path}], + "vulnerabilities": [ + { + "evidence": {"valueParts": [{"source": 0, "value": file_path}]}, + "hash": ANY, + "location": {"line": ANY, "path": "foobar.py", "spanId": ANY}, + "type": VULN_PATH_TRAVERSAL, + } + ], + } def test_path_traversal_redact_abs_paths(): file_path = os.path.join(ROOT_DIR, "../fixtures", "taint_sinks", "path_traversal_test_file.txt") - ev = Evidence( - valueParts=[ - {"value": file_path, "source": 0}, - ] - ) + file_path = taint_pyobject(pyobject=file_path, source_name="path_traversal", source_value=file_path) + ev = Evidence(value=file_path) loc = Location(path="foobar.py", line=35, spanId=123) v = Vulnerability(type=VULN_PATH_TRAVERSAL, evidence=ev, location=loc) - s = Source(origin="SomeOrigin", name="SomeName", value="SomeValue") - report = IastSpanReporter([s], {v}) + report = IastSpanReporter(vulnerabilities={v}) + report.add_ranges_to_evidence_and_extract_sources(v) + result = report.build_and_scrub_value_parts() - redacted_report = PathTraversal._redact_report(report) - for v in redacted_report.vulnerabilities: - assert v.evidence.valueParts == [{"source": 0, "value": file_path}] + assert result == { + "sources": [{"name": "path_traversal", "origin": OriginType.PARAMETER, "value": file_path}], + "vulnerabilities": [ + { + "evidence": {"valueParts": [{"source": 0, "value": file_path}]}, + "hash": ANY, + "location": {"line": ANY, "path": "foobar.py", "spanId": ANY}, + "type": VULN_PATH_TRAVERSAL, + } + ], + } diff --git a/tests/appsec/iast/taint_sinks/test_sql_injection.py b/tests/appsec/iast/taint_sinks/test_sql_injection.py index 54efea82ffe..85f0e8e123e 100644 --- a/tests/appsec/iast/taint_sinks/test_sql_injection.py +++ b/tests/appsec/iast/taint_sinks/test_sql_injection.py @@ -41,26 +41,25 @@ def test_sql_injection(fixture_path, fixture_module, iast_span_defaults): mod.sqli_simple(table) span_report = core.get_item(IAST.CONTEXT_KEY, span=iast_span_defaults) assert span_report - - assert len(span_report.vulnerabilities) == 1 - vulnerability = list(span_report.vulnerabilities)[0] - source = span_report.sources[0] - assert vulnerability.type == VULN_SQL_INJECTION - assert vulnerability.evidence.valueParts == [ + data = span_report.build_and_scrub_value_parts() + vulnerability = data["vulnerabilities"][0] + source = data["sources"][0] + assert vulnerability["type"] == VULN_SQL_INJECTION + assert vulnerability["evidence"]["valueParts"] == [ {"value": "SELECT "}, {"redacted": True}, {"value": " FROM "}, {"value": "students", "source": 0}, ] - assert vulnerability.evidence.value is None - assert source.name == "test_ossystem" - assert source.origin == OriginType.PARAMETER - assert source.value == "students" + assert "value" not in vulnerability["evidence"].keys() + assert source["name"] == "test_ossystem" + assert source["origin"] == OriginType.PARAMETER + assert source["value"] == "students" line, hash_value = get_line_and_hash("test_sql_injection", VULN_SQL_INJECTION, filename=fixture_path) - assert vulnerability.location.line == line - assert vulnerability.location.path == fixture_path - assert vulnerability.hash == hash_value + assert vulnerability["location"]["path"] == fixture_path + assert vulnerability["location"]["line"] == line + assert vulnerability["hash"] == hash_value @pytest.mark.parametrize("fixture_path,fixture_module", DDBBS) @@ -80,6 +79,6 @@ def test_sql_injection_deduplication(fixture_path, fixture_module, iast_span_ded span_report = core.get_item(IAST.CONTEXT_KEY, span=iast_span_deduplication_enabled) assert span_report - - assert len(span_report.vulnerabilities) == 1 + data = span_report.build_and_scrub_value_parts() + assert len(data["vulnerabilities"]) == 1 VulnerabilityBase._prepare_report._reset_cache() diff --git a/tests/appsec/iast/taint_sinks/test_sql_injection_redacted.py b/tests/appsec/iast/taint_sinks/test_sql_injection_redacted.py index 4122b53d402..a4d1da049f8 100644 --- a/tests/appsec/iast/taint_sinks/test_sql_injection_redacted.py +++ b/tests/appsec/iast/taint_sinks/test_sql_injection_redacted.py @@ -1,13 +1,16 @@ import pytest from ddtrace.appsec._constants import IAST +from ddtrace.appsec._iast._taint_tracking import OriginType from ddtrace.appsec._iast._taint_tracking import is_pyobject_tainted +from ddtrace.appsec._iast._taint_tracking import origin_to_str from ddtrace.appsec._iast._taint_tracking import str_to_origin +from ddtrace.appsec._iast._taint_tracking import taint_pyobject +from ddtrace.appsec._iast._taint_tracking.aspects import add_aspect from ddtrace.appsec._iast.constants import VULN_SQL_INJECTION from ddtrace.appsec._iast.reporter import Evidence from ddtrace.appsec._iast.reporter import IastSpanReporter from ddtrace.appsec._iast.reporter import Location -from ddtrace.appsec._iast.reporter import Source from ddtrace.appsec._iast.reporter import Vulnerability from ddtrace.appsec._iast.taint_sinks.sql_injection import SqlInjection from ddtrace.internal import core @@ -18,33 +21,7 @@ # FIXME: ideally all these should pass, through the key is that we don't leak any potential PII -_ignore_list = { - 13, - 14, - 15, - 16, - 17, - 18, - 19, - 20, # unsupported weird strings - 23, - 28, - 31, - 33, - 34, # difference in numerics parsing (e.g. sign in the previous valuepart) - 40, - 41, - 42, - 43, - 44, # overlapping ":string", not supported by sqlparser, - 45, - 46, - 47, - 49, - 50, - 51, - 52, # slight differences in sqlparser parsing -} +_ignore_list = {46, 47} @pytest.mark.parametrize( @@ -74,164 +51,212 @@ def test_sqli_redaction_suite(evidence_input, sources_expected, vulnerabilities_ span_report = core.get_item(IAST.CONTEXT_KEY, span=iast_span_defaults) assert span_report - vulnerability = list(span_report.vulnerabilities)[0] + span_report.build_and_scrub_value_parts() + result = span_report._to_dict() + vulnerability = list(result["vulnerabilities"])[0] + source = list(result["sources"])[0] + source["origin"] = origin_to_str(source["origin"]) - assert vulnerability.type == VULN_SQL_INJECTION - assert vulnerability.evidence.valueParts == vulnerabilities_expected["evidence"]["valueParts"] + assert vulnerability["type"] == VULN_SQL_INJECTION + assert source == sources_expected -@pytest.mark.skip(reason="TODO: Currently replacing too eagerly here") def test_redacted_report_no_match(): - ev = Evidence(value="SomeEvidenceValue") - orig_ev = ev.value + string_evicence = taint_pyobject( + pyobject="SomeEvidenceValue", source_name="source_name", source_value="SomeEvidenceValue" + ) + ev = Evidence(value=string_evicence) loc = Location(path="foobar.py", line=35, spanId=123) v = Vulnerability(type=VULN_SQL_INJECTION, evidence=ev, location=loc) - s = Source(origin="SomeOrigin", name="SomeName", value="SomeValue") - report = IastSpanReporter([s], {v}) + report = IastSpanReporter(vulnerabilities={v}) + report.add_ranges_to_evidence_and_extract_sources(v) + result = report.build_and_scrub_value_parts() + + assert result["vulnerabilities"] + + for v in result["vulnerabilities"]: + assert v["evidence"] == {"valueParts": [{"source": 0, "value": "SomeEvidenceValue"}]} - redacted_report = SqlInjection._redact_report(report) - for v in redacted_report.vulnerabilities: - assert not v.evidence.redacted - assert v.evidence.value == orig_ev + for v in result["sources"]: + assert v == {"name": "source_name", "origin": OriginType.PARAMETER, "value": "SomeEvidenceValue"} def test_redacted_report_source_name_match(): - ev = Evidence(value="'SomeEvidenceValue'") + string_evicence = taint_pyobject(pyobject="'SomeEvidenceValue'", source_name="secret", source_value="SomeValue") + ev = Evidence(value=string_evicence) loc = Location(path="foobar.py", line=35, spanId=123) v = Vulnerability(type=VULN_SQL_INJECTION, evidence=ev, location=loc) - s = Source(origin="SomeOrigin", name="secret", value="SomeValue") - report = IastSpanReporter([s], {v}) + report = IastSpanReporter(vulnerabilities={v}) + report.add_ranges_to_evidence_and_extract_sources(v) + result = report.build_and_scrub_value_parts() - redacted_report = SqlInjection._redact_report(report) - for v in redacted_report.vulnerabilities: - assert not v.evidence.value + assert result["vulnerabilities"] + + for v in result["vulnerabilities"]: + assert v["evidence"] == {"valueParts": [{"pattern": "*******************", "redacted": True, "source": 0}]} + + for v in result["sources"]: + assert v == {"name": "secret", "origin": OriginType.PARAMETER, "pattern": "abcdefghi", "redacted": True} def test_redacted_report_source_value_match(): - ev = Evidence(value="'SomeEvidenceValue'") + string_evicence = taint_pyobject( + pyobject="'SomeEvidenceValue'", source_name="SomeName", source_value="somepassword" + ) + ev = Evidence(value=string_evicence) loc = Location(path="foobar.py", line=35, spanId=123) v = Vulnerability(type=VULN_SQL_INJECTION, evidence=ev, location=loc) - s = Source(origin="SomeOrigin", name="SomeName", value="somepassword") - report = IastSpanReporter([s], {v}) + report = IastSpanReporter(vulnerabilities={v}) + report.add_ranges_to_evidence_and_extract_sources(v) + result = report.build_and_scrub_value_parts() - redacted_report = SqlInjection._redact_report(report) - for v in redacted_report.vulnerabilities: - assert not v.evidence.value + assert result["vulnerabilities"] + + for v in result["vulnerabilities"]: + assert v["evidence"] == {"valueParts": [{"pattern": "*******************", "redacted": True, "source": 0}]} + + for v in result["sources"]: + assert v == {"name": "SomeName", "origin": OriginType.PARAMETER, "pattern": "abcdefghijkl", "redacted": True} def test_redacted_report_evidence_value_match_also_redacts_source_value(): - ev = Evidence(value="'SomeSecretPassword'") + string_evicence = taint_pyobject( + pyobject="'SomeSecretPassword'", source_name="SomeName", source_value="SomeSecretPassword" + ) + ev = Evidence(value=string_evicence) loc = Location(path="foobar.py", line=35, spanId=123) v = Vulnerability(type=VULN_SQL_INJECTION, evidence=ev, location=loc) - s = Source(origin="SomeOrigin", name="SomeName", value="SomeSecretPassword") - report = IastSpanReporter([s], {v}) + report = IastSpanReporter(vulnerabilities={v}) + report.add_ranges_to_evidence_and_extract_sources(v) + result = report.build_and_scrub_value_parts() - redacted_report = SqlInjection._redact_report(report) - for v in redacted_report.vulnerabilities: - assert not v.evidence.value - for s in redacted_report.sources: - assert s.redacted - assert s.pattern == "abcdefghijklmnopqr" - assert not s.value + assert result["vulnerabilities"] + + for v in result["vulnerabilities"]: + assert v["evidence"] == {"valueParts": [{"pattern": "********************", "redacted": True, "source": 0}]} + + for v in result["sources"]: + assert v == { + "name": "SomeName", + "origin": OriginType.PARAMETER, + "pattern": "abcdefghijklmnopqr", + "redacted": True, + } def test_redacted_report_valueparts(): - ev = Evidence( - valueParts=[ - {"value": "SELECT * FROM users WHERE password = '"}, - {"value": "1234", "source": 0}, - {"value": ":{SHA1}'"}, - ] - ) + string_evicence = taint_pyobject(pyobject="1234", source_name="SomeName", source_value="SomeValue") + + ev = Evidence(value=add_aspect("SELECT * FROM users WHERE password = '", add_aspect(string_evicence, ":{SHA1}'"))) loc = Location(path="foobar.py", line=35, spanId=123) v = Vulnerability(type=VULN_SQL_INJECTION, evidence=ev, location=loc) - s = Source(origin="SomeOrigin", name="SomeName", value="SomeValue") - report = IastSpanReporter([s], {v}) + report = IastSpanReporter(vulnerabilities={v}) + report.add_ranges_to_evidence_and_extract_sources(v) + result = report.build_and_scrub_value_parts() + + assert result["vulnerabilities"] + + for v in result["vulnerabilities"]: + assert v["evidence"] == { + "valueParts": [ + {"value": "SELECT * FROM users WHERE password = '"}, + {"pattern": "****", "redacted": True, "source": 0}, + {"redacted": True}, + {"value": "'"}, + ] + } - redacted_report = SqlInjection._redact_report(report) - for v in redacted_report.vulnerabilities: - assert v.evidence.valueParts == [ - {"value": "SELECT * FROM users WHERE password = '"}, - {"redacted": True}, - {"value": ":{SHA1}'"}, - ] + for v in result["sources"]: + assert v == {"name": "SomeName", "origin": OriginType.PARAMETER, "pattern": "abcdefghi", "redacted": True} def test_redacted_report_valueparts_username_not_tainted(): - ev = Evidence( - valueParts=[ - {"value": "SELECT * FROM users WHERE username = '"}, - {"value": "pepito"}, - {"value": "' AND password = '"}, - {"value": "secret", "source": 0}, - {"value": "'"}, - ] + string_evicence = taint_pyobject(pyobject="secret", source_name="SomeName", source_value="SomeValue") + + string_tainted = add_aspect( + "SELECT * FROM users WHERE username = '", + add_aspect("pepito", add_aspect("' AND password = '", add_aspect(string_evicence, "'"))), ) + ev = Evidence(value=string_tainted, dialect="POSTGRES") loc = Location(path="foobar.py", line=35, spanId=123) v = Vulnerability(type=VULN_SQL_INJECTION, evidence=ev, location=loc) - s = Source(origin="SomeOrigin", name="SomeName", value="SomeValue") - report = IastSpanReporter([s], {v}) - - redacted_report = SqlInjection._redact_report(report) - for v in redacted_report.vulnerabilities: - assert v.evidence.valueParts == [ - {"value": "SELECT * FROM users WHERE username = '"}, - {"redacted": True}, - {"value": "'"}, - {"value": " AND password = "}, - {"value": "'"}, - {"redacted": True}, - {"value": "'"}, - ] + report = IastSpanReporter(vulnerabilities={v}) + report.add_ranges_to_evidence_and_extract_sources(v) + result = report.build_and_scrub_value_parts() + + assert result["vulnerabilities"] + + for v in result["vulnerabilities"]: + assert v["evidence"] == { + "valueParts": [ + {"value": "SELECT * FROM users WHERE username = '"}, + {"redacted": True}, + {"value": "' AND password = '"}, + {"pattern": "******", "redacted": True, "source": 0}, + {"value": "'"}, + ] + } + + for v in result["sources"]: + assert v == {"name": "SomeName", "origin": OriginType.PARAMETER, "pattern": "abcdefghi", "redacted": True} def test_redacted_report_valueparts_username_tainted(): - ev = Evidence( - valueParts=[ - {"value": "SELECT * FROM users WHERE username = '"}, - {"value": "pepito", "source": 0}, - {"value": "' AND password = '"}, - {"value": "secret", "source": 0}, - {"value": "'"}, - ] + string_evicence = taint_pyobject(pyobject="secret", source_name="SomeName", source_value="SomeValue") + + string_tainted = add_aspect( + "SELECT * FROM users WHERE username = '", + add_aspect(string_evicence, add_aspect("' AND password = '", add_aspect(string_evicence, "'"))), ) + ev = Evidence(value=string_tainted, dialect="POSTGRES") loc = Location(path="foobar.py", line=35, spanId=123) v = Vulnerability(type=VULN_SQL_INJECTION, evidence=ev, location=loc) - s = Source(origin="SomeOrigin", name="SomeName", value="SomeValue") - report = IastSpanReporter([s], {v}) - - redacted_report = SqlInjection._redact_report(report) - for v in redacted_report.vulnerabilities: - assert v.evidence.valueParts == [ - {"value": "SELECT * FROM users WHERE username = '"}, - {"redacted": True}, - {"value": "'"}, - {"value": " AND password = "}, - {"value": "'"}, - {"redacted": True}, - {"value": "'"}, - ] + report = IastSpanReporter(vulnerabilities={v}) + report.add_ranges_to_evidence_and_extract_sources(v) + result = report.build_and_scrub_value_parts() + + assert result["vulnerabilities"] + + for v in result["vulnerabilities"]: + assert v["evidence"] == { + "valueParts": [ + {"value": "SELECT * FROM users WHERE username = '"}, + {"pattern": "******", "redacted": True, "source": 0}, + {"value": "' AND password = '"}, + {"pattern": "******", "redacted": True, "source": 0}, + {"value": "'"}, + ] + } + + for v in result["sources"]: + assert v == {"name": "SomeName", "origin": OriginType.PARAMETER, "pattern": "abcdefghi", "redacted": True} def test_regression_ci_failure(): - ev = Evidence( - valueParts=[ - {"value": "SELECT tbl_name FROM sqlite_"}, - {"value": "master", "source": 0}, - {"value": "WHERE tbl_name LIKE 'password'"}, - ] + string_evicence = taint_pyobject(pyobject="master", source_name="SomeName", source_value="master") + + string_tainted = add_aspect( + "SELECT tbl_name FROM sqlite_", add_aspect(string_evicence, "WHERE tbl_name LIKE 'password'") ) + ev = Evidence(value=string_tainted, dialect="POSTGRES") loc = Location(path="foobar.py", line=35, spanId=123) v = Vulnerability(type=VULN_SQL_INJECTION, evidence=ev, location=loc) - s = Source(origin="SomeOrigin", name="SomeName", value="SomeValue") - report = IastSpanReporter([s], {v}) - - redacted_report = SqlInjection._redact_report(report) - for v in redacted_report.vulnerabilities: - assert v.evidence.valueParts == [ - {"value": "SELECT tbl_name FROM sqlite_"}, - {"source": 0, "value": "master"}, - {"value": "WHERE tbl_name LIKE '"}, - {"redacted": True}, - {"value": "'"}, - ] + report = IastSpanReporter(vulnerabilities={v}) + report.add_ranges_to_evidence_and_extract_sources(v) + result = report.build_and_scrub_value_parts() + + assert result["vulnerabilities"] + + for v in result["vulnerabilities"]: + assert v["evidence"] == { + "valueParts": [ + {"value": "SELECT tbl_name FROM sqlite_"}, + {"source": 0, "value": "master"}, + {"value": "WHERE tbl_name LIKE '"}, + {"redacted": True}, + {"value": "'"}, + ] + } + + for v in result["sources"]: + assert v == {"name": "SomeName", "origin": OriginType.PARAMETER, "value": "master"} diff --git a/tests/appsec/iast/test_taint_utils.py b/tests/appsec/iast/test_taint_utils.py index 9ccf6df4507..a60cc2c547a 100644 --- a/tests/appsec/iast/test_taint_utils.py +++ b/tests/appsec/iast/test_taint_utils.py @@ -9,7 +9,7 @@ from ddtrace.appsec._iast._taint_tracking import taint_pyobject from ddtrace.appsec._iast._taint_utils import LazyTaintDict from ddtrace.appsec._iast._taint_utils import LazyTaintList -from ddtrace.appsec._iast._taint_utils import check_tainted_args +from ddtrace.appsec._iast._taint_utils import check_tainted_dbapi_args def setup(): @@ -192,17 +192,17 @@ def test_checked_tainted_args(): untainted_arg = "gallahad the pure" # Returns False: Untainted first argument - assert not check_tainted_args( + assert not check_tainted_dbapi_args( args=(untainted_arg,), kwargs=None, tracer=None, integration_name="sqlite", method=cursor.execute ) # Returns False: Untainted first argument - assert not check_tainted_args( + assert not check_tainted_dbapi_args( args=(untainted_arg, tainted_arg), kwargs=None, tracer=None, integration_name="sqlite", method=cursor.execute ) # Returns False: Integration name not in list - assert not check_tainted_args( + assert not check_tainted_dbapi_args( args=(tainted_arg,), kwargs=None, tracer=None, @@ -211,7 +211,7 @@ def test_checked_tainted_args(): ) # Returns False: Wrong function name - assert not check_tainted_args( + assert not check_tainted_dbapi_args( args=(tainted_arg,), kwargs=None, tracer=None, @@ -220,17 +220,17 @@ def test_checked_tainted_args(): ) # Returns True: - assert check_tainted_args( + assert check_tainted_dbapi_args( args=(tainted_arg, untainted_arg), kwargs=None, tracer=None, integration_name="sqlite", method=cursor.execute ) # Returns True: - assert check_tainted_args( + assert check_tainted_dbapi_args( args=(tainted_arg, untainted_arg), kwargs=None, tracer=None, integration_name="mysql", method=cursor.execute ) # Returns True: - assert check_tainted_args( + assert check_tainted_dbapi_args( args=(tainted_arg, untainted_arg), kwargs=None, tracer=None, integration_name="psycopg", method=cursor.execute ) diff --git a/tests/contrib/dbapi/test_dbapi_appsec.py b/tests/contrib/dbapi/test_dbapi_appsec.py index 819f969ede6..3ba165da8cf 100644 --- a/tests/contrib/dbapi/test_dbapi_appsec.py +++ b/tests/contrib/dbapi/test_dbapi_appsec.py @@ -36,7 +36,7 @@ def test_tainted_query(self): traced_cursor.execute(query) cursor.execute.assert_called_once_with(query) - mock_sql_injection_report.assert_called_once_with(evidence_value=query) + mock_sql_injection_report.assert_called_once_with(evidence_value=query, dialect="sqlite") @pytest.mark.skipif(not _is_python_version_supported(), reason="IAST compatible versions") def test_tainted_query_args(self): diff --git a/tests/contrib/django/django_app/appsec_urls.py b/tests/contrib/django/django_app/appsec_urls.py index 461ec6adba4..a4c4f2a60b5 100644 --- a/tests/contrib/django/django_app/appsec_urls.py +++ b/tests/contrib/django/django_app/appsec_urls.py @@ -99,10 +99,10 @@ def sqli_http_request_header_name(request): def sqli_http_request_header_value(request): value = [x for x in request.META.values() if x == "master"][0] - with connection.cursor() as cursor: + query = add_aspect("SELECT 1 FROM sqlite_", value) # label iast_enabled_sqli_http_request_header_value - cursor.execute(add_aspect("SELECT 1 FROM sqlite_", value)) + cursor.execute(query) return HttpResponse(request.META["HTTP_USER_AGENT"], status=200) diff --git a/tests/contrib/django/test_django_appsec_iast.py b/tests/contrib/django/test_django_appsec_iast.py index af9dbd7ba53..98039e94d55 100644 --- a/tests/contrib/django/test_django_appsec_iast.py +++ b/tests/contrib/django/test_django_appsec_iast.py @@ -1,7 +1,6 @@ # -*- coding: utf-8 -*- import json -import mock import pytest from ddtrace.appsec._constants import IAST @@ -28,9 +27,9 @@ def reset_context(): from ddtrace.appsec._iast._taint_tracking import create_context from ddtrace.appsec._iast._taint_tracking import reset_context - yield - reset_context() - _ = create_context() + yield + reset_context() + _ = create_context() def _aux_appsec_get_root_span( @@ -66,7 +65,7 @@ def _aux_appsec_get_root_span( @pytest.mark.skipif(not python_supported_by_iast(), reason="Python version not supported by IAST") def test_django_weak_hash(client, test_spans, tracer): - with override_global_config(dict(_asm_enabled=True, _iast_enabled=True)), override_env({"DD_IAST_ENABLED": "True"}): + with override_global_config(dict(_asm_enabled=True, _iast_enabled=True, _deduplication_enabled=False)): oce.reconfigure() patch_iast({"weak_hash": True}) root_span, _ = _aux_appsec_get_root_span(client, test_spans, tracer, url="/appsec/weak-hash/") @@ -79,10 +78,7 @@ def test_django_weak_hash(client, test_spans, tracer): @pytest.mark.skipif(not python_supported_by_iast(), reason="Python version not supported by IAST") def test_django_tainted_user_agent_iast_enabled(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True)), override_env({"DD_IAST_ENABLED": "True"}): - oce.reconfigure() - tracer._iast_enabled = True - + with override_global_config(dict(_iast_enabled=True, _deduplication_enabled=False)): root_span, response = _aux_appsec_get_root_span( client, test_spans, @@ -99,7 +95,7 @@ def test_django_tainted_user_agent_iast_enabled(client, test_spans, tracer): @pytest.mark.skipif(not python_supported_by_iast(), reason="Python version not supported by IAST") def test_django_tainted_user_agent_iast_disabled(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=False)), override_env({"DD_IAST_ENABLED": "True"}): + with override_global_config(dict(_iast_enabled=False, _deduplication_enabled=False)): oce.reconfigure() root_span, response = _aux_appsec_get_root_span( @@ -121,9 +117,7 @@ def test_django_tainted_user_agent_iast_disabled(client, test_spans, tracer): @pytest.mark.django_db() @pytest.mark.skipif(not python_supported_by_iast(), reason="Python version not supported by IAST") def test_django_tainted_user_agent_iast_enabled_sqli_http_request_parameter(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True)), mock.patch( - "ddtrace.contrib.dbapi._is_iast_enabled", return_value=True - ), override_env({"DD_IAST_ENABLED": "True"}): + with override_global_config(dict(_iast_enabled=True)): root_span, response = _aux_appsec_get_root_span( client, test_spans, @@ -144,14 +138,20 @@ def test_django_tainted_user_agent_iast_enabled_sqli_http_request_parameter(clie line, hash_value = get_line_and_hash("iast_enabled_sqli_http_request_parameter", vuln_type, filename=TEST_FILE) assert loaded["sources"] == [ - {"origin": "http.request.parameter", "name": "q", "value": "SELECT 1 FROM sqlite_master"} + { + "name": "q", + "origin": "http.request.parameter", + "pattern": "abcdefghijklmnopqrstuvwxyzA", + "redacted": True, + } ] + assert loaded["vulnerabilities"][0]["type"] == vuln_type assert loaded["vulnerabilities"][0]["evidence"] == { "valueParts": [ - {"value": "SELECT ", "source": 0}, - {"redacted": True}, - {"value": " FROM sqlite_master", "source": 0}, + {"source": 0, "value": "SELECT "}, + {"pattern": "h", "redacted": True, "source": 0}, + {"source": 0, "value": " FROM sqlite_master"}, ] } assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE @@ -162,9 +162,7 @@ def test_django_tainted_user_agent_iast_enabled_sqli_http_request_parameter(clie @pytest.mark.django_db() @pytest.mark.skipif(not python_supported_by_iast(), reason="Python version not supported by IAST") def test_django_tainted_user_agent_iast_enabled_sqli_http_request_header_value(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True)), mock.patch( - "ddtrace.contrib.dbapi._is_iast_enabled", return_value=True - ), override_env({"DD_IAST_ENABLED": "True"}): + with override_global_config(dict(_iast_enabled=True, _deduplication_enabled=False)): root_span, response = _aux_appsec_get_root_span( client, test_spans, @@ -175,37 +173,34 @@ def test_django_tainted_user_agent_iast_enabled_sqli_http_request_header_value(c headers={"HTTP_USER_AGENT": "master"}, ) - vuln_type = "SQL_INJECTION" - loaded = json.loads(root_span.get_tag(IAST.JSON)) + assert response.status_code == 200 + assert response.content == b"master" - line, hash_value = get_line_and_hash( - "iast_enabled_sqli_http_request_header_value", vuln_type, filename=TEST_FILE - ) + loaded = json.loads(root_span.get_tag(IAST.JSON)) assert loaded["sources"] == [{"origin": "http.request.header", "name": "HTTP_USER_AGENT", "value": "master"}] - assert loaded["vulnerabilities"][0]["type"] == vuln_type - assert loaded["vulnerabilities"][0]["hash"] == hash_value + assert loaded["vulnerabilities"][0]["type"] == VULN_SQL_INJECTION assert loaded["vulnerabilities"][0]["evidence"] == { "valueParts": [ {"value": "SELECT "}, {"redacted": True}, {"value": " FROM sqlite_"}, - {"value": "master", "source": 0}, + {"source": 0, "value": "master"}, ] } + + line, hash_value = get_line_and_hash( + "iast_enabled_sqli_http_request_header_value", VULN_SQL_INJECTION, filename=TEST_FILE + ) assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE assert loaded["vulnerabilities"][0]["location"]["line"] == line - - assert response.status_code == 200 - assert response.content == b"master" + assert loaded["vulnerabilities"][0]["hash"] == hash_value @pytest.mark.django_db() @pytest.mark.skipif(not python_supported_by_iast(), reason="Python version not supported by IAST") def test_django_tainted_user_agent_iast_disabled_sqli_http_request_header_value(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=False)), mock.patch( - "ddtrace.contrib.dbapi._is_iast_enabled", return_value=False - ), override_env({"DD_IAST_ENABLED": "True"}): + with override_global_config(dict(_iast_enabled=False)): root_span, response = _aux_appsec_get_root_span( client, test_spans, @@ -225,9 +220,7 @@ def test_django_tainted_user_agent_iast_disabled_sqli_http_request_header_value( @pytest.mark.django_db() @pytest.mark.skipif(not python_supported_by_iast(), reason="Python version not supported by IAST") def test_django_tainted_user_agent_iast_enabled_sqli_http_request_header_name(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True)), mock.patch( - "ddtrace.contrib.dbapi._is_iast_enabled", return_value=True - ), override_env({"DD_IAST_ENABLED": "True"}): + with override_global_config(dict(_iast_enabled=True, _deduplication_enabled=False)): root_span, response = _aux_appsec_get_root_span( client, test_spans, @@ -238,17 +231,13 @@ def test_django_tainted_user_agent_iast_enabled_sqli_http_request_header_name(cl headers={"master": "test/1.2.3"}, ) - vuln_type = "SQL_INJECTION" + assert response.status_code == 200 + assert response.content == b"test/1.2.3" loaded = json.loads(root_span.get_tag(IAST.JSON)) - line, hash_value = get_line_and_hash( - "iast_enabled_sqli_http_request_header_name", vuln_type, filename=TEST_FILE - ) - assert loaded["sources"] == [{"origin": "http.request.header.name", "name": "master", "value": "master"}] - assert loaded["vulnerabilities"][0]["type"] == vuln_type - assert loaded["vulnerabilities"][0]["hash"] == hash_value + assert loaded["vulnerabilities"][0]["type"] == VULN_SQL_INJECTION assert loaded["vulnerabilities"][0]["evidence"] == { "valueParts": [ {"value": "SELECT "}, @@ -257,19 +246,19 @@ def test_django_tainted_user_agent_iast_enabled_sqli_http_request_header_name(cl {"value": "master", "source": 0}, ] } + + line, hash_value = get_line_and_hash( + "iast_enabled_sqli_http_request_header_name", VULN_SQL_INJECTION, filename=TEST_FILE + ) assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE assert loaded["vulnerabilities"][0]["location"]["line"] == line - - assert response.status_code == 200 - assert response.content == b"test/1.2.3" + assert loaded["vulnerabilities"][0]["hash"] == hash_value @pytest.mark.django_db() @pytest.mark.skipif(not python_supported_by_iast(), reason="Python version not supported by IAST") def test_django_tainted_user_agent_iast_disabled_sqli_http_request_header_name(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=False)), mock.patch( - "ddtrace.contrib.dbapi._is_iast_enabled", return_value=True - ), override_env({"DD_IAST_ENABLED": "True"}): + with override_global_config(dict(_iast_enabled=False)): root_span, response = _aux_appsec_get_root_span( client, test_spans, @@ -289,9 +278,7 @@ def test_django_tainted_user_agent_iast_disabled_sqli_http_request_header_name(c @pytest.mark.django_db() @pytest.mark.skipif(not python_supported_by_iast(), reason="Python version not supported by IAST") def test_django_iast_enabled_full_sqli_http_path_parameter(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True)), mock.patch( - "ddtrace.contrib.dbapi._is_iast_enabled", return_value=True - ), override_env({"DD_IAST_ENABLED": "True"}): + with override_global_config(dict(_iast_enabled=True)): root_span, response = _aux_appsec_get_root_span( client, test_spans, @@ -299,19 +286,15 @@ def test_django_iast_enabled_full_sqli_http_path_parameter(client, test_spans, t url="/appsec/sqli_http_path_parameter/sqlite_master/", headers={"HTTP_USER_AGENT": "test/1.2.3"}, ) - vuln_type = "SQL_INJECTION" + assert response.status_code == 200 + assert response.content == b"test/1.2.3" loaded = json.loads(root_span.get_tag(IAST.JSON)) - line, hash_value = get_line_and_hash( - "iast_enabled_full_sqli_http_path_parameter", vuln_type, filename=TEST_FILE - ) - assert loaded["sources"] == [ {"origin": "http.request.path.parameter", "name": "q_http_path_parameter", "value": "sqlite_master"} ] - assert loaded["vulnerabilities"][0]["type"] == vuln_type - assert loaded["vulnerabilities"][0]["hash"] == hash_value + assert loaded["vulnerabilities"][0]["type"] == VULN_SQL_INJECTION assert loaded["vulnerabilities"][0]["evidence"] == { "valueParts": [ {"value": "SELECT "}, @@ -320,19 +303,18 @@ def test_django_iast_enabled_full_sqli_http_path_parameter(client, test_spans, t {"value": "sqlite_master", "source": 0}, ] } + line, hash_value = get_line_and_hash( + "iast_enabled_full_sqli_http_path_parameter", VULN_SQL_INJECTION, filename=TEST_FILE + ) assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE assert loaded["vulnerabilities"][0]["location"]["line"] == line - - assert response.status_code == 200 - assert response.content == b"test/1.2.3" + assert loaded["vulnerabilities"][0]["hash"] == hash_value @pytest.mark.django_db() @pytest.mark.skipif(not python_supported_by_iast(), reason="Python version not supported by IAST") def test_django_iast_disabled_full_sqli_http_path_parameter(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=False)), mock.patch( - "ddtrace.contrib.dbapi._is_iast_enabled", return_value=False - ), override_env({"DD_IAST_ENABLED": "True"}): + with override_global_config(dict(_iast_enabled=False)): root_span, response = _aux_appsec_get_root_span( client, test_spans, @@ -350,9 +332,7 @@ def test_django_iast_disabled_full_sqli_http_path_parameter(client, test_spans, @pytest.mark.django_db() @pytest.mark.skipif(not python_supported_by_iast(), reason="Python version not supported by IAST") def test_django_tainted_user_agent_iast_enabled_sqli_http_cookies_name(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True)), mock.patch( - "ddtrace.contrib.dbapi._is_iast_enabled", return_value=True - ), override_env({"DD_IAST_ENABLED": "True"}): + with override_global_config(dict(_iast_enabled=True, _deduplication_enabled=False)): root_span, response = _aux_appsec_get_root_span( client, test_spans, @@ -360,13 +340,11 @@ def test_django_tainted_user_agent_iast_enabled_sqli_http_cookies_name(client, t url="/appsec/sqli_http_request_cookie_name/", cookies={"master": "test/1.2.3"}, ) + assert response.status_code == 200 + assert response.content == b"test/1.2.3" loaded = json.loads(root_span.get_tag(IAST.JSON)) - line, hash_value = get_line_and_hash( - "iast_enabled_sqli_http_cookies_name", VULN_SQL_INJECTION, filename=TEST_FILE - ) - vulnerability = False for vuln in loaded["vulnerabilities"]: if vuln["type"] == VULN_SQL_INJECTION: @@ -375,7 +353,6 @@ def test_django_tainted_user_agent_iast_enabled_sqli_http_cookies_name(client, t assert vulnerability, "No {} reported".format(VULN_SQL_INJECTION) assert loaded["sources"] == [{"origin": "http.request.cookie.name", "name": "master", "value": "master"}] - assert vulnerability["hash"] == hash_value assert vulnerability["evidence"] == { "valueParts": [ {"value": "SELECT "}, @@ -384,19 +361,18 @@ def test_django_tainted_user_agent_iast_enabled_sqli_http_cookies_name(client, t {"value": "master", "source": 0}, ] } + line, hash_value = get_line_and_hash( + "iast_enabled_sqli_http_cookies_name", VULN_SQL_INJECTION, filename=TEST_FILE + ) assert vulnerability["location"]["path"] == TEST_FILE assert vulnerability["location"]["line"] == line - - assert response.status_code == 200 - assert response.content == b"test/1.2.3" + assert vulnerability["hash"] == hash_value @pytest.mark.django_db() @pytest.mark.skipif(not python_supported_by_iast(), reason="Python version not supported by IAST") def test_django_tainted_iast_disabled_sqli_http_cookies_name(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=False)), mock.patch( - "ddtrace.contrib.dbapi._is_iast_enabled", return_value=False - ), override_env({"DD_IAST_ENABLED": "True"}): + with override_global_config(dict(_iast_enabled=False)): root_span, response = _aux_appsec_get_root_span( client, test_spans, @@ -414,9 +390,7 @@ def test_django_tainted_iast_disabled_sqli_http_cookies_name(client, test_spans, @pytest.mark.django_db() @pytest.mark.skipif(not python_supported_by_iast(), reason="Python version not supported by IAST") def test_django_tainted_user_agent_iast_enabled_sqli_http_cookies_value(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True)), mock.patch( - "ddtrace.contrib.dbapi._is_iast_enabled", return_value=True - ), override_env({"DD_IAST_ENABLED": "True"}): + with override_global_config(dict(_iast_enabled=True, _deduplication_enabled=False)): root_span, response = _aux_appsec_get_root_span( client, test_spans, @@ -424,10 +398,10 @@ def test_django_tainted_user_agent_iast_enabled_sqli_http_cookies_value(client, url="/appsec/sqli_http_request_cookie_value/", cookies={"master": "master"}, ) - vuln_type = "SQL_INJECTION" - loaded = json.loads(root_span.get_tag(IAST.JSON)) + assert response.status_code == 200 + assert response.content == b"master" - line, hash_value = get_line_and_hash("iast_enabled_sqli_http_cookies_value", vuln_type, filename=TEST_FILE) + loaded = json.loads(root_span.get_tag(IAST.JSON)) vulnerability = False for vuln in loaded["vulnerabilities"]: @@ -437,7 +411,7 @@ def test_django_tainted_user_agent_iast_enabled_sqli_http_cookies_value(client, assert vulnerability, "No {} reported".format(VULN_SQL_INJECTION) assert loaded["sources"] == [{"origin": "http.request.cookie.value", "name": "master", "value": "master"}] assert vulnerability["type"] == "SQL_INJECTION" - assert vulnerability["hash"] == hash_value + assert vulnerability["evidence"] == { "valueParts": [ {"value": "SELECT "}, @@ -446,19 +420,19 @@ def test_django_tainted_user_agent_iast_enabled_sqli_http_cookies_value(client, {"value": "master", "source": 0}, ] } + + line, hash_value = get_line_and_hash( + "iast_enabled_sqli_http_cookies_value", VULN_SQL_INJECTION, filename=TEST_FILE + ) assert vulnerability["location"]["line"] == line assert vulnerability["location"]["path"] == TEST_FILE - - assert response.status_code == 200 - assert response.content == b"master" + assert vulnerability["hash"] == hash_value @pytest.mark.django_db() @pytest.mark.skipif(not python_supported_by_iast(), reason="Python version not supported by IAST") def test_django_tainted_iast_disabled_sqli_http_cookies_value(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=False)), mock.patch( - "ddtrace.contrib.dbapi._is_iast_enabled", return_value=False - ), override_env({"DD_IAST_ENABLED": "True"}): + with override_global_config(dict(_iast_enabled=False)): root_span, response = _aux_appsec_get_root_span( client, test_spans, @@ -483,9 +457,7 @@ def test_django_tainted_iast_disabled_sqli_http_cookies_value(client, test_spans @pytest.mark.django_db() @pytest.mark.skipif(not python_supported_by_iast(), reason="Python version not supported by IAST") def test_django_tainted_user_agent_iast_enabled_sqli_http_body(client, test_spans, tracer, payload, content_type): - with override_global_config(dict(_iast_enabled=True, _deduplication_enabled=False)), override_env( - dict(DD_IAST_ENABLED="True") - ), mock.patch("ddtrace.contrib.dbapi._is_iast_enabled", return_value=True): + with override_global_config(dict(_iast_enabled=True, _deduplication_enabled=False)): root_span, response = _aux_appsec_get_root_span( client, test_spans, @@ -519,9 +491,7 @@ def test_django_tainted_user_agent_iast_enabled_sqli_http_body(client, test_span @pytest.mark.django_db() @pytest.mark.skipif(not python_supported_by_iast(), reason="Python version not supported by IAST") def test_django_tainted_iast_disabled_sqli_http_body(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=False)), mock.patch( - "ddtrace.contrib.dbapi._is_iast_enabled", return_value=False - ), override_env({"DD_IAST_ENABLED": "True"}): + with override_global_config(dict(_iast_enabled=False)): root_span, response = _aux_appsec_get_root_span( client, test_spans, @@ -539,9 +509,7 @@ def test_django_tainted_iast_disabled_sqli_http_body(client, test_spans, tracer) @pytest.mark.skipif(not python_supported_by_iast(), reason="Python version not supported by IAST") def test_querydict_django_with_iast(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True)), mock.patch( - "ddtrace.contrib.dbapi._is_iast_enabled", return_value=False - ), override_env({"DD_IAST_ENABLED": "True"}): + with override_global_config(dict(_iast_enabled=True)): root_span, response = _aux_appsec_get_root_span( client, test_spans, @@ -559,9 +527,7 @@ def test_querydict_django_with_iast(client, test_spans, tracer): @pytest.mark.skipif(not python_supported_by_iast(), reason="Python version not supported by IAST") def test_django_command_injection(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True, _deduplication_enabled=False)), override_env( - dict(DD_IAST_ENABLED="True") - ): + with override_global_config(dict(_iast_enabled=True, _deduplication_enabled=False)): oce.reconfigure() patch_iast({"command_injection": True}) root_span, _ = _aux_appsec_get_root_span( @@ -591,9 +557,7 @@ def test_django_command_injection(client, test_spans, tracer): @pytest.mark.skipif(not python_supported_by_iast(), reason="Python version not supported by IAST") def test_django_header_injection(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True, _deduplication_enabled=False)), override_env( - dict(DD_IAST_ENABLED="True") - ): + with override_global_config(dict(_iast_enabled=True, _deduplication_enabled=False)): oce.reconfigure() patch_iast({"header_injection": True}) root_span, _ = _aux_appsec_get_root_span( diff --git a/tests/contrib/flask/test_flask_appsec_iast.py b/tests/contrib/flask/test_flask_appsec_iast.py index 3dbd11a767d..d197d639fd5 100644 --- a/tests/contrib/flask/test_flask_appsec_iast.py +++ b/tests/contrib/flask/test_flask_appsec_iast.py @@ -47,7 +47,6 @@ def setUp(self): with override_global_config( dict( _iast_enabled=True, - _asm_enabled=True, _deduplication_enabled=False, ) ), override_env(IAST_ENV): @@ -80,7 +79,7 @@ def sqli_1(param_str): with override_global_config( dict( _iast_enabled=True, - _asm_enabled=True, + _deduplication_enabled=False, ) ): resp = self.client.post("/sqli/sqlite_master/", data={"name": "test"}) @@ -131,7 +130,7 @@ def sqli_2(param_str): with override_global_config( dict( _iast_enabled=True, - _asm_enabled=True, + _deduplication_enabled=False, ) ): resp = self.client.post( @@ -380,7 +379,6 @@ def sqli_7(): with override_global_config( dict( _iast_enabled=True, - _asm_enabled=True, _deduplication_enabled=False, ) ), override_env(IAST_ENV): @@ -447,7 +445,7 @@ def sqli_8(): with override_global_config( dict( _iast_enabled=True, - _asm_enabled=True, + _deduplication_enabled=False, ) ): if tuple(map(int, werkzeug_version.split("."))) >= (2, 3): @@ -508,6 +506,7 @@ def sqli_9(): with override_global_config( dict( _iast_enabled=True, + _deduplication_enabled=False, ) ): resp = self.client.get("/sqli/parameter/?table=sqlite_master") @@ -613,7 +612,7 @@ def header_injection(): with override_global_config( dict( _iast_enabled=True, - _asm_enabled=True, + _deduplication_enabled=False, ) ): resp = self.client.post("/header_injection/", data={"name": "test"})