Skip to content

Commit

Permalink
gpg signer: remove subkey and expiration support (wip)
Browse files Browse the repository at this point in the history
- import_: require exact match between passed keyid, and one of
  the keys in the bundle returned by gpg, and return a GPGKey
  only for that key, w/o subkeys
- sign: require exact match between keyid on attached public key
  and keyid on the signature returned by gpg
- remove obsolete creation_time, validity_period and subkeys from
  GPGKey
  -> this also means, we no longer check key expiration at
  verification time, which might be out of sync anyway
- remove static hashes field from GPGKey
- adopt changes in tests

- change a lower-level gpg warning log message to debug

TODO:
- proof-read
- structure commits
  1. changes in import_ and sign + relevant test changes
  2. removal of unneeded fields + relevant test changes
  3. new tests
- comment on log level change, is this okay?
- ticketize: check if key is valid (expired, revoked, incapable) on
  import

Signed-off-by: Lukas Puehringer <lukas.puehringer@nyu.edu>
  • Loading branch information
lukpueh committed Mar 3, 2023
1 parent b89f6a1 commit 3ccdecc
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 205 deletions.
2 changes: 1 addition & 1 deletion securesystemslib/gpg/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ def get_pubkey_bundle(data, keyid):
):
if public_key and public_key["keyid"].endswith(keyid.lower()):
if idx > 1:
log.warning(
log.debug(
"Exporting master key '{}' including subkeys '{}' for" # pylint: disable=logging-format-interpolation,consider-using-f-string
" passed keyid '{}'.".format(
master_public_key["keyid"],
Expand Down
202 changes: 57 additions & 145 deletions securesystemslib/signer/_gpg_signer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
"""Signer implementation for OpenPGP """

import logging
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, Optional, Tuple
from urllib import parse

from securesystemslib import exceptions
from securesystemslib.gpg import exceptions as gpg_exceptions
from securesystemslib import exceptions, formats
from securesystemslib.gpg import functions as gpg
from securesystemslib.gpg.exceptions import KeyNotFoundError
from securesystemslib.signer._key import Key
from securesystemslib.signer._signer import SecretsHandler, Signature, Signer

Expand All @@ -25,159 +25,46 @@ class GPGKey(Key):
ketytype: Key type, e.g. "rsa", "dsa" or "eddsa".
scheme: Signing schemes, e.g. "pgp+rsa-pkcsv1.5", "pgp+dsa-fips-180-2",
"pgp+eddsa-ed25519".
hashes: Hash algorithm to hash the data to be signed, e.g. "pgp+SHA2".
keyval: Opaque key content.
creation_time: Unix timestamp when key was created.
validity_period: Validity of key in days.
subkeys: A dictionary of keyids as keys and GPGKeys as values.
unrecognized_fields: Dictionary of all attributes that are not managed
by Securesystemslib
"""

def __init__(
self,
keyid: str,
keytype: str,
scheme: str,
hashes: List[str],
keyval: Dict[str, Any],
creation_time: Optional[int] = None,
validity_period: Optional[int] = None,
subkeys: Optional[Dict[str, "GPGKey"]] = None,
unrecognized_fields: Optional[Dict[str, Any]] = None,
):

super().__init__(keyid, keytype, scheme, keyval, unrecognized_fields)

self.hashes = hashes
self.creation_time = creation_time
self.validity_period = validity_period
self.subkeys = subkeys

def __eq__(self, other: Any) -> bool:
if not isinstance(other, GPGKey):
return False

return (
super().__eq__(other)
and self.hashes == other.hashes
and self.creation_time == other.creation_time
and self.validity_period == other.validity_period
and self.subkeys == other.subkeys
)

@classmethod
def __from_dict(
cls,
keyid: str,
keytype: str,
scheme: str,
subkeys: Optional[Dict[str, "GPGKey"]],
key_dict: Dict[str, Any],
) -> "GPGKey":
"""Helper for common from_*dict operations."""

hashes = key_dict.pop("hashes")
keyval = key_dict.pop("keyval")
creation_time = key_dict.pop("creation_time", None)
validity_period = key_dict.pop("validity_period", None)

return cls(
keyid,
keytype,
scheme,
hashes,
keyval,
creation_time,
validity_period,
subkeys,
key_dict,
)

@classmethod
def _from_legacy_dict(cls, key_dict: Dict[str, Any]) -> "GPGKey":
"""Create GPGKey from legacy dictionary representation."""

keyid = key_dict.pop("keyid")
keytype = key_dict.pop("type")
scheme = key_dict.pop("method")
subkeys = key_dict.pop("subkeys", None)

if subkeys is not None:
subkeys = {
keyid: cls._from_legacy_dict(
key
) # pylint: disable=protected-access
for (keyid, key) in subkeys.items()
}

return cls.__from_dict(keyid, keytype, scheme, subkeys, key_dict)

@classmethod
def from_dict(cls, keyid: str, key_dict: Dict[str, Any]) -> "GPGKey":
keytype = key_dict.pop("keytype")
scheme = key_dict.pop("scheme")
subkeys = key_dict.pop("subkeys", None)

if subkeys:
subkeys = {
keyid: cls.from_dict(keyid, key)
for (keyid, key) in subkeys.items()
}

return cls.__from_dict(keyid, keytype, scheme, subkeys, key_dict)
keyval = key_dict.pop("keyval")

def __to_dict(self) -> Dict[str, Any]:
"""Helper for common to_*dict operations."""
return cls(keyid, keytype, scheme, keyval, key_dict)

key_dict: Dict[str, Any] = {
"hashes": self.hashes,
def to_dict(self) -> Dict:
return {
"keytype": self.keytype,
"scheme": self.scheme,
"keyval": self.keyval,
**self.unrecognized_fields,
}
if self.creation_time is not None:
key_dict["creation_time"] = self.creation_time

if self.validity_period is not None:
key_dict["validity_period"] = self.validity_period
@classmethod
def _from_legacy_dict(cls, key_dict: Dict[str, Any]) -> "GPGKey":
"""Create GPGKey from legacy dictionary representation."""
keyid = key_dict.pop("keyid")
keytype = key_dict.pop("type")
scheme = key_dict.pop("method")
keyval = key_dict.pop("keyval")

return key_dict
return cls(keyid, keytype, scheme, keyval)

def _to_legacy_dict(self) -> Dict[str, Any]:
"""Returns legacy dictionary representation of self."""

key_dict = self.__to_dict()
key_dict.update(
{
"keyid": self.keyid,
"type": self.keytype,
"method": self.scheme,
}
)

if self.subkeys:
key_dict["subkeys"] = {
keyid: key._to_legacy_dict() # pylint: disable=protected-access
for (keyid, key) in self.subkeys.items()
}

return key_dict

def to_dict(self) -> Dict[str, Any]:
key_dict = self.__to_dict()
key_dict.update(
{
"keytype": self.keytype,
"scheme": self.scheme,
**self.unrecognized_fields,
}
)

if self.subkeys:
key_dict["subkeys"] = {
keyid: key.to_dict() for (keyid, key) in self.subkeys.items()
}

return key_dict
return {
"keyid": self.keyid,
"type": self.keytype,
"method": self.scheme,
"hashes": [formats.GPG_HASH_ALGORITHM_STRING],
"keyval": self.keyval,
}

def verify_signature(self, signature: Signature, data: bytes) -> None:
try:
Expand All @@ -194,7 +81,6 @@ def verify_signature(self, signature: Signature, data: bytes) -> None:
except (
exceptions.FormatError,
exceptions.UnsupportedLibraryError,
gpg_exceptions.KeyExpirationError,
) as e:
logger.info("Key %s failed to verify sig: %s", self.keyid, str(e))
raise exceptions.VerificationError(
Expand Down Expand Up @@ -278,17 +164,35 @@ def import_(
homedir: GnuPG home directory path. If not passed, the default homedir is
used.
Raises:
UnsupportedLibraryError: The gpg command or pyca/cryptography are
not available.
KeyNotFoundError: No key was found for the passed keyid.
Returns:
Tuple of private key uri and the public key.
"""
uri = f"{cls.SCHEME}:{homedir or ''}"

public_key = (
GPGKey._from_legacy_dict( # pylint: disable=protected-access
gpg.export_pubkey(keyid, homedir)
raw_key = gpg.export_pubkey(keyid, homedir)
raw_keys = [raw_key] + list(raw_key.pop("subkeys", {}).values())
keyids = []

for key in raw_keys:
if key["keyid"] == keyid:
# TODO: Raise here if key is invalid (expired, revoked, incapable)
public_key = GPGKey._from_legacy_dict( # pylint: disable=protected-access
key
)
break
keyids.append(key["keyid"])

else:
raise KeyNotFoundError(
f"No exact match found for passed keyid"
f" {keyid}, found: {keyids}."
)
)

return (uri, public_key)

Expand All @@ -310,7 +214,15 @@ def sign(self, payload: bytes) -> Signature:
Returns:
Signature.
"""
return self._sig_from_legacy_dict(
gpg.create_signature(payload, self.public_key.keyid, self.homedir)
raw_sig = gpg.create_signature(
payload, self.public_key.keyid, self.homedir
)
if raw_sig["keyid"] != self.public_key.keyid:
raise ValueError(
f"The signing key {raw_sig['keyid']} does not"
f" match the attached public key {self.public_key.keyid}."
)

return self._sig_from_legacy_dict(raw_sig)
1 change: 0 additions & 1 deletion tests/check_public_interfaces_gpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
export_pubkeys,
verify_signature,
)

from securesystemslib.signer import GPGKey, GPGSigner, Signer


Expand Down
Loading

0 comments on commit 3ccdecc

Please sign in to comment.