Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] x509_certificate(_pipe): add regenerate option #310

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelogs/fragments/310-x509_certificate-regenerate.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
minor_changes:
- "x509_certificate, x509_certificate_pipe - add ``regenerate`` option to allow better control on when certificates should be regenerated (https://github.com/ansible-collections/community.crypto/issues/295, https://github.com/ansible-collections/community.crypto/pull/310)."
breaking_changes:
- "x509_certificate, x509_certificate_pipe - by default, broken certificates will now cause the module to fail instead of them being regenerated. This can be changed with the ``regenerate`` option (https://github.com/ansible-collections/community.crypto/pull/310)."
31 changes: 31 additions & 0 deletions plugins/doc_fragments/module_certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,37 @@ class ModuleDocFragment(object):
type: bool
default: no

regenerate:
description:
- Allows to configure in which situations the module is allowed to regenerate certificates.
The module will always generate a new key if the destination file does not exist.
- By default, the certificate will be regenerated when it doesn't match the module's options,
except when the certificate cannot be read or the not before/not after timestamps do not
match.
- If set to C(never), the module will fail if the certificate cannot be read, and will never
regenerate an existing certificate.
- If set to C(fail), the module will fail if the certificate does not correspond to the
module's options.
- If set to C(partial_idempotence), the certificate will be regenerated if it does not conform
to most of the module's options. The certificate is B(not) regenerated if it cannot be read
(broken file), or if the not before/not after timestamps do not match.
- If set to C(full_idempotence), the certificate will be regenerated if it does not conform to
the module's options. This is also the case if the key cannot be read (broken file), the
not before/not after timestamps do not match.
- If set to C(always), the module will always regenerate the key. This is equivalent to
setting I(force) to C(yes).
- B(Note) that before community.crypto 2.0.0, broken certificates that cannot be read were
always regenerated. From 2.0.0 on, they will not be regenerated by default.
type: str
choices:
- never
- fail
- partial_idempotence
- full_idempotence
- always
default: partial_idempotence
version_added: 2.0.0

csr_path:
description:
- Path to the Certificate Signing Request (CSR) used to generate this certificate.
Expand Down
45 changes: 37 additions & 8 deletions plugins/module_utils/crypto/module_backends/certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from ansible.module_utils import six
from ansible.module_utils.basic import missing_required_lib
from ansible.module_utils.common.text.converters import to_native

from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.common import ArgumentSpec

Expand Down Expand Up @@ -62,7 +63,9 @@ def __init__(self, module, backend):
self.module = module
self.backend = backend

self.force = module.params['force']
self.regenerate = module.params['regenerate']
if module.params['force']:
self.regenerate = 'always'
self.privatekey_path = module.params['privatekey_path']
self.privatekey_content = module.params['privatekey_content']
if self.privatekey_content is not None:
Expand Down Expand Up @@ -223,29 +226,50 @@ def _check_subject_key_identifier(self):
return False
return True

def needs_regeneration(self):
def needs_regeneration_true(self):
if self.regenerate == 'fail':
self.module.fail_json(msg='The certificate would be regenerated')
return True

def needs_regeneration(self, not_before=None, not_after=None):
"""Check whether a regeneration is necessary."""
if self.force or self.existing_certificate_bytes is None:
if self.regenerate == 'always' or self.existing_certificate_bytes is None:
return True
if self.regenerate == 'never':
return False

try:
self._ensure_existing_certificate_loaded()
except Exception as dummy:
return True
except Exception as exc:
if self.regenerate == 'full_idempotence':
return self.needs_regeneration_true()
self.module.fail_json(msg='The existing certificate can not be loaded: {0}. '.format(to_native(exc))
+ 'To force regeneration, call the module with `generate` set to `full_idempotence` or `always`,'
+ ' or with `force=yes`.')

# Check whether private key matches
self._ensure_private_key_loaded()
if self.privatekey is not None and not self._check_privatekey():
return True
return self.needs_regeneration_true()

# Check whether CSR matches
self._ensure_csr_loaded()
if self.csr is not None and not self._check_csr():
return True
return self.needs_regeneration_true()

# Check SubjectKeyIdentifier
if self.create_subject_key_identifier != 'never_create' and not self._check_subject_key_identifier():
return True
return self.needs_regeneration_true()

# Check not before
if not_before is not None and self.regenerate in ('full_idempotence', 'fail'):
if self.existing_certificate.not_valid_before != not_before:
return self.needs_regeneration_true()

# Check not after
if not_after is not None and self.regenerate in ('full_idempotence', 'fail'):
if self.existing_certificate.not_valid_after != not_after:
return self.needs_regeneration_true()

return False

Expand Down Expand Up @@ -328,6 +352,11 @@ def get_certificate_argument_spec():
force=dict(type='bool', default=False,),
csr_path=dict(type='path'),
csr_content=dict(type='str'),
regenerate=dict(
type='str',
default='partial_idempotence',
choices=['never', 'fail', 'partial_idempotence', 'full_idempotence', 'always']
),
select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography']),

# General properties of a certificate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def get_certificate_data(self):
return self.cert_bytes

def needs_regeneration(self):
parent_check = super(EntrustCertificateBackend, self).needs_regeneration()
parent_check = super(EntrustCertificateBackend, self).needs_regeneration(not_after=self.notAfter)

try:
cert_details = self._get_cert_details()
Expand All @@ -134,11 +134,11 @@ def needs_regeneration(self):
# Always issue a new certificate if the certificate is expired, suspended or revoked
status = cert_details.get('status', False)
if status == 'EXPIRED' or status == 'SUSPENDED' or status == 'REVOKED':
return True
return self.needs_regeneration_true()

# If the requested cert type was specified and it is for a different certificate type than the initial certificate, a new one is needed
if self.module.params['entrust_cert_type'] and cert_details.get('certType') and self.module.params['entrust_cert_type'] != cert_details.get('certType'):
return True
return self.needs_regeneration_true()

return parent_check

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def get_certificate_data(self):
return self.cert.public_bytes(Encoding.PEM)

def needs_regeneration(self):
if super(OwnCACertificateBackendCryptography, self).needs_regeneration():
if super(OwnCACertificateBackendCryptography, self).needs_regeneration(not_before=self.notBefore, not_after=self.notAfter):
return True

# Check AuthorityKeyIdentifier
Expand All @@ -188,9 +188,9 @@ def needs_regeneration(self):
try:
ext = self.existing_certificate.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier)
if ext.value != expected_ext:
return True
return self.needs_regeneration_true()
except cryptography.x509.ExtensionNotFound as dummy:
return True
return self.needs_regeneration_true()

return False

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ def get_certificate_data(self):
"""Return bytes for self.cert."""
return self.cert.public_bytes(Encoding.PEM)

def needs_regeneration(self):
return super(SelfSignedCertificateBackendCryptography, self).needs_regeneration(
not_before=self.notBefore, not_after=self.notAfter)

def dump(self, include_certificate):
result = super(SelfSignedCertificateBackendCryptography, self).dump(include_certificate)

Expand Down