diff --git a/plugins/module_utils/acme/backend_cryptography.py b/plugins/module_utils/acme/backend_cryptography.py index 6d9509954..77366ca21 100644 --- a/plugins/module_utils/acme/backend_cryptography.py +++ b/plugins/module_utils/acme/backend_cryptography.py @@ -14,18 +14,30 @@ import os import sys -from ansible.module_utils._text import to_bytes +from ansible.module_utils._text import to_bytes, to_native from ansible_collections.community.crypto.plugins.module_utils.acme.backends import ( CryptoBackend, ) +from ansible_collections.community.crypto.plugins.module_utils.acme.certificates import ( + ChainMatcher, +) + from ansible_collections.community.crypto.plugins.module_utils.acme.errors import BackendException from ansible_collections.community.crypto.plugins.module_utils.acme.io import read_file from ansible_collections.community.crypto.plugins.module_utils.acme.utils import nopad_b64 +from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( + parse_name_field, +) + +from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( + cryptography_name_to_oid, +) + try: import cryptography import cryptography.hazmat.backends @@ -82,6 +94,87 @@ def _pad_hex(n, digits): return h +class CryptographyChainMatcher(ChainMatcher): + @staticmethod + def _parse_key_identifier(key_identifier, name, criterium_idx, module): + if key_identifier: + try: + return binascii.unhexlify(key_identifier.replace(':', '')) + except Exception: + if criterium_idx is None: + module.warn('Criterium has invalid {0} value. Ignoring criterium.'.format(name)) + else: + module.warn('Criterium {0} in select_chain has invalid {1} value. ' + 'Ignoring criterium.'.format(criterium_idx, name)) + return None + + def __init__(self, criterium, module): + self.criterium = criterium + self.test_certificates = criterium.test_certificates + self.subject = [] + self.issuer = [] + if criterium.subject: + self.subject = [ + (cryptography_name_to_oid(k), to_native(v)) for k, v in parse_name_field(criterium.subject) + ] + if criterium.issuer: + self.issuer = [ + (cryptography_name_to_oid(k), to_native(v)) for k, v in parse_name_field(criterium.issuer) + ] + self.subject_key_identifier = CryptographyChainMatcher._parse_key_identifier( + criterium.subject_key_identifier, 'subject_key_identifier', criterium.index, module) + self.authority_key_identifier = CryptographyChainMatcher._parse_key_identifier( + criterium.authority_key_identifier, 'authority_key_identifier', criterium.index, module) + + def _match_subject(self, x509_subject, match_subject): + for oid, value in match_subject: + found = False + for attribute in x509_subject: + if attribute.oid == oid and value == to_native(attribute.value): + found = True + break + if not found: + return False + return True + + def match(self, certificate): + ''' + Check whether an alternate chain matches the specified criterium. + ''' + chain = certificate.chain + if self.test_certificates == 'last': + chain = chain[-1:] + elif self.test_certificates == 'first': + chain = chain[:1] + for cert in chain: + try: + x509 = cryptography.x509.load_pem_x509_certificate(to_bytes(cert), cryptography.hazmat.backends.default_backend()) + matches = True + if not self._match_subject(x509.subject, self.subject): + matches = False + if not self._match_subject(x509.issuer, self.issuer): + matches = False + if self.subject_key_identifier: + try: + ext = x509.extensions.get_extension_for_class(cryptography.x509.SubjectKeyIdentifier) + if self.subject_key_identifier != ext.value.digest: + matches = False + except cryptography.x509.ExtensionNotFound: + matches = False + if self.authority_key_identifier: + try: + ext = x509.extensions.get_extension_for_class(cryptography.x509.AuthorityKeyIdentifier) + if self.authority_key_identifier != ext.value.key_identifier: + matches = False + except cryptography.x509.ExtensionNotFound: + matches = False + if matches: + return True + except Exception as e: + self.module.warn('Error while loading certificate {0}: {1}'.format(cert, e)) + return False + + class CryptographyBackend(CryptoBackend): def __init__(self, module): super(CryptographyBackend, self).__init__(module) @@ -268,3 +361,9 @@ def get_cert_days(self, cert_filename=None, cert_content=None, now=None): if now is None: now = datetime.datetime.now() return (cert.not_valid_after - now).days + + def create_chain_matcher(self, criterium): + ''' + Given a Criterium object, creates a ChainMatcher object. + ''' + return CryptographyChainMatcher(criterium, self.module) diff --git a/plugins/module_utils/acme/backend_openssl_cli.py b/plugins/module_utils/acme/backend_openssl_cli.py index 724664bb2..fdbe6cb5b 100644 --- a/plugins/module_utils/acme/backend_openssl_cli.py +++ b/plugins/module_utils/acme/backend_openssl_cli.py @@ -22,7 +22,9 @@ CryptoBackend, ) -from ansible_collections.community.crypto.plugins.module_utils.acme.errors import BackendException +from ansible_collections.community.crypto.plugins.module_utils.acme.errors import ( + BackendException, +) from ansible_collections.community.crypto.plugins.module_utils.acme.utils import nopad_b64 @@ -285,3 +287,9 @@ def get_cert_days(self, cert_filename=None, cert_content=None, now=None): if now is None: now = datetime.datetime.now() return (not_after - now).days + + def create_chain_matcher(self, criterium): + ''' + Given a Criterium object, creates a ChainMatcher object. + ''' + raise BackendException('Alternate chain matching can only be used with the "cryptography" backend.') diff --git a/plugins/module_utils/acme/backends.py b/plugins/module_utils/acme/backends.py index 5e959ea09..afd364c35 100644 --- a/plugins/module_utils/acme/backends.py +++ b/plugins/module_utils/acme/backends.py @@ -50,3 +50,9 @@ def get_cert_days(self, cert_filename=None, cert_content=None, now=None): If now is not specified, datetime.datetime.now() is used. ''' + + @abc.abstractmethod + def create_chain_matcher(self, criterium): + ''' + Given a Criterium object, creates a ChainMatcher object. + ''' diff --git a/plugins/module_utils/acme/certificates.py b/plugins/module_utils/acme/certificates.py index ead448947..bafe87226 100644 --- a/plugins/module_utils/acme/certificates.py +++ b/plugins/module_utils/acme/certificates.py @@ -8,9 +8,9 @@ __metaclass__ = type -import binascii +import abc -from ansible.module_utils._text import to_bytes, to_native +from ansible.module_utils import six from ansible_collections.community.crypto.plugins.module_utils.acme.errors import ( ModuleFailException, @@ -22,25 +22,10 @@ process_links, ) -from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( - parse_name_field, -) - -from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( - cryptography_name_to_oid, -) - from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import ( split_pem_list, ) -try: - import cryptography - import cryptography.hazmat.backends - import cryptography.x509 -except ImportError: - pass - class CertificateChain(object): ''' @@ -108,85 +93,13 @@ def __init__(self, criterium, index=None): self.authority_key_identifier = criterium['authority_key_identifier'] +@six.add_metaclass(abc.ABCMeta) class ChainMatcher(object): - @staticmethod - def _parse_key_identifier(key_identifier, name, criterium_idx, client): - if key_identifier: - try: - return binascii.unhexlify(key_identifier.replace(':', '')) - except Exception: - if criterium_idx is None: - client.module.warn('Criterium has invalid {0} value. Ignoring criterium.'.format(name)) - else: - client.module.warn('Criterium {0} in select_chain has invalid {1} value. ' - 'Ignoring criterium.'.format(criterium_idx, name)) - return None - - def __init__(self, criterium, client): - self.criterium = criterium - self.test_certificates = criterium.test_certificates - self.subject = [] - self.issuer = [] - if criterium.subject: - self.subject = [ - (cryptography_name_to_oid(k), to_native(v)) for k, v in parse_name_field(criterium.subject) - ] - if criterium.issuer: - self.issuer = [ - (cryptography_name_to_oid(k), to_native(v)) for k, v in parse_name_field(criterium.issuer) - ] - self.subject_key_identifier = ChainMatcher._parse_key_identifier( - criterium.subject_key_identifier, 'subject_key_identifier', criterium.index, client) - self.authority_key_identifier = ChainMatcher._parse_key_identifier( - criterium.authority_key_identifier, 'authority_key_identifier', criterium.index, client) - - def _match_subject(self, x509_subject, match_subject): - for oid, value in match_subject: - found = False - for attribute in x509_subject: - if attribute.oid == oid and value == to_native(attribute.value): - found = True - break - if not found: - return False - return True - + @abc.abstractmethod def match(self, certificate): ''' - Check whether an alternate chain matches the specified criterium. + Check whether a certificate chain (CertificateChain instance) matches. ''' - chain = certificate.chain - if self.test_certificates == 'last': - chain = chain[-1:] - elif self.test_certificates == 'first': - chain = chain[:1] - for cert in chain: - try: - x509 = cryptography.x509.load_pem_x509_certificate(to_bytes(cert), cryptography.hazmat.backends.default_backend()) - matches = True - if not self._match_subject(x509.subject, self.subject): - matches = False - if not self._match_subject(x509.issuer, self.issuer): - matches = False - if self.subject_key_identifier: - try: - ext = x509.extensions.get_extension_for_class(cryptography.x509.SubjectKeyIdentifier) - if self.subject_key_identifier != ext.value.digest: - matches = False - except cryptography.x509.ExtensionNotFound: - matches = False - if self.authority_key_identifier: - try: - ext = x509.extensions.get_extension_for_class(cryptography.x509.AuthorityKeyIdentifier) - if self.authority_key_identifier != ext.value.key_identifier: - matches = False - except cryptography.x509.ExtensionNotFound: - matches = False - if matches: - return True - except Exception as e: - self.module.warn('Error while loading certificate {0}: {1}'.format(cert, e)) - return False def retrieve_acme_v1_certificate(client, csr_der): diff --git a/plugins/modules/acme_certificate.py b/plugins/modules/acme_certificate.py index 8eb9e7c73..0eb16efbb 100644 --- a/plugins/modules/acme_certificate.py +++ b/plugins/modules/acme_certificate.py @@ -522,8 +522,6 @@ ACMEAccount, ) -from ansible_collections.community.crypto.plugins.module_utils.acme.backend_cryptography import CryptographyBackend - from ansible_collections.community.crypto.plugins.module_utils.acme.challenges import ( combine_identifier, split_identifier, @@ -533,7 +531,6 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.certificates import ( retrieve_acme_v1_certificate, CertificateChain, - ChainMatcher, Criterium, ) @@ -579,6 +576,13 @@ def __init__(self, module, backend): self.order = None self.order_uri = self.data.get('order_uri') if self.data else None self.all_chains = None + self.select_chain_matcher = [] + + if self.module.params['select_chain']: + for criterium_idx, criterium in enumerate(self.module.params['select_chain']): + self.select_chain_matcher.append( + self.client.backend.create_chain_matcher( + Criterium(criterium, index=criterium_idx))) # Make sure account exists modify_account = module.params['modify_account'] @@ -716,8 +720,7 @@ def download_alternate_chains(self, cert): return alternate_chains def find_matching_chain(self, chains): - for criterium_idx, criterium in enumerate(self.module.params['select_chain']): - matcher = ChainMatcher(Criterium(criterium, index=criterium_idx), self.client) + for criterium_idx, matcher in enumerate(self.select_chain_matcher): for chain in chains: if matcher.match(chain): self.module.debug('Found matching chain for criterium {0}'.format(criterium_idx)) @@ -743,7 +746,7 @@ def get_certificate(self): else: self.order.finalize(self.client, pem_to_der(self.csr, self.csr_content)) cert = CertificateChain.download(self.client, self.order.certificate_uri) - if self.module.params['retrieve_all_alternates'] or self.module.params['select_chain']: + if self.module.params['retrieve_all_alternates'] or self.select_chain_matcher: # Retrieve alternate chains alternate_chains = self.download_alternate_chains(cert) @@ -754,7 +757,7 @@ def get_certificate(self): self.all_chains.append(alt_chain.to_json()) # Try to select alternate chain depending on criteria - if self.module.params['select_chain']: + if self.select_chain_matcher: matching_chain = self.find_matching_chain([cert] + alternate_chains) if matching_chain: cert = matching_chain @@ -832,8 +835,6 @@ def main(): supports_check_mode=True, ) backend = create_backend(module, False) - if module.params['select_chain'] and not isinstance(backend, CryptographyBackend): - module.fail_json(msg="The 'select_chain' can only be used with the 'cryptography' backend.") try: if module.params.get('dest'):