From cbb6492ebe7c1ce9778c2ffbd42cb9fe417f541f Mon Sep 17 00:00:00 2001 From: cockroacher <163405488+cockroacher@users.noreply.github.com> Date: Fri, 26 Apr 2024 16:02:18 +0200 Subject: [PATCH 1/6] pylint refactoring --- tests/email_validator.py | 140 ++++++++++++++++++++++----------------- 1 file changed, 81 insertions(+), 59 deletions(-) diff --git a/tests/email_validator.py b/tests/email_validator.py index 53a451c8..8d324db3 100644 --- a/tests/email_validator.py +++ b/tests/email_validator.py @@ -27,7 +27,7 @@ # We are doing this to support IPv6 class SmtpWebperf(smtplib.SMTP): # pylint: disable=too-many-instance-attributes - def __init__(self, host='', port=0, local_hostname=None, # pylint: disable=too-many-arguments + def __init__(self, host='', port=0, local_hostname=None, # pylint: disable=too-many-arguments, super-init-not-called timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None): """Initialize a new instance. @@ -380,7 +380,12 @@ def validate_email_domain(hostname, result_dict, global_translation, local_trans # 1.6 - Check DNSSEC # 1.7 - Check DANE # 1.8 - Check MTA-STS policy - rating = validate_mta_sts_policy(global_translation, rating, local_translation, hostname) + rating = validate_mta_sts_policy( + global_translation, + rating, + result_dict, + local_translation, + hostname) # 1.9 - Check SPF policy rating = validate_spf_policies( global_translation, rating, result_dict, local_translation, hostname) @@ -392,15 +397,11 @@ def validate_email_domain(hostname, result_dict, global_translation, local_trans return rating, result_dict -def validate_mta_sts_policy(global_translation, rating, local_translation, hostname): - has_mta_sts_policy = False - # https://www.rfc-editor.org/rfc/rfc8461#section-3.1 - mta_sts_results = dns_lookup('_mta-sts.' + hostname, dns.rdatatype.TXT) - for result in mta_sts_results: - if 'v=STSv1;' in result: - has_mta_sts_policy = True - - rating += rate_mts_sts_records(global_translation, local_translation, has_mta_sts_policy) +def validate_mta_sts_policy(global_translation, rating, result_dict, local_translation, hostname): + rating += rate_mts_sts_records( + global_translation, + local_translation, + has_dns_mta_sts_policy(hostname)) # https://mta-sts.example.com/.well-known/mta-sts.txt content = get_http_content( @@ -415,12 +416,13 @@ def validate_mta_sts_policy(global_translation, rating, local_translation, hostn # mx: mail1.polisen.se # mx: mail2.polisen.se # max_age: 604800 - - is_valid = True - has_version = False - has_mode = False - has_mx = False - has_max_age = False + result_dict['mta-sts.txt'] = { + 'valid': True, + 'has_version': False, + 'has_mode': False, + 'has_mx': False, + 'has_max_age': False + } rows = content.split('\r\n') if len(rows) == 1: @@ -442,48 +444,22 @@ def validate_mta_sts_policy(global_translation, rating, local_translation, hostn key_value_pair = row.split(':') if len(key_value_pair) != 2: print('invalid pair:', key_value_pair) - is_valid = False + result_dict['mta-sts.txt']['valid'] = False continue - key = key_value_pair[0].strip(' ') - value = key_value_pair[1].strip(' ') - - if 'version' in key: - has_version = True - elif 'mode' in key: - if value == 'enforce': - _ = 1 - elif value in ('testing', 'none'): - mta_sts_records_not_enforced_rating = Rating( - global_translation, REVIEW_SHOW_IMPROVEMENTS_ONLY) - mta_sts_records_not_enforced_rating.set_overall(3.0) - mta_sts_records_not_enforced_rating.set_integrity_and_security( - 1.0, local_translation('TEXT_REVIEW_MTA_STS_DNS_RECORD_NOT_ENFORCED')) - mta_sts_records_not_enforced_rating.set_standards( - 5.0, local_translation('TEXT_REVIEW_MTA_STS_DNS_RECORD_VALID_MODE')) - rating += mta_sts_records_not_enforced_rating - else: - mta_sts_records_invalid_mode_rating = Rating( - global_translation, REVIEW_SHOW_IMPROVEMENTS_ONLY) - mta_sts_records_invalid_mode_rating.set_overall(1.0) - mta_sts_records_invalid_mode_rating.set_integrity_and_security( - 1.0, local_translation('TEXT_REVIEW_MTA_STS_DNS_RECORD_INVALID_MODE')) - mta_sts_records_invalid_mode_rating.set_standards( - 1.0, local_translation('TEXT_REVIEW_MTA_STS_DNS_RECORD_INVALID_MODE')) - rating += mta_sts_records_invalid_mode_rating - - has_mode = True - elif 'mx' in key: - has_mx = True - elif 'max_age' in key: - has_max_age = True - else: - print('invalid key:', key) - is_valid = False + rating += handle_mta_sts_txt_row( + key_value_pair, + result_dict, + global_translation, + local_translation) - is_valid = is_valid and has_version and has_mode and has_mx and has_max_age + result_dict['mta-sts.txt']['valid'] = result_dict['mta-sts.txt']['valid'] and\ + result_dict['mta-sts.txt']['has_version'] and\ + result_dict['mta-sts.txt']['has_mode'] and\ + result_dict['mta-sts.txt']['has_mx'] and\ + result_dict['mta-sts.txt']['has_max_age'] - if is_valid: + if result_dict['mta-sts.txt']['valid']: has_mta_sts_txt_rating.set_overall(5.0) has_mta_sts_txt_rating.set_integrity_and_security( 5.0, local_translation('TEXT_REVIEW_MTA_STS_TXT_SUPPORT')) @@ -505,6 +481,54 @@ def validate_mta_sts_policy(global_translation, rating, local_translation, hostn rating += has_mta_sts_txt_rating return rating +def handle_mta_sts_txt_row(key_value_pair, result_dict, global_translation, local_translation): + rating = Rating(global_translation, REVIEW_SHOW_IMPROVEMENTS_ONLY) + key = key_value_pair[0].strip(' ') + value = key_value_pair[1].strip(' ') + + if 'version' in key: + result_dict['mta-sts.txt']['has_version'] = True + elif 'mode' in key: + if value == 'enforce': + _ = 1 + elif value in ('testing', 'none'): + mta_sts_records_not_enforced_rating = Rating( + global_translation, REVIEW_SHOW_IMPROVEMENTS_ONLY) + mta_sts_records_not_enforced_rating.set_overall(3.0) + mta_sts_records_not_enforced_rating.set_integrity_and_security( + 1.0, local_translation('TEXT_REVIEW_MTA_STS_DNS_RECORD_NOT_ENFORCED')) + mta_sts_records_not_enforced_rating.set_standards( + 5.0, local_translation('TEXT_REVIEW_MTA_STS_DNS_RECORD_VALID_MODE')) + rating += mta_sts_records_not_enforced_rating + else: + mta_sts_records_invalid_mode_rating = Rating( + global_translation, REVIEW_SHOW_IMPROVEMENTS_ONLY) + mta_sts_records_invalid_mode_rating.set_overall(1.0) + mta_sts_records_invalid_mode_rating.set_integrity_and_security( + 1.0, local_translation('TEXT_REVIEW_MTA_STS_DNS_RECORD_INVALID_MODE')) + mta_sts_records_invalid_mode_rating.set_standards( + 1.0, local_translation('TEXT_REVIEW_MTA_STS_DNS_RECORD_INVALID_MODE')) + rating += mta_sts_records_invalid_mode_rating + + result_dict['mta-sts.txt']['has_mode'] = True + elif 'mx' in key: + result_dict['mta-sts.txt']['has_mx'] = True + elif 'max_age' in key: + result_dict['mta-sts.txt']['has_max_age'] = True + else: + print('invalid key:', key) + result_dict['mta-sts.txt']['valid'] = False + return rating + +def has_dns_mta_sts_policy(hostname): + has_mta_sts_policy = False + # https://www.rfc-editor.org/rfc/rfc8461#section-3.1 + mta_sts_results = dns_lookup('_mta-sts.' + hostname, dns.rdatatype.TXT) + for result in mta_sts_results: + if 'v=STSv1;' in result: + has_mta_sts_policy = True + return has_mta_sts_policy + def rate_mts_sts_records(global_translation, local_translation, has_mta_sts_policy): has_mta_sts_records_rating = Rating(global_translation, REVIEW_SHOW_IMPROVEMENTS_ONLY) if has_mta_sts_policy: @@ -519,7 +543,6 @@ def rate_mts_sts_records(global_translation, local_translation, has_mta_sts_poli 1.0, local_translation('TEXT_REVIEW_MTA_STS_DNS_RECORD_NO_SUPPORT')) has_mta_sts_records_rating.set_standards( 1.0, local_translation('TEXT_REVIEW_MTA_STS_DNS_RECORD_NO_SUPPORT')) - return has_mta_sts_records_rating @@ -684,7 +707,6 @@ def rate_dmarc_subpolicy(global_translation, result_dict, local_translation): 1.0, local_translation('TEXT_REVIEW_DMARC_SUBPOLICY_NONE')) dmarc_subpolicy_rating.set_standards( 5.0, local_translation('TEXT_REVIEW_DMARC_SUBPOLICY_NONE')) - return dmarc_subpolicy_rating def rate_dmarc_policy(global_translation, result_dict, local_translation): @@ -714,7 +736,7 @@ def rate_dmarc_policy(global_translation, result_dict, local_translation): 1.0, local_translation('TEXT_REVIEW_DMARC_NO_POLICY')) dmarc_policy_rating.set_standards( 1.0, local_translation('TEXT_REVIEW_DMARC_NO_POLICY')) - + return dmarc_policy_rating @@ -1002,7 +1024,7 @@ def handle_dmarc_sp(data, result_dict, local_translation): result_dict['dmarc-errors'].append( local_translation( 'TEXT_REVIEW_DMARC_SUBPOLICY_INVALID')) - + def handle_dmarc_adkim(data, result_dict, local_translation): if data == 'r': result_dict['dmarc-warnings'].append( From 91d61d64e01dfdab7cee48a11f39a7dcbd5d4f27 Mon Sep 17 00:00:00 2001 From: cockroacher <163405488+cockroacher@users.noreply.github.com> Date: Fri, 26 Apr 2024 16:09:20 +0200 Subject: [PATCH 2/6] pylint docstrings --- tests/email_validator.py | 100 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 98 insertions(+), 2 deletions(-) diff --git a/tests/email_validator.py b/tests/email_validator.py index 8d324db3..9100639b 100644 --- a/tests/email_validator.py +++ b/tests/email_validator.py @@ -1160,6 +1160,12 @@ def validate_spf_policy(global_translation, local_translation, hostname, result_ def replace_network_with_first_and_last_ipaddress(spf_addresses): + """ + Replaces network addresses with their first and last IP addresses. + + Args: + spf_addresses (list): List of SPF addresses. + """ networs_to_remove = [] for ip_address in spf_addresses: # support for network mask @@ -1184,6 +1190,18 @@ def replace_network_with_first_and_last_ipaddress(spf_addresses): def validate_ip6_operation_status(global_translation, rating, local_translation, ipv6_servers): + """ + Validates IPv6 server operation status and rates it. + + Args: + global_translation (function): Global text translator. + local_translation (function): Local text translator. + rating (Rating): Initial Rating object. + ipv6_servers (list): List of IPv6 servers. + + Returns: + rating (Rating): Updated Rating object. + """ ipv6_servers_operational = [] # 1.3 - Check Start TLS ipv6_servers_operational_starttls = [] @@ -1230,12 +1248,23 @@ def validate_ip6_operation_status(global_translation, rating, local_translation, def validate_ip4_operation_status(global_translation, rating, local_translation, ipv4_servers): + """ + Validates IPv4 server operation status and rates it. + + Args: + global_translation (function): Global text translator. + local_translation (function): Local text translator. + rating (Rating): Initial Rating object. + ipv4_servers (list): List of IPv4 servers. + + Returns: + rating (Rating): Updated Rating object. + """ ipv4_servers_operational = [] # 1.3 - Check Start TLS ipv4_servers_operational_starttls = [] for ip_address in ipv4_servers: try: - # print('SMTP CONNECT:', ip_address) with SmtpWebperf( ip_address, port=25, @@ -1244,7 +1273,6 @@ def validate_ip4_operation_status(global_translation, rating, local_translation, ipv4_servers_operational.append(ip_address) smtp.starttls() ipv4_servers_operational_starttls.append(ip_address) - # print('SMTP SUCCESS') except smtplib.SMTPConnectError as smtp_error: print('SMTP ERROR: ', smtp_error) @@ -1273,6 +1301,15 @@ def validate_ip4_operation_status(global_translation, rating, local_translation, return rating def get_email_entries(hostname): + """ + Retrieves email entries for a given hostname. + + Args: + hostname (str): The hostname to retrieve email entries for. + + Returns: + email_entries (list): List of email entries for the hostname. + """ email_entries = [] email_results = dns_lookup(hostname, dns.rdatatype.MX) has_mx_records = len(email_results) > 0 @@ -1292,6 +1329,16 @@ def get_email_entries(hostname): return email_entries def get_addresses_for_dnstype(email_entries, rdatatype): + """ + Retrieves addresses for a specific DNS type from email entries. + + Args: + email_entries (list): List of email entries. + rdatatype (int): DNS record type. + + Returns: + ipv4_servers (list): List of servers of the specified DNS type. + """ ipv4_servers = [] for email_entry in email_entries: ipv_4 = dns_lookup(email_entry, rdatatype) @@ -1299,6 +1346,21 @@ def get_addresses_for_dnstype(email_entries, rdatatype): return ipv4_servers def validate_mx_records(global_translation, rating, result_dict, local_translation, hostname): + """ + Validates mail exchange records and rates them based on IPv4/IPv6 usage and GDPR compliance. + + Args: + global_translation (function): Global text translator. + local_translation (function): Local text translator. + rating (Rating): Initial Rating object. + result_dict (dict): Dictionary to store results. + hostname (str): The hostname to validate. + + Returns: + rating (Rating): Updated Rating object. + ipv4_servers (list): List of IPv4 servers. + ipv6_servers (list): List of IPv6 servers. + """ # 1.1 - Check IPv4 and IPv6 support email_entries = get_email_entries(hostname) @@ -1354,6 +1416,18 @@ def rate_mx_gdpr( local_translation, countries_others, countries_eu_or_exception_list): + """ + Rates the GDPR compliance of mail exchange servers. + + Args: + global_translation (function): Global text translator. + local_translation (function): Local text translator. + countries_others (dict): Non-GDPR compliant countries. + countries_eu_or_exception_list (dict): GDPR compliant countries. + + Returns: + rating (Rating): Rating of GDPR compliance based on country lists. + """ rating = Rating(global_translation, REVIEW_SHOW_IMPROVEMENTS_ONLY) nof_gdpr_countries = len(countries_eu_or_exception_list) nof_none_gdpr_countries = len(countries_others) @@ -1374,6 +1448,17 @@ def rate_mx_gdpr( return rating def rate_mx_ip6_usage(global_translation, local_translation, ipv6_servers): + """ + Rates the usage of IPv6 servers. + + Args: + global_translation (function): Global text translator. + local_translation (function): Local text translator. + ipv6_servers (list): List of IPv6 servers. + + Returns: + nof_ipv6_rating (Rating): Rating of IPv6 usage based on server count. + """ nof_ipv6_servers = len(ipv6_servers) nof_ipv6_rating = Rating(global_translation, REVIEW_SHOW_IMPROVEMENTS_ONLY) if nof_ipv6_servers >= 2: @@ -1398,6 +1483,17 @@ def rate_mx_ip6_usage(global_translation, local_translation, ipv6_servers): return nof_ipv6_rating def rate_mx_ip4_usage(global_translation, local_translation, ipv4_servers): + """ + This function rates the usage of IPv4 servers based on their number. + + Args: + global_translation (function): A function to translate text globally. + local_translation (function): A function to translate text locally. + ipv4_servers (list): A list of IPv4 servers. + + Returns: + nof_ipv4_rating (Rating): A Rating object that represents the rating of IPv4 usage. + """ nof_ipv4_servers = len(ipv4_servers) nof_ipv4_rating = Rating(global_translation, REVIEW_SHOW_IMPROVEMENTS_ONLY) if nof_ipv4_servers >= 2: From 70b921bdd2d2441892cfb615be0edc730ab9c1c8 Mon Sep 17 00:00:00 2001 From: cockroacher <163405488+cockroacher@users.noreply.github.com> Date: Fri, 26 Apr 2024 16:10:23 +0200 Subject: [PATCH 3/6] pylint docstrings --- tests/email_validator.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/email_validator.py b/tests/email_validator.py index 9100639b..b8a45f0e 100644 --- a/tests/email_validator.py +++ b/tests/email_validator.py @@ -1108,6 +1108,15 @@ def handle_dmarc_ri(data, result_dict, local_translation): result_dict['dmarc-ri'] = None def handle_dmarc_section(key, data, result_dict, local_translation): + """ + Handles a DMARC section based on its key. + + Args: + key (str): The key of the DMARC section. + data (str): The data of the DMARC section. + result_dict (dict): Dictionary to store results. + local_translation (function): Local text translator. + """ dmarc_section_handlers = { "p": handle_dmarc_p, "sp": handle_dmarc_sp, @@ -1126,6 +1135,18 @@ def handle_dmarc_section(key, data, result_dict, local_translation): handler(data, result_dict, local_translation) def validate_spf_policy(global_translation, local_translation, hostname, result_dict): + """ + Validates the SPF policy of a hostname. + + Args: + global_translation (function): Global text translator. + local_translation (function): Local text translator. + hostname (str): The hostname to validate. + result_dict (dict): Dictionary to store results. + + Returns: + result_dict (dict): Updated dictionary with SPF validation results. + """ # https://proton.me/support/anti-spoofing-custom-domain if 'spf-dns-lookup-count' in result_dict and result_dict['spf-dns-lookup-count'] >= 10: From 0eb3f16056bdc35e2b2bfaf16c35c2654a864778 Mon Sep 17 00:00:00 2001 From: cockroacher <163405488+cockroacher@users.noreply.github.com> Date: Fri, 26 Apr 2024 16:38:15 +0200 Subject: [PATCH 4/6] pylint docstring and rename --- tests/email_validator.py | 185 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 177 insertions(+), 8 deletions(-) diff --git a/tests/email_validator.py b/tests/email_validator.py index b8a45f0e..f089c609 100644 --- a/tests/email_validator.py +++ b/tests/email_validator.py @@ -952,45 +952,133 @@ def handle_spf_ip6(section, result_dict, _, _2): result_dict['spf-ipv6'].append(data) def handle_spf_include(section, result_dict, global_translation, local_translation): + """ + Handles the 'include' mechanism in an SPF (Sender Policy Framework) record. + + Parameters: + section (str): The SPF section to be handled. + result_dict (dict): Stores the results. + global_translation (function): Translates text messages globally. + local_translation (function): Translates text messages locally. + + The function extracts the domain from the 'include' section, + validates the SPF policy of the domain, + and updates the result dictionary with the validation results. + """ spf_domain = section[8:] subresult_dict = validate_spf_policy( global_translation, local_translation, spf_domain, result_dict) result_dict.update(subresult_dict) -def handle_spf_neutral_all_question(_, result_dict, _2, _3): +def handle_spf_neutral_all(_, result_dict, _2, _3): # What do this do and should we rate on it? result_dict['spf-uses-neutralfail'] = True -def handle_spf_soft_all_question(_, result_dict, _2, _3): +def handle_spf_soft_all(_, result_dict, _2, _3): + """ + Handles the '~all' mechanism in an SPF (Sender Policy Framework) record. + + Parameters: + _ (_): Ignored parameter. + result_dict (dict): Stores the results. + _2, _3: Ignored parameters. + + The function marks the SPF record as using the '~all' mechanism, which indicates a SoftFail. + This means that any IP not authorized by the SPF record should + be treated as a potential spam source, but not definitively so. + """ # add support for SoftFail result_dict['spf-uses-softfail'] = True -def handle_spf_hard_all_question(_, result_dict, _2, _3): +def handle_spf_hard_all(_, result_dict, _2, _3): + """ + Handles the '-all' mechanism in an SPF (Sender Policy Framework) record. + + Parameters: + _ (_): Ignored parameter. + result_dict (dict): Stores the results. + _2, _3: Ignored parameters. + + The function marks the SPF record as using the '-all' mechanism, + which indicates a HardFail. + This means that any IP not authorized by the SPF record should + be treated as a definitive spam source. + """ # add support for HardFail result_dict['spf-uses-hardfail'] = True -def handle_spf_ignore_all_question(_, result_dict, _2, _3): +def handle_spf_ignore_all(_, result_dict, _2, _3): + """ + Handles the '+all' mechanism in an SPF (Sender Policy Framework) record. + + Parameters: + _ (str): Ignored parameter. + result_dict (dict): Stores the results. + _2, _3: Ignored parameters. + + The function marks the SPF record as using the '+all' mechanism, + which essentially whitelists all sending hosts. + This is generally considered a poor practice in SPF policy. + """ # basicly whitelist everything... Big fail result_dict['spf-uses-ignorefail'] = True def handle_spf_noop(_, _2, _3, _4): + """ + A no-operation (noop) function for handling certain SPF (Sender Policy Framework) mechanisms. + + Parameters: + _ (_): Ignored parameter. + _2 (_): Ignored parameter. + _3 (_): Ignored parameter. + _4 (_): Ignored parameter. + + This function does nothing and + is used as a placeholder for SPF mechanisms that require no specific handling. + """ return def handle_spf_ptr(_, result_dict, _2, _3): + """ + Handles the 'ptr' mechanism in an SPF (Sender Policy Framework) record. + + Parameters: + _ (_): Ignored parameter. + result_dict (dict): Stores the results. + _2, _3: Ignored parameters. + + The function marks the SPF record as using the 'ptr' mechanism. + The 'ptr' mechanism is generally not recommended due to potential performance and + security issues. + """ # What do this do and should we rate on it? result_dict['spf-uses-ptr'] = True def handle_spf_section(section, result_dict, global_translation, local_translation): + """ + Handles SPF (Sender Policy Framework) sections. + + Parameters: + section (str): The SPF section to be handled. + result_dict (dict): Stores the results. + global_translation (function): Translates text messages globally. + local_translation (function): Translates text messages locally. + + The function maps SPF sections to their respective handlers. + If a section starts with a specific option, the corresponding handler is invoked. + If the section doesn't start with any of the specified options, + it's marked as non-standard in the result dictionary. + """ spf_section_handlers = { "ip4:": handle_spf_ip4, "ip6:": handle_spf_ip6, "include:": handle_spf_include, "+include:": handle_spf_include, - "?all:": handle_spf_neutral_all_question, - "~all:": handle_spf_soft_all_question, - "-all:": handle_spf_hard_all_question, - "+all:": handle_spf_ignore_all_question, + "?all:": handle_spf_neutral_all, + "~all:": handle_spf_soft_all, + "-all:": handle_spf_hard_all, + "+all:": handle_spf_ignore_all, "v=spf1": handle_spf_noop, "mx": handle_spf_noop, "+mx": handle_spf_noop, @@ -1010,6 +1098,14 @@ def handle_spf_section(section, result_dict, global_translation, local_translati result_dict['spf-uses-none-standard'] = True def handle_dmarc_p(data, result_dict, local_translation): + """ + Handles the DMARC 'p' tag. + + Parameters: + data (str): The 'p' value. + result_dict (dict): Stores the results, including warnings. + local_translation (function): Translates text messages. + """ if data in ('none', 'quarantine', 'reject'): result_dict['dmarc-p'] = data else: @@ -1018,6 +1114,14 @@ def handle_dmarc_p(data, result_dict, local_translation): 'TEXT_REVIEW_DMARC_POLICY_INVALID')) def handle_dmarc_sp(data, result_dict, local_translation): + """ + Handles the DMARC 'sp' tag. + + Parameters: + data (str): The 'sp' value. + result_dict (dict): Stores the results, including warnings. + local_translation (function): Translates text messages. + """ if data in ('none', 'quarantine', 'reject'): result_dict['dmarc-sp'] = data else: @@ -1026,6 +1130,14 @@ def handle_dmarc_sp(data, result_dict, local_translation): 'TEXT_REVIEW_DMARC_SUBPOLICY_INVALID')) def handle_dmarc_adkim(data, result_dict, local_translation): + """ + Handles the DMARC 'adkim' tag. + + Parameters: + data (str): The 'adkim' value. + result_dict (dict): Stores the results, including warnings. + local_translation (function): Translates text messages. + """ if data == 'r': result_dict['dmarc-warnings'].append( local_translation( @@ -1038,6 +1150,14 @@ def handle_dmarc_adkim(data, result_dict, local_translation): 'TEXT_REVIEW_DMARC_ADKIM_INVALID')) def handle_dmarc_aspf(data, result_dict, local_translation): + """ + Handles the DMARC 'aspf' tag. + + Parameters: + data (str): The 'aspf' value. + result_dict (dict): Stores the results, including warnings. + local_translation (function): Translates text messages. + """ if data == 'r': result_dict['dmarc-warnings'].append( local_translation( @@ -1050,6 +1170,14 @@ def handle_dmarc_aspf(data, result_dict, local_translation): 'TEXT_REVIEW_DMARC_ASPF_INVALID')) def handle_dmarc_fo(data, result_dict, local_translation): + """ + Handles the DMARC 'fo' tag. + + Parameters: + data (str): The 'fo' value. + result_dict (dict): Stores the results, including warnings. + local_translation (function): Translates text messages. + """ result_dict['dmarc-fo'] = [] fields = data.split(',') for field in fields: @@ -1066,22 +1194,55 @@ def handle_dmarc_fo(data, result_dict, local_translation): 'TEXT_REVIEW_DMARC_FO_INVALID')) def handle_dmarc_rua(data, result_dict, _): + """ + Handles the DMARC 'rua' tag. + + Parameters: + data (str): The 'rua' value. + result_dict (dict): Stores the results, including warnings. + local_translation (function): Translates text messages. + """ fields = data.split(',') for field in fields: result_dict['dmarc-rua'].append(field) def handle_dmarc_ruf(data, result_dict, _): + """ + Handles the DMARC 'ruf' tag. + + Parameters: + data (str): The 'ruf' value. + result_dict (dict): Stores the results, including warnings. + local_translation (function): Translates text messages. + """ fields = data.split(',') for field in fields: result_dict['dmarc-ruf'].append(field) def handle_dmarc_rf(data, result_dict, local_translation): + """ + Handles the DMARC 'rf' tag. + + Parameters: + data (str): The 'rf' value. + result_dict (dict): Stores the results, including warnings. + local_translation (function): Translates text messages. + """ if data == 'afrf': result_dict['dmarc-warnings'].append(local_translation( 'TEXT_REVIEW_DMARC_RF_USES_DEFAULT')) result_dict['dmarc-rf'] = data def handle_dmarc_pct(data, result_dict, local_translation): + """ + This function handles the DMARC percentage (pct) tag in a DMARC DNS record. + + Parameters: + data (str): The DMARC pct value to be processed. + result_dict (dict): A dictionary to store the results of the DMARC pct processing. + This includes any warnings or errors encountered during processing. + local_translation (function): A function to translate text messages into the local language. + """ try: result_dict['dmarc-pct'] = int(data) if result_dict['dmarc-pct'] == 100: @@ -1097,6 +1258,14 @@ def handle_dmarc_pct(data, result_dict, local_translation): result_dict['dmarc-pct'] = None def handle_dmarc_ri(data, result_dict, local_translation): + """ + Handles the DMARC Reporting Interval (RI) by validating and processing the input data. + + Args: + data (str): The DMARC RI data to be processed. + result_dict (dict): The dictionary to store the results of the processing. + local_translation (function): The function to translate text messages. + """ try: result_dict['dmarc-ri'] = int(data) if result_dict['dmarc-ri'] == 86400: From 84db62bd3f84a09b9c1a96abce9caebf827dd52b Mon Sep 17 00:00:00 2001 From: cockroacher <163405488+cockroacher@users.noreply.github.com> Date: Fri, 26 Apr 2024 17:20:10 +0200 Subject: [PATCH 5/6] pylint docstrings --- tests/email_validator.py | 252 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 246 insertions(+), 6 deletions(-) diff --git a/tests/email_validator.py b/tests/email_validator.py index f089c609..31df4078 100644 --- a/tests/email_validator.py +++ b/tests/email_validator.py @@ -350,6 +350,20 @@ def get_default_info(url, text, method, precision, depth): def validate_email_domain(hostname, result_dict, global_translation, local_translation): + """ + Validates the email domain of a given hostname and + updates the rating based on the validation results. + + Parameters: + - hostname (str): The hostname to validate the email domain for. + - result_dict (dict): The results dictionary. + - global_translation (function): A function to translate text globally. + - local_translation (function): A function to translate text locally. + + Returns: + - rating (Rating): The updated Rating object. + - result_dict (dict): The updated results dictionary. + """ rating = Rating(global_translation, REVIEW_SHOW_IMPROVEMENTS_ONLY) result_dict = {} # We must take in consideration "www." subdomains... @@ -393,11 +407,24 @@ def validate_email_domain(hostname, result_dict, global_translation, local_trans rating = validate_dmarc_policies( global_translation, rating, result_dict, local_translation, hostname) - return rating, result_dict def validate_mta_sts_policy(global_translation, rating, result_dict, local_translation, hostname): + """ + Validates the MTA STS policy of a given hostname and + updates the rating based on the validation results. + + Parameters: + - global_translation (function): A function to translate text globally. + - rating (Rating): The initial Rating object. + - result_dict (dict): The results dictionary. + - local_translation (function): A function to translate text locally. + - hostname (str): The hostname to validate the MTA STS policy for. + + Returns: + - rating (Rating): The updated Rating object. + """ rating += rate_mts_sts_records( global_translation, local_translation, @@ -482,6 +509,18 @@ def validate_mta_sts_policy(global_translation, rating, result_dict, local_trans return rating def handle_mta_sts_txt_row(key_value_pair, result_dict, global_translation, local_translation): + """ + Handles a row of MTA STS TXT record and updates the rating and result dictionary. + + Parameters: + - key_value_pair (tuple): The key-value pair from the MTA STS TXT record. + - result_dict (dict): The results dictionary. + - global_translation (function): A function to translate text globally. + - local_translation (function): A function to translate text locally. + + Returns: + - rating (Rating): The updated Rating object. + """ rating = Rating(global_translation, REVIEW_SHOW_IMPROVEMENTS_ONLY) key = key_value_pair[0].strip(' ') value = key_value_pair[1].strip(' ') @@ -521,6 +560,15 @@ def handle_mta_sts_txt_row(key_value_pair, result_dict, global_translation, loca return rating def has_dns_mta_sts_policy(hostname): + """ + Checks if the given hostname has a DNS MTA STS policy. + + Parameters: + - hostname (str): The hostname to check. + + Returns: + - bool: True if a MTA STS policy exists, False otherwise. + """ has_mta_sts_policy = False # https://www.rfc-editor.org/rfc/rfc8461#section-3.1 mta_sts_results = dns_lookup('_mta-sts.' + hostname, dns.rdatatype.TXT) @@ -530,6 +578,19 @@ def has_dns_mta_sts_policy(hostname): return has_mta_sts_policy def rate_mts_sts_records(global_translation, local_translation, has_mta_sts_policy): + """ + This function rates the MTS STS records based on whether a MTA STS policy exists. + + Parameters: + - global_translation (function): A function to translate text globally. + - local_translation (function): A function to translate text locally. + - has_mta_sts_policy (bool): A boolean indicating whether a MTA STS policy exists. + + Returns: + - has_mta_sts_records_rating (Rating): A Rating object with the overall rating, + integrity and security, and + standards set based on the existence of MTA STS records. + """ has_mta_sts_records_rating = Rating(global_translation, REVIEW_SHOW_IMPROVEMENTS_ONLY) if has_mta_sts_policy: has_mta_sts_records_rating.set_overall(5.0) @@ -547,20 +608,41 @@ def rate_mts_sts_records(global_translation, local_translation, has_mta_sts_poli def validate_dmarc_policies(global_translation, rating, result_dict, local_translation, hostname): + """ + This function validates the DMARC policies of a given hostname and + updates the rating based on the validation results. + + Parameters: + - global_translation (function): A function to translate text globally. + - rating (Rating): The initial Rating object. + - result_dict (dict): A dictionary containing the results of the DMARC policy checks. + - local_translation (function): A function to translate text locally. + - hostname (str): The hostname to validate the DMARC policies for. + + Returns: + - rating (Rating): The updated Rating object with the rating results of the DMARC policies. + """ dmarc_result_dict = validate_dmarc_policy(local_translation, hostname, result_dict) result_dict.update(dmarc_result_dict) rating = rate_has_dmarc_policies(global_translation, rating, result_dict, local_translation) - # rating = Rate_Invalid_format_DMARC_Policies( - # global_translation, - # rating, - # result_dict, - # local_translation) return rating def validate_dmarc_policy(local_translation, hostname, result_dict): + """ + This function validates the DMARC policy of a given hostname. + + Parameters: + - local_translation (function): A function to translate text locally. + - hostname (str): The hostname to validate the DMARC policy for. + - result_dict (dict): A dictionary containing the results of the DMARC policy checks. + + Returns: + - result_dict (dict): The updated results dictionary with the + validation results of the DMARC policy. + """ # https://proton.me/support/anti-spoofing-custom-domain dmarc_results = dns_lookup(f"_dmarc.{hostname}", "TXT") @@ -607,6 +689,19 @@ def validate_dmarc_policy(local_translation, hostname, result_dict): def rate_has_dmarc_policies(global_translation, rating, result_dict, local_translation): + """ + This function rates the DMARC policies based on the provided results dictionary. + + Parameters: + - global_translation (function): A function to translate text globally. + - rating (Rating): The initial Rating object. + - result_dict (dict): A dictionary containing the results of the DMARC policy checks. + - local_translation (function): A function to translate text locally. + + Returns: + - rating (Rating): A Rating object with the overall rating, integrity and security, + and standards set based on the DMARC policies. + """ if 'dmarc-has-policy' in result_dict: no_dmarc_record_rating = Rating(global_translation, REVIEW_SHOW_IMPROVEMENTS_ONLY) no_dmarc_record_rating.set_overall(5.0) @@ -665,6 +760,18 @@ def rate_has_dmarc_policies(global_translation, rating, result_dict, local_trans return rating def rate_dmarc_pct(global_translation, result_dict, local_translation): + """ + This function rates the DMARC percentage (pct) based on the provided results dictionary. + + Parameters: + - global_translation (function): A function to translate text globally. + - result_dict (dict): A dictionary containing the results of the DMARC policy checks. + - local_translation (function): A function to translate text locally. + + Returns: + - percentage_rating (Rating): A Rating object with the overall rating, integrity and security, + and standards set based on the DMARC percentage. + """ percentage_rating = Rating(global_translation, REVIEW_SHOW_IMPROVEMENTS_ONLY) if result_dict['dmarc-pct'] < 100: percentage_rating.set_overall(3.0) @@ -681,6 +788,21 @@ def rate_dmarc_pct(global_translation, result_dict, local_translation): return percentage_rating def rate_dmarc_subpolicy(global_translation, result_dict, local_translation): + """ + This function rates the DMARC subpolicy based on the provided results dictionary. + + Parameters: + - global_translation (function): A function to translate text globally. + - result_dict (dict): A dictionary containing the results of the DMARC policy checks. + - local_translation (function): A function to translate text locally. + + The function checks the 'dmarc-sp' and 'dmarc-p' fields in the results dictionary. + It sets the overall rating and standards based on the values of these fields. + + Returns: + - dmarc_subpolicy_rating (Rating): A Rating object with the overall rating and + standards set based on the DMARC subpolicy. + """ dmarc_subpolicy_rating = Rating(global_translation, REVIEW_SHOW_IMPROVEMENTS_ONLY) if 'dmarc-sp' in result_dict and\ 'dmarc-p' in result_dict and\ @@ -710,6 +832,16 @@ def rate_dmarc_subpolicy(global_translation, result_dict, local_translation): return dmarc_subpolicy_rating def rate_dmarc_policy(global_translation, result_dict, local_translation): + """ + Rates DMARC policy based on its configuration. + + Parameters: + global_translation, local_translation (function): Translation functions. + result_dict (dict): Stores DMARC results. + + Returns: + dmarc_policy_rating (Rating): Rating object after DMARC policy rating. + """ dmarc_policy_rating = Rating(global_translation, REVIEW_SHOW_IMPROVEMENTS_ONLY) if 'dmarc-p' in result_dict: if 'reject' == result_dict['dmarc-p']: @@ -741,6 +873,18 @@ def rate_dmarc_policy(global_translation, result_dict, local_translation): def validate_spf_policies(global_translation, rating, result_dict, local_translation, hostname): + """ + Validates SPF policies and rates them based on various criteria. + + Parameters: + global_translation, local_translation (function): Translation functions. + hostname (str): The hostname to validate SPF policies for. + rating (Rating): The current rating object. + result_dict (dict): Stores SPF results. + + Returns: + rating (Rating): Updated rating object after SPF policy validation and rating. + """ spf_result_dict = validate_spf_policy( global_translation, local_translation, @@ -771,6 +915,17 @@ def validate_spf_policies(global_translation, rating, result_dict, local_transla def rate_use_of_ptr_for_spf_policies(global_translation, rating, result_dict, local_translation): + """ + Rates SPF policies based on their use of PTR records. + + Parameters: + global_translation, local_translation (function): Translation functions. + rating (Rating): The current rating object. + result_dict (dict): Stores SPF results. + + Returns: + rating (Rating): Updated rating object after SPF PTR records usage check. + """ if 'spf-uses-ptr' in result_dict: has_spf_record_ptr_being_used_rating = Rating( global_translation, REVIEW_SHOW_IMPROVEMENTS_ONLY) @@ -787,6 +942,17 @@ def rate_fail_configuration_for_spf_policies( rating, result_dict, local_translation): + """ + Rates SPF policies based on their fail configuration. + + Parameters: + global_translation, local_translation (function): Translation functions. + rating (Rating): The current rating object. + result_dict (dict): Stores SPF results. + + Returns: + rating (Rating): Updated rating object after SPF fail configuration check. + """ if 'spf-uses-ignorefail' in result_dict: has_spf_ignore_records_rating = Rating( global_translation, REVIEW_SHOW_IMPROVEMENTS_ONLY) @@ -831,6 +997,17 @@ def rate_fail_configuration_for_spf_policies( def rate_invalid_format_spf_policies(global_translation, rating, result_dict, local_translation): + """ + Rates SPF policies based on their format validity. + + Parameters: + global_translation, local_translation (function): Translation functions. + rating (Rating): The current rating object. + result_dict (dict): Stores SPF results. + + Returns: + rating (Rating): Updated rating object after SPF format validity check. + """ if 'spf-uses-none-standard' in result_dict: has_spf_unknown_section_rating = Rating( global_translation, REVIEW_SHOW_IMPROVEMENTS_ONLY) @@ -851,6 +1028,17 @@ def rate_invalid_format_spf_policies(global_translation, rating, result_dict, lo def rate_has_spf_policies(global_translation, rating, result_dict, local_translation): + """ + Rates the presence of SPF policies in DNS records. + + Parameters: + global_translation, local_translation (function): Translation functions. + rating (Rating): The current rating object. + result_dict (dict): Stores SPF results. + + Returns: + rating (Rating): Updated rating object after SPF policy presence check. + """ has_spf_records_rating = Rating(global_translation, REVIEW_SHOW_IMPROVEMENTS_ONLY) if 'spf-has-policy' in result_dict: txt = local_translation('TEXT_REVIEW_SPF_DNS_RECORD_SUPPORT') @@ -875,6 +1063,17 @@ def rate_too_many_dns_lookup_for_spf_policies( rating, result_dict, local_translation): + """ + Rates SPF policies based on DNS lookups count. + + Parameters: + global_translation, local_translation (function): Translation functions. + rating (Rating): The current rating object. + result_dict (dict): Stores SPF results. + + Returns: + rating (Rating): Updated rating object after DNS lookups count check. + """ if 'spf-error-to-many-dns-lookups' in result_dict: to_many_spf_dns_lookups_rating = Rating( global_translation, REVIEW_SHOW_IMPROVEMENTS_ONLY) @@ -888,6 +1087,17 @@ def rate_too_many_dns_lookup_for_spf_policies( def rate_gdpr_for_spf_policies(global_translation, rating, result_dict, local_translation): + """ + Rates GDPR compliance for SPF policies based on IP addresses. + + Parameters: + global_translation, local_translation (function): Translation functions. + rating (Rating): The current rating object. + result_dict (dict): Stores SPF results. + + Returns: + rating (Rating): Updated rating object after GDPR compliance check. + """ spf_addresses = [] if 'spf-ipv4' not in result_dict: result_dict['spf-ipv4'] = [] @@ -940,12 +1150,32 @@ def rate_gdpr_for_spf_policies(global_translation, rating, result_dict, local_tr return rating def handle_spf_ip4(section, result_dict, _, _2): + """ + Updates 'result_dict' with SPF IPv4 data from 'section'. + + Parameters: + section (list): Contains SPF data. + result_dict (dict): Stores SPF results. + + Returns: + None. Updates 'result_dict' in place. + """ data = section[4:] if 'spf-ipv4' not in result_dict: result_dict['spf-ipv4'] = [] result_dict['spf-ipv4'].append(data) def handle_spf_ip6(section, result_dict, _, _2): + """ + Updates 'result_dict' with SPF IPv6 data from 'section'. + + Parameters: + section (list): Contains SPF data. + result_dict (dict): Stores SPF results. + + Returns: + None. Updates 'result_dict' in place. + """ data = section[4:] if 'spf-ipv6' not in result_dict: result_dict['spf-ipv6'] = [] @@ -971,6 +1201,16 @@ def handle_spf_include(section, result_dict, global_translation, local_translati result_dict.update(subresult_dict) def handle_spf_neutral_all(_, result_dict, _2, _3): + """ + Handles the '?all' mechanism in an SPF (Sender Policy Framework) record. + + Parameters: + _ (_): Ignored parameter. + result_dict (dict): Stores the results. + _2, _3: Ignored parameters. + + The function marks the SPF record as using the '?all' mechanism, which indicates a NeutralFail. + """ # What do this do and should we rate on it? result_dict['spf-uses-neutralfail'] = True From 181936725faa04236dfecd87b1f40924a5d4f2f6 Mon Sep 17 00:00:00 2001 From: cockroacher <163405488+cockroacher@users.noreply.github.com> Date: Fri, 26 Apr 2024 17:27:40 +0200 Subject: [PATCH 6/6] pylint docstrings --- tests/email_validator.py | 46 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/tests/email_validator.py b/tests/email_validator.py index 31df4078..5e2f3121 100644 --- a/tests/email_validator.py +++ b/tests/email_validator.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +# pylint: disable=too-many-lines import re import smtplib from datetime import datetime @@ -26,7 +27,7 @@ checked_urls = {} # We are doing this to support IPv6 -class SmtpWebperf(smtplib.SMTP): # pylint: disable=too-many-instance-attributes +class SmtpWebperf(smtplib.SMTP): # pylint: disable=too-many-instance-attributes,missing-class-docstring def __init__(self, host='', port=0, local_hostname=None, # pylint: disable=too-many-arguments, super-init-not-called timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None): @@ -165,6 +166,15 @@ def run_test(global_translation, lang_code, url): def search_for_email_domain(content): + """ + Extracts the domain from an email address found in the given content. + + Parameters: + content (str): The content to search for an email address. + + Returns: + str: The domain of the found email address, or None if no valid email domain is found. + """ content_match = re.search( r"[\"' ]mailto:(?P[^\"'\r\n\\]+)[\"'\r\n\\]", content) @@ -272,6 +282,18 @@ def get_text_precision(text): def get_interesting_urls(content, org_url_start, depth): + """ + Extracts and processes URLs from the given HTML content. + + Parameters: + content (str): The HTML content to extract URLs from. + org_url_start (str): The original URL to be used as a base for relative URLs. + depth (int): The depth value to be included in the result. + + Returns: + dict: A dictionary where keys are URLs and + values are dictionaries containing information about each URL. + """ urls = {} soup = BeautifulSoup(content, 'lxml') @@ -331,10 +353,32 @@ def get_interesting_urls(content, org_url_start, depth): def get_sort_on_precision(item): + """ + Extracts the 'precision' value from a given item. + + Parameters: + item (tuple): A tuple where the second element is a dictionary containing a 'precision' key. + + Returns: + int/float: The value of 'precision' from the dictionary. + """ return item[1]["precision"] def get_default_info(url, text, method, precision, depth): + """ + Constructs a dictionary with default information. + + Parameters: + url (str): The URL to be included in the result. + text (str): The text to be processed and included in the result. + method (str): The method to be included in the result. + precision (int/float): The precision value to be included in the result. + depth (int): The depth value to be included in the result. + + Returns: + dict: A dictionary containing the processed input parameters. + """ result = {} if text is not None: