From 3997adaeaaf6b4bfd88f26684f2faae15f59843f Mon Sep 17 00:00:00 2001 From: phlax Date: Wed, 25 Aug 2021 15:59:10 +0100 Subject: [PATCH] libs: Add envoy.gpg.sign (#33) Signed-off-by: Ryan Northey --- .github/workflows/ci.yml | 2 + envoy.gpg.sign/README.rst | 5 + envoy.gpg.sign/VERSION | 1 + envoy.gpg.sign/envoy/gpg/sign/__init__.py | 18 + envoy.gpg.sign/envoy/gpg/sign/cmd.py | 24 ++ envoy.gpg.sign/envoy/gpg/sign/deb.py | 106 ++++++ envoy.gpg.sign/envoy/gpg/sign/exceptions.py | 4 + envoy.gpg.sign/envoy/gpg/sign/py.typed | 0 envoy.gpg.sign/envoy/gpg/sign/rpm.py | 87 +++++ envoy.gpg.sign/envoy/gpg/sign/rpm_macro.tmpl | 5 + envoy.gpg.sign/envoy/gpg/sign/runner.py | 145 +++++++ envoy.gpg.sign/envoy/gpg/sign/util.py | 84 ++++ envoy.gpg.sign/setup.py | 63 +++ envoy.gpg.sign/tests/test_deb.py | 204 ++++++++++ envoy.gpg.sign/tests/test_rpm.py | 215 +++++++++++ envoy.gpg.sign/tests/test_runner.py | 381 +++++++++++++++++++ envoy.gpg.sign/tests/test_util.py | 213 +++++++++++ 17 files changed, 1557 insertions(+) create mode 100644 envoy.gpg.sign/README.rst create mode 100644 envoy.gpg.sign/VERSION create mode 100644 envoy.gpg.sign/envoy/gpg/sign/__init__.py create mode 100644 envoy.gpg.sign/envoy/gpg/sign/cmd.py create mode 100644 envoy.gpg.sign/envoy/gpg/sign/deb.py create mode 100644 envoy.gpg.sign/envoy/gpg/sign/exceptions.py create mode 100644 envoy.gpg.sign/envoy/gpg/sign/py.typed create mode 100644 envoy.gpg.sign/envoy/gpg/sign/rpm.py create mode 100644 envoy.gpg.sign/envoy/gpg/sign/rpm_macro.tmpl create mode 100644 envoy.gpg.sign/envoy/gpg/sign/runner.py create mode 100644 envoy.gpg.sign/envoy/gpg/sign/util.py create mode 100644 envoy.gpg.sign/setup.py create mode 100644 envoy.gpg.sign/tests/test_deb.py create mode 100644 envoy.gpg.sign/tests/test_rpm.py create mode 100644 envoy.gpg.sign/tests/test_runner.py create mode 100644 envoy.gpg.sign/tests/test_util.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 188893360..8a978b96c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,6 +21,7 @@ jobs: - envoy.github.abstract - envoy.github.release - envoy.gpg.identity + - envoy.gpg.sign - mypy-abstracts - pytest-patches python-version: @@ -100,6 +101,7 @@ jobs: - envoy.github.abstract - envoy.github.release - envoy.gpg.identity + - envoy.gpg.sign - mypy-abstracts - pytest-patches steps: diff --git a/envoy.gpg.sign/README.rst b/envoy.gpg.sign/README.rst new file mode 100644 index 000000000..848f84f41 --- /dev/null +++ b/envoy.gpg.sign/README.rst @@ -0,0 +1,5 @@ + +envoy.gpg.sign +============== + +GPG signing util used in Envoy proxy's CI diff --git a/envoy.gpg.sign/VERSION b/envoy.gpg.sign/VERSION new file mode 100644 index 000000000..8acdd82b7 --- /dev/null +++ b/envoy.gpg.sign/VERSION @@ -0,0 +1 @@ +0.0.1 diff --git a/envoy.gpg.sign/envoy/gpg/sign/__init__.py b/envoy.gpg.sign/envoy/gpg/sign/__init__.py new file mode 100644 index 000000000..ceca75bf7 --- /dev/null +++ b/envoy.gpg.sign/envoy/gpg/sign/__init__.py @@ -0,0 +1,18 @@ + +from .exceptions import SigningError +from .util import DirectorySigningUtil +from .deb import DebChangesFiles, DebSigningUtil +from .rpm import RPMMacro, RPMSigningUtil +from .runner import PackageSigningRunner +from .cmd import cmd + + +__all__ = ( + "cmd", + "DebChangesFiles", + "DebSigningUtil", + "DirectorySigningUtil", + "PackageSigningRunner", + "RPMMacro", + "RPMSigningUtil", + "SigningError") diff --git a/envoy.gpg.sign/envoy/gpg/sign/cmd.py b/envoy.gpg.sign/envoy/gpg/sign/cmd.py new file mode 100644 index 000000000..14a060278 --- /dev/null +++ b/envoy.gpg.sign/envoy/gpg/sign/cmd.py @@ -0,0 +1,24 @@ + +import sys + +from .runner import PackageSigningRunner +from .deb import DebSigningUtil +from .rpm import RPMSigningUtil + + +def _register_utils() -> None: + PackageSigningRunner.register_util("deb", DebSigningUtil) + PackageSigningRunner.register_util("rpm", RPMSigningUtil) + + +def main(*args) -> int: + _register_utils() + return PackageSigningRunner(*args).run() + + +def cmd(): + sys.exit(main(*sys.argv[1:])) + + +if __name__ == "__main__": + cmd() diff --git a/envoy.gpg.sign/envoy/gpg/sign/deb.py b/envoy.gpg.sign/envoy/gpg/sign/deb.py new file mode 100644 index 000000000..8e0605cb5 --- /dev/null +++ b/envoy.gpg.sign/envoy/gpg/sign/deb.py @@ -0,0 +1,106 @@ + +import pathlib +from functools import cached_property +from itertools import chain +from typing import Iterator, Type + +from .exceptions import SigningError +from .util import DirectorySigningUtil + + +class DebChangesFiles(object): + """Creates a set of `changes` files for specific distros from a src + `changes` file. + + eg, if src changes file is `envoy_1.100.changes` and `Distribution:` + field is `buster bullseye`, it creates: + + `envoy_1.100.changes` -> `envoy_1.100.buster.changes` + `envoy_1.100.changes` -> `envoy_1.100.bullseye.changes` + + while replacing any instances of the original distribution name in + the respective changes files, eg: + + `buster bullseye` -> `buster` + `buster bullseye` -> `bullseye` + + finally, it removes the src changes file. + """ + + def __init__(self, src): + self.src = src + + def __iter__(self) -> Iterator[pathlib.Path]: + """Iterate the required changes files, creating them, yielding the paths + of the newly created files, and deleting the original + """ + for path in self.files: + yield path + self.src.unlink() + + @cached_property + def distributions(self) -> str: + """Find and parse the `Distributions` header in the `changes` file""" + with open(self.src) as f: + line = f.readline() + while line: + if not line.startswith("Distribution:"): + line = f.readline() + continue + return line.split(":")[1].strip() + raise SigningError( + f"Did not find Distribution field in changes file {self.src}") + + @property + def files(self) -> Iterator[pathlib.Path]: + """Create changes files for each distro, yielding the paths""" + for distro in self.distributions.split(): + yield self.changes_file(distro) + + def changes_file(self, distro: str) -> pathlib.Path: + """Create a `changes` file for a specific distro""" + target = self.changes_file_path(distro) + target.write_text( + self.src.read_text().replace( + self.distributions, + distro)) + return target + + def changes_file_path(self, distro: str) -> pathlib.Path: + """Path to write the new changes file to""" + return self.src.with_suffix(f".{distro}.changes") + + +class DebSigningUtil(DirectorySigningUtil): + """Sign all `changes` packages in a given directory + + the `.changes` spec allows a single `.changes` file to have multiple + `Distributions` listed. + + but, most package repos require a single signed `.change` file per + distribution, with only one distribution listed. + + this extracts the `.changes` files to -> per-distro + `filename.distro.changes`, and removes the original, before signing the + files. + """ + + command_name = "debsign" + ext = "changes" + _package_type = "deb" + + @cached_property + def command_args(self) -> tuple: + return ("-k", self.maintainer.fingerprint) + + @property + def changes_files(self) -> Type[DebChangesFiles]: + return DebChangesFiles + + @cached_property + def pkg_files(self) -> tuple: + """Mangled .changes paths""" + return tuple( + chain.from_iterable( + self.changes_files(src) + for src in super().pkg_files)) diff --git a/envoy.gpg.sign/envoy/gpg/sign/exceptions.py b/envoy.gpg.sign/envoy/gpg/sign/exceptions.py new file mode 100644 index 000000000..f1d66fef6 --- /dev/null +++ b/envoy.gpg.sign/envoy/gpg/sign/exceptions.py @@ -0,0 +1,4 @@ + + +class SigningError(Exception): + pass diff --git a/envoy.gpg.sign/envoy/gpg/sign/py.typed b/envoy.gpg.sign/envoy/gpg/sign/py.typed new file mode 100644 index 000000000..e69de29bb diff --git a/envoy.gpg.sign/envoy/gpg/sign/rpm.py b/envoy.gpg.sign/envoy/gpg/sign/rpm.py new file mode 100644 index 000000000..f70f9520a --- /dev/null +++ b/envoy.gpg.sign/envoy/gpg/sign/rpm.py @@ -0,0 +1,87 @@ + +import pathlib +from functools import cached_property +from typing import Type, Union + +from .exceptions import SigningError +from .util import DirectorySigningUtil + + +class RPMMacro: + """`.rpmmacros` configuration for rpmsign""" + + _macro_filename = ".rpmmacros" + + def __init__( + self, + home: Union[pathlib.Path, str], + overwrite: bool = False, **kwargs): + self._home = home + self.overwrite = bool(overwrite) + self.kwargs = kwargs + + @property + def home(self) -> pathlib.Path: + return pathlib.Path(self._home) + + @property + def path(self) -> pathlib.Path: + return self.home.joinpath(self._macro_filename) + + @property + def macro(self) -> str: + macro = self.template + for k, v in self.kwargs.items(): + macro = macro.replace(f"__{k.upper()}__", str(v)) + return macro + + @property + def template(self) -> str: + return pathlib.Path( + __file__).parent.joinpath( + "rpm_macro.tmpl").read_text() + + def write(self) -> None: + if not self.overwrite and self.path.exists(): + return + self.path.write_text(self.macro) + + +class RPMSigningUtil(DirectorySigningUtil): + """Sign all RPM packages in a given directory""" + + command_name = "rpmsign" + ext = "rpm" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.setup() + + @cached_property + def command(self) -> str: + gpg2_available = ( + self.maintainer.gpg_bin + and self.maintainer.gpg_bin.name == "gpg2") + if not gpg2_available: + raise SigningError("GPG2 is required to sign RPM packages") + return super().command + + @cached_property + def command_args(self) -> tuple: + return ("--key-id", self.maintainer.fingerprint, "--addsign") + + @property + def rpmmacro(self) -> Type[RPMMacro]: + return RPMMacro + + def setup(self) -> None: + """Create the .rpmmacros file if it doesn't exist""" + self.rpmmacro( + self.maintainer.home, + maintainer=self.maintainer.name, + gpg_bin=self.maintainer.gpg_bin, + gpg_config=self.maintainer.gnupg_home).write() + + def sign_pkg(self, pkg_file: pathlib.Path) -> None: + pkg_file.chmod(0o755) + super().sign_pkg(pkg_file) diff --git a/envoy.gpg.sign/envoy/gpg/sign/rpm_macro.tmpl b/envoy.gpg.sign/envoy/gpg/sign/rpm_macro.tmpl new file mode 100644 index 000000000..efe19b205 --- /dev/null +++ b/envoy.gpg.sign/envoy/gpg/sign/rpm_macro.tmpl @@ -0,0 +1,5 @@ +%_signature gpg +%_gpg_path __GPG_CONFIG__ +%_gpg_name __MAINTAINER__ +%_gpgbin __GPG_BIN__ +%__gpg_sign_cmd %{__gpg} gpg --force-v3-sigs --batch --verbose --no-armor --no-secmem-warning -u "%{_gpg_name}" -sbo %{__signature_filename} --digest-algo sha256 %{__plaintext_filename}' diff --git a/envoy.gpg.sign/envoy/gpg/sign/runner.py b/envoy.gpg.sign/envoy/gpg/sign/runner.py new file mode 100644 index 000000000..70cf2f558 --- /dev/null +++ b/envoy.gpg.sign/envoy/gpg/sign/runner.py @@ -0,0 +1,145 @@ + +import argparse +import pathlib +import tarfile +from functools import cached_property +from typing import Type, Union + +from envoy.base import runner, utils +from envoy.gpg import identity + +from .exceptions import SigningError +from .util import DirectorySigningUtil + + +class PackageSigningRunner(runner.Runner): + """For a given `package_type` and `path` this will run the relevant signing + util for the packages they contain. + """ + + _signing_utils = () + + @classmethod + def register_util( + cls, + name: str, + util: Type[DirectorySigningUtil]) -> None: + """Register util for signing a package type""" + cls._signing_utils = getattr(cls, "_signing_utils") + ((name, util),) + + @property + def extract(self) -> bool: + return self.args.extract + + @cached_property + def maintainer(self) -> identity.GPGIdentity: + """A representation of the maintainer with GPG capabilities""" + return self.maintainer_class( + self.maintainer_name, + self.maintainer_email, + self.log) + + @property + def maintainer_class(self) -> Type[identity.GPGIdentity]: + return identity.GPGIdentity + + @property + def maintainer_email(self) -> str: + """Email of the maintainer if set""" + return self.args.maintainer_email + + @property + def maintainer_name(self) -> str: + """Name of the maintainer if set""" + return self.args.maintainer_name + + @property + def package_type(self) -> str: + """Package type - eg deb/rpm""" + return self.args.package_type + + @property + def path(self) -> pathlib.Path: + """Path to the packages directory""" + return pathlib.Path(self.args.path) + + @property + def tar(self) -> str: + return self.args.tar + + @cached_property + def signing_utils(self) -> dict: + """Configured signing utils - eg `DebSigningUtil`, `RPMSigningUtil`""" + return dict(getattr(self, "_signing_utils")) + + def add_arguments(self, parser: argparse.ArgumentParser) -> None: + super().add_arguments(parser) + parser.add_argument( + "path", + default="", + help="Path to the directory containing packages to sign") + parser.add_argument( + "--extract", + action="store_true", + help=( + "If set, treat the path as a tarball containing directories " + "according to package_type")) + parser.add_argument( + "--tar", + help="Path to save the signed packages as tar file") + parser.add_argument( + "--type", + default="", + choices=[c for c in self.signing_utils] + [""], + help="Package type to sign") + parser.add_argument( + "--maintainer-name", + default="", + help=( + "Maintainer name to match when searching for a GPG key " + "to match with")) + parser.add_argument( + "--maintainer-email", + default="", + help=( + "Maintainer email to match when searching for a GPG key " + "to match with")) + + def archive(self, path: Union[pathlib.Path, str]) -> None: + with tarfile.open(self.tar, "w") as tar: + tar.add(path, arcname=".") + + def get_signing_util(self, path: pathlib.Path) -> DirectorySigningUtil: + return self.signing_utils[path.name](path, self.maintainer, self.log) + + @runner.catches((identity.GPGError, SigningError)) + def run(self) -> None: + if self.extract: + self.sign_tarball() + else: + self.sign_directory() + self.log.success("Successfully signed packages") + + def sign(self, path: pathlib.Path) -> None: + self.log.notice( + f"Signing {path.name}s ({self.maintainer}) {str(path)}") + self.get_signing_util(path).sign() + + def sign_all(self, path: pathlib.Path) -> None: + for directory in path.glob("*"): + if directory.name in self.signing_utils: + self.sign(directory) + + def sign_directory(self) -> None: + self.sign(self.path) + if self.tar: + self.archive(self.path) + + def sign_tarball(self) -> None: + if not self.tar: + raise SigningError( + "You must set a `--tar` file to save to " + "when `--extract` is set") + with utils.untar(self.path) as tardir: + self.sign_all(tardir) + self.archive(tardir) diff --git a/envoy.gpg.sign/envoy/gpg/sign/util.py b/envoy.gpg.sign/envoy/gpg/sign/util.py new file mode 100644 index 000000000..43f4116f0 --- /dev/null +++ b/envoy.gpg.sign/envoy/gpg/sign/util.py @@ -0,0 +1,84 @@ + +import pathlib +import shutil +import subprocess +from functools import cached_property +from typing import Optional, Tuple, Union + +import verboselogs # type:ignore + +from envoy.gpg import identity + +from .exceptions import SigningError + + +class DirectorySigningUtil: + """Base class for signing utils - eg for deb or rpm packages""" + + command_name = "" + _package_type = "" + ext = "" + + def __init__( + self, + path: Union[pathlib.Path, str], + maintainer: identity.GPGIdentity, + log: verboselogs.VerboseLogger, + command: Optional[str] = ""): + self._path = path + self.maintainer = maintainer + self.log = log + self._command = command + + @cached_property + def command(self) -> str: + """Provided command name/path or path to available system version""" + command = self._command or shutil.which(self.command_name) + if command: + return command + raise SigningError( + "Signing software missing " + f"({self.package_type}): {self.command_name}") + + @property + def command_args(self) -> tuple: + return () + + @property + def package_type(self) -> str: + return self._package_type or self.ext + + @property + def path(self) -> pathlib.Path: + return pathlib.Path(self._path) + + @property + def pkg_files(self) -> Tuple[pathlib.Path, ...]: + """Tuple of paths to package files to sign""" + # TODO?(phlax): check maintainer/packager field matches key id + return tuple( + pkg_file + for pkg_file + in self.path.glob("*") + if pkg_file.name.endswith(f".{self.ext}")) + + def sign(self) -> None: + """Sign the packages""" + for pkg in self.pkg_files: + self.sign_pkg(pkg) + + def sign_command(self, pkg_file: pathlib.Path) -> tuple: + """Tuple of command parts to sign a specific package""" + return (self.command,) + self.command_args + (str(pkg_file),) + + def sign_pkg(self, pkg_file: pathlib.Path) -> None: + """Sign a specific package file""" + self.log.notice(f"Sign package ({self.package_type}): {pkg_file.name}") + response = subprocess.run( + self.sign_command(pkg_file), capture_output=True, encoding="utf-8") + + if response.returncode: + raise SigningError(response.stdout + response.stderr) + + self.log.success( + f"Signed package ({self.package_type}): {pkg_file.name}") diff --git a/envoy.gpg.sign/setup.py b/envoy.gpg.sign/setup.py new file mode 100644 index 000000000..e789438e2 --- /dev/null +++ b/envoy.gpg.sign/setup.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python + +import os +import codecs +from setuptools import find_namespace_packages, setup # type:ignore + + +def read(fname): + file_path = os.path.join(os.path.dirname(__file__), fname) + return codecs.open(file_path, encoding='utf-8').read() + + +setup( + name='envoy.gpg.sign', + version=read("VERSION"), + author='Ryan Northey', + author_email='ryan@synca.io', + maintainer='Ryan Northey', + maintainer_email='ryan@synca.io', + license='Apache Software License 2.0', + url='https://github.com/envoyproxy/pytooling/envoy.gpg.sign', + description="GPG signing util used in Envoy proxy's CI", + long_description=read('README.rst'), + py_modules=['envoy.gpg.sign'], + packages=find_namespace_packages(), + package_data={ + 'envoy.gpg.sign': [ + "rpm_macro.tmpl", + 'py.typed']}, + python_requires='>=3.5', + extras_require={ + "test": [ + "pytest", + "pytest-coverage", + "pytest-patches"], + "lint": ['flake8'], + "types": [ + 'mypy'], + "publish": ['wheel'], + }, + install_requires=[ + "envoy.base.utils", + "envoy.base.runner", + "envoy.gpg.identity", + ], + classifiers=[ + 'Development Status :: 4 - Beta', + 'Framework :: Pytest', + 'Intended Audience :: Developers', + 'Topic :: Software Development :: Testing', + 'Programming Language :: Python', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3 :: Only', + 'Programming Language :: Python :: Implementation :: CPython', + 'Operating System :: OS Independent', + 'License :: OSI Approved :: Apache Software License', + ], + entry_points={ + 'console_scripts': ['gpg-sign=envoy.gpg.sign.cmd:cmd'], + } +) diff --git a/envoy.gpg.sign/tests/test_deb.py b/envoy.gpg.sign/tests/test_deb.py new file mode 100644 index 000000000..34499c105 --- /dev/null +++ b/envoy.gpg.sign/tests/test_deb.py @@ -0,0 +1,204 @@ + +import types +from unittest.mock import MagicMock, PropertyMock + +import pytest + +from envoy.gpg import identity, sign + + +# DebChangesFiles + +def test_changes_constructor(): + changes = sign.DebChangesFiles("SRC") + assert changes.src == "SRC" + + +def test_changes_dunder_iter(patches): + path = MagicMock() + changes = sign.DebChangesFiles(path) + + patched = patches( + ("DebChangesFiles.files", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.deb") + _files = ["FILE1", "FILE2", "FILE3"] + + with patched as (m_files, ): + m_files.return_value = _files + result = changes.__iter__() + assert list(result) == _files + + assert isinstance(result, types.GeneratorType) + assert ( + list(path.unlink.call_args) + == [(), {}]) + + +@pytest.mark.parametrize( + "lines", + [([], None), + (["FOO", "BAR"], None), + (["FOO", "BAR", + "Distribution: distro1"], "distro1"), + (["FOO", "BAR", + "Distribution: distro1 distro2"], "distro1 distro2"), + (["FOO", "BAR", + "Distribution: distro1 distro2", "BAZ"], "distro1 distro2"), + (["FOO", "BAR", + "", "Distribution: distro1 distro2"], None)]) +def test_changes_distributions(patches, lines): + lines, expected = lines + changes = sign.DebChangesFiles("SRC") + patched = patches( + "open", + prefix="envoy.gpg.sign.deb") + + class DummyFile(object): + line = 0 + + def __init__(self, lines): + self.lines = lines + + def readline(self): + if len(self.lines) > self.line: + line = self.lines[self.line] + self.line += 1 + return line + + _file = DummyFile(lines) + + with patched as (m_open, ): + readline = m_open.return_value.__enter__.return_value.readline + readline.side_effect = _file.readline + if expected: + assert changes.distributions == expected + else: + with pytest.raises(sign.SigningError) as e: + changes.distributions + assert ( + e.value.args[0] + == "Did not find Distribution field in changes file SRC") + + if "" in lines: + lines = lines[:lines.index("")] + + if expected: + breakon = 0 + for line in lines: + if line.startswith("Distribution:"): + break + breakon += 1 + lines = lines[:breakon] + count = len(lines) + 1 + assert ( + list(list(c) + for c + in readline.call_args_list) + == [[(), {}]] * count) + + +def test_changes_files(patches): + changes = sign.DebChangesFiles("SRC") + + patched = patches( + "DebChangesFiles.changes_file", + ("DebChangesFiles.distributions", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.deb") + + with patched as (m_changes, m_distros): + m_distros.return_value = "DISTRO1 DISTRO2 DISTRO3" + result = changes.files + assert list(result) == [m_changes.return_value] * 3 + + assert isinstance(result, types.GeneratorType) + assert ( + list(list(c) for c in m_changes.call_args_list) + == [[('DISTRO1',), {}], + [('DISTRO2',), {}], + [('DISTRO3',), {}]]) + + +def test_changes_changes_file(patches): + path = MagicMock() + changes = sign.DebChangesFiles(path) + patched = patches( + "DebChangesFiles.changes_file_path", + ("DebChangesFiles.distributions", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.deb") + + with patched as (m_path, m_distros): + assert ( + changes.changes_file("DISTRO") + == m_path.return_value) + + assert ( + list(m_path.call_args) + == [('DISTRO',), {}]) + assert ( + list(m_path.return_value.write_text.call_args) + == [(path.read_text.return_value.replace.return_value,), {}]) + assert ( + list(path.read_text.call_args) + == [(), {}]) + assert ( + list(path.read_text.return_value.replace.call_args) + == [(m_distros.return_value, "DISTRO"), {}]) + + +def test_changes_file_path(): + path = MagicMock() + changes = sign.DebChangesFiles(path) + assert changes.changes_file_path("DISTRO") == path.with_suffix.return_value + assert ( + list(path.with_suffix.call_args) + == [('.DISTRO.changes',), {}]) + + +# DebSigningUtil + +@pytest.mark.parametrize("args", [(), ("ARG1", ), ("ARG2", )]) +def test_debsign_constructor(patches, args): + packager = sign.PackageSigningRunner("x", "y", "z") + maintainer = identity.GPGIdentity(packager) + debsign = sign.DebSigningUtil("PATH", maintainer, "LOG", *args) + + assert isinstance(debsign, sign.DirectorySigningUtil) + assert debsign.ext == "changes" + assert debsign.command_name == "debsign" + assert debsign._package_type == "deb" + assert debsign.changes_files == sign.DebChangesFiles + assert debsign._path == "PATH" + assert debsign.maintainer == maintainer + assert debsign.log == "LOG" + + +def test_debsign_command_args(patches): + maintainer = MagicMock() + debsign = sign.DebSigningUtil("PATH", maintainer, "LOG") + assert ( + debsign.command_args + == ("-k", maintainer.fingerprint)) + assert "command_args" in debsign.__dict__ + + +def test_debsign_pkg_files(patches): + packager = sign.PackageSigningRunner("x", "y", "z") + maintainer = identity.GPGIdentity(packager) + debsign = sign.DebSigningUtil("PATH", maintainer, "LOG") + patched = patches( + "chain", + ("DirectorySigningUtil.pkg_files", dict(new_callable=PropertyMock)), + ("DebSigningUtil.changes_files", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.deb") + + with patched as (m_chain, m_pkg, m_changes): + m_pkg.return_value = ("FILE1", "FILE2", "FILE3") + m_chain.from_iterable.side_effect = lambda _iter: list(_iter) + assert ( + debsign.pkg_files + == (m_changes.return_value.return_value, ) * 3) + + assert m_chain.from_iterable.called + assert ( + list(list(c) for c in m_changes.return_value.call_args_list) + == [[('FILE1',), {}], [('FILE2',), {}], [('FILE3',), {}]]) diff --git a/envoy.gpg.sign/tests/test_rpm.py b/envoy.gpg.sign/tests/test_rpm.py new file mode 100644 index 000000000..01c07f3d9 --- /dev/null +++ b/envoy.gpg.sign/tests/test_rpm.py @@ -0,0 +1,215 @@ + +from unittest.mock import MagicMock, PropertyMock + +import pytest + +from envoy.gpg import identity, sign + + +# RPMMacro + +@pytest.mark.parametrize("overwrite", [[], None, True, False]) +@pytest.mark.parametrize("kwargs", [{}, dict(K1="V1", K2="V2")]) +def test_rpmmacro_constructor(patches, overwrite, kwargs): + rpmmacro = ( + sign.RPMMacro("HOME", overwrite=overwrite, **kwargs) + if overwrite != [] + else sign.RPMMacro("HOME", **kwargs)) + assert rpmmacro._macro_filename == ".rpmmacros" + assert rpmmacro._home == "HOME" + assert rpmmacro.overwrite == bool(overwrite or False) + assert rpmmacro.kwargs == kwargs + + +def test_rpmmacro_home(patches): + rpmmacro = sign.RPMMacro("HOME") + patched = patches( + "pathlib", + prefix="envoy.gpg.sign.rpm") + with patched as (m_plib, ): + assert rpmmacro.home == m_plib.Path.return_value + + assert ( + list(m_plib.Path.call_args) + == [(rpmmacro._home,), {}]) + + +def test_rpmmacro_path(patches): + rpmmacro = sign.RPMMacro("HOME") + patched = patches( + ("RPMMacro.home", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.rpm") + with patched as (m_home, ): + assert rpmmacro.path == m_home.return_value.joinpath.return_value + + assert ( + list(m_home.return_value.joinpath.call_args) + == [(rpmmacro._macro_filename, ), {}]) + + +@pytest.mark.parametrize("kwargs", [{}, dict(K1="V1", K2="V2")]) +def test_rpmmacro_macro(patches, kwargs): + rpmmacro = sign.RPMMacro("HOME", **kwargs) + patched = patches( + ("RPMMacro.template", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.rpm") + with patched as (m_template, ): + result = rpmmacro.macro + + expected = m_template.return_value + for k, v in kwargs.items(): + assert ( + list(expected.replace.call_args) + == [(f"__{k.upper()}__", v), {}]) + expected = expected.replace.return_value + + assert result == expected + assert "macro" not in rpmmacro.__dict__ + + +@pytest.mark.parametrize("overwrite", [True, False]) +@pytest.mark.parametrize("exists", [True, False]) +def test_rpmmacro_write(patches, overwrite, exists): + rpmmacro = sign.RPMMacro("HOME") + patched = patches( + ("RPMMacro.macro", dict(new_callable=PropertyMock)), + ("RPMMacro.path", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.rpm") + rpmmacro.overwrite = overwrite + + with patched as (m_macro, m_path): + m_path.return_value.exists.return_value = exists + assert not rpmmacro.write() + + if not overwrite: + assert ( + list(m_path.return_value.exists.call_args) + == [(), {}]) + else: + assert not m_path.return_value.exists.join.called + + if not overwrite and exists: + assert not m_path.return_value.write_text.called + return + + assert ( + list(m_path.return_value.write_text.call_args) + == [(m_macro.return_value,), {}]) + + +# RPMSigningUtil + +@pytest.mark.parametrize("args", [(), ("ARG1", "ARG2")]) +@pytest.mark.parametrize("kwargs", [{}, dict(K1="V1", K2="V2")]) +def test_rpmsign_constructor(patches, args, kwargs): + packager = sign.PackageSigningRunner("x", "y", "z") + maintainer = identity.GPGIdentity(packager) + patched = patches( + "RPMSigningUtil.setup", + "DirectorySigningUtil.__init__", + prefix="envoy.gpg.sign.rpm") + + with patched as (m_setup, m_super): + rpmsign = sign.RPMSigningUtil("PATH", maintainer, *args, **kwargs) + + assert isinstance(rpmsign, sign.DirectorySigningUtil) + assert rpmsign.ext == "rpm" + assert rpmsign.command_name == "rpmsign" + assert ( + list(m_setup.call_args) + == [(), {}]) + assert ( + list(m_super.call_args) + == [('PATH', maintainer) + args, kwargs]) + assert rpmsign.rpmmacro == sign.RPMMacro + + +@pytest.mark.parametrize("gpg2", [True, False]) +def test_rpmsign_command(patches, gpg2): + maintainer = MagicMock() + patched = patches( + "RPMSigningUtil.__init__", + ("DirectorySigningUtil.command", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.rpm") + + with patched as (m_init, m_super): + maintainer.gpg_bin.name = "gpg2" if gpg2 else "notgpg2" + m_init.return_value = None + rpmsign = sign.RPMSigningUtil("PATH", maintainer, "LOG") + rpmsign.maintainer = maintainer + + if gpg2: + assert rpmsign.command == m_super.return_value + else: + with pytest.raises(sign.SigningError) as e: + rpmsign.command + + assert ( + e.value.args[0] + == 'GPG2 is required to sign RPM packages') + + if gpg2: + assert "command" in rpmsign.__dict__ + else: + assert "command" not in rpmsign.__dict__ + + +def test_rpmsign_command_args(patches): + maintainer = MagicMock() + patched = patches( + "RPMSigningUtil.setup", + prefix="envoy.gpg.sign.rpm") + + with patched as (m_setup,): + rpmsign = sign.RPMSigningUtil("PATH", maintainer, "LOG") + assert ( + rpmsign.command_args + == ("--key-id", maintainer.fingerprint, + "--addsign")) + + assert "command_args" in rpmsign.__dict__ + + +class DummyRPMSigningUtil(sign.RPMSigningUtil): + + def __init__(self, path, maintainer): + self._path = path + self.maintainer = maintainer + + +def test_rpmsign_setup(patches): + maintainer = MagicMock() + rpmsign = DummyRPMSigningUtil("PATH", maintainer) + patched = patches( + ("RPMSigningUtil.rpmmacro", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.rpm") + + with patched as (m_macro, ): + assert not rpmsign.setup() + + assert ( + list(m_macro.return_value.call_args) + == [(maintainer.home,), + {'maintainer': maintainer.name, + 'gpg_bin': maintainer.gpg_bin, + 'gpg_config': maintainer.gnupg_home}]) + + +def test_rpmsign_sign_pkg(patches): + packager = sign.PackageSigningRunner("x", "y", "z") + maintainer = identity.GPGIdentity(packager) + rpmsign = DummyRPMSigningUtil("PATH", maintainer) + patched = patches( + "DirectorySigningUtil.sign_pkg", + prefix="envoy.gpg.sign.rpm") + file = MagicMock() + + with patched as (m_sign, ): + assert not rpmsign.sign_pkg(file) + + assert ( + list(file.chmod.call_args) + == [(0o755, ), {}]) + assert ( + list(m_sign.call_args) + == [(file,), {}]) diff --git a/envoy.gpg.sign/tests/test_runner.py b/envoy.gpg.sign/tests/test_runner.py new file mode 100644 index 000000000..f7c89bf7b --- /dev/null +++ b/envoy.gpg.sign/tests/test_runner.py @@ -0,0 +1,381 @@ + +from unittest.mock import MagicMock, PropertyMock + +import pytest + +from envoy.base import runner +from envoy.gpg import identity, sign + + +def test_packager_constructor(): + packager = sign.PackageSigningRunner("x", "y", "z") + assert isinstance(packager, runner.Runner) + assert packager.maintainer_class == identity.GPGIdentity + assert packager._signing_utils == () + + +def test_packager_cls_register_util(): + assert sign.PackageSigningRunner._signing_utils == () + + class Util1(object): + pass + + class Util2(object): + pass + + sign.PackageSigningRunner.register_util("util1", Util1) + assert ( + sign.PackageSigningRunner._signing_utils + == (('util1', Util1),)) + + sign.PackageSigningRunner.register_util("util2", Util2) + assert ( + sign.PackageSigningRunner._signing_utils + == (('util1', Util1), + ('util2', Util2),)) + + +def test_packager_extract(patches): + packager = sign.PackageSigningRunner("x", "y", "z") + patched = patches( + ("PackageSigningRunner.args", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.runner") + + with patched as (m_args, ): + assert packager.extract == m_args.return_value.extract + + assert "extract" not in packager.__dict__ + + +def test_packager_maintainer(patches): + packager = sign.PackageSigningRunner("x", "y", "z") + patched = patches( + ("PackageSigningRunner.log", + dict(new_callable=PropertyMock)), + ("PackageSigningRunner.maintainer_class", + dict(new_callable=PropertyMock)), + ("PackageSigningRunner.maintainer_email", + dict(new_callable=PropertyMock)), + ("PackageSigningRunner.maintainer_name", + dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.runner") + + with patched as (m_log, m_class, m_email, m_name): + assert packager.maintainer == m_class.return_value.return_value + + assert ( + list(m_class.return_value.call_args) + == [(m_name.return_value, + m_email.return_value, + m_log.return_value), {}]) + + assert "maintainer" in packager.__dict__ + + +def test_packager_maintainer_email(patches): + packager = sign.PackageSigningRunner("x", "y", "z") + patched = patches( + ("PackageSigningRunner.args", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.runner") + + with patched as (m_args, ): + assert ( + packager.maintainer_email + == m_args.return_value.maintainer_email) + + assert "maintainer_email" not in packager.__dict__ + + +def test_packager_maintainer_name(patches): + packager = sign.PackageSigningRunner("x", "y", "z") + + patched = patches( + ("PackageSigningRunner.args", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.runner") + + with patched as (m_args, ): + assert packager.maintainer_name == m_args.return_value.maintainer_name + + assert "maintainer_name" not in packager.__dict__ + + +def test_packager_package_type(patches): + packager = sign.PackageSigningRunner("x", "y", "z") + + patched = patches( + ("PackageSigningRunner.args", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.runner") + + with patched as (m_args, ): + assert packager.package_type == m_args.return_value.package_type + + assert "package_type" not in packager.__dict__ + + +def test_packager_path(patches): + packager = sign.PackageSigningRunner("x", "y", "z") + patched = patches( + "pathlib", + ("PackageSigningRunner.args", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.runner") + + with patched as (m_plib, m_args): + assert packager.path == m_plib.Path.return_value + + assert ( + list(m_plib.Path.call_args) + == [(m_args.return_value.path, ), {}]) + assert "path" not in packager.__dict__ + + +def test_packager_tar(patches): + packager = sign.PackageSigningRunner("x", "y", "z") + patched = patches( + ("PackageSigningRunner.args", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.runner") + + with patched as (m_args, ): + assert packager.tar == m_args.return_value.tar + + assert "tar" not in packager.__dict__ + + +def test_packager_signing_utils(): + packager = sign.PackageSigningRunner("x", "y", "z") + _utils = (("NAME1", "UTIL1"), ("NAME2", "UTIL2")) + packager._signing_utils = _utils + assert packager.signing_utils == dict(_utils) + + +def test_packager_add_arguments(): + packager = sign.PackageSigningRunner("x", "y", "z") + parser = MagicMock() + packager.add_arguments(parser) + assert ( + list(list(c) for c in parser.add_argument.call_args_list) + == [[('--log-level', '-l'), + {'choices': ['debug', 'info', 'warn', 'error'], + 'default': 'info', + 'help': 'Log level to display'}], + [('path',), + {'default': '', + 'help': 'Path to the directory containing packages to sign'}], + [('--extract',), + {'action': 'store_true', + 'help': ( + 'If set, treat the path as a tarball containing directories ' + 'according to package_type')}], + [('--tar',), + {'help': 'Path to save the signed packages as tar file'}], + [('--type',), + {'choices': ['util1', 'util2', ''], + 'default': '', + 'help': 'Package type to sign'}], + [('--maintainer-name',), + {'default': '', + 'help': ( + 'Maintainer name to match when searching for a GPG key ' + 'to match with')}], + [('--maintainer-email',), + {'default': '', + 'help': ( + 'Maintainer email to match when searching for a GPG key ' + 'to match with')}]]) + + +def test_packager_archive(patches): + packager = sign.PackageSigningRunner("x", "y", "z") + patched = patches( + "tarfile", + ("PackageSigningRunner.tar", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.runner") + + with patched as (m_tarfile, m_tar): + assert not packager.archive("PATH") + + assert ( + list(m_tarfile.open.call_args) + == [(m_tar.return_value, 'w'), {}]) + assert ( + list(m_tarfile.open.return_value.__enter__.return_value.add.call_args) + == [('PATH',), {'arcname': '.'}]) + + +def test_packager_get_signing_util(patches): + packager = sign.PackageSigningRunner("x", "y", "z") + patched = patches( + ("PackageSigningRunner.log", + dict(new_callable=PropertyMock)), + ("PackageSigningRunner.maintainer", + dict(new_callable=PropertyMock)), + ("PackageSigningRunner.signing_utils", + dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.runner") + path = MagicMock() + + with patched as (m_log, m_maintainer, m_utils): + assert ( + packager.get_signing_util(path) + == m_utils.return_value.__getitem__.return_value.return_value) + + assert ( + list(m_utils.return_value.__getitem__.call_args) + == [(path.name,), {}]) + assert ( + list(m_utils.return_value.__getitem__.return_value.call_args) + == [(path, m_maintainer.return_value, m_log.return_value), {}]) + + +@pytest.mark.parametrize("extract", [True, False]) +def test_packager_run(patches, extract): + packager = sign.PackageSigningRunner("x", "y", "z") + patched = patches( + "PackageSigningRunner.sign_tarball", + "PackageSigningRunner.sign_directory", + ("PackageSigningRunner.extract", dict(new_callable=PropertyMock)), + ("PackageSigningRunner.log", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.runner") + + assert ( + packager.run.__wrapped__.__catches__ + == (identity.GPGError, sign.SigningError)) + + with patched as (m_tarb, m_dir, m_extract, m_log): + m_extract.return_value = extract + assert not packager.run() + + assert ( + list(m_log.return_value.success.call_args) + == [('Successfully signed packages',), {}]) + + if extract: + assert ( + list(m_tarb.call_args) + == [(), {}]) + assert not m_dir.called + return + assert not m_tarb.called + assert ( + list(m_dir.call_args) + == [(), {}]) + + +def test_packager_sign(patches): + packager = sign.PackageSigningRunner("x", "y", "z") + patched = patches( + "PackageSigningRunner.get_signing_util", + ("PackageSigningRunner.log", dict(new_callable=PropertyMock)), + ("PackageSigningRunner.maintainer", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.runner") + path = MagicMock() + + with patched as (m_util, m_log, m_maintainer): + assert not packager.sign(path) + + assert ( + list(m_log.return_value.notice.call_args) + == [((f"Signing {path.name}s ({m_maintainer.return_value}) " + f"{path}"),), {}]) + assert ( + list(m_util.call_args) + == [(path, ), {}]) + assert ( + list(m_util.return_value.sign.call_args) + == [(), {}]) + + +@pytest.mark.parametrize("utils", [[], ["a", "b", "c"]]) +@pytest.mark.parametrize("listdir", [[], ["a", "b"], ["b", "c"], ["c", "d"]]) +def test_packager_sign_all(patches, listdir, utils): + packager = sign.PackageSigningRunner("x", "y", "z") + patched = patches( + "PackageSigningRunner.sign", + ("PackageSigningRunner.signing_utils", + dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.runner") + path = MagicMock() + + with patched as (m_sign, m_utils): + _glob = {} + + for _path in listdir: + _mock = MagicMock() + _mock.name = _path + _glob[_path] = _mock + path.glob.return_value = _glob.values() + m_utils.return_value = utils + assert not packager.sign_all(path) + + assert ( + list(path.glob.call_args) + == [('*',), {}]) + expected = [x for x in listdir if x in utils] + assert ( + list(list(c) for c in m_sign.call_args_list) + == [[(_glob[k], ), {}] for k in expected]) + + +@pytest.mark.parametrize("tar", [True, False]) +def test_packager_sign_directory(patches, tar): + packager = sign.PackageSigningRunner("x", "y", "z") + patched = patches( + "PackageSigningRunner.archive", + "PackageSigningRunner.sign", + ("PackageSigningRunner.path", dict(new_callable=PropertyMock)), + ("PackageSigningRunner.tar", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.runner") + + with patched as (m_archive, m_sign, m_path, m_tar): + m_tar.return_value = tar + assert not packager.sign_directory() + + assert ( + list(m_sign.call_args) + == [(m_path.return_value, ), {}]) + if not tar: + assert not m_archive.called + return + + assert ( + list(m_archive.call_args) + == [(m_path.return_value, ), {}]) + + +@pytest.mark.parametrize("tar", [True, False]) +def test_packager_sign_tarball(patches, tar): + packager = sign.PackageSigningRunner("x", "y", "z") + patched = patches( + "utils", + "PackageSigningRunner.archive", + "PackageSigningRunner.sign_all", + ("PackageSigningRunner.path", dict(new_callable=PropertyMock)), + ("PackageSigningRunner.tar", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.runner") + + with patched as (m_utils, m_archive, m_sign, m_path, m_tar): + m_tar.return_value = tar + if not tar: + with pytest.raises(sign.SigningError) as e: + packager.sign_tarball() + else: + assert not packager.sign_tarball() + + if not tar: + assert ( + e.value.args[0] + == ("You must set a `--tar` file to save to when " + "`--extract` is set")) + assert not m_utils.untar.called + assert not m_sign.called + assert not m_archive.called + return + + assert ( + list(m_utils.untar.call_args) + == [(m_path.return_value,), {}]) + assert ( + list(m_sign.call_args) + == [(m_utils.untar.return_value.__enter__.return_value,), {}]) + assert ( + list(m_archive.call_args) + == [(m_utils.untar.return_value.__enter__.return_value,), {}]) diff --git a/envoy.gpg.sign/tests/test_util.py b/envoy.gpg.sign/tests/test_util.py new file mode 100644 index 000000000..13e8c5243 --- /dev/null +++ b/envoy.gpg.sign/tests/test_util.py @@ -0,0 +1,213 @@ + +from unittest.mock import MagicMock, PropertyMock + +import pytest + +from envoy.gpg import identity, sign + + +@pytest.mark.parametrize("command", ["", None, "COMMAND", "OTHERCOMMAND"]) +def test_util_constructor(command): + packager = sign.PackageSigningRunner("x", "y", "z") + maintainer = identity.GPGIdentity(packager) + args = ("PATH", maintainer, "LOG") + if command is not None: + args += (command, ) + util = sign.DirectorySigningUtil(*args) + assert util._path == "PATH" + assert util.maintainer == maintainer + assert util.log == "LOG" + assert util._command == (command or "") + assert util.command_args == () + + +@pytest.mark.parametrize("command_name", ["", None, "CMD", "OTHERCMD"]) +@pytest.mark.parametrize("command", ["", None, "COMMAND", "OTHERCOMMAND"]) +@pytest.mark.parametrize("which", ["", None, "PATH", "OTHERPATH"]) +def test_util_command(patches, command_name, command, which): + packager = sign.PackageSigningRunner("x", "y", "z") + maintainer = identity.GPGIdentity(packager) + util = sign.DirectorySigningUtil( + "PATH", maintainer, "LOG", command=command) + patched = patches( + "shutil", + ("DirectorySigningUtil.package_type", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.util") + if command_name is not None: + util.command_name = command_name + + with patched as (m_shutil, m_type): + m_shutil.which.return_value = which + + if not which and not command: + with pytest.raises(sign.SigningError) as e: + util.command + + assert ( + list(m_shutil.which.call_args) + == [(command_name or "",), {}]) + assert ( + e.value.args[0] + == (f"Signing software missing ({m_type.return_value}): " + f"{command_name or ''}")) + return + + result = util.command + + assert "command" in util.__dict__ + assert not m_type.called + + if command: + assert not m_shutil.which.called + assert result == command + return + + assert ( + list(m_shutil.which.call_args) + == [(command_name or "",), {}]) + assert result == m_shutil.which.return_value + + +def test_util_sign(patches): + packager = sign.PackageSigningRunner("x", "y", "z") + maintainer = identity.GPGIdentity(packager) + util = sign.DirectorySigningUtil("PATH", maintainer, "LOG") + patched = patches( + "DirectorySigningUtil.sign_pkg", + ("DirectorySigningUtil.pkg_files", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.util") + + with patched as (m_sign, m_pkgs): + m_pkgs.return_value = ("PKG1", "PKG2", "PKG3") + assert not util.sign() + + assert ( + list(list(c) for c in m_sign.call_args_list) + == [[('PKG1',), {}], + [('PKG2',), {}], + [('PKG3',), {}]]) + + +def test_util_sign_command(patches): + packager = sign.PackageSigningRunner("x", "y", "z") + maintainer = identity.GPGIdentity(packager) + util = sign.DirectorySigningUtil("PATH", maintainer, "LOG") + patched = patches( + ("DirectorySigningUtil.command", dict(new_callable=PropertyMock)), + ("DirectorySigningUtil.command_args", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.util") + + with patched as (m_command, m_args): + m_args.return_value = ("ARG1", "ARG2", "ARG3") + assert ( + util.sign_command("PACKAGE") + == ((m_command.return_value, ) + + m_args.return_value + ("PACKAGE", ))) + + +@pytest.mark.parametrize("returncode", [0, 1]) +def test_util_sign_pkg(patches, returncode): + packager = sign.PackageSigningRunner("x", "y", "z") + maintainer = identity.GPGIdentity(packager) + util = sign.DirectorySigningUtil("PATH", maintainer, "LOG") + util.log = MagicMock() + pkg_file = MagicMock() + patched = patches( + "subprocess", + "DirectorySigningUtil.sign_command", + ("DirectorySigningUtil.package_type", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.util") + + with patched as (m_subproc, m_command, m_type): + m_subproc.run.return_value.returncode = returncode + if returncode: + with pytest.raises(sign.SigningError) as e: + util.sign_pkg(pkg_file) + else: + assert not util.sign_pkg(pkg_file) + + assert ( + list(util.log.notice.call_args) + == [(f"Sign package ({m_type.return_value}): {pkg_file.name}",), {}]) + assert ( + list(m_command.call_args) + == [(pkg_file,), {}]) + assert ( + list(m_subproc.run.call_args) + == [(m_command.return_value,), + {'capture_output': True, + 'encoding': 'utf-8'}]) + + if not returncode: + assert ( + list(util.log.success.call_args) + == [((f"Signed package ({m_type.return_value}): " + f"{pkg_file.name}"),), {}]) + return + assert ( + e.value.args[0] + == (m_subproc.run.return_value.stdout + + m_subproc.run.return_value.stderr)) + + +@pytest.mark.parametrize("ext", ["EXT1", "EXT2"]) +@pytest.mark.parametrize("package_type", [None, "", "TYPE1", "TYPE2"]) +def test_util_package_type(ext, package_type): + packager = sign.PackageSigningRunner("x", "y", "z") + maintainer = identity.GPGIdentity(packager) + util = sign.DirectorySigningUtil("PATH", maintainer, "LOG") + util.ext = ext + util._package_type = package_type + assert util.package_type == package_type or ext + + +def test_util_path(patches): + packager = sign.PackageSigningRunner("x", "y", "z") + maintainer = identity.GPGIdentity(packager) + util = sign.DirectorySigningUtil("PATH", maintainer, "LOG") + patched = patches( + "pathlib", + prefix="envoy.gpg.sign.util") + with patched as (m_plib, ): + assert util.path == m_plib.Path.return_value + + assert ( + list(m_plib.Path.call_args) + == [(util._path,), {}]) + + +@pytest.mark.parametrize( + "files", + [[], + ["abc", "xyz"], + ["abc.EXT", "xyz.EXT", "abc.FOO", "abc.BAR"], + ["abc.NOTEXT", "xyz.NOTEXT"]]) +def test_util_pkg_files(patches, files): + packager = sign.PackageSigningRunner("x", "y", "z") + maintainer = identity.GPGIdentity(packager) + util = sign.DirectorySigningUtil("PATH", maintainer, "LOG") + patched = patches( + ("DirectorySigningUtil.ext", dict(new_callable=PropertyMock)), + ("DirectorySigningUtil.path", dict(new_callable=PropertyMock)), + prefix="envoy.gpg.sign.util") + with patched as (m_ext, m_path): + _glob = {} + + for _path in files: + _mock = MagicMock() + _mock.name = _path + _glob[_path] = _mock + m_path.return_value.glob.return_value = _glob.values() + + m_ext.return_value = "EXT" + result = util.pkg_files + + expected = [fname for fname in files if fname.endswith(".EXT")] + + assert ( + list(m_path.return_value.glob.call_args) + == [("*",), {}]) + assert "pkg_files" not in util.__dict__ + assert ( + result + == tuple(_glob[k] for k in expected))