From 3ccdecc3b200dd1ecb043c7735ff2f24dc9b70ed Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Fri, 3 Mar 2023 14:22:46 +0100 Subject: [PATCH] gpg signer: remove subkey and expiration support (wip) - import_: require exact match between passed keyid, and one of the keys in the bundle returned by gpg, and return a GPGKey only for that key, w/o subkeys - sign: require exact match between keyid on attached public key and keyid on the signature returned by gpg - remove obsolete creation_time, validity_period and subkeys from GPGKey -> this also means, we no longer check key expiration at verification time, which might be out of sync anyway - remove static hashes field from GPGKey - adopt changes in tests - change a lower-level gpg warning log message to debug TODO: - proof-read - structure commits 1. changes in import_ and sign + relevant test changes 2. removal of unneeded fields + relevant test changes 3. new tests - comment on log level change, is this okay? - ticketize: check if key is valid (expired, revoked, incapable) on import Signed-off-by: Lukas Puehringer --- securesystemslib/gpg/common.py | 2 +- securesystemslib/signer/_gpg_signer.py | 202 +++++++------------------ tests/check_public_interfaces_gpg.py | 1 - tests/test_signer.py | 94 +++++------- 4 files changed, 94 insertions(+), 205 deletions(-) diff --git a/securesystemslib/gpg/common.py b/securesystemslib/gpg/common.py index ffd6d67b1..e1236f9d9 100644 --- a/securesystemslib/gpg/common.py +++ b/securesystemslib/gpg/common.py @@ -635,7 +635,7 @@ def get_pubkey_bundle(data, keyid): ): if public_key and public_key["keyid"].endswith(keyid.lower()): if idx > 1: - log.warning( + log.debug( "Exporting master key '{}' including subkeys '{}' for" # pylint: disable=logging-format-interpolation,consider-using-f-string " passed keyid '{}'.".format( master_public_key["keyid"], diff --git a/securesystemslib/signer/_gpg_signer.py b/securesystemslib/signer/_gpg_signer.py index 46d5c0042..615c24829 100644 --- a/securesystemslib/signer/_gpg_signer.py +++ b/securesystemslib/signer/_gpg_signer.py @@ -1,12 +1,12 @@ """Signer implementation for OpenPGP """ import logging -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, Optional, Tuple from urllib import parse -from securesystemslib import exceptions -from securesystemslib.gpg import exceptions as gpg_exceptions +from securesystemslib import exceptions, formats from securesystemslib.gpg import functions as gpg +from securesystemslib.gpg.exceptions import KeyNotFoundError from securesystemslib.signer._key import Key from securesystemslib.signer._signer import SecretsHandler, Signature, Signer @@ -25,159 +25,46 @@ class GPGKey(Key): ketytype: Key type, e.g. "rsa", "dsa" or "eddsa". scheme: Signing schemes, e.g. "pgp+rsa-pkcsv1.5", "pgp+dsa-fips-180-2", "pgp+eddsa-ed25519". - hashes: Hash algorithm to hash the data to be signed, e.g. "pgp+SHA2". keyval: Opaque key content. - creation_time: Unix timestamp when key was created. - validity_period: Validity of key in days. - subkeys: A dictionary of keyids as keys and GPGKeys as values. unrecognized_fields: Dictionary of all attributes that are not managed by Securesystemslib """ - def __init__( - self, - keyid: str, - keytype: str, - scheme: str, - hashes: List[str], - keyval: Dict[str, Any], - creation_time: Optional[int] = None, - validity_period: Optional[int] = None, - subkeys: Optional[Dict[str, "GPGKey"]] = None, - unrecognized_fields: Optional[Dict[str, Any]] = None, - ): - - super().__init__(keyid, keytype, scheme, keyval, unrecognized_fields) - - self.hashes = hashes - self.creation_time = creation_time - self.validity_period = validity_period - self.subkeys = subkeys - - def __eq__(self, other: Any) -> bool: - if not isinstance(other, GPGKey): - return False - - return ( - super().__eq__(other) - and self.hashes == other.hashes - and self.creation_time == other.creation_time - and self.validity_period == other.validity_period - and self.subkeys == other.subkeys - ) - - @classmethod - def __from_dict( - cls, - keyid: str, - keytype: str, - scheme: str, - subkeys: Optional[Dict[str, "GPGKey"]], - key_dict: Dict[str, Any], - ) -> "GPGKey": - """Helper for common from_*dict operations.""" - - hashes = key_dict.pop("hashes") - keyval = key_dict.pop("keyval") - creation_time = key_dict.pop("creation_time", None) - validity_period = key_dict.pop("validity_period", None) - - return cls( - keyid, - keytype, - scheme, - hashes, - keyval, - creation_time, - validity_period, - subkeys, - key_dict, - ) - - @classmethod - def _from_legacy_dict(cls, key_dict: Dict[str, Any]) -> "GPGKey": - """Create GPGKey from legacy dictionary representation.""" - - keyid = key_dict.pop("keyid") - keytype = key_dict.pop("type") - scheme = key_dict.pop("method") - subkeys = key_dict.pop("subkeys", None) - - if subkeys is not None: - subkeys = { - keyid: cls._from_legacy_dict( - key - ) # pylint: disable=protected-access - for (keyid, key) in subkeys.items() - } - - return cls.__from_dict(keyid, keytype, scheme, subkeys, key_dict) - @classmethod def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "GPGKey": keytype = key_dict.pop("keytype") scheme = key_dict.pop("scheme") - subkeys = key_dict.pop("subkeys", None) - - if subkeys: - subkeys = { - keyid: cls.from_dict(keyid, key) - for (keyid, key) in subkeys.items() - } - - return cls.__from_dict(keyid, keytype, scheme, subkeys, key_dict) + keyval = key_dict.pop("keyval") - def __to_dict(self) -> Dict[str, Any]: - """Helper for common to_*dict operations.""" + return cls(keyid, keytype, scheme, keyval, key_dict) - key_dict: Dict[str, Any] = { - "hashes": self.hashes, + def to_dict(self) -> Dict: + return { + "keytype": self.keytype, + "scheme": self.scheme, "keyval": self.keyval, + **self.unrecognized_fields, } - if self.creation_time is not None: - key_dict["creation_time"] = self.creation_time - if self.validity_period is not None: - key_dict["validity_period"] = self.validity_period + @classmethod + def _from_legacy_dict(cls, key_dict: Dict[str, Any]) -> "GPGKey": + """Create GPGKey from legacy dictionary representation.""" + keyid = key_dict.pop("keyid") + keytype = key_dict.pop("type") + scheme = key_dict.pop("method") + keyval = key_dict.pop("keyval") - return key_dict + return cls(keyid, keytype, scheme, keyval) def _to_legacy_dict(self) -> Dict[str, Any]: """Returns legacy dictionary representation of self.""" - - key_dict = self.__to_dict() - key_dict.update( - { - "keyid": self.keyid, - "type": self.keytype, - "method": self.scheme, - } - ) - - if self.subkeys: - key_dict["subkeys"] = { - keyid: key._to_legacy_dict() # pylint: disable=protected-access - for (keyid, key) in self.subkeys.items() - } - - return key_dict - - def to_dict(self) -> Dict[str, Any]: - key_dict = self.__to_dict() - key_dict.update( - { - "keytype": self.keytype, - "scheme": self.scheme, - **self.unrecognized_fields, - } - ) - - if self.subkeys: - key_dict["subkeys"] = { - keyid: key.to_dict() for (keyid, key) in self.subkeys.items() - } - - return key_dict + return { + "keyid": self.keyid, + "type": self.keytype, + "method": self.scheme, + "hashes": [formats.GPG_HASH_ALGORITHM_STRING], + "keyval": self.keyval, + } def verify_signature(self, signature: Signature, data: bytes) -> None: try: @@ -194,7 +81,6 @@ def verify_signature(self, signature: Signature, data: bytes) -> None: except ( exceptions.FormatError, exceptions.UnsupportedLibraryError, - gpg_exceptions.KeyExpirationError, ) as e: logger.info("Key %s failed to verify sig: %s", self.keyid, str(e)) raise exceptions.VerificationError( @@ -278,17 +164,35 @@ def import_( homedir: GnuPG home directory path. If not passed, the default homedir is used. + Raises: + UnsupportedLibraryError: The gpg command or pyca/cryptography are + not available. + KeyNotFoundError: No key was found for the passed keyid. + Returns: Tuple of private key uri and the public key. """ uri = f"{cls.SCHEME}:{homedir or ''}" - public_key = ( - GPGKey._from_legacy_dict( # pylint: disable=protected-access - gpg.export_pubkey(keyid, homedir) + raw_key = gpg.export_pubkey(keyid, homedir) + raw_keys = [raw_key] + list(raw_key.pop("subkeys", {}).values()) + keyids = [] + + for key in raw_keys: + if key["keyid"] == keyid: + # TODO: Raise here if key is invalid (expired, revoked, incapable) + public_key = GPGKey._from_legacy_dict( # pylint: disable=protected-access + key + ) + break + keyids.append(key["keyid"]) + + else: + raise KeyNotFoundError( + f"No exact match found for passed keyid" + f" {keyid}, found: {keyids}." ) - ) return (uri, public_key) @@ -310,7 +214,15 @@ def sign(self, payload: bytes) -> Signature: Returns: Signature. + """ - return self._sig_from_legacy_dict( - gpg.create_signature(payload, self.public_key.keyid, self.homedir) + raw_sig = gpg.create_signature( + payload, self.public_key.keyid, self.homedir ) + if raw_sig["keyid"] != self.public_key.keyid: + raise ValueError( + f"The signing key {raw_sig['keyid']} does not" + f" match the attached public key {self.public_key.keyid}." + ) + + return self._sig_from_legacy_dict(raw_sig) diff --git a/tests/check_public_interfaces_gpg.py b/tests/check_public_interfaces_gpg.py index 4ce2c9f57..fcf2368d0 100644 --- a/tests/check_public_interfaces_gpg.py +++ b/tests/check_public_interfaces_gpg.py @@ -34,7 +34,6 @@ export_pubkeys, verify_signature, ) - from securesystemslib.signer import GPGKey, GPGSigner, Signer diff --git a/tests/test_signer.py b/tests/test_signer.py index f121f9a83..472bcc1b3 100644 --- a/tests/test_signer.py +++ b/tests/test_signer.py @@ -18,7 +18,7 @@ VerificationError, ) from securesystemslib.gpg.constants import have_gpg -from securesystemslib.gpg.exceptions import KeyExpirationError +from securesystemslib.gpg.exceptions import CommandError, KeyNotFoundError from securesystemslib.signer import ( KEY_FOR_TYPE_AND_SCHEME, SIGNER_FOR_URI_SCHEME, @@ -374,8 +374,8 @@ class TestGPGRSA(unittest.TestCase): @classmethod def setUpClass(cls): - cls.default_keyid = "8465A1E2E0FB2B40ADB2478E18FB3F537E0C8A17" - cls.signing_subkey_keyid = "C5A0ABE6EC19D0D65F85E2C39BE9DF5131D924E9" + cls.default_keyid = "8465a1e2e0fb2b40adb2478e18fb3f537e0c8a17" + cls.signing_subkey_keyid = "c5a0abe6ec19d0d65f85e2c39be9df5131d924e9" # Create directory to run the tests without having everything blow up. cls.working_dir = os.getcwd() @@ -401,16 +401,13 @@ def tearDownClass(cls): os.chdir(cls.working_dir) shutil.rmtree(cls.test_dir) - def test_gpg_sign_and_verify_object_with_default_key(self): - """Create a signature using the default key on the keyring.""" + def test_gpg_sign_and_verify_object(self): + """Create a signature using a specific key on the keyring.""" - # Public key import requires a keyid, signer loading does not. Ignore the URI w/ - # keyid returned here, and construct one w/o keyid below, to test default key. - _, public_key = GPGSigner.import_( + uri, public_key = GPGSigner.import_( self.signing_subkey_keyid, self.gnupg_home ) - uri = f"gnupg:{self.gnupg_home}" signer = Signer.from_priv_key_uri(uri, public_key) sig = signer.sign(self.test_data) @@ -419,33 +416,39 @@ def test_gpg_sign_and_verify_object_with_default_key(self): with self.assertRaises(UnverifiedSignatureError): public_key.verify_signature(sig, self.wrong_data) - def test_gpg_sign_and_verify_object(self): - """Create a signature using a specific key on the keyring.""" - - uri, public_key = GPGSigner.import_( - self.signing_subkey_keyid, self.gnupg_home - ) + sig.keyid = 123456 + with self.assertRaises(VerificationError): + public_key.verify_signature(sig, self.test_data) + def test_gpg_fail_sign_keyid_match(self): + """Fail signing because signature keyid does not match public key.""" + uri, public_key = GPGSigner.import_(self.default_keyid, self.gnupg_home) signer = Signer.from_priv_key_uri(uri, public_key) - sig = signer.sign(self.test_data) - public_key.verify_signature(sig, self.test_data) + # Fail because we imported main key, but gpg favors signing subkey + with self.assertRaises(ValueError): + signer.sign(self.test_data) - with self.assertRaises(UnverifiedSignatureError): - public_key.verify_signature(sig, self.wrong_data) + def test_gpg_fail_import_keyid_match(self): + """Fail key import because passed keyid does not match returned key.""" - public_key.subkeys[sig.keyid].creation_time = 1 - public_key.subkeys[sig.keyid].validity_period = 1 - with self.assertRaises(VerificationError) as ctx: - public_key.verify_signature(sig, self.test_data) + # gpg exports the right key, but we require an exact keyid match + non_matching_keyid = self.default_keyid.upper() + with self.assertRaises(KeyNotFoundError): + GPGSigner.import_(non_matching_keyid, self.gnupg_home) - self.assertIsInstance(ctx.exception.__cause__, KeyExpirationError) + def test_gpg_fail_sign_expired_key(self): + """Signing fails with non-zero exit code if key is expired.""" + expired_key = "e8ac80c924116dabb51d4b987cb07d6d2c199c7c" + + uri, public_key = GPGSigner.import_(expired_key, self.gnupg_home) + signer = Signer.from_priv_key_uri(uri, public_key) + with self.assertRaises(CommandError): + signer.sign(self.test_data) def test_gpg_signer_load_with_bad_scheme(self): """Load from priv key uri with wrong uri scheme.""" - key = GPGKey( - "aa", "rsa", "pgp+rsa-pkcsv1.5", ["pgp+SHA2"], {"public": "val"} - ) + key = GPGKey("aa", "rsa", "pgp+rsa-pkcsv1.5", {"public": "val"}) with self.assertRaises(ValueError): GPGSigner.from_priv_key_uri("wrong:", key) @@ -481,42 +484,17 @@ def test_gpg_key_legacy_data_structure(self): fields = {"keytype", "scheme"} legacy_dict = public_key._to_legacy_dict() - for key in [legacy_dict] + list(legacy_dict["subkeys"].values()): - for field in legacy_fields: - self.assertIn(field, key) - for field in fields: - self.assertNotIn(field, key) - - self.assertEqual(public_key, GPGKey._from_legacy_dict(legacy_dict)) + for field in legacy_fields: + self.assertIn(field, legacy_dict) - def test_gpg_key_optional_fields(self): - """Test gpg public key from/to_dict with optional fields.""" + for field in fields: + self.assertNotIn(field, legacy_dict) - keydict_base = { - "keytype": "rsa", - "scheme": "pgp+rsa-pkcsv1.5", - "hashes": ["pgp+SHA2"], - "keyval": { - "public": "pubkeyval", - }, - } - - for name, val in ( - ("creation_time", 1), - ("validity_period", 1), - ("subkeys", {"bb": copy.deepcopy(keydict_base)}), - ): - keydict = copy.deepcopy(keydict_base) - keydict[name] = val - key = Key.from_dict("aa", copy.deepcopy(keydict)) - self.assertIsNotNone(getattr(key, name)) - self.assertDictEqual(keydict, key.to_dict(), name) + self.assertEqual(public_key, GPGKey._from_legacy_dict(legacy_dict)) def test_gpg_key__eq__(self): """Test GPGKey.__eq__() .""" - key1 = GPGKey( - "aa", "rsa", "pgp+rsa-pkcsv1.5", ["pgp+SHA2"], {"public": "val"} - ) + key1 = GPGKey("aa", "rsa", "pgp+rsa-pkcsv1.5", {"public": "val"}) key2 = copy.deepcopy(key1) self.assertEqual(key1, key2)