Skip to content

Commit

Permalink
fix: make Scannable.is_longer_than() compare to the correct size
Browse files Browse the repository at this point in the history
`Scannable.is_longer_than()` currently compares the size passed to it to
the decoded size, but what we actually want to compare it to is the
*UTF-8 encoded size*.

To implement this without reading the content multiple times,
`Scannable` implementations now store the UTF-8 encoded size, and
`Scannable` helper methods (`_is_file_longer_than()` and
`_decode_bytes()`) return the UTF-8 encoded size in addition to their
current return value.

When the content they work on is already UTF-8, the helper methods try
to avoid reading it until necessary.
  • Loading branch information
agateau-gg committed Jul 12, 2023
1 parent e631154 commit d822636
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 70 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Fixed

- `ggshield secret scan` no longer tries to scan files that are longer than the maximum document size (#561).
24 changes: 13 additions & 11 deletions ggshield/scan/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def __init__(self, path: str):
super().__init__()
self._path = Path(path)
self._content: Optional[str] = None
self._utf8_encoded_size: Optional[int] = None

@property
def url(self) -> str:
Expand All @@ -30,25 +31,26 @@ def filename(self) -> str:
def path(self) -> Path:
return self._path

def is_longer_than(self, size: int) -> bool:
if self._content:
# We already have the content, easy
return len(self._content) > size

if self.path.stat().st_size < size:
# Shortcut: if the byte size is smaller than `size`, we can be sure the
# decoded size will be smaller
return False
def is_longer_than(self, max_utf8_encoded_size: int) -> bool:
if self._utf8_encoded_size is not None:
# We already have the encoded size, easy
return self._utf8_encoded_size > max_utf8_encoded_size

with self.path.open("rb") as fp:
result, self._content = Scannable._is_file_longer_than(fp, size)
(
result,
self._content,
self._utf8_encoded_size,
) = Scannable._is_file_longer_than(fp, max_utf8_encoded_size)
return result

@property
def content(self) -> str:
if self._content is None:
with self.path.open("rb") as f:
self._content = Scannable._decode_bytes(f.read())
self._content, self._utf8_encoded_size = Scannable._decode_bytes(
f.read()
)
return self._content


Expand Down
87 changes: 59 additions & 28 deletions ggshield/scan/scannable.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import urllib.parse
from abc import ABC, abstractmethod
from io import SEEK_END, SEEK_SET
from pathlib import Path
from typing import BinaryIO, Callable, List, Optional, Tuple

Expand All @@ -14,6 +15,12 @@
logger = logging.getLogger(__name__)


# Our worse encoding (UTF-32) would take 4 bytes to encode ASCII, where UTF-8 would take
# only 1. If the file is longer than byte_size / UTF8_TO_WORSE_OTHER_ENCODING_RATIO, no
# need to look into it: it's too big.
UTF8_TO_WORSE_OTHER_ENCODING_RATIO = 4


class DecodeError(Exception):
"""
Raised when a Scannable cannot determine the encoding of its content.
Expand Down Expand Up @@ -52,8 +59,9 @@ def path(self) -> Path:
raise NotImplementedError

@abstractmethod
def is_longer_than(self, size: int) -> bool:
"""Return true if the length of the *decoded* content is greater than `size`.
def is_longer_than(self, max_utf8_encoded_size: int) -> bool:
"""Return true if the length of the *utf-8 encoded* content is greater than
`max_utf8_encoded_size`.
When possible, implementations must try to answer this without reading all
content.
Raise `DecodeError` if the content cannot be decoded.
Expand All @@ -72,10 +80,12 @@ def __repr__(self) -> str:
@staticmethod
def _decode_bytes(
raw_document: bytes, charset_match: Optional[CharsetMatch] = None
) -> str:
) -> Tuple[str, int]:
"""Low level helper function to decode bytes using `charset_match`. If
`charset_match` is not provided, tries to determine it itself.
Returns a tuple of (decoded_content, utf8_encoded_size).
Raises DecodeError if the document cannot be decoded."""
if charset_match is None:
charset_match = charset_normalizer.from_bytes(raw_document).best()
Expand All @@ -89,42 +99,60 @@ def _decode_bytes(
codecs.BOM_UTF8
):
raw_document = raw_document[len(codecs.BOM_UTF8) :]
return raw_document.decode(charset_match.encoding, errors="replace")
content = raw_document.decode(charset_match.encoding, errors="replace")

if charset_match.encoding in {"utf_8", "ascii"}:
# The document is already in UTF-8, no need to encode it as UTF-8 to
# determine UTF-8 encoded size.
utf8_encoded_size = len(raw_document)
else:
utf8_encoded_size = len(content.encode(errors="replace"))

return content, utf8_encoded_size

@staticmethod
def _is_file_longer_than(fp: BinaryIO, size: int) -> Tuple[bool, Optional[str]]:
def _is_file_longer_than(
fp: BinaryIO, max_utf8_encoded_size: int
) -> Tuple[bool, Optional[str], Optional[int]]:
"""Helper function to implement is_longer_than() for file-based Scannable classes.
Returns a tuple of:
- True if file is longer than `size`, False otherwise
- The decoded content as a string, if it has been fully read, None otherwise
- The decoded content as a string if the file has been fully read, None otherwise
- The utf8-encoded size if we know it, None otherwise
Raises DecodeError if the file cannot be decoded.
"""
byte_content = b""
str_content = ""

# Get the byte size
assert fp.seekable()
byte_size = fp.seek(0, SEEK_END)

if byte_size > max_utf8_encoded_size * UTF8_TO_WORSE_OTHER_ENCODING_RATIO:
# Even if the file used the worst encoding (UTF-32), encoding the content of
# this file as UTF-8 would produce a file longer than
# `max_utf8_encoded_size`, so bail out
return True, None, None

# Determine the encoding
fp.seek(0, SEEK_SET)
charset_matches = charset_normalizer.from_fp(fp)
charset_match = charset_matches.best()
if charset_match is None:
raise DecodeError
fp.seek(0)
while True:
# Try to read more than the requested size:
# - If the file is smaller, that changes nothing
# - if the file is bigger, we potentially avoid a second read
byte_chunk = fp.read(size * 2)
if byte_chunk:
byte_content += byte_chunk
# Note: we decode `byte_content` and not `byte_chunk`: we can't
# decode just the chunk because we have no way to know if it starts
# and ends at complete code-point boundaries
str_content = Scannable._decode_bytes(byte_content, charset_match)
if len(str_content) > size:
return True, None
else:
# We read the whole file, keep it
return False, str_content

if charset_match.encoding in {"utf_8", "ascii"}:
# Shortcut: the content is already in UTF-8 (or ASCII, which is a subset of
# utf-8), no need to decode anything
return byte_size > max_utf8_encoded_size, None, byte_size

# We can't know if the file is longer without reading its content, do it now
fp.seek(0, SEEK_SET)
content, utf8_encoded_size = Scannable._decode_bytes(fp.read(), charset_match)
if utf8_encoded_size > max_utf8_encoded_size:
return True, None, utf8_encoded_size
else:
# We read the whole file, keep it
return False, content, utf8_encoded_size


class StringScannable(Scannable):
Expand All @@ -135,6 +163,7 @@ def __init__(self, url: str, content: str, filemode: Filemode = Filemode.FILE):
self._url = url
self._path: Optional[Path] = None
self._content = content
self._utf8_encoded_size = None

@property
def url(self) -> str:
Expand All @@ -151,8 +180,10 @@ def path(self) -> Path:
self._path = Path(result.path)
return self._path

def is_longer_than(self, size: int) -> bool:
return len(self._content) > size
def is_longer_than(self, max_utf8_encoded_size: int) -> bool:
if self._utf8_encoded_size is None:
self._utf8_encoded_size = len(self._content.encode(errors="replace"))
return self._utf8_encoded_size > max_utf8_encoded_size

@property
def content(self) -> str:
Expand Down
27 changes: 14 additions & 13 deletions ggshield/secret/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def __init__(
self._tar_file = tar_file
self._tar_info = tar_info
self._content: Optional[str] = None
self._utf8_encoded_size: Optional[int] = None

@property
def url(self) -> str:
Expand All @@ -78,23 +79,22 @@ def filename(self) -> str:
def path(self) -> Path:
return Path("/", self._tar_info.name)

def is_longer_than(self, size: int) -> bool:
if self._content:
# We already have the content, easy
return len(self._content) > size

if self._tar_info.size < size:
# Shortcut: if the byte size is smaller than `size`, we can be sure the
# decoded size will be smaller
return False
def is_longer_than(self, max_utf8_encoded_size: int) -> bool:
if self._utf8_encoded_size is not None:
# We already have the encoded size, easy
return self._utf8_encoded_size > max_utf8_encoded_size

# We need to decode at least the beginning of the file to determine if it's
# small enough
fp = self._tar_file.extractfile(self._tar_info)
assert fp is not None
with fp:
result, self._content = Scannable._is_file_longer_than(
fp, size # type:ignore
(
result,
self._content,
self._utf8_encoded_size,
) = Scannable._is_file_longer_than(
fp, max_utf8_encoded_size # type:ignore
)
# mypy complains that fp is IO[bytes] but _is_file_longer_than() expects
# BinaryIO. They are compatible, ignore the error.
Expand All @@ -106,7 +106,9 @@ def content(self) -> str:
file = self._tar_file.extractfile(self._tar_info)
assert file is not None
byte_content = file.read()
self._content = Scannable._decode_bytes(byte_content)
self._content, self._utf8_encoded_size = Scannable._decode_bytes(
byte_content
)
return self._content


Expand Down Expand Up @@ -329,7 +331,6 @@ def docker_scan_archive(
scan_context: ScanContext,
ignored_detectors: Optional[Set[str]] = None,
) -> SecretScanCollection:

scanner = SecretScanner(
client=client,
cache=cache,
Expand Down
Loading

0 comments on commit d822636

Please sign in to comment.