From aa1481eae7f315e24d9f96dfd2ee0aca90f27d4f Mon Sep 17 00:00:00 2001 From: Aarni Koskela Date: Tue, 20 Jul 2021 15:08:55 +0300 Subject: [PATCH] Replace double-underscored names with single underscores This will avoid Python's class-name-prefix mangling of attributes and methods while still retaining the signal that they're meant for private use of the library. This makes it easier to e.g. subclass classes should one need to, and to access data from outside (while being mindful of the fact that you're accessing notionally private data). Only one change to a test was required (and that test was indeed accessing a private field from outside...). --- src/onelogin/saml2/auth.py | 280 +++++++++--------- src/onelogin/saml2/authn_request.py | 22 +- src/onelogin/saml2/logout_request.py | 40 +-- src/onelogin/saml2/logout_response.py | 42 +-- src/onelogin/saml2/metadata.py | 6 +- src/onelogin/saml2/response.py | 92 +++--- src/onelogin/saml2/settings.py | 246 +++++++-------- .../saml2_tests/authn_request_test.py | 2 +- 8 files changed, 365 insertions(+), 365 deletions(-) diff --git a/src/onelogin/saml2/auth.py b/src/onelogin/saml2/auth.py index cfaee5fd..c284fe46 100644 --- a/src/onelogin/saml2/auth.py +++ b/src/onelogin/saml2/auth.py @@ -52,29 +52,29 @@ def __init__(self, request_data, old_settings=None, custom_base_path=None): :param custom_base_path: Optional. Path where are stored the settings file and the cert folder :type custom_base_path: string """ - self.__request_data = request_data + self._request_data = request_data if isinstance(old_settings, OneLogin_Saml2_Settings): - self.__settings = old_settings + self._settings = old_settings else: - self.__settings = OneLogin_Saml2_Settings(old_settings, custom_base_path) - self.__attributes = dict() - self.__friendlyname_attributes = dict() - self.__nameid = None - self.__nameid_format = None - self.__nameid_nq = None - self.__nameid_spnq = None - self.__session_index = None - self.__session_expiration = None - self.__authenticated = False - self.__errors = [] - self.__error_reason = None - self.__last_request_id = None - self.__last_message_id = None - self.__last_assertion_id = None - self.__last_authn_contexts = [] - self.__last_request = None - self.__last_response = None - self.__last_assertion_not_on_or_after = None + self._settings = OneLogin_Saml2_Settings(old_settings, custom_base_path) + self._attributes = dict() + self._friendlyname_attributes = dict() + self._nameid = None + self._nameid_format = None + self._nameid_nq = None + self._nameid_spnq = None + self._session_index = None + self._session_expiration = None + self._authenticated = False + self._errors = [] + self._error_reason = None + self._last_request_id = None + self._last_message_id = None + self._last_assertion_id = None + self._last_authn_contexts = [] + self._last_request = None + self._last_response = None + self._last_assertion_not_on_or_after = None def get_settings(self): """ @@ -82,7 +82,7 @@ def get_settings(self): :return: Setting info :rtype: OneLogin_Saml2_Setting object """ - return self.__settings + return self._settings def set_strict(self, value): """ @@ -92,22 +92,22 @@ def set_strict(self, value): :type value: bool """ assert isinstance(value, bool) - self.__settings.set_strict(value) + self._settings.set_strict(value) def store_valid_response(self, response): - self.__attributes = response.get_attributes() - self.__friendlyname_attributes = response.get_friendlyname_attributes() - self.__nameid = response.get_nameid() - self.__nameid_format = response.get_nameid_format() - self.__nameid_nq = response.get_nameid_nq() - self.__nameid_spnq = response.get_nameid_spnq() - self.__session_index = response.get_session_index() - self.__session_expiration = response.get_session_not_on_or_after() - self.__last_message_id = response.get_id() - self.__last_assertion_id = response.get_assertion_id() - self.__last_authn_contexts = response.get_authn_contexts() - self.__authenticated = True - self.__last_assertion_not_on_or_after = response.get_assertion_not_on_or_after() + self._attributes = response.get_attributes() + self._friendlyname_attributes = response.get_friendlyname_attributes() + self._nameid = response.get_nameid() + self._nameid_format = response.get_nameid_format() + self._nameid_nq = response.get_nameid_nq() + self._nameid_spnq = response.get_nameid_spnq() + self._session_index = response.get_session_index() + self._session_expiration = response.get_session_not_on_or_after() + self._last_message_id = response.get_id() + self._last_assertion_id = response.get_assertion_id() + self._last_authn_contexts = response.get_authn_contexts() + self._authenticated = True + self._last_assertion_not_on_or_after = response.get_assertion_not_on_or_after() def process_response(self, request_id=None): """ @@ -118,22 +118,22 @@ def process_response(self, request_id=None): :raises: OneLogin_Saml2_Error.SAML_RESPONSE_NOT_FOUND, when a POST with a SAMLResponse is not found """ - self.__errors = [] - self.__error_reason = None + self._errors = [] + self._error_reason = None - if 'post_data' in self.__request_data and 'SAMLResponse' in self.__request_data['post_data']: + if 'post_data' in self._request_data and 'SAMLResponse' in self._request_data['post_data']: # AuthnResponse -- HTTP_POST Binding - response = self.response_class(self.__settings, self.__request_data['post_data']['SAMLResponse']) - self.__last_response = response.get_xml_document() + response = self.response_class(self._settings, self._request_data['post_data']['SAMLResponse']) + self._last_response = response.get_xml_document() - if response.is_valid(self.__request_data, request_id): + if response.is_valid(self._request_data, request_id): self.store_valid_response(response) else: - self.__errors.append('invalid_response') - self.__error_reason = response.get_error() + self._errors.append('invalid_response') + self._error_reason = response.get_error() else: - self.__errors.append('invalid_binding') + self._errors.append('invalid_binding') raise OneLogin_Saml2_Error( 'SAML Response not found, Only supported HTTP_POST Binding', OneLogin_Saml2_Error.SAML_RESPONSE_NOT_FOUND @@ -151,57 +151,57 @@ def process_slo(self, keep_local_session=False, request_id=None, delete_session_ :returns: Redirection url """ - self.__errors = [] - self.__error_reason = None + self._errors = [] + self._error_reason = None - get_data = 'get_data' in self.__request_data and self.__request_data['get_data'] + get_data = 'get_data' in self._request_data and self._request_data['get_data'] if get_data and 'SAMLResponse' in get_data: - logout_response = self.logout_response_class(self.__settings, get_data['SAMLResponse']) - self.__last_response = logout_response.get_xml() + logout_response = self.logout_response_class(self._settings, get_data['SAMLResponse']) + self._last_response = logout_response.get_xml() if not self.validate_response_signature(get_data): - self.__errors.append('invalid_logout_response_signature') - self.__errors.append('Signature validation failed. Logout Response rejected') - elif not logout_response.is_valid(self.__request_data, request_id): - self.__errors.append('invalid_logout_response') - self.__error_reason = logout_response.get_error() + self._errors.append('invalid_logout_response_signature') + self._errors.append('Signature validation failed. Logout Response rejected') + elif not logout_response.is_valid(self._request_data, request_id): + self._errors.append('invalid_logout_response') + self._error_reason = logout_response.get_error() elif logout_response.get_status() != OneLogin_Saml2_Constants.STATUS_SUCCESS: - self.__errors.append('logout_not_success') + self._errors.append('logout_not_success') else: - self.__last_message_id = logout_response.id + self._last_message_id = logout_response.id if not keep_local_session: OneLogin_Saml2_Utils.delete_local_session(delete_session_cb) elif get_data and 'SAMLRequest' in get_data: - logout_request = self.logout_request_class(self.__settings, get_data['SAMLRequest']) - self.__last_request = logout_request.get_xml() + logout_request = self.logout_request_class(self._settings, get_data['SAMLRequest']) + self._last_request = logout_request.get_xml() if not self.validate_request_signature(get_data): - self.__errors.append("invalid_logout_request_signature") - self.__errors.append('Signature validation failed. Logout Request rejected') - elif not logout_request.is_valid(self.__request_data): - self.__errors.append('invalid_logout_request') - self.__error_reason = logout_request.get_error() + self._errors.append("invalid_logout_request_signature") + self._errors.append('Signature validation failed. Logout Request rejected') + elif not logout_request.is_valid(self._request_data): + self._errors.append('invalid_logout_request') + self._error_reason = logout_request.get_error() else: if not keep_local_session: OneLogin_Saml2_Utils.delete_local_session(delete_session_cb) in_response_to = logout_request.id - self.__last_message_id = logout_request.id - response_builder = self.logout_response_class(self.__settings) + self._last_message_id = logout_request.id + response_builder = self.logout_response_class(self._settings) response_builder.build(in_response_to) - self.__last_response = response_builder.get_xml() + self._last_response = response_builder.get_xml() logout_response = response_builder.get_response() parameters = {'SAMLResponse': logout_response} - if 'RelayState' in self.__request_data['get_data']: - parameters['RelayState'] = self.__request_data['get_data']['RelayState'] + if 'RelayState' in self._request_data['get_data']: + parameters['RelayState'] = self._request_data['get_data']['RelayState'] - security = self.__settings.get_security_data() + security = self._settings.get_security_data() if security['logoutResponseSigned']: self.add_response_signature(parameters, security['signatureAlgorithm']) return self.redirect_to(self.get_slo_response_url(), parameters) else: - self.__errors.append('invalid_binding') + self._errors.append('invalid_binding') raise OneLogin_Saml2_Error( 'SAML LogoutRequest/LogoutResponse not found. Only supported HTTP_REDIRECT Binding', OneLogin_Saml2_Error.SAML_LOGOUTMESSAGE_NOT_FOUND @@ -218,9 +218,9 @@ def redirect_to(self, url=None, parameters={}): :returns: Redirection URL """ - if url is None and 'RelayState' in self.__request_data['get_data']: - url = self.__request_data['get_data']['RelayState'] - return OneLogin_Saml2_Utils.redirect(url, parameters, request_data=self.__request_data) + if url is None and 'RelayState' in self._request_data['get_data']: + url = self._request_data['get_data']['RelayState'] + return OneLogin_Saml2_Utils.redirect(url, parameters, request_data=self._request_data) def is_authenticated(self): """ @@ -229,7 +229,7 @@ def is_authenticated(self): :returns: True if is authenticated, False if not :rtype: bool """ - return self.__authenticated + return self._authenticated def get_attributes(self): """ @@ -238,7 +238,7 @@ def get_attributes(self): :returns: SAML attributes :rtype: dict """ - return self.__attributes + return self._attributes def get_friendlyname_attributes(self): """ @@ -247,7 +247,7 @@ def get_friendlyname_attributes(self): :returns: SAML attributes :rtype: dict """ - return self.__friendlyname_attributes + return self._friendlyname_attributes def get_nameid(self): """ @@ -256,7 +256,7 @@ def get_nameid(self): :returns: NameID :rtype: string|None """ - return self.__nameid + return self._nameid def get_nameid_format(self): """ @@ -265,7 +265,7 @@ def get_nameid_format(self): :returns: NameID Format :rtype: string|None """ - return self.__nameid_format + return self._nameid_format def get_nameid_nq(self): """ @@ -274,7 +274,7 @@ def get_nameid_nq(self): :returns: NameID NameQualifier :rtype: string|None """ - return self.__nameid_nq + return self._nameid_nq def get_nameid_spnq(self): """ @@ -283,7 +283,7 @@ def get_nameid_spnq(self): :returns: NameID SP NameQualifier :rtype: string|None """ - return self.__nameid_spnq + return self._nameid_spnq def get_session_index(self): """ @@ -291,7 +291,7 @@ def get_session_index(self): :returns: The SessionIndex of the assertion :rtype: string """ - return self.__session_index + return self._session_index def get_session_expiration(self): """ @@ -299,14 +299,14 @@ def get_session_expiration(self): :returns: The SessionNotOnOrAfter of the assertion :rtype: unix/posix timestamp|None """ - return self.__session_expiration + return self._session_expiration def get_last_assertion_not_on_or_after(self): """ The NotOnOrAfter value of the valid SubjectConfirmationData node (if any) of the last assertion processed """ - return self.__last_assertion_not_on_or_after + return self._last_assertion_not_on_or_after def get_errors(self): """ @@ -315,7 +315,7 @@ def get_errors(self): :returns: List of errors :rtype: list """ - return self.__errors + return self._errors def get_last_error_reason(self): """ @@ -324,7 +324,7 @@ def get_last_error_reason(self): :returns: Reason of the last error :rtype: None | string """ - return self.__error_reason + return self._error_reason def get_attribute(self, name): """ @@ -337,7 +337,7 @@ def get_attribute(self, name): :rtype: list """ assert isinstance(name, compat.str_type) - return self.__attributes.get(name) + return self._attributes.get(name) def get_friendlyname_attribute(self, friendlyname): """ @@ -350,35 +350,35 @@ def get_friendlyname_attribute(self, friendlyname): :rtype: list """ assert isinstance(friendlyname, compat.str_type) - return self.__friendlyname_attributes.get(friendlyname) + return self._friendlyname_attributes.get(friendlyname) def get_last_request_id(self): """ :returns: The ID of the last Request SAML message generated. :rtype: string """ - return self.__last_request_id + return self._last_request_id def get_last_message_id(self): """ :returns: The ID of the last Response SAML message processed. :rtype: string """ - return self.__last_message_id + return self._last_message_id def get_last_assertion_id(self): """ :returns: The ID of the last assertion processed. :rtype: string """ - return self.__last_assertion_id + return self._last_assertion_id def get_last_authn_contexts(self): """ :returns: The list of authentication contexts sent in the last SAML Response. :rtype: list """ - return self.__last_authn_contexts + return self._last_authn_contexts def login(self, return_to=None, force_authn=False, is_passive=False, set_nameid_policy=True, name_id_value_req=None): """ @@ -402,9 +402,9 @@ def login(self, return_to=None, force_authn=False, is_passive=False, set_nameid_ :returns: Redirection URL :rtype: string """ - authn_request = self.authn_request_class(self.__settings, force_authn, is_passive, set_nameid_policy, name_id_value_req) - self.__last_request = authn_request.get_xml() - self.__last_request_id = authn_request.get_id() + authn_request = self.authn_request_class(self._settings, force_authn, is_passive, set_nameid_policy, name_id_value_req) + self._last_request = authn_request.get_xml() + self._last_request_id = authn_request.get_id() saml_request = authn_request.get_request() parameters = {'SAMLRequest': saml_request} @@ -412,9 +412,9 @@ def login(self, return_to=None, force_authn=False, is_passive=False, set_nameid_ if return_to is not None: parameters['RelayState'] = return_to else: - parameters['RelayState'] = OneLogin_Saml2_Utils.get_self_url_no_query(self.__request_data) + parameters['RelayState'] = OneLogin_Saml2_Utils.get_self_url_no_query(self._request_data) - security = self.__settings.get_security_data() + security = self._settings.get_security_data() if security.get('authnRequestsSigned', False): self.add_request_signature(parameters, security['signatureAlgorithm']) return self.redirect_to(self.get_sso_url(), parameters) @@ -450,30 +450,30 @@ def logout(self, return_to=None, name_id=None, session_index=None, nq=None, name OneLogin_Saml2_Error.SAML_SINGLE_LOGOUT_NOT_SUPPORTED ) - if name_id is None and self.__nameid is not None: - name_id = self.__nameid + if name_id is None and self._nameid is not None: + name_id = self._nameid - if name_id_format is None and self.__nameid_format is not None: - name_id_format = self.__nameid_format + if name_id_format is None and self._nameid_format is not None: + name_id_format = self._nameid_format logout_request = self.logout_request_class( - self.__settings, + self._settings, name_id=name_id, session_index=session_index, nq=nq, name_id_format=name_id_format, spnq=spnq ) - self.__last_request = logout_request.get_xml() - self.__last_request_id = logout_request.id + self._last_request = logout_request.get_xml() + self._last_request_id = logout_request.id parameters = {'SAMLRequest': logout_request.get_request()} if return_to is not None: parameters['RelayState'] = return_to else: - parameters['RelayState'] = OneLogin_Saml2_Utils.get_self_url_no_query(self.__request_data) + parameters['RelayState'] = OneLogin_Saml2_Utils.get_self_url_no_query(self._request_data) - security = self.__settings.get_security_data() + security = self._settings.get_security_data() if security.get('logoutRequestSigned', False): self.add_request_signature(parameters, security['signatureAlgorithm']) return self.redirect_to(slo_url, parameters) @@ -485,7 +485,7 @@ def get_sso_url(self): :returns: An URL, the SSO endpoint of the IdP :rtype: string """ - return self.__settings.get_idp_sso_url() + return self._settings.get_idp_sso_url() def get_slo_url(self): """ @@ -494,7 +494,7 @@ def get_slo_url(self): :returns: An URL, the SLO endpoint of the IdP :rtype: string """ - return self.__settings.get_idp_slo_url() + return self._settings.get_idp_slo_url() def get_slo_response_url(self): """ @@ -503,7 +503,7 @@ def get_slo_response_url(self): :returns: an URL, the SLO return endpoint of the IdP :rtype: string """ - return self.__settings.get_idp_slo_response_url() + return self._settings.get_idp_slo_response_url() def add_request_signature(self, request_data, sign_algorithm=OneLogin_Saml2_Constants.RSA_SHA1): """ @@ -515,7 +515,7 @@ def add_request_signature(self, request_data, sign_algorithm=OneLogin_Saml2_Cons :param sign_algorithm: Signature algorithm method :type sign_algorithm: string """ - return self.__build_signature(request_data, 'SAMLRequest', sign_algorithm) + return self._build_signature(request_data, 'SAMLRequest', sign_algorithm) def add_response_signature(self, response_data, sign_algorithm=OneLogin_Saml2_Constants.RSA_SHA1): """ @@ -526,10 +526,10 @@ def add_response_signature(self, response_data, sign_algorithm=OneLogin_Saml2_Co :param sign_algorithm: Signature algorithm method :type sign_algorithm: string """ - return self.__build_signature(response_data, 'SAMLResponse', sign_algorithm) + return self._build_signature(response_data, 'SAMLResponse', sign_algorithm) @staticmethod - def __build_sign_query_from_qs(query_string, saml_type): + def _build_sign_query_from_qs(query_string, saml_type): """ Build sign query from query string @@ -545,7 +545,7 @@ def __build_sign_query_from_qs(query_string, saml_type): return '&'.join(part for arg in args for part in parts if part.startswith(arg)) @staticmethod - def __build_sign_query(saml_data, relay_state, algorithm, saml_type, lowercase_urlencoding=False): + def _build_sign_query(saml_data, relay_state, algorithm, saml_type, lowercase_urlencoding=False): """ Build sign query @@ -570,7 +570,7 @@ def __build_sign_query(saml_data, relay_state, algorithm, saml_type, lowercase_u sign_data.append('SigAlg=%s' % OneLogin_Saml2_Utils.escape_url(algorithm, lowercase_urlencoding)) return '&'.join(sign_data) - def __build_signature(self, data, saml_type, sign_algorithm=OneLogin_Saml2_Constants.RSA_SHA1): + def _build_signature(self, data, saml_type, sign_algorithm=OneLogin_Saml2_Constants.RSA_SHA1): """ Builds the Signature :param data: The Request data @@ -591,10 +591,10 @@ def __build_signature(self, data, saml_type, sign_algorithm=OneLogin_Saml2_Const OneLogin_Saml2_Error.PRIVATE_KEY_NOT_FOUND ) - msg = self.__build_sign_query(data[saml_type], - data.get('RelayState', None), - sign_algorithm, - saml_type) + msg = self._build_sign_query(data[saml_type], + data.get('RelayState', None), + sign_algorithm, + saml_type) sign_algorithm_transform_map = { OneLogin_Saml2_Constants.DSA_SHA1: xmlsec.Transform.DSA_SHA1, @@ -605,7 +605,7 @@ def __build_signature(self, data, saml_type, sign_algorithm=OneLogin_Saml2_Const } sign_algorithm_transform = sign_algorithm_transform_map.get(sign_algorithm, xmlsec.Transform.RSA_SHA1) - signature = OneLogin_Saml2_Utils.sign_binary(msg, key, sign_algorithm_transform, self.__settings.is_debug_active()) + signature = OneLogin_Saml2_Utils.sign_binary(msg, key, sign_algorithm_transform, self._settings.is_debug_active()) data['Signature'] = OneLogin_Saml2_Utils.b64encode(signature) data['SigAlg'] = sign_algorithm @@ -618,7 +618,7 @@ def validate_request_signature(self, request_data): """ - return self.__validate_signature(request_data, 'SAMLRequest') + return self._validate_signature(request_data, 'SAMLRequest') def validate_response_signature(self, request_data): """ @@ -629,9 +629,9 @@ def validate_response_signature(self, request_data): """ - return self.__validate_signature(request_data, 'SAMLResponse') + return self._validate_signature(request_data, 'SAMLResponse') - def __validate_signature(self, data, saml_type, raise_exceptions=False): + def _validate_signature(self, data, saml_type, raise_exceptions=False): """ Validate Signature @@ -650,7 +650,7 @@ def __validate_signature(self, data, saml_type, raise_exceptions=False): try: signature = data.get('Signature', None) if signature is None: - if self.__settings.is_strict() and self.__settings.get_security_data().get('wantMessagesSigned', False): + if self._settings.is_strict() and self._settings.get_security_data().get('wantMessagesSigned', False): raise OneLogin_Saml2_ValidationError( 'The %s is not signed. Rejected.' % saml_type, OneLogin_Saml2_ValidationError.NO_SIGNED_MESSAGE @@ -666,7 +666,7 @@ def __validate_signature(self, data, saml_type, raise_exceptions=False): if not (exists_x509cert or exists_multix509sign): error_msg = 'In order to validate the sign on the %s, the x509cert of the IdP is required' % saml_type - self.__errors.append(error_msg) + self._errors.append(error_msg) raise OneLogin_Saml2_Error( error_msg, OneLogin_Saml2_Error.CERT_NOT_FOUND @@ -676,16 +676,16 @@ def __validate_signature(self, data, saml_type, raise_exceptions=False): if isinstance(sign_alg, bytes): sign_alg = sign_alg.decode('utf8') - query_string = self.__request_data.get('query_string') - if query_string and self.__request_data.get('validate_signature_from_qs'): - signed_query = self.__build_sign_query_from_qs(query_string, saml_type) + query_string = self._request_data.get('query_string') + if query_string and self._request_data.get('validate_signature_from_qs'): + signed_query = self._build_sign_query_from_qs(query_string, saml_type) else: - lowercase_urlencoding = self.__request_data.get('lowercase_urlencoding', False) - signed_query = self.__build_sign_query(data[saml_type], - data.get('RelayState'), - sign_alg, - saml_type, - lowercase_urlencoding) + lowercase_urlencoding = self._request_data.get('lowercase_urlencoding', False) + signed_query = self._build_sign_query(data[saml_type], + data.get('RelayState'), + sign_alg, + saml_type, + lowercase_urlencoding) if exists_multix509sign: for cert in idp_data['x509certMulti']['signing']: @@ -705,14 +705,14 @@ def __validate_signature(self, data, saml_type, raise_exceptions=False): OneLogin_Saml2_Utils.b64decode(signature), cert, sign_alg, - self.__settings.is_debug_active()): + self._settings.is_debug_active()): raise OneLogin_Saml2_ValidationError( 'Signature validation failed. %s rejected' % saml_type, OneLogin_Saml2_ValidationError.INVALID_SIGNATURE ) return True except Exception as e: - self.__error_reason = str(e) + self._error_reason = str(e) if raise_exceptions: raise e return False @@ -725,11 +725,11 @@ def get_last_response_xml(self, pretty_print_if_possible=False): :rtype: string|None """ response = None - if self.__last_response is not None: - if isinstance(self.__last_response, compat.str_type): - response = self.__last_response + if self._last_response is not None: + if isinstance(self._last_response, compat.str_type): + response = self._last_response else: - response = tostring(self.__last_response, encoding='unicode', pretty_print=pretty_print_if_possible) + response = tostring(self._last_response, encoding='unicode', pretty_print=pretty_print_if_possible) return response def get_last_request_xml(self): @@ -738,4 +738,4 @@ def get_last_request_xml(self): :returns: SAML request XML :rtype: string|None """ - return self.__last_request or None + return self._last_request or None diff --git a/src/onelogin/saml2/authn_request.py b/src/onelogin/saml2/authn_request.py index cdbf2c44..0aa1623d 100644 --- a/src/onelogin/saml2/authn_request.py +++ b/src/onelogin/saml2/authn_request.py @@ -41,13 +41,13 @@ def __init__(self, settings, force_authn=False, is_passive=False, set_nameid_pol :param name_id_value_req: Optional argument. Indicates to the IdP the subject that should be authenticated :type name_id_value_req: string """ - self.__settings = settings + self._settings = settings - sp_data = self.__settings.get_sp_data() - idp_data = self.__settings.get_idp_data() - security = self.__settings.get_security_data() + sp_data = self._settings.get_sp_data() + idp_data = self._settings.get_idp_data() + security = self._settings.get_security_data() - self.__id = self._generate_request_id() + self._id = self._generate_request_id() issue_instant = OneLogin_Saml2_Utils.parse_time_to_SAML(OneLogin_Saml2_Utils.now()) destination = idp_data['singleSignOnService']['url'] @@ -112,7 +112,7 @@ def __init__(self, settings, force_authn=False, is_passive=False, set_nameid_pol request = OneLogin_Saml2_Templates.AUTHN_REQUEST % \ { - 'id': self.__id, + 'id': self._id, 'provider_name': provider_name_str, 'force_authn_str': force_authn_str, 'is_passive_str': is_passive_str, @@ -127,7 +127,7 @@ def __init__(self, settings, force_authn=False, is_passive=False, set_nameid_pol 'acs_binding': sp_data['assertionConsumerService'].get('binding', 'urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST') } - self.__authn_request = request + self._authn_request = request def _generate_request_id(self): """ @@ -144,9 +144,9 @@ def get_request(self, deflate=True): :rtype: str object """ if deflate: - request = OneLogin_Saml2_Utils.deflate_and_base64_encode(self.__authn_request) + request = OneLogin_Saml2_Utils.deflate_and_base64_encode(self._authn_request) else: - request = OneLogin_Saml2_Utils.b64encode(self.__authn_request) + request = OneLogin_Saml2_Utils.b64encode(self._authn_request) return request def get_id(self): @@ -155,7 +155,7 @@ def get_id(self): :return: AuthNRequest ID :rtype: string """ - return self.__id + return self._id def get_xml(self): """ @@ -163,4 +163,4 @@ def get_xml(self): :return: XML request body :rtype: string """ - return self.__authn_request + return self._authn_request diff --git a/src/onelogin/saml2/logout_request.py b/src/onelogin/saml2/logout_request.py index 3ed3fb15..84cdd86b 100644 --- a/src/onelogin/saml2/logout_request.py +++ b/src/onelogin/saml2/logout_request.py @@ -50,14 +50,14 @@ def __init__(self, settings, request=None, name_id=None, session_index=None, nq= :param spnq: SP Name Qualifier :type: string """ - self.__settings = settings - self.__error = None + self._settings = settings + self._error = None self.id = None if request is None: - sp_data = self.__settings.get_sp_data() - idp_data = self.__settings.get_idp_data() - security = self.__settings.get_security_data() + sp_data = self._settings.get_sp_data() + idp_data = self._settings.get_idp_data() + security = self._settings.get_security_data() self.id = self._generate_request_id() @@ -71,7 +71,7 @@ def __init__(self, settings, request=None, name_id=None, session_index=None, nq= if exists_multix509enc: cert = idp_data['x509certMulti']['encryption'][0] else: - cert = self.__settings.get_idp_cert() + cert = self._settings.get_idp_cert() if name_id is not None: if not name_id_format and sp_data['NameIDFormat'] != OneLogin_Saml2_Constants.NAMEID_UNSPECIFIED: @@ -109,7 +109,7 @@ def __init__(self, settings, request=None, name_id=None, session_index=None, nq= { 'id': self.id, 'issue_instant': issue_instant, - 'single_logout_url': self.__settings.get_idp_slo_url(), + 'single_logout_url': self._settings.get_idp_slo_url(), 'entity_id': sp_data['entityId'], 'name_id': name_id_obj, 'session_index': session_index_str, @@ -118,7 +118,7 @@ def __init__(self, settings, request=None, name_id=None, session_index=None, nq= logout_request = OneLogin_Saml2_Utils.decode_base64_and_inflate(request, ignore_zip=True) self.id = self.get_id(logout_request) - self.__logout_request = compat.to_string(logout_request) + self._logout_request = compat.to_string(logout_request) def get_request(self, deflate=True): """ @@ -129,9 +129,9 @@ def get_request(self, deflate=True): :rtype: str object """ if deflate: - request = OneLogin_Saml2_Utils.deflate_and_base64_encode(self.__logout_request) + request = OneLogin_Saml2_Utils.deflate_and_base64_encode(self._logout_request) else: - request = OneLogin_Saml2_Utils.b64encode(self.__logout_request) + request = OneLogin_Saml2_Utils.b64encode(self._logout_request) return request def get_xml(self): @@ -141,7 +141,7 @@ def get_xml(self): :return: XML request body :rtype: string """ - return self.__logout_request + return self._logout_request @classmethod def get_id(cls, request): @@ -279,24 +279,24 @@ def is_valid(self, request_data, raise_exceptions=False): :return: If the Logout Request is or not valid :rtype: boolean """ - self.__error = None + self._error = None try: - root = OneLogin_Saml2_XML.to_etree(self.__logout_request) + root = OneLogin_Saml2_XML.to_etree(self._logout_request) - idp_data = self.__settings.get_idp_data() + idp_data = self._settings.get_idp_data() idp_entity_id = idp_data['entityId'] get_data = ('get_data' in request_data and request_data['get_data']) or dict() - if self.__settings.is_strict(): - res = OneLogin_Saml2_XML.validate_xml(root, 'saml-schema-protocol-2.0.xsd', self.__settings.is_debug_active()) + if self._settings.is_strict(): + res = OneLogin_Saml2_XML.validate_xml(root, 'saml-schema-protocol-2.0.xsd', self._settings.is_debug_active()) if isinstance(res, str): raise OneLogin_Saml2_ValidationError( 'Invalid SAML Logout Request. Not match the saml-schema-protocol-2.0.xsd', OneLogin_Saml2_ValidationError.INVALID_XML_FORMAT ) - security = self.__settings.get_security_data() + security = self._settings.get_security_data() current_url = OneLogin_Saml2_Utils.get_self_url_no_query(request_data) @@ -345,8 +345,8 @@ def is_valid(self, request_data, raise_exceptions=False): return True except Exception as err: # pylint: disable=R0801 - self.__error = str(err) - debug = self.__settings.is_debug_active() + self._error = str(err) + debug = self._settings.is_debug_active() if debug: print(err) if raise_exceptions: @@ -357,7 +357,7 @@ def get_error(self): """ After executing a validation process, if it fails this method returns the cause """ - return self.__error + return self._error def _generate_request_id(self): """ diff --git a/src/onelogin/saml2/logout_response.py b/src/onelogin/saml2/logout_response.py index bd942200..e9368e8a 100644 --- a/src/onelogin/saml2/logout_response.py +++ b/src/onelogin/saml2/logout_response.py @@ -33,13 +33,13 @@ def __init__(self, settings, response=None): * (string) response. An UUEncoded SAML Logout response from the IdP. """ - self.__settings = settings - self.__error = None + self._settings = settings + self._error = None self.id = None if response is not None: - self.__logout_response = compat.to_string(OneLogin_Saml2_Utils.decode_base64_and_inflate(response, ignore_zip=True)) - self.document = OneLogin_Saml2_XML.to_etree(self.__logout_response) + self._logout_response = compat.to_string(OneLogin_Saml2_Utils.decode_base64_and_inflate(response, ignore_zip=True)) + self.document = OneLogin_Saml2_XML.to_etree(self._logout_response) self.id = self.document.get('ID', None) def get_issuer(self): @@ -49,7 +49,7 @@ def get_issuer(self): :rtype: string """ issuer = None - issuer_nodes = self.__query('/samlp:LogoutResponse/saml:Issuer') + issuer_nodes = self._query('/samlp:LogoutResponse/saml:Issuer') if len(issuer_nodes) == 1: issuer = OneLogin_Saml2_XML.element_text(issuer_nodes[0]) return issuer @@ -60,7 +60,7 @@ def get_status(self): :return: The Status :rtype: string """ - entries = self.__query('/samlp:LogoutResponse/samlp:Status/samlp:StatusCode') + entries = self._query('/samlp:LogoutResponse/samlp:Status/samlp:StatusCode') if len(entries) == 0: return None status = entries[0].attrib['Value'] @@ -78,21 +78,21 @@ def is_valid(self, request_data, request_id=None, raise_exceptions=False): :return: Returns if the SAML LogoutResponse is or not valid :rtype: boolean """ - self.__error = None + self._error = None try: - idp_data = self.__settings.get_idp_data() + idp_data = self._settings.get_idp_data() idp_entity_id = idp_data['entityId'] get_data = request_data['get_data'] - if self.__settings.is_strict(): - res = OneLogin_Saml2_XML.validate_xml(self.document, 'saml-schema-protocol-2.0.xsd', self.__settings.is_debug_active()) + if self._settings.is_strict(): + res = OneLogin_Saml2_XML.validate_xml(self.document, 'saml-schema-protocol-2.0.xsd', self._settings.is_debug_active()) if isinstance(res, str): raise OneLogin_Saml2_ValidationError( 'Invalid SAML Logout Request. Not match the saml-schema-protocol-2.0.xsd', OneLogin_Saml2_ValidationError.INVALID_XML_FORMAT ) - security = self.__settings.get_security_data() + security = self._settings.get_security_data() # Check if the InResponseTo of the Logout Response matches the ID of the Logout Request (requestId) if provided in_response_to = self.get_in_response_to() @@ -134,15 +134,15 @@ def is_valid(self, request_data, request_id=None, raise_exceptions=False): return True # pylint: disable=R0801 except Exception as err: - self.__error = str(err) - debug = self.__settings.is_debug_active() + self._error = str(err) + debug = self._settings.is_debug_active() if debug: print(err) if raise_exceptions: raise return False - def __query(self, query): + def _query(self, query): """ Extracts a node from the Etree (Logout Response Message) :param query: Xpath Expression @@ -158,7 +158,7 @@ def build(self, in_response_to): :param in_response_to: InResponseTo value for the Logout Response. :type in_response_to: string """ - sp_data = self.__settings.get_sp_data() + sp_data = self._settings.get_sp_data() self.id = self._generate_request_id() @@ -168,13 +168,13 @@ def build(self, in_response_to): { 'id': self.id, 'issue_instant': issue_instant, - 'destination': self.__settings.get_idp_slo_response_url(), + 'destination': self._settings.get_idp_slo_response_url(), 'in_response_to': in_response_to, 'entity_id': sp_data['entityId'], 'status': "urn:oasis:names:tc:SAML:2.0:status:Success" } - self.__logout_response = logout_response + self._logout_response = logout_response def get_in_response_to(self): """ @@ -193,16 +193,16 @@ def get_response(self, deflate=True): :rtype: string """ if deflate: - response = OneLogin_Saml2_Utils.deflate_and_base64_encode(self.__logout_response) + response = OneLogin_Saml2_Utils.deflate_and_base64_encode(self._logout_response) else: - response = OneLogin_Saml2_Utils.b64encode(self.__logout_response) + response = OneLogin_Saml2_Utils.b64encode(self._logout_response) return response def get_error(self): """ After executing a validation process, if it fails this method returns the cause """ - return self.__error + return self._error def get_xml(self): """ @@ -211,7 +211,7 @@ def get_xml(self): :return: XML response body :rtype: string """ - return self.__logout_response + return self._logout_response def _generate_request_id(self): """ diff --git a/src/onelogin/saml2/metadata.py b/src/onelogin/saml2/metadata.py index bd839c58..34c5f2c1 100644 --- a/src/onelogin/saml2/metadata.py +++ b/src/onelogin/saml2/metadata.py @@ -218,7 +218,7 @@ def sign_metadata(metadata, key, cert, sign_algorithm=OneLogin_Saml2_Constants.R return OneLogin_Saml2_Utils.add_sign(metadata, key, cert, False, sign_algorithm, digest_algorithm) @staticmethod - def __add_x509_key_descriptors(root, cert, signing): + def _add_x509_key_descriptors(root, cert, signing): key_descriptor = OneLogin_Saml2_XML.make_child(root, '{%s}KeyDescriptor' % OneLogin_Saml2_Constants.NS_MD) root.remove(key_descriptor) root.insert(0, key_descriptor) @@ -261,6 +261,6 @@ def add_x509_key_descriptors(cls, metadata, cert=None, add_encryption=True): raise Exception('Malformed metadata.') if add_encryption: - cls.__add_x509_key_descriptors(sp_sso_descriptor, cert, False) - cls.__add_x509_key_descriptors(sp_sso_descriptor, cert, True) + cls._add_x509_key_descriptors(sp_sso_descriptor, cert, False) + cls._add_x509_key_descriptors(sp_sso_descriptor, cert, True) return OneLogin_Saml2_XML.to_string(root) diff --git a/src/onelogin/saml2/response.py b/src/onelogin/saml2/response.py index d69242d3..7d372ab6 100644 --- a/src/onelogin/saml2/response.py +++ b/src/onelogin/saml2/response.py @@ -33,8 +33,8 @@ def __init__(self, settings, response): :param response: The base64 encoded, XML string containing the samlp:Response :type response: string """ - self.__settings = settings - self.__error = None + self._settings = settings + self._error = None self.response = OneLogin_Saml2_Utils.b64decode(response) self.document = OneLogin_Saml2_XML.to_etree(self.response) self.decrypted_document = None @@ -42,11 +42,11 @@ def __init__(self, settings, response): self.valid_scd_not_on_or_after = None # Quick check for the presence of EncryptedAssertion - encrypted_assertion_nodes = self.__query('/samlp:Response/saml:EncryptedAssertion') + encrypted_assertion_nodes = self._query('/samlp:Response/saml:EncryptedAssertion') if encrypted_assertion_nodes: decrypted_document = deepcopy(self.document) self.encrypted = True - self.decrypted_document = self.__decrypt_assertion(decrypted_document) + self.decrypted_document = self._decrypt_assertion(decrypted_document) def is_valid(self, request_data, request_id=None, raise_exceptions=False): """ @@ -64,7 +64,7 @@ def is_valid(self, request_data, request_id=None, raise_exceptions=False): :returns: True if the SAML Response is valid, False if not :rtype: bool """ - self.__error = None + self._error = None try: # Checks SAML version if self.document.get('Version', None) != '2.0': @@ -90,9 +90,9 @@ def is_valid(self, request_data, request_id=None, raise_exceptions=False): OneLogin_Saml2_ValidationError.WRONG_NUMBER_OF_ASSERTIONS ) - idp_data = self.__settings.get_idp_data() + idp_data = self._settings.get_idp_data() idp_entity_id = idp_data['entityId'] - sp_data = self.__settings.get_sp_data() + sp_data = self._settings.get_sp_data() sp_entity_id = sp_data['entityId'] signed_elements = self.process_signed_elements() @@ -100,9 +100,9 @@ def is_valid(self, request_data, request_id=None, raise_exceptions=False): has_signed_response = '{%s}Response' % OneLogin_Saml2_Constants.NS_SAMLP in signed_elements has_signed_assertion = '{%s}Assertion' % OneLogin_Saml2_Constants.NS_SAML in signed_elements - if self.__settings.is_strict(): + if self._settings.is_strict(): no_valid_xml_msg = 'Invalid SAML Response. Not match the saml-schema-protocol-2.0.xsd' - res = OneLogin_Saml2_XML.validate_xml(self.document, 'saml-schema-protocol-2.0.xsd', self.__settings.is_debug_active()) + res = OneLogin_Saml2_XML.validate_xml(self.document, 'saml-schema-protocol-2.0.xsd', self._settings.is_debug_active()) if isinstance(res, str): raise OneLogin_Saml2_ValidationError( no_valid_xml_msg, @@ -111,14 +111,14 @@ def is_valid(self, request_data, request_id=None, raise_exceptions=False): # If encrypted, check also the decrypted document if self.encrypted: - res = OneLogin_Saml2_XML.validate_xml(self.decrypted_document, 'saml-schema-protocol-2.0.xsd', self.__settings.is_debug_active()) + res = OneLogin_Saml2_XML.validate_xml(self.decrypted_document, 'saml-schema-protocol-2.0.xsd', self._settings.is_debug_active()) if isinstance(res, str): raise OneLogin_Saml2_ValidationError( no_valid_xml_msg, OneLogin_Saml2_ValidationError.INVALID_XML_FORMAT ) - security = self.__settings.get_security_data() + security = self._settings.get_security_data() current_url = OneLogin_Saml2_Utils.get_self_url_no_query(request_data) # Check if the InResponseTo of the Response matchs the ID of the AuthNRequest (requestId) if provided @@ -137,7 +137,7 @@ def is_valid(self, request_data, request_id=None, raise_exceptions=False): ) if security['wantNameIdEncrypted']: - encrypted_nameid_nodes = self.__query_assertion('/saml:Subject/saml:EncryptedID/xenc:EncryptedData') + encrypted_nameid_nodes = self._query_assertion('/saml:Subject/saml:EncryptedID/xenc:EncryptedData') if len(encrypted_nameid_nodes) != 1: raise OneLogin_Saml2_ValidationError( 'The NameID of the Response is not encrypted and the SP require it', @@ -174,14 +174,14 @@ def is_valid(self, request_data, request_id=None, raise_exceptions=False): ) # Checks that there is at least one AttributeStatement if required - attribute_statement_nodes = self.__query_assertion('/saml:AttributeStatement') + attribute_statement_nodes = self._query_assertion('/saml:AttributeStatement') if security.get('wantAttributeStatement', True) and not attribute_statement_nodes: raise OneLogin_Saml2_ValidationError( 'There is no AttributeStatement on the Response', OneLogin_Saml2_ValidationError.NO_ATTRIBUTESTATEMENT ) - encrypted_attributes_nodes = self.__query_assertion('/saml:AttributeStatement/saml:EncryptedAttribute') + encrypted_attributes_nodes = self._query_assertion('/saml:AttributeStatement/saml:EncryptedAttribute') if encrypted_attributes_nodes: raise OneLogin_Saml2_ValidationError( 'There is an EncryptedAttribute in the Response and this SP not support them', @@ -236,7 +236,7 @@ def is_valid(self, request_data, request_id=None, raise_exceptions=False): # Checks the SubjectConfirmation, at least one SubjectConfirmation must be valid any_subject_confirmation = False - subject_confirmation_nodes = self.__query_assertion('/saml:Subject/saml:SubjectConfirmation') + subject_confirmation_nodes = self._query_assertion('/saml:Subject/saml:SubjectConfirmation') for scn in subject_confirmation_nodes: method = scn.get('Method', None) @@ -293,7 +293,7 @@ def is_valid(self, request_data, request_id=None, raise_exceptions=False): OneLogin_Saml2_ValidationError.NO_SIGNATURE_FOUND ) else: - cert = self.__settings.get_idp_cert() + cert = self._settings.get_idp_cert() fingerprint = idp_data.get('certFingerprint', None) if fingerprint: fingerprint = OneLogin_Saml2_Utils.format_finger_print(fingerprint) @@ -319,8 +319,8 @@ def is_valid(self, request_data, request_id=None, raise_exceptions=False): return True except Exception as err: - self.__error = str(err) - debug = self.__settings.is_debug_active() + self._error = str(err) + debug = self._settings.is_debug_active() if debug: print(err) if raise_exceptions: @@ -351,7 +351,7 @@ def check_one_condition(self): """ Checks that the samlp:Response/saml:Assertion/saml:Conditions element exists and is unique. """ - condition_nodes = self.__query_assertion('/saml:Conditions') + condition_nodes = self._query_assertion('/saml:Conditions') if len(condition_nodes) == 1: return True else: @@ -361,7 +361,7 @@ def check_one_authnstatement(self): """ Checks that the samlp:Response/saml:Assertion/saml:AuthnStatement element exists and is unique. """ - authnstatement_nodes = self.__query_assertion('/saml:AuthnStatement') + authnstatement_nodes = self._query_assertion('/saml:AuthnStatement') if len(authnstatement_nodes) == 1: return True else: @@ -374,7 +374,7 @@ def get_audiences(self): :returns: The valid audiences for the SAML Response :rtype: list """ - audience_nodes = self.__query_assertion('/saml:Conditions/saml:AudienceRestriction/saml:Audience') + audience_nodes = self._query_assertion('/saml:Conditions/saml:AudienceRestriction/saml:Audience') return [OneLogin_Saml2_XML.element_text(node) for node in audience_nodes if OneLogin_Saml2_XML.element_text(node) is not None] def get_authn_contexts(self): @@ -384,7 +384,7 @@ def get_authn_contexts(self): :returns: The authentication classes for the SAML Response :rtype: list """ - authn_context_nodes = self.__query_assertion('/saml:AuthnStatement/saml:AuthnContext/saml:AuthnContextClassRef') + authn_context_nodes = self._query_assertion('/saml:AuthnStatement/saml:AuthnContext/saml:AuthnContextClassRef') return [OneLogin_Saml2_XML.element_text(node) for node in authn_context_nodes] def get_in_response_to(self): @@ -416,7 +416,7 @@ def get_issuers(self): OneLogin_Saml2_ValidationError.ISSUER_MULTIPLE_IN_RESPONSE ) - assertion_issuer_nodes = self.__query_assertion('/saml:Issuer') + assertion_issuer_nodes = self._query_assertion('/saml:Issuer') if len(assertion_issuer_nodes) == 1: issuer_value = OneLogin_Saml2_XML.element_text(assertion_issuer_nodes[0]) if issuer_value: @@ -439,18 +439,18 @@ def get_nameid_data(self): nameid = None nameid_data = {} - encrypted_id_data_nodes = self.__query_assertion('/saml:Subject/saml:EncryptedID/xenc:EncryptedData') + encrypted_id_data_nodes = self._query_assertion('/saml:Subject/saml:EncryptedID/xenc:EncryptedData') if encrypted_id_data_nodes: encrypted_data = encrypted_id_data_nodes[0] - key = self.__settings.get_sp_key() + key = self._settings.get_sp_key() nameid = OneLogin_Saml2_Utils.decrypt_element(encrypted_data, key) else: - nameid_nodes = self.__query_assertion('/saml:Subject/saml:NameID') + nameid_nodes = self._query_assertion('/saml:Subject/saml:NameID') if nameid_nodes: nameid = nameid_nodes[0] - is_strict = self.__settings.is_strict() - want_nameid = self.__settings.get_security_data().get('wantNameId', True) + is_strict = self._settings.is_strict() + want_nameid = self._settings.get_security_data().get('wantNameId', True) if nameid is None: if is_strict and want_nameid: raise OneLogin_Saml2_ValidationError( @@ -469,7 +469,7 @@ def get_nameid_data(self): value = nameid.get(attr, None) if value: if is_strict and attr == 'SPNameQualifier': - sp_data = self.__settings.get_sp_data() + sp_data = self._settings.get_sp_data() sp_entity_id = sp_data.get('entityId', '') if sp_entity_id != value: raise OneLogin_Saml2_ValidationError( @@ -541,7 +541,7 @@ def get_session_not_on_or_after(self): :rtype: time|None """ not_on_or_after = None - authn_statement_nodes = self.__query_assertion('/saml:AuthnStatement[@SessionNotOnOrAfter]') + authn_statement_nodes = self._query_assertion('/saml:AuthnStatement[@SessionNotOnOrAfter]') if authn_statement_nodes: not_on_or_after = OneLogin_Saml2_Utils.parse_SAML_to_time(authn_statement_nodes[0].get('SessionNotOnOrAfter')) return not_on_or_after @@ -563,7 +563,7 @@ def get_session_index(self): :rtype: string|None """ session_index = None - authn_statement_nodes = self.__query_assertion('/saml:AuthnStatement[@SessionIndex]') + authn_statement_nodes = self._query_assertion('/saml:AuthnStatement[@SessionIndex]') if authn_statement_nodes: session_index = authn_statement_nodes[0].get('SessionIndex') return session_index @@ -583,9 +583,9 @@ def get_friendlyname_attributes(self): return self._get_attributes('FriendlyName') def _get_attributes(self, attr_name): - allow_duplicates = self.__settings.get_security_data().get('allowRepeatAttributeName', False) + allow_duplicates = self._settings.get_security_data().get('allowRepeatAttributeName', False) attributes = {} - attribute_nodes = self.__query_assertion('/saml:AttributeStatement/saml:Attribute') + attribute_nodes = self._query_assertion('/saml:AttributeStatement/saml:Attribute') for attribute_node in attribute_nodes: attr_key = attribute_node.get(attr_name) if attr_key: @@ -645,7 +645,7 @@ def process_signed_elements(self): :returns: The signed elements tag names :rtype: list """ - sign_nodes = self.__query('//ds:Signature') + sign_nodes = self._query('//ds:Signature') signed_elements = [] verified_seis = [] @@ -737,7 +737,7 @@ def validate_signed_elements(self, signed_elements): ) if assertion_tag in signed_elements: - expected_signature_nodes = self.__query(OneLogin_Saml2_Utils.ASSERTION_SIGNATURE_XPATH) + expected_signature_nodes = self._query(OneLogin_Saml2_Utils.ASSERTION_SIGNATURE_XPATH) if len(expected_signature_nodes) != 1: raise OneLogin_Saml2_ValidationError( 'Unexpected number of Assertion signatures found. SAML Response rejected.', @@ -754,7 +754,7 @@ def validate_timestamps(self): :returns: True if the condition is valid, False otherwise :rtype: bool """ - conditions_nodes = self.__query_assertion('/saml:Conditions') + conditions_nodes = self._query_assertion('/saml:Conditions') for conditions_node in conditions_nodes: nb_attr = conditions_node.get('NotBefore') @@ -771,7 +771,7 @@ def validate_timestamps(self): ) return True - def __query_assertion(self, xpath_expr): + def _query_assertion(self, xpath_expr): """ Extracts nodes that match the query from the Assertion @@ -785,13 +785,13 @@ def __query_assertion(self, xpath_expr): assertion_expr = '/saml:Assertion' signature_expr = '/ds:Signature/ds:SignedInfo/ds:Reference' signed_assertion_query = '/samlp:Response' + assertion_expr + signature_expr - assertion_reference_nodes = self.__query(signed_assertion_query) + assertion_reference_nodes = self._query(signed_assertion_query) tagid = None if not assertion_reference_nodes: # Check if the message is signed signed_message_query = '/samlp:Response' + signature_expr - message_reference_nodes = self.__query(signed_message_query) + message_reference_nodes = self._query(signed_message_query) if message_reference_nodes: message_id = message_reference_nodes[0].get('URI') final_query = "/samlp:Response[@ID=$tagid]/" @@ -804,9 +804,9 @@ def __query_assertion(self, xpath_expr): final_query = '/samlp:Response' + assertion_expr + "[@ID=$tagid]" tagid = assertion_id[1:] final_query += xpath_expr - return self.__query(final_query, tagid) + return self._query(final_query, tagid) - def __query(self, query, tagid=None): + def _query(self, query, tagid=None): """ Extracts nodes that match the query from the Response @@ -825,7 +825,7 @@ def __query(self, query, tagid=None): document = self.document return OneLogin_Saml2_XML.query(document, query, None, tagid) - def __decrypt_assertion(self, xml): + def _decrypt_assertion(self, xml): """ Decrypts the Assertion @@ -835,8 +835,8 @@ def __decrypt_assertion(self, xml): :returns: Decrypted Assertion :rtype: Element """ - key = self.__settings.get_sp_key() - debug = self.__settings.is_debug_active() + key = self._settings.get_sp_key() + debug = self._settings.is_debug_active() if not key: raise OneLogin_Saml2_Error( @@ -885,7 +885,7 @@ def get_error(self): """ After executing a validation process, if it fails this method returns the cause """ - return self.__error + return self._error def get_xml_document(self): """ @@ -916,4 +916,4 @@ def get_assertion_id(self): 'SAML Response must contain 1 assertion', OneLogin_Saml2_ValidationError.WRONG_NUMBER_OF_ASSERTIONS ) - return self.__query_assertion('')[0].get('ID', None) + return self._query_assertion('')[0].get('ID', None) diff --git a/src/onelogin/saml2/settings.py b/src/onelogin/saml2/settings.py index 38e79041..3163995e 100644 --- a/src/onelogin/saml2/settings.py +++ b/src/onelogin/saml2/settings.py @@ -98,37 +98,37 @@ def __init__(self, settings=None, custom_base_path=None, sp_validation_only=Fals :param sp_validation_only: Avoid the IdP validation :type sp_validation_only: boolean """ - self.__sp_validation_only = sp_validation_only - self.__paths = {} - self.__strict = True - self.__debug = False - self.__sp = {} - self.__idp = {} - self.__security = {} - self.__contacts = {} - self.__organization = {} - self.__errors = [] - - self.__load_paths(base_path=custom_base_path) - self.__update_paths(settings) + self._sp_validation_only = sp_validation_only + self._paths = {} + self._strict = True + self._debug = False + self._sp = {} + self._idp = {} + self._security = {} + self._contacts = {} + self._organization = {} + self._errors = [] + + self._load_paths(base_path=custom_base_path) + self._update_paths(settings) if settings is None: try: - valid = self.__load_settings_from_file() + valid = self._load_settings_from_file() except Exception as e: raise e if not valid: raise OneLogin_Saml2_Error( 'Invalid dict settings at the file: %s', OneLogin_Saml2_Error.SETTINGS_INVALID, - ','.join(self.__errors) + ','.join(self._errors) ) elif isinstance(settings, dict): - if not self.__load_settings_from_dict(settings): + if not self._load_settings_from_dict(settings): raise OneLogin_Saml2_Error( 'Invalid dict settings: %s', OneLogin_Saml2_Error.SETTINGS_INVALID, - ','.join(self.__errors) + ','.join(self._errors) ) else: raise OneLogin_Saml2_Error( @@ -137,14 +137,14 @@ def __init__(self, settings=None, custom_base_path=None, sp_validation_only=Fals ) self.format_idp_cert() - if 'x509certMulti' in self.__idp: + if 'x509certMulti' in self._idp: self.format_idp_cert_multi() self.format_sp_cert() - if 'x509certNew' in self.__sp: + if 'x509certNew' in self._sp: self.format_sp_cert_new() self.format_sp_key() - def __load_paths(self, base_path=None): + def _load_paths(self, base_path=None): """ Set the paths of the different folders """ @@ -152,13 +152,13 @@ def __load_paths(self, base_path=None): base_path = dirname(dirname(dirname(__file__))) if not base_path.endswith(sep): base_path += sep - self.__paths = { + self._paths = { 'base': base_path, 'cert': base_path + 'certs' + sep, 'lib': dirname(__file__) + sep } - def __update_paths(self, settings): + def _update_paths(self, settings): """ Set custom paths if necessary """ @@ -168,7 +168,7 @@ def __update_paths(self, settings): if 'custom_base_path' in settings: base_path = settings['custom_base_path'] base_path = join(dirname(__file__), base_path) - self.__load_paths(base_path) + self._load_paths(base_path) def get_base_path(self): """ @@ -177,7 +177,7 @@ def get_base_path(self): :return: The base toolkit folder path :rtype: string """ - return self.__paths['base'] + return self._paths['base'] def get_cert_path(self): """ @@ -186,13 +186,13 @@ def get_cert_path(self): :return: The cert folder path :rtype: string """ - return self.__paths['cert'] + return self._paths['cert'] def set_cert_path(self, path): """ Set a new cert path """ - self.__paths['cert'] = path + self._paths['cert'] = path def get_lib_path(self): """ @@ -201,7 +201,7 @@ def get_lib_path(self): :return: The library folder path :rtype: string """ - return self.__paths['lib'] + return self._paths['lib'] def get_schemas_path(self): """ @@ -210,9 +210,9 @@ def get_schemas_path(self): :return: The schema folder path :rtype: string """ - return self.__paths['lib'] + 'schemas/' + return self._paths['lib'] + 'schemas/' - def __load_settings_from_dict(self, settings): + def _load_settings_from_dict(self, settings): """ Loads settings info from a settings Dict @@ -224,22 +224,22 @@ def __load_settings_from_dict(self, settings): """ errors = self.check_settings(settings) if len(errors) == 0: - self.__errors = [] - self.__sp = settings['sp'] - self.__idp = settings.get('idp', {}) - self.__strict = settings.get('strict', True) - self.__debug = settings.get('debug', False) - self.__security = settings.get('security', {}) - self.__contacts = settings.get('contactPerson', {}) - self.__organization = settings.get('organization', {}) - - self.__add_default_values() + self._errors = [] + self._sp = settings['sp'] + self._idp = settings.get('idp', {}) + self._strict = settings.get('strict', True) + self._debug = settings.get('debug', False) + self._security = settings.get('security', {}) + self._contacts = settings.get('contactPerson', {}) + self._organization = settings.get('organization', {}) + + self._add_default_values() return True - self.__errors = errors + self._errors = errors return False - def __load_settings_from_file(self): + def _load_settings_from_file(self): """ Loads settings info from the settings json file @@ -265,69 +265,69 @@ def __load_settings_from_file(self): with open(advanced_filename, 'r') as json_data: settings.update(json.loads(json_data.read())) # Merge settings - return self.__load_settings_from_dict(settings) + return self._load_settings_from_dict(settings) - def __add_default_values(self): + def _add_default_values(self): """ Add default values if the settings info is not complete """ - self.__sp.setdefault('assertionConsumerService', {}) - self.__sp['assertionConsumerService'].setdefault('binding', OneLogin_Saml2_Constants.BINDING_HTTP_POST) + self._sp.setdefault('assertionConsumerService', {}) + self._sp['assertionConsumerService'].setdefault('binding', OneLogin_Saml2_Constants.BINDING_HTTP_POST) - self.__sp.setdefault('attributeConsumingService', {}) + self._sp.setdefault('attributeConsumingService', {}) - self.__sp.setdefault('singleLogoutService', {}) - self.__sp['singleLogoutService'].setdefault('binding', OneLogin_Saml2_Constants.BINDING_HTTP_REDIRECT) + self._sp.setdefault('singleLogoutService', {}) + self._sp['singleLogoutService'].setdefault('binding', OneLogin_Saml2_Constants.BINDING_HTTP_REDIRECT) - self.__idp.setdefault('singleLogoutService', {}) + self._idp.setdefault('singleLogoutService', {}) # Related to nameID - self.__sp.setdefault('NameIDFormat', OneLogin_Saml2_Constants.NAMEID_UNSPECIFIED) - self.__security.setdefault('nameIdEncrypted', False) + self._sp.setdefault('NameIDFormat', OneLogin_Saml2_Constants.NAMEID_UNSPECIFIED) + self._security.setdefault('nameIdEncrypted', False) # Metadata format - self.__security.setdefault('metadataValidUntil', None) # None means use default - self.__security.setdefault('metadataCacheDuration', None) # None means use default + self._security.setdefault('metadataValidUntil', None) # None means use default + self._security.setdefault('metadataCacheDuration', None) # None means use default # Sign provided - self.__security.setdefault('authnRequestsSigned', False) - self.__security.setdefault('logoutRequestSigned', False) - self.__security.setdefault('logoutResponseSigned', False) - self.__security.setdefault('signMetadata', False) + self._security.setdefault('authnRequestsSigned', False) + self._security.setdefault('logoutRequestSigned', False) + self._security.setdefault('logoutResponseSigned', False) + self._security.setdefault('signMetadata', False) # Sign expected - self.__security.setdefault('wantMessagesSigned', False) - self.__security.setdefault('wantAssertionsSigned', False) + self._security.setdefault('wantMessagesSigned', False) + self._security.setdefault('wantAssertionsSigned', False) # NameID element expected - self.__security.setdefault('wantNameId', True) + self._security.setdefault('wantNameId', True) # Encrypt expected - self.__security.setdefault('wantAssertionsEncrypted', False) - self.__security.setdefault('wantNameIdEncrypted', False) + self._security.setdefault('wantAssertionsEncrypted', False) + self._security.setdefault('wantNameIdEncrypted', False) # Signature Algorithm - self.__security.setdefault('signatureAlgorithm', OneLogin_Saml2_Constants.RSA_SHA1) + self._security.setdefault('signatureAlgorithm', OneLogin_Saml2_Constants.RSA_SHA1) # Digest Algorithm - self.__security.setdefault('digestAlgorithm', OneLogin_Saml2_Constants.SHA1) + self._security.setdefault('digestAlgorithm', OneLogin_Saml2_Constants.SHA1) # AttributeStatement required by default - self.__security.setdefault('wantAttributeStatement', True) + self._security.setdefault('wantAttributeStatement', True) # Disallow duplicate attribute names by default - self.__security.setdefault('allowRepeatAttributeName', False) + self._security.setdefault('allowRepeatAttributeName', False) - self.__idp.setdefault('x509cert', '') - self.__idp.setdefault('certFingerprint', '') - self.__idp.setdefault('certFingerprintAlgorithm', 'sha1') + self._idp.setdefault('x509cert', '') + self._idp.setdefault('certFingerprint', '') + self._idp.setdefault('certFingerprintAlgorithm', 'sha1') - self.__sp.setdefault('x509cert', '') - self.__sp.setdefault('privateKey', '') + self._sp.setdefault('x509cert', '') + self._sp.setdefault('privateKey', '') - self.__security.setdefault('requestedAuthnContext', True) - self.__security.setdefault('requestedAuthnContextComparison', 'exact') - self.__security.setdefault('failOnAuthnContextMismatch', False) + self._security.setdefault('requestedAuthnContext', True) + self._security.setdefault('requestedAuthnContextComparison', 'exact') + self._security.setdefault('failOnAuthnContextMismatch', False) def check_settings(self, settings): """ @@ -345,7 +345,7 @@ def check_settings(self, settings): if not isinstance(settings, dict) or len(settings) == 0: errors.append('invalid_syntax') else: - if not self.__sp_validation_only: + if not self._sp_validation_only: errors += self.check_idp_settings(settings) sp_errors = self.check_sp_settings(settings) errors += sp_errors @@ -425,9 +425,9 @@ def check_sp_settings(self, settings): errors.append('sp_not_found') else: allow_single_domain_urls = self._get_allow_single_label_domain(settings) - # check_sp_certs uses self.__sp so I add it - old_sp = self.__sp - self.__sp = settings['sp'] + # check_sp_certs uses self._sp so I add it + old_sp = self._sp + self._sp = settings['sp'] sp = settings['sp'] security = settings.get('security', {}) @@ -508,9 +508,9 @@ def check_sp_settings(self, settings): ('url' not in organization or len(organization['url']) == 0): errors.append('organization_not_enought_data') break - # Restores the value that had the self.__sp + # Restores the value that had the self._sp if 'old_sp' in locals(): - self.__sp = old_sp + self._sp = old_sp return errors @@ -562,8 +562,8 @@ def get_sp_key(self): :returns: SP private key :rtype: string or None """ - key = self.__sp.get('privateKey') - key_file_name = self.__paths['cert'] + 'sp.key' + key = self._sp.get('privateKey') + key_file_name = self._paths['cert'] + 'sp.key' if not key and exists(key_file_name): with open(key_file_name) as f: @@ -577,8 +577,8 @@ def get_sp_cert(self): :returns: SP public cert :rtype: string or None """ - cert = self.__sp.get('x509cert') - cert_file_name = self.__paths['cert'] + 'sp.crt' + cert = self._sp.get('x509cert') + cert_file_name = self._paths['cert'] + 'sp.crt' if not cert and exists(cert_file_name): with open(cert_file_name) as f: @@ -593,8 +593,8 @@ def get_sp_cert_new(self): :returns: SP public cert new :rtype: string or None """ - cert = self.__sp.get('x509certNew') - cert_file_name = self.__paths['cert'] + 'sp_new.crt' + cert = self._sp.get('x509certNew') + cert_file_name = self._paths['cert'] + 'sp_new.crt' if not cert and exists(cert_file_name): with open(cert_file_name) as f: @@ -608,7 +608,7 @@ def get_idp_cert(self): :returns: IdP public cert :rtype: string """ - cert = self.__idp.get('x509cert') + cert = self._idp.get('x509cert') cert_file_name = self.get_cert_path() + 'idp.crt' if not cert and exists(cert_file_name): with open(cert_file_name) as f: @@ -622,7 +622,7 @@ def get_idp_data(self): :returns: IdP info :rtype: dict """ - return self.__idp + return self._idp def get_sp_data(self): """ @@ -631,7 +631,7 @@ def get_sp_data(self): :returns: SP info :rtype: dict """ - return self.__sp + return self._sp def get_security_data(self): """ @@ -640,7 +640,7 @@ def get_security_data(self): :returns: Security info :rtype: dict """ - return self.__security + return self._security def get_contacts(self): """ @@ -649,7 +649,7 @@ def get_contacts(self): :returns: Contacts info :rtype: dict """ - return self.__contacts + return self._contacts def get_organization(self): """ @@ -658,7 +658,7 @@ def get_organization(self): :returns: Organization info :rtype: dict """ - return self.__organization + return self._organization def get_sp_metadata(self): """ @@ -667,14 +667,14 @@ def get_sp_metadata(self): :rtype: string """ metadata = self.metadata_class.builder( - self.__sp, self.__security['authnRequestsSigned'], - self.__security['wantAssertionsSigned'], - self.__security['metadataValidUntil'], - self.__security['metadataCacheDuration'], + self._sp, self._security['authnRequestsSigned'], + self._security['wantAssertionsSigned'], + self._security['metadataValidUntil'], + self._security['metadataCacheDuration'], self.get_contacts(), self.get_organization() ) - add_encryption = self.__security['wantNameIdEncrypted'] or self.__security['wantAssertionsEncrypted'] + add_encryption = self._security['wantNameIdEncrypted'] or self._security['wantAssertionsEncrypted'] cert_new = self.get_sp_cert_new() metadata = self.metadata_class.add_x509_key_descriptors(metadata, cert_new, add_encryption) @@ -683,8 +683,8 @@ def get_sp_metadata(self): metadata = self.metadata_class.add_x509_key_descriptors(metadata, cert, add_encryption) # Sign metadata - if 'signMetadata' in self.__security and self.__security['signMetadata'] is not False: - if self.__security['signMetadata'] is True: + if 'signMetadata' in self._security and self._security['signMetadata'] is not False: + if self._security['signMetadata'] is True: # Use the SP's normal key to sign the metadata: if not cert: raise OneLogin_Saml2_Error( @@ -700,16 +700,16 @@ def get_sp_metadata(self): ) else: # Use a custom key to sign the metadata: - if ('keyFileName' not in self.__security['signMetadata'] or - 'certFileName' not in self.__security['signMetadata']): + if ('keyFileName' not in self._security['signMetadata'] or + 'certFileName' not in self._security['signMetadata']): raise OneLogin_Saml2_Error( 'Invalid Setting: signMetadata value of the sp is not valid', OneLogin_Saml2_Error.SETTINGS_INVALID_SYNTAX ) - key_file_name = self.__security['signMetadata']['keyFileName'] - cert_file_name = self.__security['signMetadata']['certFileName'] - key_metadata_file = self.__paths['cert'] + key_file_name - cert_metadata_file = self.__paths['cert'] + cert_file_name + key_file_name = self._security['signMetadata']['keyFileName'] + cert_file_name = self._security['signMetadata']['certFileName'] + key_metadata_file = self._paths['cert'] + key_file_name + cert_metadata_file = self._paths['cert'] + cert_file_name try: with open(key_metadata_file, 'r') as f_metadata_key: @@ -731,8 +731,8 @@ def get_sp_metadata(self): cert_metadata_file ) - signature_algorithm = self.__security['signatureAlgorithm'] - digest_algorithm = self.__security['digestAlgorithm'] + signature_algorithm = self._security['signatureAlgorithm'] + digest_algorithm = self._security['digestAlgorithm'] metadata = self.metadata_class.sign_metadata(metadata, key_metadata, cert_metadata, signature_algorithm, digest_algorithm) @@ -755,7 +755,7 @@ def validate_metadata(self, xml): raise Exception('Empty string supplied as input') errors = [] - root = OneLogin_Saml2_XML.validate_xml(xml, 'saml-schema-metadata-2.0.xsd', self.__debug) + root = OneLogin_Saml2_XML.validate_xml(xml, 'saml-schema-metadata-2.0.xsd', self._debug) if isinstance(root, str): errors.append(root) else: @@ -781,38 +781,38 @@ def format_idp_cert(self): """ Formats the IdP cert. """ - self.__idp['x509cert'] = OneLogin_Saml2_Utils.format_cert(self.__idp['x509cert']) + self._idp['x509cert'] = OneLogin_Saml2_Utils.format_cert(self._idp['x509cert']) def format_idp_cert_multi(self): """ Formats the Multple IdP certs. """ - if 'x509certMulti' in self.__idp: - if 'signing' in self.__idp['x509certMulti']: - for idx in range(len(self.__idp['x509certMulti']['signing'])): - self.__idp['x509certMulti']['signing'][idx] = OneLogin_Saml2_Utils.format_cert(self.__idp['x509certMulti']['signing'][idx]) + if 'x509certMulti' in self._idp: + if 'signing' in self._idp['x509certMulti']: + for idx in range(len(self._idp['x509certMulti']['signing'])): + self._idp['x509certMulti']['signing'][idx] = OneLogin_Saml2_Utils.format_cert(self._idp['x509certMulti']['signing'][idx]) - if 'encryption' in self.__idp['x509certMulti']: - for idx in range(len(self.__idp['x509certMulti']['encryption'])): - self.__idp['x509certMulti']['encryption'][idx] = OneLogin_Saml2_Utils.format_cert(self.__idp['x509certMulti']['encryption'][idx]) + if 'encryption' in self._idp['x509certMulti']: + for idx in range(len(self._idp['x509certMulti']['encryption'])): + self._idp['x509certMulti']['encryption'][idx] = OneLogin_Saml2_Utils.format_cert(self._idp['x509certMulti']['encryption'][idx]) def format_sp_cert(self): """ Formats the SP cert. """ - self.__sp['x509cert'] = OneLogin_Saml2_Utils.format_cert(self.__sp['x509cert']) + self._sp['x509cert'] = OneLogin_Saml2_Utils.format_cert(self._sp['x509cert']) def format_sp_cert_new(self): """ Formats the SP cert. """ - self.__sp['x509certNew'] = OneLogin_Saml2_Utils.format_cert(self.__sp['x509certNew']) + self._sp['x509certNew'] = OneLogin_Saml2_Utils.format_cert(self._sp['x509certNew']) def format_sp_key(self): """ Formats the private key. """ - self.__sp['privateKey'] = OneLogin_Saml2_Utils.format_private_key(self.__sp['privateKey']) + self._sp['privateKey'] = OneLogin_Saml2_Utils.format_private_key(self._sp['privateKey']) def get_errors(self): """ @@ -821,7 +821,7 @@ def get_errors(self): :returns: Errors :rtype: list """ - return self.__errors + return self._errors def set_strict(self, value): """ @@ -832,7 +832,7 @@ def set_strict(self, value): """ assert isinstance(value, bool) - self.__strict = value + self._strict = value def is_strict(self): """ @@ -841,7 +841,7 @@ def is_strict(self): :returns: Strict parameter :rtype: boolean """ - return self.__strict + return self._strict def is_debug_active(self): """ @@ -850,7 +850,7 @@ def is_debug_active(self): :returns: Debug parameter :rtype: boolean """ - return self.__debug + return self._debug def _get_allow_single_label_domain(self, settings): security = settings.get('security', {}) diff --git a/tests/src/OneLogin/saml2_tests/authn_request_test.py b/tests/src/OneLogin/saml2_tests/authn_request_test.py index b23c633a..cbf225a1 100644 --- a/tests/src/OneLogin/saml2_tests/authn_request_test.py +++ b/tests/src/OneLogin/saml2_tests/authn_request_test.py @@ -52,7 +52,7 @@ def testCreateRequest(self): saml_settings = self.loadSettingsJSON() settings = OneLogin_Saml2_Settings(saml_settings) - settings._OneLogin_Saml2_Settings__organization = { + settings._organization = { u'en-US': { u'url': u'http://sp.example.com', u'name': u'sp_test'