Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

openssl_csr: add support for name constraints extension #92

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelogs/fragments/92-ip-networks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
bugfixes:
- "openssl_*, x509_* modules - fix handling of general names which refer to IP networks and not IP addresses (https://github.com/ansible-collections/community.crypto/pull/92)."
3 changes: 3 additions & 0 deletions changelogs/fragments/92-openssl_csr-name-constraints.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
minor_changes:
- "openssl_csr - add support for name constraints extension (https://github.com/ansible-collections/community.crypto/issues/46)."
- "openssl_csr_info - add support for name constraints extension (https://github.com/ansible-collections/community.crypto/issues/46)."
7 changes: 6 additions & 1 deletion plugins/module_utils/crypto/cryptography_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,10 @@ def cryptography_get_name(name):
if name.startswith('DNS:'):
return x509.DNSName(to_text(name[4:]))
if name.startswith('IP:'):
return x509.IPAddress(ipaddress.ip_address(to_text(name[3:])))
address = to_text(name[3:])
if '/' in address:
return x509.IPAddress(ipaddress.ip_network(address))
return x509.IPAddress(ipaddress.ip_address(address))
if name.startswith('email:'):
return x509.RFC822Name(to_text(name[6:]))
if name.startswith('URI:'):
Expand Down Expand Up @@ -261,6 +264,8 @@ def cryptography_decode_name(name):
if isinstance(name, x509.DNSName):
return 'DNS:{0}'.format(name.value)
if isinstance(name, x509.IPAddress):
if isinstance(name.value, (ipaddress.IPv4Network, ipaddress.IPv6Network)):
return 'IP:{0}/{1}'.format(name.value.network_address.compressed, name.value.prefixlen)
return 'IP:{0}'.format(name.value.compressed)
if isinstance(name, x509.RFC822Name):
return 'email:{0}'.format(name.value)
Expand Down
39 changes: 36 additions & 3 deletions plugins/module_utils/crypto/pyopenssl_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@

from ._obj2txt import obj2txt

from .basic import (
OpenSSLObjectError,
)


def pyopenssl_normalize_name(name, short=False):
nid = OpenSSL._util.lib.OBJ_txt2nid(to_bytes(name))
Expand All @@ -56,9 +60,13 @@ def pyopenssl_normalize_name_attribute(san):
if san.startswith('IP Address:'):
san = 'IP:' + san[len('IP Address:'):]
if san.startswith('IP:'):
ip = compat_ipaddress.ip_address(san[3:])
san = 'IP:{0}'.format(ip.compressed)

address = san[3:]
if '/' in address:
ip = compat_ipaddress.ip_network(address)
san = 'IP:{0}/{1}'.format(ip.network_address.compressed, ip.prefixlen)
else:
ip = compat_ipaddress.ip_address(address)
san = 'IP:{0}'.format(ip.compressed)
if san.startswith('Registered ID:'):
san = 'RID:' + san[len('Registered ID:'):]
# Some versions of OpenSSL apparently forgot the colon. Happens in CI with Ubuntu 16.04 and FreeBSD 11.1
Expand Down Expand Up @@ -119,3 +127,28 @@ def pyopenssl_get_extensions_from_csr(csr):
# similarly to how cryptography does it.
result[oid] = entry
return result


def pyopenssl_parse_name_constraints(name_constraints_extension):
lines = to_text(name_constraints_extension, errors='surrogate_or_strict').splitlines()
exclude = None
excluded = []
permitted = []
for line in lines:
if line.startswith(' ') or line.startswith('\t'):
name = pyopenssl_normalize_name_attribute(line.strip())
if exclude is True:
excluded.append(name)
elif exclude is False:
permitted.append(name)
else:
raise OpenSSLObjectError('Unexpected nameConstraint line: "{0}"'.format(line))
else:
line_lc = line.lower()
if line_lc.startswith('exclud'):
exclude = True
elif line_lc.startswith('includ') or line_lc.startswith('permitt'):
exclude = False
else:
raise OpenSSLObjectError('Cannot parse nameConstraint line: "{0}"'.format(line))
return permitted, excluded
99 changes: 95 additions & 4 deletions plugins/modules/openssl_csr.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,36 @@
aliases: [ ocspMustStaple ]
ocsp_must_staple_critical:
description:
- Should the OCSP Must Staple extension be considered as critical
- Should the OCSP Must Staple extension be considered as critical.
- Note that according to the RFC, this extension should not be marked
as critical, as old clients not knowing about OCSP Must Staple
are required to reject such certificates
(see U(https://tools.ietf.org/html/rfc7633#section-4)).
type: bool
aliases: [ ocspMustStaple_critical ]
name_constraints_permitted:
description:
- For CA certificates, this specifies a list of identifiers which describe
subtrees of names that this CA is allowed to issue certificates for.
- Values must be prefixed by their options. (i.e., C(email), C(URI), C(DNS), C(RID), C(IP), C(dirName),
C(otherName) and the ones specific to your CA).
type: list
elements: str
version_added: 1.1.0
name_constraints_excluded:
description:
- For CA certificates, this specifies a list of identifiers which describe
subtrees of names that this CA is *not* allowed to issue certificates for.
- Values must be prefixed by their options. (i.e., C(email), C(URI), C(DNS), C(RID), C(IP), C(dirName),
C(otherName) and the ones specific to your CA).
type: list
elements: str
version_added: 1.1.0
name_constraints_critical:
description:
- Should the Name Constraints extension be considered as critical.
type: bool
version_added: 1.1.0
select_crypto_backend:
description:
- Determines which crypto backend to use.
Expand Down Expand Up @@ -412,6 +435,20 @@
returned: changed or success
type: bool
sample: false
name_constraints_permitted:
description: List of permitted subtrees to sign certificates for.
returned: changed or success
type: list
elements: str
sample: ['email:.somedomain.com']
version_added: 1.1.0
name_constraints_excluded:
description: List of excluded subtrees the CA cannot sign certificates for.
returned: changed or success
type: list
elements: str
sample: ['email:.com']
version_added: 1.1.0
backup_file:
description: Name of backup file created.
returned: changed and if I(backup) is C(yes)
Expand Down Expand Up @@ -461,6 +498,7 @@

from ansible_collections.community.crypto.plugins.module_utils.crypto.pyopenssl_support import (
pyopenssl_normalize_name_attribute,
pyopenssl_parse_name_constraints,
)

MINIMAL_PYOPENSSL_VERSION = '0.15'
Expand Down Expand Up @@ -534,6 +572,9 @@ def __init__(self, module):
self.basicConstraints_critical = module.params['basic_constraints_critical']
self.ocspMustStaple = module.params['ocsp_must_staple']
self.ocspMustStaple_critical = module.params['ocsp_must_staple_critical']
self.name_constraints_permitted = module.params['name_constraints_permitted'] or []
self.name_constraints_excluded = module.params['name_constraints_excluded'] or []
self.name_constraints_critical = module.params['name_constraints_critical']
self.create_subject_key_identifier = module.params['create_subject_key_identifier']
self.subject_key_identifier = module.params['subject_key_identifier']
self.authority_key_identifier = module.params['authority_key_identifier']
Expand Down Expand Up @@ -637,7 +678,9 @@ def dump(self):
'extendedKeyUsage': self.extendedKeyUsage,
'basicConstraints': self.basicConstraints,
'ocspMustStaple': self.ocspMustStaple,
'changed': self.changed
'changed': self.changed,
'name_constraints_permitted': self.name_constraints_permitted,
'name_constraints_excluded': self.name_constraints_excluded,
}
if self.backup_file:
result['backup_file'] = self.backup_file
Expand Down Expand Up @@ -697,6 +740,13 @@ def _generate_csr(self):
usages = ', '.join(self.basicConstraints)
extensions.append(crypto.X509Extension(b"basicConstraints", self.basicConstraints_critical, usages.encode('ascii')))

if self.name_constraints_permitted or self.name_constraints_excluded:
usages = ', '.join(
['permitted;{0}'.format(name) for name in self.name_constraints_permitted] +
['excluded;{0}'.format(name) for name in self.name_constraints_excluded]
)
extensions.append(crypto.X509Extension(b"nameConstraints", self.name_constraints_critical, usages.encode('ascii')))

if self.ocspMustStaple:
extensions.append(crypto.X509Extension(OPENSSL_MUST_STAPLE_NAME, self.ocspMustStaple_critical, OPENSSL_MUST_STAPLE_VALUE))

Expand Down Expand Up @@ -773,6 +823,22 @@ def _check_extenededKeyUsage(extensions):
def _check_basicConstraints(extensions):
return _check_keyUsage_(extensions, b'basicConstraints', self.basicConstraints, self.basicConstraints_critical)

def _check_nameConstraints(extensions):
nc_ext = next((ext for ext in extensions if ext.get_short_name() == b'nameConstraints'), '')
permitted, excluded = pyopenssl_parse_name_constraints(nc_ext)
if self.name_constraints_permitted or self.name_constraints_excluded:
if set(permitted) != set([pyopenssl_normalize_name_attribute(to_text(name)) for name in self.name_constraints_permitted]):
return False
if set(excluded) != set([pyopenssl_normalize_name_attribute(to_text(name)) for name in self.name_constraints_excluded]):
return False
if nc_ext.get_critical() != self.name_constraints_critical:
return False
else:
if permitted or excluded:
return False

return True

def _check_ocspMustStaple(extensions):
oms_ext = [ext for ext in extensions if to_bytes(ext.get_short_name()) == OPENSSL_MUST_STAPLE_NAME and to_bytes(ext) == OPENSSL_MUST_STAPLE_VALUE]
if OpenSSL.SSL.OPENSSL_VERSION_NUMBER < 0x10100000:
Expand All @@ -787,7 +853,7 @@ def _check_extensions(csr):
extensions = csr.get_extensions()
return (_check_subjectAltName(extensions) and _check_keyUsage(extensions) and
_check_extenededKeyUsage(extensions) and _check_basicConstraints(extensions) and
_check_ocspMustStaple(extensions))
_check_ocspMustStaple(extensions) and _check_nameConstraints(extensions))

def _check_signature(csr):
try:
Expand Down Expand Up @@ -849,6 +915,15 @@ def _generate_csr(self):
critical=self.ocspMustStaple_critical
)

if self.name_constraints_permitted or self.name_constraints_excluded:
try:
csr = csr.add_extension(cryptography.x509.NameConstraints(
[cryptography_get_name(name) for name in self.name_constraints_permitted],
[cryptography_get_name(name) for name in self.name_constraints_excluded],
), critical=self.name_constraints_critical)
except TypeError as e:
raise OpenSSLObjectError('Error while parsing name constraint: {0}'.format(e))

if self.create_subject_key_identifier:
csr = csr.add_extension(
cryptography.x509.SubjectKeyIdentifier.from_public_key(self.privatekey.public_key()),
Expand Down Expand Up @@ -991,6 +1066,19 @@ def _check_ocspMustStaple(extensions):
else:
return tlsfeature_ext is None

def _check_nameConstraints(extensions):
current_nc_ext = _find_extension(extensions, cryptography.x509.NameConstraints)
current_nc_perm = [str(altname) for altname in current_nc_ext.value.permitted_subtrees] if current_nc_ext else []
current_nc_excl = [str(altname) for altname in current_nc_ext.value.excluded_subtrees] if current_nc_ext else []
nc_perm = [str(cryptography_get_name(altname)) for altname in self.name_constraints_permitted]
nc_excl = [str(cryptography_get_name(altname)) for altname in self.name_constraints_excluded]
if set(nc_perm) != set(current_nc_perm) or set(nc_excl) != set(current_nc_excl):
return False
if nc_perm or nc_excl:
if current_nc_ext.critical != self.name_constraints_critical:
return False
return True

def _check_subject_key_identifier(extensions):
ext = _find_extension(extensions, cryptography.x509.SubjectKeyIdentifier)
if self.create_subject_key_identifier or self.subject_key_identifier is not None:
Expand Down Expand Up @@ -1026,7 +1114,7 @@ def _check_extensions(csr):
return (_check_subjectAltName(extensions) and _check_keyUsage(extensions) and
_check_extenededKeyUsage(extensions) and _check_basicConstraints(extensions) and
_check_ocspMustStaple(extensions) and _check_subject_key_identifier(extensions) and
_check_authority_key_identifier(extensions))
_check_authority_key_identifier(extensions) and _check_nameConstraints(extensions))

def _check_signature(csr):
if not csr.is_signature_valid:
Expand Down Expand Up @@ -1081,6 +1169,9 @@ def main():
basic_constraints_critical=dict(type='bool', default=False, aliases=['basicConstraints_critical']),
ocsp_must_staple=dict(type='bool', default=False, aliases=['ocspMustStaple']),
ocsp_must_staple_critical=dict(type='bool', default=False, aliases=['ocspMustStaple_critical']),
name_constraints_permitted=dict(type='list', elements='str'),
name_constraints_excluded=dict(type='list', elements='str'),
name_constraints_critical=dict(type='bool', default=False),
backup=dict(type='bool', default=False),
create_subject_key_identifier=dict(type='bool', default=False),
subject_key_identifier=dict(type='str'),
Expand Down
49 changes: 49 additions & 0 deletions plugins/modules/openssl_csr_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,29 @@
description: Whether the C(ocsp_must_staple) extension is critical.
returned: success
type: bool
name_constraints_permitted:
description: List of permitted subtrees to sign certificates for.
returned: success
type: list
elements: str
sample: ['email:.somedomain.com']
version_added: 1.1.0
name_constraints_excluded:
description:
- List of excluded subtrees the CA cannot sign certificates for.
- Is C(none) if extension is not present.
returned: success
type: list
elements: str
sample: ['email:.com']
version_added: 1.1.0
name_constraints_critical:
description:
- Whether the C(name_constraints) extension is critical.
- Is C(none) if extension is not present.
returned: success
type: bool
version_added: 1.1.0
subject:
description:
- The CSR's subject as a dictionary.
Expand Down Expand Up @@ -231,6 +254,7 @@
pyopenssl_get_extensions_from_csr,
pyopenssl_normalize_name,
pyopenssl_normalize_name_attribute,
pyopenssl_parse_name_constraints,
)

MINIMAL_CRYPTOGRAPHY_VERSION = '1.3'
Expand Down Expand Up @@ -317,6 +341,10 @@ def _get_ocsp_must_staple(self):
def _get_subject_alt_name(self):
pass

@abc.abstractmethod
def _get_name_constraints(self):
pass

@abc.abstractmethod
def _get_public_key(self, binary):
pass
Expand Down Expand Up @@ -351,6 +379,11 @@ def get_info(self):
result['basic_constraints'], result['basic_constraints_critical'] = self._get_basic_constraints()
result['ocsp_must_staple'], result['ocsp_must_staple_critical'] = self._get_ocsp_must_staple()
result['subject_alt_name'], result['subject_alt_name_critical'] = self._get_subject_alt_name()
(
result['name_constraints_permitted'],
result['name_constraints_excluded'],
result['name_constraints_critical'],
) = self._get_name_constraints()

result['public_key'] = self._get_public_key(binary=False)
pk = self._get_public_key(binary=True)
Expand Down Expand Up @@ -474,6 +507,15 @@ def _get_subject_alt_name(self):
except cryptography.x509.ExtensionNotFound:
return None, False

def _get_name_constraints(self):
try:
nc_ext = self.csr.extensions.get_extension_for_class(x509.NameConstraints)
permitted = [cryptography_decode_name(san) for san in nc_ext.value.permitted_subtrees or []]
excluded = [cryptography_decode_name(san) for san in nc_ext.value.excluded_subtrees or []]
return permitted, excluded, nc_ext.critical
except cryptography.x509.ExtensionNotFound:
return None, None, False

def _get_public_key(self, binary):
return self.csr.public_key().public_bytes(
serialization.Encoding.DER if binary else serialization.Encoding.PEM,
Expand Down Expand Up @@ -559,6 +601,13 @@ def _get_subject_alt_name(self):
return result, bool(extension.get_critical())
return None, False

def _get_name_constraints(self):
for extension in self.csr.get_extensions():
if extension.get_short_name() == b'nameConstraints':
permitted, excluded = pyopenssl_parse_name_constraints(extension)
return permitted, excluded, bool(extension.get_critical())
return None, None, False

def _get_public_key(self, binary):
try:
return crypto.dump_publickey(
Expand Down
Loading