diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 620ee1b8..a2f7b7b2 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -4,7 +4,8 @@ Changelog *unreleased* ~~~~~~~~~~~~ -No unreleased changes. +* Added the ``packaging.wheelfile`` module for reading and creating wheel files + (:issue:`697`) 24.1 - 2024-06-10 ~~~~~~~~~~~~~~~~~ diff --git a/docs/index.rst b/docs/index.rst index e658ec08..594dd9ba 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -29,6 +29,7 @@ The ``packaging`` library uses calendar-based versioning (``YY.N``). requirements metadata tags + wheelfile utils .. toctree:: diff --git a/docs/wheelfile.rst b/docs/wheelfile.rst new file mode 100644 index 00000000..f9d1cd84 --- /dev/null +++ b/docs/wheelfile.rst @@ -0,0 +1,4 @@ +Wheel Files +=========== + +.. currentmodule:: packaging.wheelfile diff --git a/src/packaging/utils.py b/src/packaging/utils.py index d33da5bb..72cf90fd 100644 --- a/src/packaging/utils.py +++ b/src/packaging/utils.py @@ -5,6 +5,7 @@ from __future__ import annotations import re +from collections.abc import Collection from typing import NewType, Tuple, Union, cast from .tags import Tag, parse_tag @@ -40,6 +41,7 @@ class InvalidSdistFilename(ValueError): _normalized_regex = re.compile(r"^([a-z0-9]|[a-z0-9]([a-z0-9-](?!--))*[a-z0-9])$") # PEP 427: The build number must start with a digit. _build_tag_regex = re.compile(r"(\d+)(.*)") +_dist_name_re = re.compile(r"[^a-z0-9.]+", re.IGNORECASE) def canonicalize_name(name: str, *, validate: bool = False) -> NormalizedName: @@ -102,6 +104,27 @@ def canonicalize_version( return "".join(parts) +def make_wheel_filename( + name: str, + version: str | Version, + tags: Collection[Tag], + *, + build_tag: BuildTag | None = None, +) -> str: + if not tags: + raise ValueError("At least one tag is required") + + name = canonicalize_name(name).replace("-", "_").lower() + filename = f"{name}-{version}" + if build_tag: + filename = f"{filename}-{build_tag[0]}{build_tag[1]}" + + interpreter_tags = ".".join(tag.interpreter for tag in tags) + abi_tags = ".".join(tag.abi for tag in tags) + platform_tags = ".".join(tag.platform for tag in tags) + return f"{filename}-{interpreter_tags}-{abi_tags}-{platform_tags}.whl" + + def parse_wheel_filename( filename: str, ) -> tuple[NormalizedName, Version, BuildTag, frozenset[Tag]]: diff --git a/src/packaging/wheelfile.py b/src/packaging/wheelfile.py new file mode 100644 index 00000000..224c80cc --- /dev/null +++ b/src/packaging/wheelfile.py @@ -0,0 +1,553 @@ +from __future__ import annotations + +__all__ = [ + "WheelMetadata", + "WheelRecordEntry", + "WheelContentElement", + "WheelError", + "WheelArchiveFile", + "WheelReader", + "write_wheelfile", + "WheelWriter", +] + +import csv +import hashlib +import os.path +import stat +import time +from base64 import urlsafe_b64decode, urlsafe_b64encode +from collections import OrderedDict +from collections.abc import Iterable, Iterator +from contextlib import ExitStack +from datetime import datetime, timezone +from email.message import Message +from email.policy import EmailPolicy +from io import BytesIO, StringIO, UnsupportedOperation +from os import PathLike +from pathlib import Path, PurePath +from types import TracebackType +from typing import IO, NamedTuple +from zipfile import ZIP_DEFLATED, ZIP_STORED, ZipFile, ZipInfo + +from .tags import Tag +from .utils import ( + BuildTag, + InvalidWheelFilename, + NormalizedName, + parse_wheel_filename, +) +from .version import Version + +_exclude_filenames = ("RECORD", "RECORD.jws", "RECORD.p7s") +_default_timestamp = datetime(1980, 1, 1, tzinfo=timezone.utc) +_email_policy = EmailPolicy(max_line_length=0, mangle_from_=False, utf8=True) + + +class WheelMetadata(NamedTuple): + name: NormalizedName + version: Version + build_tag: BuildTag + tags: frozenset[Tag] + + @classmethod + def from_filename(cls, fname: str) -> WheelMetadata: + name, version, build, tags = parse_wheel_filename(fname) + return cls(name, version, build, tags) + + +class WheelRecordEntry(NamedTuple): + hash_algorithm: str + hash_value: bytes + filesize: int + + +class WheelContentElement(NamedTuple): + path: PurePath + hash_value: bytes + size: int + stream: IO[bytes] + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({str(self.path)!r}, size={self.size!r})" + + +def _encode_hash_value(hash_value: bytes) -> str: + return urlsafe_b64encode(hash_value).rstrip(b"=").decode("ascii") + + +def _decode_hash_value(encoded_hash: str) -> bytes: + pad = b"=" * (4 - (len(encoded_hash) & 3)) + return urlsafe_b64decode(encoded_hash.encode("ascii") + pad) + + +class WheelError(Exception): + pass + + +class WheelArchiveFile: + def __init__( + self, fp: IO[bytes], arcname: str, record_entry: WheelRecordEntry | None + ): + self._fp = fp + self._arcname = arcname + self._record_entry = record_entry + if record_entry: + self._hash = hashlib.new(record_entry.hash_algorithm) + self._num_bytes_read = 0 + + def read(self, amount: int = -1) -> bytes: + data = self._fp.read(amount) + if self._record_entry is None: + return data + + if data: + self._hash.update(data) + self._num_bytes_read += len(data) + + if amount < 0 or len(data) < amount: + # The file has been read in full – check that hash and file size match + # with the entry in RECORD + if self._num_bytes_read != self._record_entry.filesize: + raise WheelError( + f"{self._arcname}: file size mismatch: " + f"{self._record_entry.filesize} bytes in RECORD, " + f"{self._num_bytes_read} bytes in archive" + ) + elif self._hash.digest() != self._record_entry.hash_value: + raise WheelError( + f"{self._arcname}: hash mismatch: " + f"{self._record_entry.hash_value.hex()} in RECORD, " + f"{self._hash.hexdigest()} in archive" + ) + + return data + + def __enter__(self) -> WheelArchiveFile: + return self + + def __exit__( + self, + exc_type: type[BaseException], + exc_val: BaseException, + exc_tb: TracebackType, + ) -> None: + self._fp.close() + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({self._arcname!r})" + + +class WheelReader: + name: NormalizedName + version: Version + _zip: ZipFile + _dist_info_dir: str + _data_dir: str + _record_entries: OrderedDict[str, WheelRecordEntry] + + def __init__(self, path_or_fd: str | PathLike[str] | IO[bytes]): + self.path_or_fd = path_or_fd + + if isinstance(path_or_fd, (str, PathLike)): + fname = Path(path_or_fd).name + try: + self.name, self.version = parse_wheel_filename(fname)[:2] + except InvalidWheelFilename as exc: + raise WheelError(str(exc)) from None + + def __enter__(self) -> WheelReader: + self._zip = ZipFile(self.path_or_fd, "r") + + # See if the expected .dist-info directory is in place by searching for RECORD + # in the expected directory. Wheels made with older versions of "wheel" did not + # properly normalize the names, so the name of the .dist-info directory does not + # match the expectation there. + dist_info_dir: str | None = None + if hasattr(self, "name"): + dist_info_dir = f"{self.name}-{self.version}.dist-info" + try: + self._zip.getinfo(f"{dist_info_dir}/RECORD") + except KeyError: + dist_info_dir = None + else: + self._dist_info_dir = dist_info_dir + self._data_dir = f"{self.name}-{self.version}.data" + + # If no .dist-info directory could not be found yet, resort to scanning the + # archive's file names for any .dist-info directory containing a RECORD file. + if dist_info_dir is None: + try: + for zinfo in reversed(self._zip.infolist()): + if zinfo.filename.endswith(".dist-info/RECORD"): + dist_info_dir = zinfo.filename.rsplit("/", 1)[0] + namever = dist_info_dir.rsplit(".", 1)[0] + name, version = namever.rpartition("-")[::2] + if name and version: + self.name = NormalizedName(name) + self.version = Version(version) + self._dist_info_dir = dist_info_dir + self._data_dir = dist_info_dir.replace( + ".dist-info", ".data" + ) + break + else: + raise WheelError( + "Cannot find a valid .dist-info directory. " + "Is this really a wheel file?" + ) + except BaseException: + self._zip.close() + raise + + self._record_entries = self._read_record() + return self + + def __exit__( + self, + exc_type: type[BaseException], + exc_val: BaseException, + exc_tb: TracebackType, + ) -> None: + self._zip.close() + self._record_entries.clear() + del self._zip + + def _read_record(self) -> OrderedDict[str, WheelRecordEntry]: + entries = OrderedDict() + try: + contents = self.read_dist_info("RECORD") + except WheelError: + raise WheelError(f"Missing {self._dist_info_dir}/RECORD file") from None + + reader = csv.reader( + contents.strip().split("\n"), + delimiter=",", + quotechar='"', + lineterminator="\n", + ) + for row in reader: + if not row: + break + + path, hash_digest, filesize = row + if hash_digest: + algorithm, hash_digest = hash_digest.split("=") + try: + hashlib.new(algorithm) + except ValueError: + raise WheelError( + f"Unsupported hash algorithm: {algorithm}" + ) from None + + if algorithm.lower() in {"md5", "sha1"}: + raise WheelError( + f"Weak hash algorithm ({algorithm}) is not permitted by PEP 427" + ) + + entries[path] = WheelRecordEntry( + algorithm, _decode_hash_value(hash_digest), int(filesize) + ) + + return entries + + @property + def dist_info_dir(self) -> str: + return self._dist_info_dir + + @property + def data_dir(self) -> str: + return self._data_dir + + @property + def dist_info_filenames(self) -> list[PurePath]: + return [ + PurePath(fname) + for fname in self._zip.namelist() + if fname.startswith(self._dist_info_dir) + ] + + @property + def filenames(self) -> list[PurePath]: + return [PurePath(fname) for fname in self._zip.namelist()] + + def read_dist_info(self, filename: str) -> str: + filename = self.dist_info_dir + "/" + filename + try: + contents = self._zip.read(filename) + except KeyError: + raise WheelError(f"File {filename!r} not found") from None + + return contents.decode("utf-8") + + def iterate_contents(self) -> Iterator[WheelContentElement]: + for fname, entry in self._record_entries.items(): + with self._zip.open(fname, "r") as stream: + yield WheelContentElement( + PurePath(fname), entry.hash_value, entry.filesize, stream + ) + + def validate_record(self) -> None: + """Verify the integrity of the contained files.""" + for zinfo in self._zip.infolist(): + # Ignore signature files + basename = os.path.basename(zinfo.filename) + if basename in _exclude_filenames: + continue + + with self.open(zinfo.filename) as fp: + while True: + if not fp.read(65536): + break + + def extractall(self, base_path: str | PathLike[str]) -> None: + basedir = Path(base_path) + if not basedir.exists(): + raise WheelError(f"{basedir} does not exist") + elif not basedir.is_dir(): + raise WheelError(f"{basedir} is not a directory") + + for fname in self._zip.namelist(): + target_path = basedir.joinpath(fname) + target_path.parent.mkdir(0o755, True, True) + with self.open(fname) as infile, target_path.open("wb") as outfile: + while True: + data = infile.read(65536) + if not data: + break + + outfile.write(data) + + def open(self, archive_name: str) -> WheelArchiveFile: + basename = os.path.basename(archive_name) + if basename in _exclude_filenames: + record_entry = None + else: + try: + record_entry = self._record_entries[archive_name] + except KeyError: + raise WheelError(f"No hash found for file {archive_name!r}") from None + + return WheelArchiveFile( + self._zip.open(archive_name), archive_name, record_entry + ) + + def read_file(self, archive_name: str) -> bytes: + with self.open(archive_name) as fp: + return fp.read() + + def read_data_file(self, filename: str) -> bytes: + archive_path = self._data_dir + "/" + filename.strip("/") + return self.read_file(archive_path) + + def read_distinfo_file(self, filename: str) -> bytes: + archive_path = self._dist_info_dir + "/" + filename.strip("/") + return self.read_file(archive_path) + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({self.path_or_fd})" + + +def write_wheelfile( + fp: IO[bytes], /, *, generator: str, metadata: WheelMetadata, root_is_purelib: bool +) -> None: + msg = Message(policy=_email_policy) + msg["Wheel-Version"] = "1.0" # of the spec + msg["Generator"] = generator + msg["Root-Is-Purelib"] = str(root_is_purelib).lower() + if metadata.build_tag: + msg["Build"] = str(metadata.build_tag[0]) + metadata.build_tag[1] + + for tag in sorted(metadata.tags, key=lambda t: (t.interpreter, t.abi, t.platform)): + msg["Tag"] = f"{tag.interpreter}-{tag.abi}-{tag.platform}" + + fp.write(msg.as_bytes()) + + +class WheelWriter: + def __init__( + self, + path_or_fd: str | PathLike[str] | IO[bytes], + /, + *, + generator: str, + metadata: WheelMetadata | None = None, + root_is_purelib: bool = True, + compress: bool = True, + hash_algorithm: str = "sha256", + ): + self.path_or_fd = path_or_fd + self.generator = generator + self.root_is_purelib = root_is_purelib + self.hash_algorithm = hash_algorithm + self._compress_type = ZIP_DEFLATED if compress else ZIP_STORED + + if metadata: + self.metadata = metadata + elif isinstance(path_or_fd, (str, PathLike)): + filename = Path(path_or_fd).name + self.metadata = WheelMetadata.from_filename(filename) + else: + raise WheelError("path_or_fd is not a path, and metadata was not provided") + + if hash_algorithm not in hashlib.algorithms_available: + raise ValueError(f"Hash algorithm {hash_algorithm!r} is not available") + elif hash_algorithm in ("md5", "sha1"): + raise ValueError( + f"Weak hash algorithm ({hash_algorithm}) is not permitted by PEP 427" + ) + + self._dist_info_dir = f"{self.metadata.name}-{self.metadata.version}.dist-info" + self._data_dir = f"{self.metadata.name}-{self.metadata.version}.data" + self._record_path = f"{self._dist_info_dir}/RECORD" + self._record_entries: dict[str, WheelRecordEntry] = OrderedDict() + + def __enter__(self) -> WheelWriter: + self._zip = ZipFile(self.path_or_fd, "w", compression=self._compress_type) + return self + + def __exit__( + self, + exc_type: type[BaseException], + exc_val: BaseException, + exc_tb: TracebackType, + ) -> None: + try: + if not exc_type: + if f"{self._dist_info_dir}/WHEEL" not in self._record_entries: + self._write_wheelfile() + + self._write_record() + finally: + self._zip.close() + + def _write_record(self) -> None: + data = StringIO() + writer = csv.writer(data, delimiter=",", quotechar='"', lineterminator="\n") + writer.writerows( + [ + ( + fname, + entry.hash_algorithm + "=" + _encode_hash_value(entry.hash_value), + entry.filesize, + ) + for fname, entry in self._record_entries.items() + ] + ) + writer.writerow((self._record_path, "", "")) + self.write_distinfo_file("RECORD", data.getvalue()) + + def _write_wheelfile(self) -> None: + buffer = BytesIO() + write_wheelfile( + buffer, + generator=self.generator, + metadata=self.metadata, + root_is_purelib=self.root_is_purelib, + ) + self.write_distinfo_file("WHEEL", buffer.getvalue()) + + def write_metadata(self, items: Iterable[tuple[str, str]]) -> None: + msg = Message(policy=_email_policy) + for key, value in items: + key = key.title() + if key == "Description": + msg.set_payload(value.encode("utf-8")) + else: + msg.add_header(key, value) + + if "Metadata-Version" not in msg: + msg["Metadata-Version"] = "2.3" + if "Name" not in msg: + msg["Name"] = self.metadata.name + if "Version" not in msg: + msg["Version"] = str(self.metadata.version) + + self.write_distinfo_file("METADATA", msg.as_bytes()) + + def write_file( + self, + name: str | PurePath, + contents: bytes | str | PathLike[str] | IO[bytes], + *, + timestamp: datetime = _default_timestamp, + ) -> None: + arcname = PurePath(name).as_posix() + gmtime = time.gmtime(timestamp.timestamp()) + zinfo = ZipInfo(arcname, gmtime[:6]) + zinfo.compress_type = self._compress_type + zinfo.external_attr = 0o664 << 16 + with ExitStack() as exit_stack: + fp = exit_stack.enter_context(self._zip.open(zinfo, "w")) + if isinstance(contents, str): + contents = contents.encode("utf-8") + elif isinstance(contents, PathLike): + contents = exit_stack.enter_context(Path(contents).open("rb")) + + if isinstance(contents, bytes): + file_size = len(contents) + fp.write(contents) + hash_ = hashlib.new(self.hash_algorithm, contents) + else: + try: + st = os.stat(contents.fileno()) + except (AttributeError, UnsupportedOperation): + pass + else: + zinfo.external_attr = ( + stat.S_IMODE(st.st_mode) | stat.S_IFMT(st.st_mode) + ) << 16 + + hash_ = hashlib.new(self.hash_algorithm) + while True: + buffer = contents.read(65536) + if not buffer: + file_size = contents.tell() + break + + hash_.update(buffer) + fp.write(buffer) + + self._record_entries[arcname] = WheelRecordEntry( + self.hash_algorithm, hash_.digest(), file_size + ) + + def write_files_from_directory(self, directory: str | PathLike[str]) -> None: + basedir = Path(directory) + if not basedir.exists(): + raise WheelError(f"{basedir} does not exist") + elif not basedir.is_dir(): + raise WheelError(f"{basedir} is not a directory") + + for root, _dirs, files in os.walk(basedir): + for fname in files: + path = Path(root) / fname + relative = path.relative_to(basedir) + if relative.as_posix() != self._record_path: + self.write_file(relative, path) + + def write_data_file( + self, + filename: str, + contents: bytes | str | PathLike[str] | IO[bytes], + *, + timestamp: datetime = _default_timestamp, + ) -> None: + archive_path = self._data_dir + "/" + filename.strip("/") + self.write_file(archive_path, contents, timestamp=timestamp) + + def write_distinfo_file( + self, + filename: str, + contents: bytes | str | IO[bytes], + *, + timestamp: datetime = _default_timestamp, + ) -> None: + archive_path = self._dist_info_dir + "/" + filename.strip() + self.write_file(archive_path, contents, timestamp=timestamp) + + def __repr__(self) -> str: + return ( + f"{self.__class__.__name__}({self.path_or_fd}, " + f"generator={self.generator!r})" + ) diff --git a/tests/test_utils.py b/tests/test_utils.py index 87c86eef..733494e3 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,17 +1,20 @@ # This file is dual licensed under the terms of the Apache License, Version # 2.0, and the BSD License. See the LICENSE file in the root of this repository # for complete details. +from __future__ import annotations import pytest from packaging.tags import Tag from packaging.utils import ( + BuildTag, InvalidName, InvalidSdistFilename, InvalidWheelFilename, canonicalize_name, canonicalize_version, is_normalized_name, + make_wheel_filename, parse_sdist_filename, parse_wheel_filename, ) @@ -92,6 +95,56 @@ def test_canonicalize_version_no_strip_trailing_zero(version): assert canonicalize_version(version, strip_trailing_zero=False) == version +@pytest.mark.parametrize( + ("expected_filename", "name", "version", "build", "tags"), + [ + pytest.param( + "foo-1.0-py3-none-any.whl", + "foo", + Version("1.0"), + (), + {Tag("py3", "none", "any")}, + id="simple", + ), + pytest.param( + "some_pack_age-1.0-py3-none-any.whl", + "some-PACK.AGE", + Version("1.0"), + (), + {Tag("py3", "none", "any")}, + id="normalizename", + ), + pytest.param( + "foo-1.0-1000-py3-none-any.whl", + "foo", + Version("1.0"), + (1000, ""), + {Tag("py3", "none", "any")}, + id="numericbuildtag", + ), + pytest.param( + "foo-1.0-1000abc-py3-none-any.whl", + "foo", + Version("1.0"), + (1000, "abc"), + {Tag("py3", "none", "any")}, + id="complexbuildtag", + ), + ], +) +def test_make_wheel_filename( + expected_filename: str, name: str, version: Version, build: BuildTag, tags: set[Tag] +) -> None: + assert ( + make_wheel_filename(name, version, tags, build_tag=build) == expected_filename + ) + + +def test_make_wheel_filename_no_tags() -> None: + with pytest.raises(ValueError, match="At least one tag is required"): + make_wheel_filename("foo", "1.0", []) + + @pytest.mark.parametrize( ("filename", "name", "version", "build", "tags"), [ diff --git a/tests/test_wheelfile.py b/tests/test_wheelfile.py new file mode 100644 index 00000000..4b1b7389 --- /dev/null +++ b/tests/test_wheelfile.py @@ -0,0 +1,495 @@ +from __future__ import annotations + +import os.path +import sys +from io import BytesIO +from pathlib import Path, PurePath +from textwrap import dedent +from zipfile import ZIP_DEFLATED, ZipFile + +import pytest +from pytest import MonkeyPatch, TempPathFactory + +from packaging.utils import InvalidWheelFilename +from packaging.wheelfile import WheelError, WheelReader, WheelWriter + + +@pytest.fixture +def wheel_path(tmp_path: Path) -> Path: + return tmp_path / "test-1.0-py2.py3-none-any.whl" + + +class TestWheelReader: + @pytest.fixture(scope="class") + def valid_wheel(self, tmp_path_factory: TempPathFactory) -> Path: + path = tmp_path_factory.mktemp("reader") / "test-1.0-py2.py3-none-any.whl" + with ZipFile(path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, world!")\n') + zf.writestr( + "test-1.0.dist-info/RECORD", + "hello/héllö.py,sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25", + ) + + return path + + def test_properties(self, valid_wheel: Path) -> None: + with WheelReader(valid_wheel) as reader: + assert reader.dist_info_dir == "test-1.0.dist-info" + assert reader.data_dir == "test-1.0.data" + assert reader.dist_info_filenames == [PurePath("test-1.0.dist-info/RECORD")] + + def test_bad_wheel_filename(self) -> None: + with pytest.raises(WheelError, match="Invalid wheel filename"): + WheelReader("badname") + + def test_str_filename(self, valid_wheel: Path) -> None: + reader = WheelReader(str(valid_wheel)) + assert reader.path_or_fd == str(valid_wheel) + + def test_pathlike_filename(self, valid_wheel: Path) -> None: + class Foo: + def __fspath__(self) -> str: + return str(valid_wheel) + + foo = Foo() + with WheelReader(foo) as reader: + assert reader.path_or_fd is foo + + def test_pass_open_file(self, valid_wheel: Path) -> None: + with valid_wheel.open("rb") as fp, WheelReader(fp) as reader: + assert reader.path_or_fd is fp + + def test_missing_record(self, wheel_path: Path) -> None: + with ZipFile(wheel_path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, w0rld!")\n') + + with pytest.raises( + WheelError, + match=( + r"^Cannot find a valid .dist-info directory. Is this really a wheel " + r"file\?$" + ), + ): + with WheelReader(wheel_path): + pass + + def test_unsupported_hash_algorithm(self, wheel_path: Path) -> None: + with ZipFile(wheel_path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, w0rld!")\n') + zf.writestr( + "test-1.0.dist-info/RECORD", + "hello/héllö.py,sha000=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25", + ) + + with pytest.raises(WheelError, match="^Unsupported hash algorithm: sha000$"): + with WheelReader(wheel_path): + pass + + @pytest.mark.parametrize( + "algorithm, digest", + [ + pytest.param("md5", "4J-scNa2qvSgy07rS4at-Q", id="md5"), + pytest.param("sha1", "QjCnGu5Qucb6-vir1a6BVptvOA4", id="sha1"), + ], + ) + def test_weak_hash_algorithm( + self, wheel_path: Path, algorithm: str, digest: str + ) -> None: + hash_string = f"{algorithm}={digest}" + with ZipFile(wheel_path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, w0rld!")\n') + zf.writestr("test-1.0.dist-info/RECORD", f"hello/héllö.py,{hash_string},25") + + with pytest.raises( + WheelError, + match=rf"^Weak hash algorithm \({algorithm}\) is not permitted by PEP 427$", + ): + with WheelReader(wheel_path): + pass + + @pytest.mark.parametrize( + "algorithm, digest", + [ + ("sha256", "bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo"), + ( + "sha384", + "cDXriAy_7i02kBeDkN0m2RIDz85w6pwuHkt2PZ4VmT2PQc1TZs8Ebvf6eKDFcD_S", + ), + ( + "sha512", + "kdX9CQlwNt4FfOpOKO_X0pn_v1opQuksE40SrWtMyP1NqooWVWpzCE3myZTfpy8g2azZON_" + "iLNpWVxTwuDWqBQ", + ), + ], + ids=["sha256", "sha384", "sha512"], + ) + def test_validate_record( + self, wheel_path: Path, algorithm: str, digest: str + ) -> None: + hash_string = f"{algorithm}={digest}" + with ZipFile(wheel_path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, world!")\n') + zf.writestr("test-1.0.dist-info/RECORD", f"hello/héllö.py,{hash_string},25") + + with WheelReader(wheel_path) as wf: + wf.validate_record() + + def test_validate_record_missing_hash(self, wheel_path: Path) -> None: + with ZipFile(wheel_path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, world!")\n') + zf.writestr("test-1.0.dist-info/RECORD", "") + + with WheelReader(wheel_path) as wf: + exc = pytest.raises(WheelError, wf.validate_record) + exc.match("^No hash found for file 'hello/héllö.py'$") + + def test_validate_record_bad_hash(self, wheel_path: Path) -> None: + with ZipFile(wheel_path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, w0rld!")\n') + zf.writestr( + "test-1.0.dist-info/RECORD", + "hello/héllö.py,sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25", + ) + + with WheelReader(wheel_path) as wf: + exc = pytest.raises(WheelError, wf.validate_record) + exc.match( + "hello/héllö.py: hash mismatch: " + "6eff9057745c8900b6bf7ccbf14be177f6aba78d09e40f719b2b9b377e0e570a in " + "RECORD, " + "1eac82375d38fdb8a4c653c6c2b3c363058d5c193cf24bafcd1df040d344597e in " + "archive$" + ) + + def test_unnormalized_wheel(self, tmp_path: Path) -> None: + # Previous versions of "wheel" did not correctly normalize the names; test that + # we can still read such wheels + wheel_path = tmp_path / "Test_foo_bar-1.0.0-py3-none-any.whl" + with ZipFile(wheel_path, "w") as zf: + zf.writestr( + "Test_foo_bar-1.0.0.dist-info/RECORD", + "Test_foo_bar-1.0.0.dist-info/RECORD,,\n", + ) + + with WheelReader(wheel_path): + pass + + def test_read_file(self, valid_wheel: Path) -> None: + with WheelReader(valid_wheel) as wf: + contents = wf.read_file("hello/héllö.py") + + assert contents == b'print("H\xc3\xa9ll\xc3\xb6, world!")\n' + + @pytest.mark.parametrize( + "amount", + [ + pytest.param(-1, id="oneshot"), + pytest.param(2, id="gradual"), + ], + ) + def test_read_file_bad_hash(self, wheel_path: Path, amount: int) -> None: + with ZipFile(wheel_path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, w0rld!")\n') + zf.writestr( + "test-1.0.dist-info/RECORD", + "hello/héllö.py,sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25", + ) + + with pytest.raises( + WheelError, + match=( + "^hello/héllö.py: hash mismatch: " + "6eff9057745c8900b6bf7ccbf14be177f6aba78d09e40f719b2b9b377e0e570a in " + "RECORD, " + "1eac82375d38fdb8a4c653c6c2b3c363058d5c193cf24bafcd1df040d344597e in " + "archive$" + ), + ), WheelReader(wheel_path) as wf, wf.open("hello/héllö.py") as f: + assert repr(f) == "WheelArchiveFile('hello/héllö.py')" + while f.read(amount): + pass + + @pytest.mark.parametrize( + "amount", + [ + pytest.param(-1, id="oneshot"), + pytest.param(2, id="gradual"), + ], + ) + def test_read_file_bad_size(self, wheel_path: Path, amount: int) -> None: + with ZipFile(wheel_path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, w0rld!")\n') + zf.writestr( + "test-1.0.dist-info/RECORD", + "hello/héllö.py,sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,24", + ) + + with pytest.raises( + WheelError, + match=( + "^hello/héllö.py: file size mismatch: 24 bytes in RECORD, 25 bytes in " + "archive$" + ), + ), WheelReader(wheel_path) as wf, wf.open("hello/héllö.py") as f: + while f.read(amount): + pass + + def test_read_data_file(self, wheel_path: Path) -> None: + with ZipFile(wheel_path, "w") as zf: + zf.writestr("test-1.0.data/héllö.py", 'print("Héllö, world!")\n') + zf.writestr( + "test-1.0.dist-info/RECORD", + "test-1.0.data/héllö.py," + "sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25", + ) + + with WheelReader(wheel_path) as wf: + contents = wf.read_data_file("héllö.py") + + assert contents == b'print("H\xc3\xa9ll\xc3\xb6, world!")\n' + + def test_read_distinfo_file(self, valid_wheel: Path) -> None: + with WheelReader(valid_wheel) as wf: + contents = wf.read_distinfo_file("RECORD") + + assert ( + contents == b"hello/h\xc3\xa9ll\xc3\xb6.py," + b"sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25" + ) + + def test_iterate_contents(self, valid_wheel: Path) -> None: + with WheelReader(valid_wheel) as wf: + for element in wf.iterate_contents(): + assert element.path == PurePath("hello", "héllö.py") + assert element.size == 25 + assert ( + element.hash_value.hex() + == "6eff9057745c8900b6bf7ccbf14be177f6aba78d09e40f719b2b9b377e0e570" + "a" + ) + assert ( + element.stream.read() == b'print("H\xc3\xa9ll\xc3\xb6, world!")\n' + ) + assert repr(element) == "WheelContentElement('hello/héllö.py', size=25)" + + def test_extractall( + self, valid_wheel: Path, tmp_path_factory: TempPathFactory + ) -> None: + dest_dir = tmp_path_factory.mktemp("wheel_contents") + with WheelReader(valid_wheel) as wf: + wf.extractall(dest_dir) + + iterator = os.walk(dest_dir) + dirpath, dirnames, filenames = next(iterator) + assert dirnames == ["hello", "test-1.0.dist-info"] + assert not filenames + + dirpath, dirnames, filenames = next(iterator) + assert dirpath.endswith("hello") + assert filenames == ["héllö.py"] + assert ( + Path(dirpath).joinpath(filenames[0]).read_text() + == 'print("Héllö, world!")\n' + ) + + dirpath, dirnames, filenames = next(iterator) + assert dirpath.endswith("test-1.0.dist-info") + assert filenames == ["RECORD"] + assert Path(dirpath).joinpath(filenames[0]).read_text() == ( + "hello/héllö.py,sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25" + ) + + +class TestWheelWriter: + @pytest.mark.parametrize( + "filename, reason", + [ + pytest.param("test.whl", "wrong number of parts"), + pytest.param("test-1.0.whl", "wrong number of parts"), + pytest.param("test-1.0-py2.whl", "wrong number of parts"), + pytest.param("test-1.0-py2-none.whl", "wrong number of parts"), + pytest.param("test-1.0-py2-none-any", "extension must be '.whl'"), + pytest.param( + "test-1.0-py 2-none-any.whl", + "bad file name", + marks=[ + pytest.mark.xfail( + reason="parse_wheel_filename() does not fail this yet" + ) + ], + ), + ], + ) + def test_bad_wheel_filename(self, filename: str, reason: str) -> None: + basename = ( + os.path.splitext(filename)[0] if filename.endswith(".whl") else filename + ) + with pytest.raises( + InvalidWheelFilename, + match=rf"^Invalid wheel filename \({reason}\): {basename}$", + ): + WheelWriter(filename, generator="foo") + + def test_unavailable_hash_algorithm(self, wheel_path: Path) -> None: + with pytest.raises( + ValueError, + match=r"^Hash algorithm 'sha000' is not available$", + ): + WheelWriter(wheel_path, generator="generator 1.0", hash_algorithm="sha000") + + @pytest.mark.parametrize( + "algorithm", + [ + pytest.param("md5"), + pytest.param("sha1"), + ], + ) + def test_weak_hash_algorithm(self, wheel_path: Path, algorithm: str) -> None: + with pytest.raises( + ValueError, + match=rf"^Weak hash algorithm \({algorithm}\) is not permitted by PEP 427$", + ): + WheelWriter(wheel_path, generator="generator 1.0", hash_algorithm=algorithm) + + def test_write_files(self, wheel_path: Path) -> None: + with WheelWriter(wheel_path, generator="generator 1.0") as wf: + wf.write_file("hello/héllö.py", 'print("Héllö, world!")\n') + wf.write_file("hello/h,ll,.py", 'print("Héllö, world!")\n') + wf.write_data_file("mydata.txt", "Dummy") + wf.write_distinfo_file("LICENSE.txt", "License text") + + with ZipFile(wheel_path, "r") as zf: + infolist = zf.infolist() + assert len(infolist) == 6 + assert infolist[0].filename == "hello/héllö.py" + assert infolist[0].file_size == 25 + assert infolist[1].filename == "hello/h,ll,.py" + assert infolist[1].file_size == 25 + assert infolist[2].filename == "test-1.0.data/mydata.txt" + assert infolist[2].file_size == 5 + assert infolist[3].filename == "test-1.0.dist-info/LICENSE.txt" + assert infolist[4].filename == "test-1.0.dist-info/WHEEL" + assert infolist[5].filename == "test-1.0.dist-info/RECORD" + + record = zf.read("test-1.0.dist-info/RECORD") + assert record.decode("utf-8") == ( + "hello/héllö.py,sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25\n" + '"hello/h,ll,.py",sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,' + "25\n" + "test-1.0.data/mydata.txt," + "sha256=0mB6s81UJCwa14-jUFK6fIqv1PR4FQPyJ0wxBjqF9WA,5\n" + "test-1.0.dist-info/LICENSE.txt," + "sha256=Bk_bWStYk3YYSmcUeZRgnr3cqIs1oJW485Zb_XBvOgM,12\n" + "test-1.0.dist-info/WHEEL," + "sha256=KzXSdMADLwiK8h1P5UAQ76v3nVuO2ZRU8e9GCHCC6Qs,103\n" + "test-1.0.dist-info/RECORD,,\n" + ) + + def test_write_metadata(self, wheel_path: Path) -> None: + with WheelWriter(wheel_path, generator="generator 1.0") as wf: + wf.write_metadata( + [ + ("Foo", "Bar"), + ("Description", "Long description\nspanning\nthree rows"), + ] + ) + + with ZipFile(wheel_path, "r") as zf: + infolist = zf.infolist() + assert len(infolist) == 3 + assert infolist[0].filename == "test-1.0.dist-info/METADATA" + assert infolist[1].filename == "test-1.0.dist-info/WHEEL" + assert infolist[2].filename == "test-1.0.dist-info/RECORD" + + metadata = zf.read("test-1.0.dist-info/METADATA") + assert metadata.decode("utf-8") == dedent( + """\ + Foo: Bar + Metadata-Version: 2.3 + Name: test + Version: 1.0 + + Long description + spanning + three rows""" + ) + + def test_timestamp( + self, + tmp_path_factory: TempPathFactory, + wheel_path: Path, + monkeypatch: MonkeyPatch, + ) -> None: + # An environment variable can be used to influence the timestamp on + # TarInfo objects inside the zip. See issue #143. + build_dir = tmp_path_factory.mktemp("build") + for filename in ("one", "two", "three"): + build_dir.joinpath(filename).write_text(filename + "\n") + + # The earliest date representable in TarInfos, 1980-01-01 + monkeypatch.setenv("SOURCE_DATE_EPOCH", "315576060") + + with WheelWriter(wheel_path, generator="generator 1.0") as wf: + wf.write_files_from_directory(build_dir) + + with ZipFile(wheel_path, "r") as zf: + for info in zf.infolist(): + assert info.date_time == (1980, 1, 1, 0, 0, 0) + assert info.compress_type == ZIP_DEFLATED + + @pytest.mark.skipif( + sys.platform == "win32", reason="Windows does not support UNIX-like permissions" + ) + def test_attributes( + self, tmp_path_factory: TempPathFactory, wheel_path: Path + ) -> None: + # With the change from ZipFile.write() to .writestr(), we need to manually + # set member attributes. + build_dir = tmp_path_factory.mktemp("build") + files = (("foo", 0o644), ("bar", 0o755)) + for filename, mode in files: + path = build_dir / filename + path.write_text(filename + "\n") + path.chmod(mode) + + with WheelWriter(wheel_path, generator="generator 1.0") as wf: + wf.write_files_from_directory(build_dir) + + with ZipFile(wheel_path, "r") as zf: + for filename, mode in files: + info = zf.getinfo(filename) + assert info.external_attr == (mode | 0o100000) << 16 + assert info.compress_type == ZIP_DEFLATED + + info = zf.getinfo("test-1.0.dist-info/RECORD") + permissions = (info.external_attr >> 16) & 0o777 + assert permissions == 0o664 + + def test_write_file_from_bytesio(self, wheel_path: Path) -> None: + with WheelWriter(wheel_path, generator="generator 1.0") as wf: + buffer = BytesIO(b"test content") + wf.write_file("test", buffer) + + with ZipFile(wheel_path, "r") as zf: + assert zf.open("test", "r").read() == b"test content" + + def test_write_files_from_dir_source_nonexistent( + self, wheel_path: Path, tmp_path: Path + ) -> None: + source_dir = tmp_path / "nonexistent" + with WheelWriter(wheel_path, generator="generator 1.0") as wf: + with pytest.raises(WheelError, match=f"{source_dir} does not exist"): + wf.write_files_from_directory(source_dir) + + def test_write_files_from_dir_source_not_dir( + self, wheel_path: Path, tmp_path: Path + ) -> None: + source_dir = tmp_path / "file" + source_dir.touch() + with WheelWriter(wheel_path, generator="generator 1.0") as wf: + with pytest.raises(WheelError, match=f"{source_dir} is not a directory"): + wf.write_files_from_directory(source_dir) + + def test_repr(self, wheel_path: Path) -> None: + with WheelWriter(wheel_path, generator="generator 1.0") as wf: + assert repr(wf) == f"WheelWriter({wheel_path}, generator='generator 1.0')"