Skip to content

Commit

Permalink
Don't reuse X509_STORE_CTXs
Browse files Browse the repository at this point in the history
There's a lot of history with X509_STORE_CTX's somewhat messy transition
from a stack-allocated type to a heap-allocated type. (This is why a
double X509_STORE_CTX_init used to leak memory.) We can avoid all this
mess by just making a new X509_STORE_CTX each time.
  • Loading branch information
davidben committed Nov 30, 2023
1 parent 6a1b0a8 commit 1454cd6
Showing 1 changed file with 47 additions and 73 deletions.
120 changes: 47 additions & 73 deletions src/OpenSSL/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -1895,15 +1895,9 @@ def __init__(
certificate: X509,
chain: Optional[Sequence[X509]] = None,
) -> None:
store_ctx = _lib.X509_STORE_CTX_new()
self._store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free)
self._store = store
self._cert = certificate
self._chain = self._build_certificate_stack(chain)
# Make the store context available for use after instantiating this
# class by initializing it now. Per testing, subsequent calls to
# :meth:`_init` have no adverse affect.
self._init()

@staticmethod
def _build_certificate_stack(
Expand Down Expand Up @@ -1935,28 +1929,8 @@ def cleanup(s: Any) -> None:

return stack

def _init(self) -> None:
"""
Set up the store context for a subsequent verification operation.
Calling this method more than once without first calling
:meth:`_cleanup` will leak memory.
"""
ret = _lib.X509_STORE_CTX_init(
self._store_ctx, self._store._store, self._cert._x509, self._chain
)
if ret <= 0:
_raise_current_error()

def _cleanup(self) -> None:
"""
Internally cleans up the store context.
The store context can then be reused with a new call to :meth:`_init`.
"""
_lib.X509_STORE_CTX_cleanup(self._store_ctx)

def _exception_from_context(self) -> X509StoreContextError:
@staticmethod
def _exception_from_context(store_ctx: Any) -> X509StoreContextError:
"""
Convert an OpenSSL native context error failure into a Python
exception.
Expand All @@ -1966,21 +1940,45 @@ def _exception_from_context(self) -> X509StoreContextError:
"""
message = _ffi.string(
_lib.X509_verify_cert_error_string(
_lib.X509_STORE_CTX_get_error(self._store_ctx)
_lib.X509_STORE_CTX_get_error(store_ctx)
)
).decode("utf-8")
errors = [
_lib.X509_STORE_CTX_get_error(self._store_ctx),
_lib.X509_STORE_CTX_get_error_depth(self._store_ctx),
_lib.X509_STORE_CTX_get_error(store_ctx),
_lib.X509_STORE_CTX_get_error_depth(store_ctx),
message,
]
# A context error should always be associated with a certificate, so we
# expect this call to never return :class:`None`.
_x509 = _lib.X509_STORE_CTX_get_current_cert(self._store_ctx)
_x509 = _lib.X509_STORE_CTX_get_current_cert(store_ctx)
_cert = _lib.X509_dup(_x509)
pycert = X509._from_raw_x509_ptr(_cert)
return X509StoreContextError(message, errors, pycert)

def _verify_certificate(self) -> Any:
"""
Verifies the certificate and runs an X509_STORE_CTX containing the
results.
:raises X509StoreContextError: If an error occurred when validating a
certificate in the context. Sets ``certificate`` attribute to
indicate which certificate caused the error.
"""
store_ctx = _lib.X509_STORE_CTX_new()
_openssl_assert(store_ctx != _ffi.NULL)
store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free)

ret = _lib.X509_STORE_CTX_init(
store_ctx, self._store._store, self._cert._x509, self._chain
)
_openssl_assert(ret == 1)

ret = _lib.X509_verify_cert(store_ctx)
if ret <= 0:
raise self._exception_from_context(store_ctx)

return store_ctx

def set_store(self, store: X509Store) -> None:
"""
Set the context's X.509 store.
Expand All @@ -2002,19 +2000,7 @@ def verify_certificate(self) -> None:
certificate in the context. Sets ``certificate`` attribute to
indicate which certificate caused the error.
"""
# Always re-initialize the store context in case
# :meth:`verify_certificate` is called multiple times.
#
# :meth:`_init` is called in :meth:`__init__` so _cleanup is called
# before _init to ensure memory is not leaked.
self._cleanup()
self._init()
try:
ret = _lib.X509_verify_cert(self._store_ctx)
if ret <= 0:
raise self._exception_from_context()
finally:
self._cleanup()
self._verify_certificate()

def get_verified_chain(self) -> List[X509]:
"""
Expand All @@ -2027,34 +2013,22 @@ def get_verified_chain(self) -> List[X509]:
.. versionadded:: 20.0
"""
# Always re-initialize the store context in case
# :meth:`verify_certificate` is called multiple times.
#
# :meth:`_init` is called in :meth:`__init__` so _cleanup is called
# before _init to ensure memory is not leaked.
self._cleanup()
self._init()
try:
ret = _lib.X509_verify_cert(self._store_ctx)
if ret <= 0:
raise self._exception_from_context()

# Note: X509_STORE_CTX_get1_chain returns a deep copy of the chain.
cert_stack = _lib.X509_STORE_CTX_get1_chain(self._store_ctx)
_openssl_assert(cert_stack != _ffi.NULL)

result = []
for i in range(_lib.sk_X509_num(cert_stack)):
cert = _lib.sk_X509_value(cert_stack, i)
_openssl_assert(cert != _ffi.NULL)
pycert = X509._from_raw_x509_ptr(cert)
result.append(pycert)

# Free the stack but not the members which are freed by the X509 class.
_lib.sk_X509_free(cert_stack)
return result
finally:
self._cleanup()
store_ctx = self._verify_certificate()

# Note: X509_STORE_CTX_get1_chain returns a deep copy of the chain.
cert_stack = _lib.X509_STORE_CTX_get1_chain(store_ctx)
_openssl_assert(cert_stack != _ffi.NULL)

result = []
for i in range(_lib.sk_X509_num(cert_stack)):
cert = _lib.sk_X509_value(cert_stack, i)
_openssl_assert(cert != _ffi.NULL)
pycert = X509._from_raw_x509_ptr(cert)
result.append(pycert)

# Free the stack but not the members which are freed by the X509 class.
_lib.sk_X509_free(cert_stack)
return result


def load_certificate(type: int, buffer: bytes) -> X509:
Expand Down

0 comments on commit 1454cd6

Please sign in to comment.