diff --git a/securesystemslib/gpg/common.py b/securesystemslib/gpg/common.py index ffd6d67b..e1236f9d 100644 --- a/securesystemslib/gpg/common.py +++ b/securesystemslib/gpg/common.py @@ -635,7 +635,7 @@ def get_pubkey_bundle(data, keyid): ): if public_key and public_key["keyid"].endswith(keyid.lower()): if idx > 1: - log.warning( + log.debug( "Exporting master key '{}' including subkeys '{}' for" # pylint: disable=logging-format-interpolation,consider-using-f-string " passed keyid '{}'.".format( master_public_key["keyid"], diff --git a/securesystemslib/signer/__init__.py b/securesystemslib/signer/__init__.py index d5c816d1..37032e34 100644 --- a/securesystemslib/signer/__init__.py +++ b/securesystemslib/signer/__init__.py @@ -5,7 +5,7 @@ Some implementations are provided by default but more can be added by users. """ from securesystemslib.signer._gcp_signer import GCPSigner -from securesystemslib.signer._gpg_signer import GPGSigner +from securesystemslib.signer._gpg_signer import GPGKey, GPGSigner from securesystemslib.signer._hsm_signer import HSMSigner from securesystemslib.signer._key import KEY_FOR_TYPE_AND_SCHEME, Key, SSlibKey from securesystemslib.signer._signature import Signature @@ -23,6 +23,7 @@ SSlibSigner.FILE_URI_SCHEME: SSlibSigner, GCPSigner.SCHEME: GCPSigner, HSMSigner.SCHEME: HSMSigner, + GPGSigner.SCHEME: GPGSigner, } ) @@ -47,5 +48,8 @@ ("rsa", "rsa-pkcs1v15-sha384"): SSlibKey, ("rsa", "rsa-pkcs1v15-sha512"): SSlibKey, ("sphincs", "sphincs-shake-128s"): SSlibKey, + ("rsa", "pgp+rsa-pkcsv1.5"): GPGKey, + ("dsa", "pgp+dsa-fips-180-2"): GPGKey, + ("eddsa", "pgp+eddsa-ed25519"): GPGKey, } ) diff --git a/securesystemslib/signer/_gpg_signer.py b/securesystemslib/signer/_gpg_signer.py index 6d48b0ee..5498609d 100644 --- a/securesystemslib/signer/_gpg_signer.py +++ b/securesystemslib/signer/_gpg_signer.py @@ -1,32 +1,107 @@ """Signer implementation for OpenPGP """ -from typing import Dict, Optional -import securesystemslib.gpg.functions as gpg +import logging +from typing import Any, Dict, Optional, Tuple +from urllib import parse + +from securesystemslib import exceptions, formats +from securesystemslib.gpg import exceptions as gpg_exceptions +from securesystemslib.gpg import functions as gpg from securesystemslib.signer._key import Key from securesystemslib.signer._signer import SecretsHandler, Signature, Signer +logger = logging.getLogger(__name__) + + +class GPGKey(Key): + """OpenPGP Key. + + *All parameters named below are not just constructor arguments but also + instance attributes.* + + Attributes: + keyid: Key identifier that is unique within the metadata it is used in. + It is also used to identify the GnuPG local user signing key. + ketytype: Key type, e.g. "rsa", "dsa" or "eddsa". + scheme: Signing schemes, e.g. "pgp+rsa-pkcsv1.5", "pgp+dsa-fips-180-2", + "pgp+eddsa-ed25519". + keyval: Opaque key content. + unrecognized_fields: Dictionary of all attributes that are not managed + by Securesystemslib + """ + + @classmethod + def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "GPGKey": + keytype = key_dict.pop("keytype") + scheme = key_dict.pop("scheme") + keyval = key_dict.pop("keyval") + + return cls(keyid, keytype, scheme, keyval, key_dict) + + def to_dict(self) -> Dict: + return { + "keytype": self.keytype, + "scheme": self.scheme, + "keyval": self.keyval, + **self.unrecognized_fields, + } + + def verify_signature(self, signature: Signature, data: bytes) -> None: + try: + if not gpg.verify_signature( + GPGSigner._sig_to_legacy_dict( # pylint: disable=protected-access + signature + ), + GPGSigner._key_to_legacy_dict( # pylint: disable=protected-access + self + ), + data, + ): + raise exceptions.UnverifiedSignatureError( + f"Failed to verify signature by {self.keyid}" + ) + except ( + exceptions.FormatError, + exceptions.UnsupportedLibraryError, + ) as e: + logger.info("Key %s failed to verify sig: %s", self.keyid, str(e)) + raise exceptions.VerificationError( + f"Unknown failure to verify signature by {self.keyid}" + ) from e + class GPGSigner(Signer): """OpenPGP Signer - Runs command in ``GNUPG`` environment variable to sign, fallback commands are + Runs command in ``GNUPG`` environment variable to sign. Fallback commands are ``gpg2`` and ``gpg``. Supported signing schemes are: "pgp+rsa-pkcsv1.5", "pgp+dsa-fips-180-2" and "pgp+eddsa-ed25519", with SHA-256 hashing. + GPGSigner can be instantiated with Signer.from_priv_key_uri(). These private key URI + schemes are supported: + + * "gnupg:[]": + Signs with GnuPG key in keyring in home dir. The signing key is + identified with the keyid of the passed public key. If homedir is not + passed, the default homedir is used. Arguments: - keyid: GnuPG local user signing key id. If not passed, the default key is used. + public_key: The related public key instance. homedir: GnuPG home directory path. If not passed, the default homedir is used. """ + SCHEME = "gnupg" + def __init__( - self, keyid: Optional[str] = None, homedir: Optional[str] = None + self, + public_key: Key, + homedir: Optional[str] = None, ): - self.keyid = keyid self.homedir = homedir + self.public_key = public_key @classmethod def from_priv_key_uri( @@ -35,41 +110,123 @@ def from_priv_key_uri( public_key: Key, secrets_handler: Optional[SecretsHandler] = None, ) -> "GPGSigner": - raise NotImplementedError("Incompatible with private key URIs") + if not isinstance(public_key, GPGKey): + raise ValueError(f"expected GPGKey for {priv_key_uri}") + + uri = parse.urlparse(priv_key_uri) + + if uri.scheme != cls.SCHEME: + raise ValueError(f"GPGSigner does not support {priv_key_uri}") + + homedir = uri.path or None + + return cls(public_key, homedir) @staticmethod - def _to_gpg_sig(sig: Signature) -> Dict: - """Helper to convert Signature -> internal gpg signature format.""" + def _sig_to_legacy_dict(sig: Signature) -> Dict: + """Helper to convert Signature to internal gpg signature dict format.""" sig_dict = sig.to_dict() sig_dict["signature"] = sig_dict.pop("sig") return sig_dict @staticmethod - def _from_gpg_sig(sig_dict: Dict) -> Signature: - """Helper to convert internal gpg signature format -> Signature.""" + def _sig_from_legacy_dict(sig_dict: Dict) -> Signature: + """Helper to convert internal gpg signature format to Signature.""" sig_dict["sig"] = sig_dict.pop("signature") return Signature.from_dict(sig_dict) + @staticmethod + def _key_to_legacy_dict(key: GPGKey) -> Dict[str, Any]: + """Returns legacy dictionary representation of self.""" + return { + "keyid": key.keyid, + "type": key.keytype, + "method": key.scheme, + "hashes": [formats.GPG_HASH_ALGORITHM_STRING], + "keyval": key.keyval, + } + + @staticmethod + def _key_from_legacy_dict(key_dict: Dict[str, Any]) -> GPGKey: + """Create GPGKey from legacy dictionary representation.""" + keyid = key_dict["keyid"] + keytype = key_dict["type"] + scheme = key_dict["method"] + keyval = key_dict["keyval"] + + return GPGKey(keyid, keytype, scheme, keyval) + + @classmethod + def import_( + cls, keyid: str, homedir: Optional[str] = None + ) -> Tuple[str, Key]: + """Load key and signer details from GnuPG keyring. + + NOTE: Information about the key validity (expiration, revocation, etc.) + is discarded at import and not considered when verifying a signature. + + Args: + keyid: GnuPG local user signing key id. + homedir: GnuPG home directory path. If not passed, the default homedir is + used. + + Raises: + UnsupportedLibraryError: The gpg command or pyca/cryptography are + not available. + KeyNotFoundError: No key was found for the passed keyid. + + Returns: + Tuple of private key uri and the public key. + + """ + uri = f"{cls.SCHEME}:{homedir or ''}" + + raw_key = gpg.export_pubkey(keyid, homedir) + raw_keys = [raw_key] + list(raw_key.pop("subkeys", {}).values()) + keyids = [] + + for key in raw_keys: + if key["keyid"] == keyid: + # TODO: Raise here if key is expired, revoked, incapable, ... + public_key = cls._key_from_legacy_dict(key) + break + keyids.append(key["keyid"]) + + else: + raise gpg_exceptions.KeyNotFoundError( + f"No exact match found for passed keyid" + f" {keyid}, found: {keyids}." + ) + + return (uri, public_key) + def sign(self, payload: bytes) -> Signature: - """Signs payload with ``gpg``. + """Signs payload with GnuPG. Arguments: payload: bytes to be signed. Raises: - ValueError: The gpg command failed to create a valid signature. - OSError: the gpg command is not present or non-executable. - securesystemslib.exceptions.UnsupportedLibraryError: The gpg - command is not available, or the cryptography library is - not installed. - securesystemslib.gpg.exceptions.CommandError: The gpg command - returned a non-zero exit code. - securesystemslib.gpg.exceptions.KeyNotFoundError: The used gpg - version is not fully supported. + ValueError: gpg command failed to create a valid signature. + OSError: gpg command is not present or non-executable. + securesystemslib.exceptions.UnsupportedLibraryError: gpg command is not + available, or the cryptography library is not installed. + securesystemslib.gpg.exceptions.CommandError: gpg command returned a + non-zero exit code. + securesystemslib.gpg.exceptions.KeyNotFoundError: gpg version is not fully + supported. Returns: Signature. + """ - return self._from_gpg_sig( - gpg.create_signature(payload, self.keyid, self.homedir) + raw_sig = gpg.create_signature( + payload, self.public_key.keyid, self.homedir ) + if raw_sig["keyid"] != self.public_key.keyid: + raise ValueError( + f"The signing key {raw_sig['keyid']} does not" + f" match the attached public key {self.public_key.keyid}." + ) + + return self._sig_from_legacy_dict(raw_sig) diff --git a/tests/check_public_interfaces.py b/tests/check_public_interfaces.py index c35b2d24..9b195d20 100644 --- a/tests/check_public_interfaces.py +++ b/tests/check_public_interfaces.py @@ -41,6 +41,11 @@ import securesystemslib.gpg.util # pylint: disable=wrong-import-position import securesystemslib.interface # pylint: disable=wrong-import-position import securesystemslib.keys # pylint: disable=wrong-import-position +from securesystemslib.exceptions import ( + UnsupportedLibraryError, + VerificationError, +) +from securesystemslib.signer import GPGKey, Signature class TestPublicInterfaces( @@ -314,6 +319,14 @@ def test_gpg_functions(self): securesystemslib.gpg.functions.export_pubkey("f00") self.assertEqual(expected_error_msg, str(ctx.exception)) + def test_signer(self): + """Assert generic VerificationError from UnsupportedLibraryError.""" + key = GPGKey("aa", "rsa", "pgp+rsa-pkcsv1.5", {"public": "val"}) + sig = Signature("aa", "aaaaaaa", {"other_headers": "aaaaaa"}) + with self.assertRaises(VerificationError) as ctx: + key.verify_signature(sig, b"data") + self.assertIsInstance(ctx.exception.__cause__, UnsupportedLibraryError) + if __name__ == "__main__": unittest.main(verbosity=1, buffer=True) diff --git a/tests/check_public_interfaces_gpg.py b/tests/check_public_interfaces_gpg.py index 3ed236e9..eeadf18a 100644 --- a/tests/check_public_interfaces_gpg.py +++ b/tests/check_public_interfaces_gpg.py @@ -34,6 +34,7 @@ export_pubkeys, verify_signature, ) +from securesystemslib.signer import GPGKey, GPGSigner, Signer class TestPublicInterfacesGPG( @@ -47,17 +48,28 @@ def setUpClass(cls): def test_gpg_functions(self): """Signing, key export and util functions must raise on missing gpg.""" - with self.assertRaises(UnsupportedLibraryError) as ctx: - create_signature("bar") - self.assertEqual(NO_GPG_MSG, str(ctx.exception)) - with self.assertRaises(UnsupportedLibraryError) as ctx: - export_pubkey("f00") - self.assertEqual(NO_GPG_MSG, str(ctx.exception)) - - with self.assertRaises(UnsupportedLibraryError) as ctx: - export_pubkeys(["f00"]) - self.assertEqual(NO_GPG_MSG, str(ctx.exception)) + # Hand-crafting a GPG public key and loading a signer works w/o gpg, but + # signing fails (see below). + mock_public_key = GPGKey( + "aa", + "rsa", + "pgp+rsa-pkcsv1.5", + {"public": {"key": "value"}}, + ) + signer = Signer.from_priv_key_uri("gnupg:?id=abcd", mock_public_key) + + # Run commands that require gpg and assert error plus message + for fn, args in ( + (create_signature, ("bar",)), + (export_pubkey, ("f00",)), + (export_pubkeys, (["f00"],)), + (GPGSigner.import_, ("keyid",)), + (signer.sign, (b"data",)), + ): + with self.assertRaises(UnsupportedLibraryError) as ctx: + fn(*args) + self.assertEqual(NO_GPG_MSG, str(ctx.exception)) def test_gpg_verify(self): """Signature verification does not require gpg to be installed on the host. @@ -139,6 +151,10 @@ def test_gpg_verify(self): for key, sig in key_signature_pairs: self.assertTrue(verify_signature(sig, key, data)) + # pylint: disable=protected-access + GPGSigner._key_from_legacy_dict(key).verify_signature( + GPGSigner._sig_from_legacy_dict(sig), data + ) if __name__ == "__main__": diff --git a/tests/test_signer.py b/tests/test_signer.py index db114fd0..23b6761c 100644 --- a/tests/test_signer.py +++ b/tests/test_signer.py @@ -15,13 +15,14 @@ FormatError, UnsupportedAlgorithmError, UnverifiedSignatureError, + VerificationError, ) from securesystemslib.gpg.constants import have_gpg -from securesystemslib.gpg.functions import export_pubkey -from securesystemslib.gpg.functions import verify_signature as verify_sig +from securesystemslib.gpg.exceptions import CommandError, KeyNotFoundError from securesystemslib.signer import ( KEY_FOR_TYPE_AND_SCHEME, SIGNER_FOR_URI_SCHEME, + GPGKey, GPGSigner, Key, SecretsHandler, @@ -42,6 +43,7 @@ def test_key_from_to_dict(self): "keytype": keytype, "scheme": scheme, "extra": "somedata", + "hashes": ["only recognized by GPGKey"], "keyval": { "public": "pubkeyval", "foo": "bar", @@ -372,8 +374,8 @@ class TestGPGRSA(unittest.TestCase): @classmethod def setUpClass(cls): - cls.default_keyid = "8465A1E2E0FB2B40ADB2478E18FB3F537E0C8A17" - cls.signing_subkey_keyid = "C5A0ABE6EC19D0D65F85E2C39BE9DF5131D924E9" + cls.default_keyid = "8465a1e2e0fb2b40adb2478e18fb3f537e0c8a17" + cls.signing_subkey_keyid = "c5a0abe6ec19d0d65f85e2c39be9df5131d924e9" # Create directory to run the tests without having everything blow up. cls.working_dir = os.getcwd() @@ -399,43 +401,113 @@ def tearDownClass(cls): os.chdir(cls.working_dir) shutil.rmtree(cls.test_dir) - def test_gpg_sign_and_verify_object_with_default_key(self): - """Create a signature using the default key on the keyring.""" - # pylint: disable=protected-access - signer = GPGSigner(homedir=self.gnupg_home) - signature = signer.sign(self.test_data) + def test_gpg_sign_and_verify_object(self): + """Create a signature using a specific key on the keyring.""" - signature_dict = GPGSigner._to_gpg_sig(signature) - key_data = export_pubkey(self.default_keyid, self.gnupg_home) + uri, public_key = GPGSigner.import_( + self.signing_subkey_keyid, self.gnupg_home + ) - self.assertTrue(verify_sig(signature_dict, key_data, self.test_data)) - self.assertFalse(verify_sig(signature_dict, key_data, self.wrong_data)) + signer = Signer.from_priv_key_uri(uri, public_key) + sig = signer.sign(self.test_data) - def test_gpg_sign_and_verify_object(self): - """Create a signature using a specific key on the keyring.""" - # pylint: disable=protected-access - signer = GPGSigner(self.signing_subkey_keyid, self.gnupg_home) - signature = signer.sign(self.test_data) + public_key.verify_signature(sig, self.test_data) + + with self.assertRaises(UnverifiedSignatureError): + public_key.verify_signature(sig, self.wrong_data) + + sig.keyid = 123456 + with self.assertRaises(VerificationError): + public_key.verify_signature(sig, self.test_data) + + def test_gpg_fail_sign_keyid_match(self): + """Fail signing because signature keyid does not match public key.""" + uri, public_key = GPGSigner.import_(self.default_keyid, self.gnupg_home) + signer = Signer.from_priv_key_uri(uri, public_key) + + # Fail because we imported main key, but gpg favors signing subkey + with self.assertRaises(ValueError): + signer.sign(self.test_data) + + def test_gpg_fail_import_keyid_match(self): + """Fail key import because passed keyid does not match returned key.""" + + # gpg exports the right key, but we require an exact keyid match + non_matching_keyid = self.default_keyid.upper() + with self.assertRaises(KeyNotFoundError): + GPGSigner.import_(non_matching_keyid, self.gnupg_home) - signature_dict = GPGSigner._to_gpg_sig(signature) - key_data = export_pubkey(self.signing_subkey_keyid, self.gnupg_home) + def test_gpg_fail_sign_expired_key(self): + """Signing fails with non-zero exit code if key is expired.""" + expired_key = "e8ac80c924116dabb51d4b987cb07d6d2c199c7c" - self.assertTrue(verify_sig(signature_dict, key_data, self.test_data)) - self.assertFalse(verify_sig(signature_dict, key_data, self.wrong_data)) + uri, public_key = GPGSigner.import_(expired_key, self.gnupg_home) + signer = Signer.from_priv_key_uri(uri, public_key) + with self.assertRaises(CommandError): + signer.sign(self.test_data) - def test_gpg_signature_data_structure(self): + def test_gpg_signer_load_with_bad_scheme(self): + """Load from priv key uri with wrong uri scheme.""" + key = GPGKey("aa", "rsa", "pgp+rsa-pkcsv1.5", {"public": "val"}) + with self.assertRaises(ValueError): + GPGSigner.from_priv_key_uri("wrong:", key) + + def test_gpg_signer_load_with_bad_key(self): + """Load from priv key uri with wrong pubkey type.""" + key = SSlibKey("aa", "rsa", "rsassa-pss-sha256", {"public": "val"}) + with self.assertRaises(ValueError): + GPGSigner.from_priv_key_uri("gnupg:", key) + + def test_gpg_signature_legacy_data_structure(self): """Test custom fields and legacy data structure in gpg signatures.""" # pylint: disable=protected-access - signer = GPGSigner(homedir=self.gnupg_home) + _, public_key = GPGSigner.import_( + self.signing_subkey_keyid, self.gnupg_home + ) + signer = GPGSigner(public_key, homedir=self.gnupg_home) sig = signer.sign(self.test_data) self.assertIn("other_headers", sig.unrecognized_fields) - sig_dict = GPGSigner._to_gpg_sig(sig) + sig_dict = GPGSigner._sig_to_legacy_dict(sig) self.assertIn("signature", sig_dict) self.assertNotIn("sig", sig_dict) - sig2 = GPGSigner._from_gpg_sig(sig_dict) + sig2 = GPGSigner._sig_from_legacy_dict(sig_dict) self.assertEqual(sig, sig2) + def test_gpg_key_legacy_data_structure(self): + """Test legacy data structure conversion in gpg keys.""" + # pylint: disable=protected-access + _, public_key = GPGSigner.import_( + self.signing_subkey_keyid, self.gnupg_home + ) + legacy_fields = {"keyid", "type", "method"} + fields = {"keytype", "scheme"} + + legacy_dict = GPGSigner._key_to_legacy_dict(public_key) + for field in legacy_fields: + self.assertIn(field, legacy_dict) + + for field in fields: + self.assertNotIn(field, legacy_dict) + + self.assertEqual( + public_key, GPGSigner._key_from_legacy_dict(legacy_dict) + ) + + def test_gpg_key__eq__(self): + """Test GPGKey.__eq__() .""" + key1 = GPGKey("aa", "rsa", "pgp+rsa-pkcsv1.5", {"public": "val"}) + key2 = copy.deepcopy(key1) + self.assertEqual(key1, key2) + + key2.keyid = "bb" + self.assertNotEqual(key1, key2) + + other_key = SSlibKey( + "aa", "rsa", "rsassa-pss-sha256", {"public": "val"} + ) + self.assertNotEqual(key1, other_key) + # Run the unit tests. if __name__ == "__main__":