Skip to content

Commit

Permalink
Make chain matcher creation part of backend.
Browse files Browse the repository at this point in the history
  • Loading branch information
felixfontein committed Mar 14, 2021
1 parent 0cc5738 commit ad5dbbb
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 103 deletions.
101 changes: 100 additions & 1 deletion plugins/module_utils/acme/backend_cryptography.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,30 @@
import os
import sys

from ansible.module_utils._text import to_bytes
from ansible.module_utils._text import to_bytes, to_native

from ansible_collections.community.crypto.plugins.module_utils.acme.backends import (
CryptoBackend,
)

from ansible_collections.community.crypto.plugins.module_utils.acme.certificates import (
ChainMatcher,
)

from ansible_collections.community.crypto.plugins.module_utils.acme.errors import BackendException

from ansible_collections.community.crypto.plugins.module_utils.acme.io import read_file

from ansible_collections.community.crypto.plugins.module_utils.acme.utils import nopad_b64

from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
parse_name_field,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
cryptography_name_to_oid,
)

try:
import cryptography
import cryptography.hazmat.backends
Expand Down Expand Up @@ -82,6 +94,87 @@ def _pad_hex(n, digits):
return h


class CryptographyChainMatcher(ChainMatcher):
@staticmethod
def _parse_key_identifier(key_identifier, name, criterium_idx, module):
if key_identifier:
try:
return binascii.unhexlify(key_identifier.replace(':', ''))
except Exception:
if criterium_idx is None:
module.warn('Criterium has invalid {0} value. Ignoring criterium.'.format(name))
else:
module.warn('Criterium {0} in select_chain has invalid {1} value. '
'Ignoring criterium.'.format(criterium_idx, name))
return None

def __init__(self, criterium, module):
self.criterium = criterium
self.test_certificates = criterium.test_certificates
self.subject = []
self.issuer = []
if criterium.subject:
self.subject = [
(cryptography_name_to_oid(k), to_native(v)) for k, v in parse_name_field(criterium.subject)
]
if criterium.issuer:
self.issuer = [
(cryptography_name_to_oid(k), to_native(v)) for k, v in parse_name_field(criterium.issuer)
]
self.subject_key_identifier = CryptographyChainMatcher._parse_key_identifier(
criterium.subject_key_identifier, 'subject_key_identifier', criterium.index, module)
self.authority_key_identifier = CryptographyChainMatcher._parse_key_identifier(
criterium.authority_key_identifier, 'authority_key_identifier', criterium.index, module)

def _match_subject(self, x509_subject, match_subject):
for oid, value in match_subject:
found = False
for attribute in x509_subject:
if attribute.oid == oid and value == to_native(attribute.value):
found = True
break
if not found:
return False
return True

def match(self, certificate):
'''
Check whether an alternate chain matches the specified criterium.
'''
chain = certificate.chain
if self.test_certificates == 'last':
chain = chain[-1:]
elif self.test_certificates == 'first':
chain = chain[:1]
for cert in chain:
try:
x509 = cryptography.x509.load_pem_x509_certificate(to_bytes(cert), cryptography.hazmat.backends.default_backend())
matches = True
if not self._match_subject(x509.subject, self.subject):
matches = False
if not self._match_subject(x509.issuer, self.issuer):
matches = False
if self.subject_key_identifier:
try:
ext = x509.extensions.get_extension_for_class(cryptography.x509.SubjectKeyIdentifier)
if self.subject_key_identifier != ext.value.digest:
matches = False
except cryptography.x509.ExtensionNotFound:
matches = False
if self.authority_key_identifier:
try:
ext = x509.extensions.get_extension_for_class(cryptography.x509.AuthorityKeyIdentifier)
if self.authority_key_identifier != ext.value.key_identifier:
matches = False
except cryptography.x509.ExtensionNotFound:
matches = False
if matches:
return True
except Exception as e:
self.module.warn('Error while loading certificate {0}: {1}'.format(cert, e))
return False


class CryptographyBackend(CryptoBackend):
def __init__(self, module):
super(CryptographyBackend, self).__init__(module)
Expand Down Expand Up @@ -268,3 +361,9 @@ def get_cert_days(self, cert_filename=None, cert_content=None, now=None):
if now is None:
now = datetime.datetime.now()
return (cert.not_valid_after - now).days

def create_chain_matcher(self, criterium):
'''
Given a Criterium object, creates a ChainMatcher object.
'''
return CryptographyChainMatcher(criterium, self.module)
10 changes: 9 additions & 1 deletion plugins/module_utils/acme/backend_openssl_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
CryptoBackend,
)

from ansible_collections.community.crypto.plugins.module_utils.acme.errors import BackendException
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
BackendException,
)

from ansible_collections.community.crypto.plugins.module_utils.acme.utils import nopad_b64

Expand Down Expand Up @@ -285,3 +287,9 @@ def get_cert_days(self, cert_filename=None, cert_content=None, now=None):
if now is None:
now = datetime.datetime.now()
return (not_after - now).days

def create_chain_matcher(self, criterium):
'''
Given a Criterium object, creates a ChainMatcher object.
'''
raise BackendException('Alternate chain matching can only be used with the "cryptography" backend.')
6 changes: 6 additions & 0 deletions plugins/module_utils/acme/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,9 @@ def get_cert_days(self, cert_filename=None, cert_content=None, now=None):
If now is not specified, datetime.datetime.now() is used.
'''

@abc.abstractmethod
def create_chain_matcher(self, criterium):
'''
Given a Criterium object, creates a ChainMatcher object.
'''
97 changes: 5 additions & 92 deletions plugins/module_utils/acme/certificates.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
__metaclass__ = type


import binascii
import abc

from ansible.module_utils._text import to_bytes, to_native
from ansible.module_utils import six

from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
ModuleFailException,
Expand All @@ -22,25 +22,10 @@
process_links,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
parse_name_field,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
cryptography_name_to_oid,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import (
split_pem_list,
)

try:
import cryptography
import cryptography.hazmat.backends
import cryptography.x509
except ImportError:
pass


class CertificateChain(object):
'''
Expand Down Expand Up @@ -108,85 +93,13 @@ def __init__(self, criterium, index=None):
self.authority_key_identifier = criterium['authority_key_identifier']


@six.add_metaclass(abc.ABCMeta)
class ChainMatcher(object):
@staticmethod
def _parse_key_identifier(key_identifier, name, criterium_idx, client):
if key_identifier:
try:
return binascii.unhexlify(key_identifier.replace(':', ''))
except Exception:
if criterium_idx is None:
client.module.warn('Criterium has invalid {0} value. Ignoring criterium.'.format(name))
else:
client.module.warn('Criterium {0} in select_chain has invalid {1} value. '
'Ignoring criterium.'.format(criterium_idx, name))
return None

def __init__(self, criterium, client):
self.criterium = criterium
self.test_certificates = criterium.test_certificates
self.subject = []
self.issuer = []
if criterium.subject:
self.subject = [
(cryptography_name_to_oid(k), to_native(v)) for k, v in parse_name_field(criterium.subject)
]
if criterium.issuer:
self.issuer = [
(cryptography_name_to_oid(k), to_native(v)) for k, v in parse_name_field(criterium.issuer)
]
self.subject_key_identifier = ChainMatcher._parse_key_identifier(
criterium.subject_key_identifier, 'subject_key_identifier', criterium.index, client)
self.authority_key_identifier = ChainMatcher._parse_key_identifier(
criterium.authority_key_identifier, 'authority_key_identifier', criterium.index, client)

def _match_subject(self, x509_subject, match_subject):
for oid, value in match_subject:
found = False
for attribute in x509_subject:
if attribute.oid == oid and value == to_native(attribute.value):
found = True
break
if not found:
return False
return True

@abc.abstractmethod
def match(self, certificate):
'''
Check whether an alternate chain matches the specified criterium.
Check whether a certificate chain (CertificateChain instance) matches.
'''
chain = certificate.chain
if self.test_certificates == 'last':
chain = chain[-1:]
elif self.test_certificates == 'first':
chain = chain[:1]
for cert in chain:
try:
x509 = cryptography.x509.load_pem_x509_certificate(to_bytes(cert), cryptography.hazmat.backends.default_backend())
matches = True
if not self._match_subject(x509.subject, self.subject):
matches = False
if not self._match_subject(x509.issuer, self.issuer):
matches = False
if self.subject_key_identifier:
try:
ext = x509.extensions.get_extension_for_class(cryptography.x509.SubjectKeyIdentifier)
if self.subject_key_identifier != ext.value.digest:
matches = False
except cryptography.x509.ExtensionNotFound:
matches = False
if self.authority_key_identifier:
try:
ext = x509.extensions.get_extension_for_class(cryptography.x509.AuthorityKeyIdentifier)
if self.authority_key_identifier != ext.value.key_identifier:
matches = False
except cryptography.x509.ExtensionNotFound:
matches = False
if matches:
return True
except Exception as e:
self.module.warn('Error while loading certificate {0}: {1}'.format(cert, e))
return False


def retrieve_acme_v1_certificate(client, csr_der):
Expand Down
19 changes: 10 additions & 9 deletions plugins/modules/acme_certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,6 @@
ACMEAccount,
)

from ansible_collections.community.crypto.plugins.module_utils.acme.backend_cryptography import CryptographyBackend

from ansible_collections.community.crypto.plugins.module_utils.acme.challenges import (
combine_identifier,
split_identifier,
Expand All @@ -533,7 +531,6 @@
from ansible_collections.community.crypto.plugins.module_utils.acme.certificates import (
retrieve_acme_v1_certificate,
CertificateChain,
ChainMatcher,
Criterium,
)

Expand Down Expand Up @@ -579,6 +576,13 @@ def __init__(self, module, backend):
self.order = None
self.order_uri = self.data.get('order_uri') if self.data else None
self.all_chains = None
self.select_chain_matcher = []

if self.module.params['select_chain']:
for criterium_idx, criterium in enumerate(self.module.params['select_chain']):
self.select_chain_matcher.append(
self.client.backend.create_chain_matcher(
Criterium(criterium, index=criterium_idx)))

# Make sure account exists
modify_account = module.params['modify_account']
Expand Down Expand Up @@ -716,8 +720,7 @@ def download_alternate_chains(self, cert):
return alternate_chains

def find_matching_chain(self, chains):
for criterium_idx, criterium in enumerate(self.module.params['select_chain']):
matcher = ChainMatcher(Criterium(criterium, index=criterium_idx), self.client)
for criterium_idx, matcher in enumerate(self.select_chain_matcher):
for chain in chains:
if matcher.match(chain):
self.module.debug('Found matching chain for criterium {0}'.format(criterium_idx))
Expand All @@ -743,7 +746,7 @@ def get_certificate(self):
else:
self.order.finalize(self.client, pem_to_der(self.csr, self.csr_content))
cert = CertificateChain.download(self.client, self.order.certificate_uri)
if self.module.params['retrieve_all_alternates'] or self.module.params['select_chain']:
if self.module.params['retrieve_all_alternates'] or self.select_chain_matcher:
# Retrieve alternate chains
alternate_chains = self.download_alternate_chains(cert)

Expand All @@ -754,7 +757,7 @@ def get_certificate(self):
self.all_chains.append(alt_chain.to_json())

# Try to select alternate chain depending on criteria
if self.module.params['select_chain']:
if self.select_chain_matcher:
matching_chain = self.find_matching_chain([cert] + alternate_chains)
if matching_chain:
cert = matching_chain
Expand Down Expand Up @@ -832,8 +835,6 @@ def main():
supports_check_mode=True,
)
backend = create_backend(module, False)
if module.params['select_chain'] and not isinstance(backend, CryptographyBackend):
module.fail_json(msg="The 'select_chain' can only be used with the 'cryptography' backend.")

try:
if module.params.get('dest'):
Expand Down

0 comments on commit ad5dbbb

Please sign in to comment.