Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

openssh_keypair: Adding passphrase parameter #225

Merged
2 changes: 2 additions & 0 deletions changelogs/fragments/225-openssh-keypair-passphrase.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
minor_changes:
- openssh_keypair - Added ``passphrase`` paramter to openssh_keypair for encrypting/decrtypting OpenSSH private keys (https://github.com/ansible-collections/community.crypto/pull/225).
91 changes: 61 additions & 30 deletions plugins/module_utils/openssh/cryptography_openssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,23 @@
__metaclass__ = type

from base64 import b64encode, b64decode
from distutils.version import LooseVersion
from getpass import getuser
from socket import gethostname

from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
HAS_CRYPTOGRAPHY,
CRYPTOGRAPHY_HAS_ED25519,
)

try:
import cryptography as c
from cryptography.exceptions import InvalidSignature, UnsupportedAlgorithm
from cryptography.hazmat.backends.openssl import backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa, padding
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey
except ImportError:
pass

if HAS_CRYPTOGRAPHY and CRYPTOGRAPHY_HAS_ED25519:
if LooseVersion(c.__version__) >= LooseVersion("3.0"):
HAS_OPENSSH_PRIVATE_FORMAT = True
else:
HAS_OPENSSH_PRIVATE_FORMAT = False

HAS_OPENSSH_SUPPORT = True

_ALGORITHM_PARAMETERS = {
Expand Down Expand Up @@ -76,7 +75,8 @@
}
}
}
else:
except ImportError:
HAS_OPENSSH_PRIVATE_FORMAT = False
HAS_OPENSSH_SUPPORT = False
_ALGORITHM_PARAMETERS = {}

Expand Down Expand Up @@ -192,21 +192,26 @@ def generate(cls, keytype='rsa', size=None, passphrase=None):
)

@classmethod
def load(cls, path, passphrase=None, key_format='PEM'):
def load(cls, path, passphrase=None, private_key_format='PEM', public_key_format='PEM', no_public_key=False):
"""Returns an Asymmetric_Keypair object loaded from the supplied file path

:path: A path to an existing private key to be loaded
:passphrase: Secret of type bytes used to decrypt the private key being loaded
:key_format: Format of key files to be loaded
:private_key_format: Format of private key to be loaded
:public_key_format: Format of public key to be loaded
:no_public_key: Set 'True' to only load a private key and automatically populate the matching public key
"""

if passphrase:
encryption_algorithm = get_encryption_algorithm(passphrase)
else:
encryption_algorithm = serialization.NoEncryption()

privatekey = load_privatekey(path, passphrase, key_format)
publickey = load_publickey(path + '.pub', key_format)
privatekey = load_privatekey(path, passphrase, private_key_format)
if no_public_key:
publickey = privatekey.public_key()
else:
publickey = load_publickey(path + '.pub', public_key_format)

# Ed25519 keys are always of size 256 and do not have a key_size attribute
if isinstance(privatekey, Ed25519PrivateKey):
Expand Down Expand Up @@ -352,11 +357,11 @@ def generate(cls, keytype='rsa', size=None, passphrase=None, comment=None):
:comment: Comment for a newly generated OpenSSH public key
"""

if not comment:
if comment is None:
comment = "%s@%s" % (getuser(), gethostname())

asym_keypair = Asymmetric_Keypair.generate(keytype, size, passphrase)
openssh_privatekey = cls.encode_openssh_privatekey(asym_keypair)
openssh_privatekey = cls.encode_openssh_privatekey(asym_keypair, 'SSH')
openssh_publickey = cls.encode_openssh_publickey(asym_keypair, comment)
fingerprint = calculate_fingerprint(openssh_publickey)

Expand All @@ -365,20 +370,21 @@ def generate(cls, keytype='rsa', size=None, passphrase=None, comment=None):
openssh_privatekey=openssh_privatekey,
openssh_publickey=openssh_publickey,
fingerprint=fingerprint,
comment=comment
comment=comment,
)

@classmethod
def load(cls, path, passphrase=None):
def load(cls, path, passphrase=None, no_public_key=False):
"""Returns an Openssh_Keypair object loaded from the supplied file path

:path: A path to an existing private key to be loaded
:passphrase: Secret used to decrypt the private key being loaded
:no_public_key: Set 'True' to only load a private key and automatically populate the matching public key
"""

comment = extract_comment(path + '.pub')
asym_keypair = Asymmetric_Keypair.load(path, passphrase, 'SSH')
openssh_privatekey = cls.encode_openssh_privatekey(asym_keypair)
asym_keypair = Asymmetric_Keypair.load(path, passphrase, 'SSH', 'SSH', no_public_key)
openssh_privatekey = cls.encode_openssh_privatekey(asym_keypair, 'SSH')
openssh_publickey = cls.encode_openssh_publickey(asym_keypair, comment)
fingerprint = calculate_fingerprint(openssh_publickey)

Expand All @@ -387,21 +393,31 @@ def load(cls, path, passphrase=None):
openssh_privatekey=openssh_privatekey,
openssh_publickey=openssh_publickey,
fingerprint=fingerprint,
comment=comment
comment=comment,
)

@staticmethod
def encode_openssh_privatekey(asym_keypair):
def encode_openssh_privatekey(asym_keypair, key_format):
"""Returns an OpenSSH encoded private key for a given keypair

:asym_keypair: Asymmetric_Keypair from the private key is extracted
:key_format: Format of the encoded private key.
"""

# OpenSSH formatted private keys are not available in Cryptography <3.0
try:
privatekey_format = serialization.PrivateFormat.OpenSSH
except AttributeError:
if key_format == 'SSH':
# Default to PEM format if SSH not available
if not HAS_OPENSSH_PRIVATE_FORMAT:
privatekey_format = serialization.PrivateFormat.PKCS8
else:
privatekey_format = serialization.PrivateFormat.OpenSSH
elif key_format == 'PKCS8':
privatekey_format = serialization.PrivateFormat.PKCS8
elif key_format == 'PKCS1':
if asym_keypair.key_type == 'ed25519':
raise InvalidKeyFormatError("ed25519 keys cannot be represented in PKCS1 format")
privatekey_format = serialization.PrivateFormat.TraditionalOpenSSL
else:
raise InvalidKeyFormatError("The accepted private key formats are SSH, PKCS8, and PKCS1")

encoded_privatekey = asym_keypair.private_key.private_bytes(
encoding=serialization.Encoding.PEM,
Expand All @@ -425,7 +441,7 @@ def encode_openssh_publickey(asym_keypair, comment):

validate_comment(comment)

encoded_publickey += (" %s" % comment).encode(encoding=_TEXT_ENCODING)
encoded_publickey += (" %s" % comment).encode(encoding=_TEXT_ENCODING) if comment else b''

return encoded_publickey

Expand Down Expand Up @@ -502,7 +518,7 @@ def comment(self, comment):
validate_comment(comment)

self.__comment = comment
encoded_comment = (" %s" % self.__comment).encode(encoding=_TEXT_ENCODING)
encoded_comment = (" %s" % self.__comment).encode(encoding=_TEXT_ENCODING) if self.__comment else b''
self.__openssh_publickey = b' '.join(self.__openssh_publickey.split(b' ', 2)[:2]) + encoded_comment
return self.__openssh_publickey

Expand All @@ -513,7 +529,7 @@ def update_passphrase(self, passphrase):
"""

self.__asym_keypair.update_passphrase(passphrase)
self.__openssh_privatekey = OpenSSH_Keypair.encode_openssh_privatekey(self.__asym_keypair)
self.__openssh_privatekey = OpenSSH_Keypair.encode_openssh_privatekey(self.__asym_keypair, 'SSH')


def load_privatekey(path, passphrase, key_format):
Expand Down Expand Up @@ -549,7 +565,22 @@ def load_privatekey(path, passphrase, key_format):
)

except ValueError as e:
raise InvalidPrivateKeyFileError(e)
# Revert to PEM if key could not be loaded in SSH format
if key_format == 'SSH':
try:
privatekey = privatekey_loaders['PEM'](
data=content,
password=passphrase,
backend=backend,
)
except ValueError as e:
raise InvalidPrivateKeyFileError(e)
except TypeError as e:
raise InvalidPassphraseError(e)
except UnsupportedAlgorithm as e:
raise InvalidAlgorithmError(e)
else:
raise InvalidPrivateKeyFileError(e)
except TypeError as e:
raise InvalidPassphraseError(e)
except UnsupportedAlgorithm as e:
Expand Down Expand Up @@ -645,4 +676,4 @@ def calculate_fingerprint(openssh_publickey):
decoded_pubkey = b64decode(openssh_publickey.split(b' ')[1])
digest.update(decoded_pubkey)

return b64encode(digest.finalize()).decode(encoding=_TEXT_ENCODING).rstrip('=')
return 'SHA256:%s' % b64encode(digest.finalize()).decode(encoding=_TEXT_ENCODING).rstrip('=')
Loading