diff --git a/README.md b/README.md index 129dd6b..9d53bbd 100644 --- a/README.md +++ b/README.md @@ -91,7 +91,8 @@ se_user_conf = { #Below two params are optional and need to be enabled to pass the custom email body #user_config.se_notifications_enable_custom_email_body: True, #user_config.se_notifications_email_custom_body: "Custom statistics: 'product_id': {}", - + #Below parameter is optional and need to be enabled in case authorization is required to access smtp server. + #user_config.se_notifications_email_smtp_auth: True, } ``` diff --git a/docs/bigquery.md b/docs/bigquery.md index 2003718..35aeb5b 100644 --- a/docs/bigquery.md +++ b/docs/bigquery.md @@ -43,6 +43,19 @@ writer = ( .option("writeMethod", "direct") ) +#if smtp server needs to be authenticated, password should be set in secure way like cerberus or databricks secret +stats_streaming_config_dict = { + user_config.se_enable_streaming: False, + user_config.secret_type: "cerberus", + user_config.cbs_url: "htpps://cerberus.example.com", + user_config.cbs_sdb_path: "", + user_config.cbs_smtp_password: "", + # user_config.secret_type: "databricks", + # user_config.dbx_workspace_url: "https://workspace.cloud.databricks.com", + # user_config.dbx_secret_scope: "your_secret_scope", + # user_config.dbx_smtp_password: "your_password", +} + se: SparkExpectations = SparkExpectations( product_id="your_product", rules_df=spark.read.format("bigquery").load( @@ -52,13 +65,14 @@ se: SparkExpectations = SparkExpectations( stats_table_writer=writer, target_and_error_table_writer=writer, debugger=False, - stats_streaming_options={user_config.se_enable_streaming: False} + stats_streaming_options=stats_streaming_config_dict, ) # Commented fields are optional or required when notifications are enabled user_conf = { user_config.se_notifications_enable_email: False, + # user_config.se_notifications_enable_smtp_server_auth: False, # user_config.se_notifications_enable_custom_email_body: True, # user_config.se_notifications_email_smtp_host: "mailhost.com", # user_config.se_notifications_email_smtp_port: 25, diff --git a/docs/delta.md b/docs/delta.md index 2db8d45..1f434c8 100644 --- a/docs/delta.md +++ b/docs/delta.md @@ -34,6 +34,19 @@ from spark_expectations.core.expectations import ( ) from spark_expectations.config.user_config import Constants as user_config +# if smtp server needs to be authenticated, password should be set in secure way like cerberus or databricks secret +stats_streaming_config_dict = { + user_config.se_enable_streaming: False, + user_config.secret_type: "cerberus", + user_config.cbs_url: "htpps://cerberus.example.com", + user_config.cbs_sdb_path: "", + user_config.cbs_smtp_password: "", + # user_config.secret_type: "databricks", + # user_config.dbx_workspace_url: "https://workspace.cloud.databricks.com", + # user_config.dbx_secret_scope: "your_secret_scope", + # user_config.dbx_smtp_password: "your_password", +} + writer = WrappedDataFrameWriter().mode("append").format("delta") se: SparkExpectations = SparkExpectations( @@ -43,12 +56,13 @@ se: SparkExpectations = SparkExpectations( stats_table_writer=writer, target_and_error_table_writer=writer, debugger=False, - stats_streaming_options={user_config.se_enable_streaming: False} + stats_streaming_options=stats_streaming_config_dict ) # Commented fields are optional or required when notifications are enabled user_conf = { user_config.se_notifications_enable_email: False, + # user_config.se_notifications_enable_smtp_server_auth: False, # user_config.se_notifications_enable_custom_email_body: True, # user_config.se_notifications_email_smtp_host: "mailhost.com", # user_config.se_notifications_email_smtp_port: 25, diff --git a/docs/examples.md b/docs/examples.md index e7ea28d..997bf75 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -8,52 +8,54 @@ from spark_expectations.config.user_config import Constants as user_config se_user_conf = { user_config.se_notifications_enable_email: False, # (1)! - user_config.se_notifications_enable_custom_email_body: False, # (2) - user_config.se_notifications_email_smtp_host: "mailhost.com", # (3)! - user_config.se_notifications_email_smtp_port: 25, # (4)! - user_config.se_notifications_email_from: "", # (5)! - user_config.se_notifications_email_to_other_mail_id: "", # (6)! - user_config.se_notifications_email_subject: "spark expectations - data quality - notifications", # (7)! - user_config.se_notifications_email_custom_body: "custom stats: 'product_id': {}", # (8)! - user_config.se_notifications_enable_slack: True, # (9)! - user_config.se_notifications_slack_webhook_url: "", # (10)! - user_config.se_notifications_on_start: True, # (11)! - user_config.se_notifications_on_completion: True, # (12)! - user_config.se_notifications_on_fail: True, # (13)! - user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True, # (14)! - user_config.se_notifications_on_error_drop_threshold: 15, # (15)! - user_config.se_enable_error_table: True, # (16)! - user_config.enable_query_dq_detailed_result: True, # (17)! - user_config.enable_agg_dq_detailed_result: True, # (18)! - user_config.querydq_output_custom_table_name: "", #19 + user_config.se_notifications_enable_smtp_server_auth: False, # (2)! + user_config.se_notifications_enable_custom_email_body: False, # (3) + user_config.se_notifications_email_smtp_host: "mailhost.com", # (4)! + user_config.se_notifications_email_smtp_port: 25, # (5)! + user_config.se_notifications_email_from: "", # (6)! + user_config.se_notifications_email_to_other_mail_id: "", # (7)! + user_config.se_notifications_email_subject: "spark expectations - data quality - notifications", # (8)! + user_config.se_notifications_email_custom_body: "custom stats: 'product_id': {}", # (9)! + user_config.se_notifications_enable_slack: True, # (10)! + user_config.se_notifications_slack_webhook_url: "", # (11)! + user_config.se_notifications_on_start: True, # (12)! + user_config.se_notifications_on_completion: True, # (13)! + user_config.se_notifications_on_fail: True, # (14)! + user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True, # (15)! + user_config.se_notifications_on_error_drop_threshold: 15, # (16)! + user_config.se_enable_error_table: True, # (17)! + user_config.enable_query_dq_detailed_result: True, # (18)! + user_config.enable_agg_dq_detailed_result: True, # (19)! + user_config.querydq_output_custom_table_name: "", #20 user_config.se_dq_rules_params: { "env": "local", "table": "product", - }, # (20)! + }, # (21)! } } ``` 1. The `user_config.se_notifications_enable_email` parameter, which controls whether notifications are sent via email, is set to false by default -2. The `user_config.se_notifications_enable_custom_email_body` optional parameter, which controls whether custom email body is enabled, is set to false by default -3. The `user_config.se_notifications_email_smtp_host` parameter is set to "mailhost.com" by default and is used to specify the email SMTP domain host -4. The `user_config.se_notifications_email_smtp_port` parameter, which accepts a port number, is set to "25" by default -5. The `user_config.se_notifications_email_from` parameter is used to specify the email ID that will trigger the email notification -6. The `user_config.se_notifications_email_to_other_mail_id` parameter accepts a list of recipient email IDs -7. The `user_config.se_notifications_email_subject` parameter captures the subject line of the email -8. The `user_config.se_notifications_email_custom_body` optional parameter, captures the custom email body, need to be compliant with certain syntax -9. The `user_config.se_notifications_enable_slack` parameter, which controls whether notifications are sent via slack, is set to false by default -10. The `user_config/se_notifications_slack_webhook_url` parameter accepts the webhook URL of a Slack channel for sending notifications -11. When `user_config.se_notifications_on_start` parameter set to `True` enables notification on start of the spark-expectations, variable by default set to `False` -12. When `user_config.se_notifications_on_completion` parameter set to `True` enables notification on completion of spark-expectations framework, variable by default set to `False` -13. When `user_config.se_notifications_on_fail` parameter set to `True` enables notification on failure of spark-expectations data quality framework, variable by default set to `True` -14. When `user_config.se_notifications_on_error_drop_exceeds_threshold_breach` parameter set to `True` enables notification when error threshold reaches above the configured value -15. The `user_config.se_notifications_on_error_drop_threshold` parameter captures error drop threshold value -16. The `user_config.se_enable_error_table` parameter, which controls whether error data to load into error table, is set to true by default -17. When `user_config.enable_query_dq_detailed_result` parameter set to `True`, enables the option to cature the query_dq detailed stats to detailed_stats table. By default set to `False` -18. When `user_config.enable_agg_dq_detailed_result` parameter set to `True`, enables the option to cature the agg_dq detailed stats to detailed_stats table. By default set to `False` -19. The `user_config.querydq_output_custom_table_name` parameter is used to specify the name of the custom query_dq output table which captures the output of the alias queries passed in the query dq expectation. Default is _custom_output -20. The `user_config.se_dq_rules_params` parameter, which are required to dynamically update dq rules +2. The `user_config.se_notifications_enable_smtp_server_auth` optional parameter, which controls whether SMTP server authentication is enabled, is set to false by default +3. The `user_config.se_notifications_enable_custom_email_body` optional parameter, which controls whether custom email body is enabled, is set to false by default +4. The `user_config.se_notifications_email_smtp_host` parameter is set to "mailhost.com" by default and is used to specify the email SMTP domain host +5. The `user_config.se_notifications_email_smtp_port` parameter, which accepts a port number, is set to "25" by default +6. The `user_config.se_notifications_email_from` parameter is used to specify the email ID that will trigger the email notification +7. The `user_config.se_notifications_email_to_other_mail_id` parameter accepts a list of recipient email IDs +8. The `user_config.se_notifications_email_subject` parameter captures the subject line of the email +9. The `user_config.se_notifications_email_custom_body` optional parameter, captures the custom email body, need to be compliant with certain syntax +10. The `user_config.se_notifications_enable_slack` parameter, which controls whether notifications are sent via slack, is set to false by default +11. The `user_config/se_notifications_slack_webhook_url` parameter accepts the webhook URL of a Slack channel for sending notifications +12. When `user_config.se_notifications_on_start` parameter set to `True` enables notification on start of the spark-expectations, variable by default set to `False` +13. When `user_config.se_notifications_on_completion` parameter set to `True` enables notification on completion of spark-expectations framework, variable by default set to `False` +14. When `user_config.se_notifications_on_fail` parameter set to `True` enables notification on failure of spark-expectations data quality framework, variable by default set to `True` +15. When `user_config.se_notifications_on_error_drop_exceeds_threshold_breach` parameter set to `True` enables notification when error threshold reaches above the configured value +16. The `user_config.se_notifications_on_error_drop_threshold` parameter captures error drop threshold value +17. The `user_config.se_enable_error_table` parameter, which controls whether error data to load into error table, is set to true by default +18. When `user_config.enable_query_dq_detailed_result` parameter set to `True`, enables the option to cature the query_dq detailed stats to detailed_stats table. By default set to `False` +19. When `user_config.enable_agg_dq_detailed_result` parameter set to `True`, enables the option to cature the agg_dq detailed stats to detailed_stats table. By default set to `False` +20. The `user_config.querydq_output_custom_table_name` parameter is used to specify the name of the custom query_dq output table which captures the output of the alias queries passed in the query dq expectation. Default is _custom_output +21. The `user_config.se_dq_rules_params` parameter, which are required to dynamically update dq rules ### Spark Expectations Initialization @@ -62,7 +64,8 @@ For all the below examples the below import and SparkExpectations class instanti When store for sensitive details is Databricks secret scope,construct config dictionary for authentication of Kafka and avoid duplicate construction every time your project is initialized, you can create a dictionary with the following keys and their appropriate values. -This dictionary can be placed in the __init__.py file of your project or declared as a global variable. +This dictionary can be placed in the __init__.py file of your project or declared as a global variable. In case you need authentication for smtp server, +you can store password in Databricks secret scope as well, or choose Cerberus for this secret storage. ```python from typing import Dict, Union from spark_expectations.config.user_config import Constants as user_config @@ -77,6 +80,7 @@ stats_streaming_config_dict: Dict[str, Union[bool, str]] = { user_config.dbx_secret_app_name: "se_streaming_auth_secret_appid_key", # (7)! user_config.dbx_secret_token: "se_streaming_auth_secret_token_key", # (8)! user_config.dbx_topic_name: "se_streaming_topic_name", # (9)! + user_config.dbx_smtp_password: "smtp_password_secret_key", # (10)! } ``` @@ -89,6 +93,7 @@ stats_streaming_config_dict: Dict[str, Union[bool, str]] = { 7. The `user_config.dbx_secret_app_name` captures secret key for the Kafka authentication app name 8. The `user_config.dbx_secret_token` captures secret key for the Kafka authentication app secret token 9. The `user_config.dbx_topic_name` captures secret key for the Kafka topic name +10. The `user_config.dbx_smtp_password` captures secret key for the SMTP password Similarly when sensitive store is Cerberus: @@ -106,6 +111,7 @@ stats_streaming_config_dict: Dict[str, Union[bool, str]] = { user_config.cbs_secret_app_name: "se_streaming_auth_secret_appid_sdb_path", # (7)! user_config.cbs_secret_token: "se_streaming_auth_secret_token_sdb_path", # (8)! user_config.cbs_topic_name: "se_streaming_topic_name_sdb_path", # (9)! + user_config.cbs_smtp_password: "smtp_password_secret_key", # (10)! } ``` @@ -118,8 +124,12 @@ stats_streaming_config_dict: Dict[str, Union[bool, str]] = { 7. The `user_config.cbs_secret_app_name` captures path where Kafka authentication app name stored in the Cerberus sdb 8. The `user_config.cbs_secret_token` captures path where Kafka authentication app name secret token stored in the Cerberus sdb 9. The `user_config.cbs_topic_name` captures path where Kafka topic name stored in the Cerberus sdb +10. The `user_config.cbs_smtp_password` captures key for the SMTP password + +```python You can disable the streaming functionality by setting the `user_config.se_enable_streaming` parameter to `False` +You can stil pass the secret keys for smtp password, even if streaming is disabled. ```python from typing import Dict, Union diff --git a/docs/iceberg.md b/docs/iceberg.md index 25dfe9f..9b3d1ed 100644 --- a/docs/iceberg.md +++ b/docs/iceberg.md @@ -42,6 +42,19 @@ from spark_expectations.config.user_config import Constants as user_config writer = WrappedDataFrameWriter().mode("append").format("iceberg") +# if smtp server needs to be authenticated, password should be set in secure way like cerberus or databricks secret +stats_streaming_config_dict = { + user_config.se_enable_streaming: False, + user_config.secret_type: "cerberus", + user_config.cbs_url: "htpps://cerberus.example.com", + user_config.cbs_sdb_path: "", + user_config.cbs_smtp_password: "", + # user_config.secret_type: "databricks", + # user_config.dbx_workspace_url: "https://workspace.cloud.databricks.com", + # user_config.dbx_secret_scope: "your_secret_scope", + # user_config.dbx_smtp_password: "your_password", +} + se: SparkExpectations = SparkExpectations( product_id="your_product", rules_df=spark.sql("select * from dq_spark_local.dq_rules"), @@ -49,12 +62,13 @@ se: SparkExpectations = SparkExpectations( stats_table_writer=writer, target_and_error_table_writer=writer, debugger=False, - stats_streaming_options={user_config.se_enable_streaming: False}, + stats_streaming_options=stats_streaming_config_dict, ) # Commented fields are optional or required when notifications are enabled user_conf = { user_config.se_notifications_enable_email: False, + # user_config.se_notifications_enable_smtp_server_auth: False, # user_config.se_notifications_enable_custom_email_body: True, # user_config.se_notifications_email_smtp_host: "mailhost.com", # user_config.se_notifications_email_smtp_port: 25, diff --git a/spark_expectations/config/user_config.py b/spark_expectations/config/user_config.py index ada3700..ffb0564 100644 --- a/spark_expectations/config/user_config.py +++ b/spark_expectations/config/user_config.py @@ -5,6 +5,9 @@ class Constants: # declare const user config variables for email notification se_notifications_enable_email = "spark.expectations.notifications.email.enabled" + se_notifications_enable_smtp_server_auth = ( + "spark.expectations.notifications.email.smtp_server_auth" + ) se_notifications_enable_custom_email_body = ( "spark.expectations.notifications.enable.custom.email.body" ) @@ -65,6 +68,7 @@ class Constants: cbs_secret_app_name = "se.streaming.cbs.secret.app.name" cbs_secret_token = "se.streaming.cerberus.secret.token" cbs_topic_name = "se.streaming.cerberus.token.name" + cbs_smtp_password = "se.streaming.cerberus.smtp.password" dbx_workspace_url = "se.streaming.dbx.workspace.url" dbx_secret_scope = "se.streaming.dbx.secret.scope" @@ -73,6 +77,7 @@ class Constants: dbx_secret_app_name = "se.streaming.dbx.secret.app.name" dbx_secret_token = "se.streaming.dbx.secret.token" dbx_topic_name = "se.streaming.dbx.topic.name" + dbx_smtp_password = "se.streaming.dbx.smtp.password" # declare const user config variables for agg query dq detailed stats se_enable_agg_dq_detailed_result = "spark.expectations.agg.dq.detailed.stats" diff --git a/spark_expectations/core/context.py b/spark_expectations/core/context.py index 6c35155..7fb20b3 100644 --- a/spark_expectations/core/context.py +++ b/spark_expectations/core/context.py @@ -47,6 +47,7 @@ def __post_init__(self) -> None: self._dq_config_abs_path: Optional[str] = None self._enable_mail: bool = False + self._enable_smtp_server_auth: bool = False self._enable_custom_email_body: bool = False self._to_mail: Optional[str] = None self._mail_subject: Optional[str] = None @@ -496,21 +497,21 @@ def get_enable_mail(self) -> bool: """ return self._enable_mail - def set_to_mail(self, to_mail: str) -> None: - self._to_mail = to_mail - - def set_enable_custom_email_body(self, enable_custom_email_body: bool) -> None: - self._enable_custom_email_body = bool(enable_custom_email_body) + def set_enable_smtp_server_auth(self, enable_smtp_server_auth: bool) -> None: + self._enable_smtp_server_auth = bool(enable_smtp_server_auth) @property - def get_enable_custom_email_body(self) -> bool: + def get_enable_smtp_server_auth(self) -> bool: """ - This function return whether to enable custom email body or not + This function return whether smtp server requires authentication or not Returns: - str: Returns _enable_custom_email_body(bool) + str: Returns _enable_smtp_server_auth(bool) """ - return self._enable_custom_email_body + return self._enable_smtp_server_auth + + def set_to_mail(self, to_mail: str) -> None: + self._to_mail = to_mail @property def get_to_mail(self) -> str: @@ -528,6 +529,19 @@ def get_to_mail(self) -> str: accessing it""" ) + def set_enable_custom_email_body(self, enable_custom_email_body: bool) -> None: + self._enable_custom_email_body = bool(enable_custom_email_body) + + @property + def get_enable_custom_email_body(self) -> bool: + """ + This function return whether to enable custom email body or not + Returns: + str: Returns _enable_custom_email_body(bool) + + """ + return self._enable_custom_email_body + def set_mail_from(self, mail_from: str) -> None: self._mail_from = mail_from @@ -1150,6 +1164,44 @@ def get_topic_name(self) -> Optional[str]: accessing it""" ) + @property + def get_smtp_password_key(self) -> Optional[str]: + """ + This function helps in getting key / path for smtp password + Returns: + smtp password key / path in Optional[str] + """ + _smtp_password_key: Optional[str] = ( + self._se_streaming_stats_dict.get(user_config.cbs_smtp_password) + if self.get_secret_type == "cerberus" + else self._se_streaming_stats_dict.get(user_config.dbx_smtp_password) + ) + if _smtp_password_key: + return _smtp_password_key + raise SparkExpectationsMiscException( + """The spark expectations context is not set completely, please assign + 'UserConfig.cbs_smtp_password' before + accessing it""" + ) + + @property + def get_cbs_sdb_path(self) -> Optional[str]: + """ + This function helps in cerberus sdb path + Returns: + cerberus sdb path in Optional[str] + """ + _cbs_sdb_path: Optional[str] = self._se_streaming_stats_dict.get( + user_config.cbs_sdb_path + ) + if _cbs_sdb_path: + return _cbs_sdb_path + raise SparkExpectationsMiscException( + """The spark expectations context is not set completely, please assign + 'UserConfig.cbs_sdb_path' before + accessing it""" + ) + def set_se_streaming_stats_topic_name( self, se_streaming_stats_topic_name: str ) -> None: diff --git a/spark_expectations/examples/sample_dq_delta.py b/spark_expectations/examples/sample_dq_delta.py index 1bad471..011cfd8 100644 --- a/spark_expectations/examples/sample_dq_delta.py +++ b/spark_expectations/examples/sample_dq_delta.py @@ -21,18 +21,31 @@ } job_info = str(dic_job_info) +stats_streaming_config_dict = { + user_config.se_enable_streaming: False, + user_config.secret_type: "cerberus", + user_config.cbs_url: "https://prod.cerberus.nikecloud.com", + user_config.cbs_sdb_path: "", + user_config.cbs_smtp_password: "", + # user_config.secret_type: "databricks", + # user_config.dbx_workspace_url: "https://workspace.cloud.databricks.com", + # user_config.dbx_secret_scope: "your_scope", + # user_config.dbx_smtp_password: "your_password", +} + se: SparkExpectations = SparkExpectations( - product_id="your_product", + product_id="your_productreport", rules_df=spark.table("dq_spark_dev.dq_rules"), stats_table="dq_spark_dev.dq_stats", stats_table_writer=writer, target_and_error_table_writer=writer, debugger=False, - # stats_streaming_options={user_config.se_enable_streaming: False}, + stats_streaming_options=stats_streaming_config_dict, ) user_conf = { - user_config.se_notifications_enable_email: True, + user_config.se_notifications_enable_email: False, + user_config.se_notifications_enable_smtp_server_auth: True, user_config.se_notifications_enable_custom_email_body: True, user_config.se_notifications_email_smtp_host: "mailhost.com", user_config.se_notifications_email_smtp_port: 25, diff --git a/spark_expectations/examples/sample_dq_iceberg.py b/spark_expectations/examples/sample_dq_iceberg.py index 16e352a..d972890 100644 --- a/spark_expectations/examples/sample_dq_iceberg.py +++ b/spark_expectations/examples/sample_dq_iceberg.py @@ -48,8 +48,6 @@ user_config.se_enable_query_dq_detailed_result: True, user_config.se_enable_agg_dq_detailed_result: True, user_config.se_enable_error_table: True, - user_config.enable_query_dq_detailed_result: True, - user_config.enable_agg_dq_detailed_result: True, user_config.se_dq_rules_params: { "env": "local", "table": "product", diff --git a/spark_expectations/notifications/plugins/email.py b/spark_expectations/notifications/plugins/email.py index e5047f7..c6fea67 100644 --- a/spark_expectations/notifications/plugins/email.py +++ b/spark_expectations/notifications/plugins/email.py @@ -9,6 +9,7 @@ ) from spark_expectations.core.exceptions import SparkExpectationsEmailException from spark_expectations.core.context import SparkExpectationsContext +from spark_expectations.secrets import SparkExpectationsSecretsBackend # Create the email plugin @@ -17,6 +18,51 @@ class SparkExpectationsEmailPluginImpl(SparkExpectationsNotification): This class implements/supports functionality to send email """ + def _get_smtp_password( + self, _context: SparkExpectationsContext, server: smtplib.SMTP + ) -> None: + """ + Retrieves the SMTP password from secret and logs in to the server. + Args: + _context: SparkExpectationsContext object + server: smtplib.SMTP object + """ + sender = _context.get_mail_from + secret_handler = SparkExpectationsSecretsBackend( + secret_dict=_context.get_se_streaming_stats_dict + ) + secret_type = _context.get_secret_type + try: + if secret_type == "cerberus": + cbs_sdb_path = _context.get_cbs_sdb_path + smtp_password_key = _context.get_smtp_password_key + if cbs_sdb_path and smtp_password_key: + secret = secret_handler.get_secret(cbs_sdb_path) + if isinstance(secret, dict): + password = secret.get(smtp_password_key) + else: + password = None + elif secret_type == "databricks": + smtp_password_key = _context.get_smtp_password_key + if smtp_password_key: + password = secret_handler.get_secret(smtp_password_key) + else: + password = None + else: + password = None + except KeyError: + raise SparkExpectationsEmailException( + "SMTP password key is missing in the secret." + ) + except Exception as e: + raise SparkExpectationsEmailException( + "Failed to retrieve SMTP password." + ) from e + + if password is None: + raise SparkExpectationsEmailException("SMTP password is not set.") + server.login(sender, password) + @spark_expectations_notification_impl def send_notification( self, @@ -27,7 +73,7 @@ def send_notification( function to send email notification for requested mail id's Args: _context: object of SparkExpectationsContext - _config_args: dict(which consist to: receiver mail(str), subject: subject of + _config_args: dict(which consist of: receiver mail(str), subject: subject of the mail(str) and body: body of the mail(str) Returns: @@ -48,6 +94,8 @@ def send_notification( _context.get_mail_smtp_server, _context.get_mail_smtp_port ) server.starttls() + if _context.get_enable_smtp_server_auth is True: + self._get_smtp_password(_context, server) text = msg.as_string() server.sendmail( _context.get_mail_from, diff --git a/spark_expectations/utils/actions.py b/spark_expectations/utils/actions.py index 0509aa5..d53a223 100644 --- a/spark_expectations/utils/actions.py +++ b/spark_expectations/utils/actions.py @@ -328,7 +328,9 @@ def agg_query_dq_detailed_result( ) ): for _key, _querydq_query in sub_key_value.items(): - _querydq_df = _context.spark.sql(_dq_rule["expectation" + "_" + _key]) + _querydq_df = _context.spark.sql( + _dq_rule["expectation" + "_" + _key] + ) querydq_output.append( ( _context.get_run_id, @@ -341,7 +343,10 @@ def agg_query_dq_detailed_result( [ ( _key, - [row.asDict() for row in _querydq_df.collect()], + [ + row.asDict() + for row in _querydq_df.collect() + ], ) ] ), diff --git a/spark_expectations/utils/reader.py b/spark_expectations/utils/reader.py index 64c9de0..b8f617e 100644 --- a/spark_expectations/utils/reader.py +++ b/spark_expectations/utils/reader.py @@ -35,6 +35,7 @@ def set_notification_param( try: _default_spark_conf: Dict[str, Union[str, int, bool]] = { user_config.se_notifications_enable_email: False, + user_config.se_notifications_enable_smtp_server_auth: False, user_config.se_notifications_enable_custom_email_body: False, user_config.se_notifications_email_smtp_host: "", user_config.se_notifications_email_smtp_port: 25, @@ -105,6 +106,10 @@ def set_notification_param( raise SparkExpectationsMiscException( "All params/variables required for email notification is not configured or supplied" ) + if _notification_dict[ + user_config.se_notifications_enable_smtp_server_auth + ]: + self._context.set_enable_smtp_server_auth(True) if ( _notification_dict[ user_config.se_notifications_enable_custom_email_body diff --git a/tests/core/test_context.py b/tests/core/test_context.py index a43987f..1437b30 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -52,6 +52,7 @@ def test_context_properties(): context._mail_smtp_server = "abc" context._mail_smtp_port = 25 context._enable_mail = True + context._enable_smtp_server_auth = True context._enable_custom_email_body = True context._to_mail = "abc@mail.com, decf@mail.com" context._mail_from = "abc@mail.com" @@ -177,6 +178,7 @@ def test_context_properties(): assert context._mail_smtp_server == "abc" assert context.get_mail_smtp_port == 25 assert context._enable_mail is True + assert context._enable_smtp_server_auth is True assert context._enable_custom_email_body is True assert context._to_mail == "abc@mail.com, decf@mail.com" assert context._mail_from == "abc@mail.com" @@ -493,6 +495,13 @@ def test_set_enable_mail(): assert context.get_enable_mail is True +def test_set_enable_smtp_server_auth(): + context = SparkExpectationsContext(product_id="product1", spark=spark) + context.set_enable_smtp_server_auth(True) + assert context._enable_smtp_server_auth is True + assert context.get_enable_smtp_server_auth is True + + def test_set_enable_custom_email_body(): context = SparkExpectationsContext(product_id="product1", spark=spark) context.set_enable_custom_email_body(True) @@ -753,6 +762,65 @@ def test_get_mail_from_exception(): context.get_mail_from +def test_get_smtp_password_key(): + context = SparkExpectationsContext(product_id="product1", spark=spark) + context.set_se_streaming_stats_dict( + {user_config.cbs_smtp_password: "cerberus_password", + user_config.dbx_smtp_password: "dbx_password", + user_config.secret_type: "cerberus"} + ) + + assert context.get_smtp_password_key == "cerberus_password" + + context.set_se_streaming_stats_dict( + { + user_config.cbs_smtp_password: "cerberus_password", + user_config.dbx_smtp_password: "dbx_password", + user_config.secret_type: "databricks", + } + ) + assert context.get_smtp_password_key == "dbx_password" + + +def test_get_smtp_password_key_exception(): + context = SparkExpectationsContext(product_id="product1", spark=spark) + context.set_se_streaming_stats_dict( + { + user_config.cbs_smtp_password: "cerberus_password", + user_config.secret_type: "databricks" + } + ) + + with pytest.raises( + SparkExpectationsMiscException, + match=("""The spark expectations context is not set completely, please assign + 'UserConfig.cbs_smtp_password' before + accessing it""") + ): + _ = context.get_smtp_password_key + + +def test_get_cbs_sdb_path(): + context = SparkExpectationsContext(product_id="product1", spark=spark) + context.set_se_streaming_stats_dict( + {user_config.cbs_sdb_path: "app/your_project/your_env"} + ) + assert context.get_cbs_sdb_path == "app/your_project/your_env" + + +def test_get_cbs_sdb_path_exception(): + context = SparkExpectationsContext(product_id="product1", spark=spark) + context.set_se_streaming_stats_dict({}) + + with pytest.raises( + SparkExpectationsMiscException, + match=( + r"The spark expectations context is not set completely, please assign\s+" + r"'UserConfig.cbs_sdb_path'\s+before\s+accessing it" + ) + ): + _ = context.get_cbs_sdb_path + def test_get_slack_webhook_url_exception(): context = SparkExpectationsContext(product_id="product1", spark=spark) context._slack_webhook_url = False diff --git a/tests/notification/plugins/test_email.py b/tests/notification/plugins/test_email.py index be985d8..50e73f4 100644 --- a/tests/notification/plugins/test_email.py +++ b/tests/notification/plugins/test_email.py @@ -73,3 +73,205 @@ def test_send_notification_exception(_mock_context): # assert mock_smtp.assert_called_with(_mock_context.get_mail_smtp_server, _mock_context.get_mail_smtp_port) + + +@patch('spark_expectations.notifications.plugins.email.SparkExpectationsContext', autospec=True, spec_set=True) +def test_get_smtp_password_cerberus(_mock_context): + email_handler = SparkExpectationsEmailPluginImpl() + _mock_context.get_secret_type = "cerberus" + _mock_context.get_mail_from = "test@example.com" + _mock_context.get_se_streaming_stats_dict = { + "secret_type": "cerberus" + } + _mock_context.get_cbs_sdb_path = "path/to/secret" + _mock_context.get_smtp_password_key = "password_key" + _mock_context.get_cerberus_url = "https://cerberus.example.com" + + with patch("spark_expectations.notifications.plugins.email.smtplib.SMTP") as mock_smtp, \ + patch("spark_expectations.secrets.SparkExpectationsSecretsBackend.get_secret") as mock_get_secret: + mock_get_secret.return_value = { + "password_key": "test_password" + } + server = mock_smtp.return_value + email_handler._get_smtp_password(_mock_context, server) + server.login.assert_called_once_with("test@example.com", "test_password") + + +@patch('spark_expectations.notifications.plugins.email.SparkExpectationsContext', autospec=True, spec_set=True) +def test_get_smtp_password_none_exception(_mock_context): + email_handler = SparkExpectationsEmailPluginImpl() + _mock_context.get_secret_type = "cerberus" + _mock_context.get_mail_from = "test@example.com" + _mock_context.get_se_streaming_stats_dict = { + "secret_type": "cerberus" + } + _mock_context.get_cbs_sdb_path = "path/to/secret" + _mock_context.get_smtp_password_key = "password_key" + _mock_context.get_cerberus_url = "https://cerberus.example.com" + + with patch("spark_expectations.notifications.plugins.email.smtplib.SMTP") as mock_smtp, \ + patch("spark_expectations.secrets.SparkExpectationsSecretsBackend.get_secret") as mock_get_secret: + mock_get_secret.return_value = {} # Mocking the return value as an empty dictionary + server = mock_smtp.return_value + + with pytest.raises(SparkExpectationsEmailException, match="SMTP password is not set."): + email_handler._get_smtp_password(_mock_context, server) + +@patch('spark_expectations.notifications.plugins.email.SparkExpectationsContext', autospec=True, spec_set=True) +def test_get_smtp_password_databricks(_mock_context): + email_handler = SparkExpectationsEmailPluginImpl() + _mock_context.get_secret_type = "databricks" + _mock_context.get_mail_from = "test@example.com" + _mock_context.get_se_streaming_stats_dict = { + "secret_type": "databricks" + } + _mock_context.get_smtp_password_key = "password_key" + + with patch("spark_expectations.notifications.plugins.email.smtplib.SMTP") as mock_smtp, \ + patch("spark_expectations.secrets.SparkExpectationsSecretsBackend.get_secret") as mock_get_secret: + mock_get_secret.return_value = "test_password" + server = mock_smtp.return_value + + email_handler._get_smtp_password(_mock_context, server) + server.login.assert_called_once_with("test@example.com", "test_password") + + +@patch('spark_expectations.notifications.plugins.email.SparkExpectationsContext', autospec=True, spec_set=True) +def test_get_smtp_password_databricks_missing_key(_mock_context): + email_handler = SparkExpectationsEmailPluginImpl() + _mock_context.get_secret_type = "databricks" + _mock_context.get_mail_from = "test@example.com" + _mock_context.get_se_streaming_stats_dict = { + "secret_type": "databricks" + } + _mock_context.get_smtp_password_key = None + + with patch("spark_expectations.notifications.plugins.email.smtplib.SMTP") as mock_smtp: + server = mock_smtp.return_value + + # Assert that the SparkExpectationsEmailException is raised for missing key + with pytest.raises(SparkExpectationsEmailException, match="SMTP password is not set."): + email_handler._get_smtp_password(_mock_context, server) + + +@patch('spark_expectations.notifications.plugins.email.SparkExpectationsContext', autospec=True, spec_set=True) +def test_get_smtp_password_key_error(_mock_context): + email_handler = SparkExpectationsEmailPluginImpl() + _mock_context.get_secret_type = "cerberus" + _mock_context.get_mail_from = "test@example.com" + _mock_context.get_se_streaming_stats_dict = { + "secret_type": "cerberus" + } + _mock_context.get_cbs_sdb_path = "path/to/secret" + _mock_context.get_smtp_password_key = "password_key" + + with patch("spark_expectations.notifications.plugins.email.smtplib.SMTP") as mock_smtp, \ + patch("spark_expectations.secrets.SparkExpectationsSecretsBackend.get_secret", side_effect=KeyError): + server = mock_smtp.return_value + + # Assert that the SparkExpectationsEmailException is raised for KeyError + with pytest.raises(SparkExpectationsEmailException, match="SMTP password key is missing in the secret."): + email_handler._get_smtp_password(_mock_context, server) + + +@patch('spark_expectations.notifications.plugins.email.SparkExpectationsContext', autospec=True, spec_set=True) +def test_get_smtp_password_generic_exception(_mock_context): + email_handler = SparkExpectationsEmailPluginImpl() + _mock_context.get_secret_type = "cerberus" + _mock_context.get_mail_from = "test@example.com" + _mock_context.get_se_streaming_stats_dict = { + "secret_type": "cerberus" + } + _mock_context.get_cbs_sdb_path = "path/to/secret" + _mock_context.get_smtp_password_key = "password_key" + + with patch("spark_expectations.notifications.plugins.email.smtplib.SMTP") as mock_smtp, \ + patch("spark_expectations.secrets.SparkExpectationsSecretsBackend.get_secret", side_effect=Exception("Test Exception")): + server = mock_smtp.return_value + + # Assert that the SparkExpectationsEmailException is raised for a generic exception + with pytest.raises(SparkExpectationsEmailException, match="Failed to retrieve SMTP password."): + email_handler._get_smtp_password(_mock_context, server) + +@patch('spark_expectations.notifications.plugins.email.SparkExpectationsContext', autospec=True, spec_set=True) +def test_get_smtp_password_cerberus_password_none(_mock_context): + email_handler = SparkExpectationsEmailPluginImpl() + _mock_context.get_secret_type = "cerberus" + _mock_context.get_mail_from = "test@example.com" + _mock_context.get_se_streaming_stats_dict = { + "secret_type": "cerberus" + } + _mock_context.get_cbs_sdb_path = "path/to/secret" + _mock_context.get_smtp_password_key = "password_key" + + with patch("spark_expectations.notifications.plugins.email.smtplib.SMTP") as mock_smtp, \ + patch("spark_expectations.secrets.SparkExpectationsSecretsBackend.get_secret", return_value="not_a_dict"): + server = mock_smtp.return_value + + # Assert that the SparkExpectationsEmailException is raised for password being None + with pytest.raises(SparkExpectationsEmailException, match="SMTP password is not set."): + email_handler._get_smtp_password(_mock_context, server) + +@patch('spark_expectations.notifications.plugins.email.SparkExpectationsContext', autospec=True, spec_set=True) +def test_get_smtp_password_databricks_password_none(_mock_context): + email_handler = SparkExpectationsEmailPluginImpl() + _mock_context.get_secret_type = "databricks" + _mock_context.get_mail_from = "test@example.com" + _mock_context.get_se_streaming_stats_dict = { + "secret_type": "databricks" + } + _mock_context.get_smtp_password_key = None + + with patch("spark_expectations.notifications.plugins.email.smtplib.SMTP") as mock_smtp: + server = mock_smtp.return_value + + # Assert that the SparkExpectationsEmailException is raised for password being None + with pytest.raises(SparkExpectationsEmailException, match="SMTP password is not set."): + email_handler._get_smtp_password(_mock_context, server) + +@patch('spark_expectations.notifications.plugins.email.SparkExpectationsContext', autospec=True, spec_set=True) +def test_get_smtp_password_secret_type_else(_mock_context): + email_handler = SparkExpectationsEmailPluginImpl() + _mock_context.get_secret_type = "unknown" + _mock_context.get_mail_from = "test@example.com" + _mock_context.get_se_streaming_stats_dict = { + "secret_type": "unknown" + } + + with patch("spark_expectations.notifications.plugins.email.smtplib.SMTP") as mock_smtp: + server = mock_smtp.return_value + + # Assert that the SparkExpectationsEmailException is raised for password being None + with pytest.raises(SparkExpectationsEmailException, match="SMTP password is not set."): + email_handler._get_smtp_password(_mock_context, server) + + +@patch('spark_expectations.notifications.plugins.email.SparkExpectationsContext', autospec=True, spec_set=True) +@patch.object(SparkExpectationsEmailPluginImpl, '_get_smtp_password') +def test_send_notification_with_smtp_auth(mock_get_smtp_password, _mock_context): + # arrange + email_handler = SparkExpectationsEmailPluginImpl() + _mock_context.get_enable_mail = True + _mock_context.get_mail_from = "sender@example.com" + _mock_context.get_to_mail = "receiver1@example.com, receiver2@example.com" + _mock_context.get_mail_subject = "Test Email" + _mock_context.get_mail_smtp_server = "mailhost.example.com" + _mock_context.get_mail_smtp_port = 587 + _mock_context.get_enable_smtp_server_auth = True + + mock_config_args = { + "message": "Test Email Body" + } + + with patch("spark_expectations.notifications.plugins.email.smtplib.SMTP") as mock_smtp, \ + patch("spark_expectations.notifications.plugins.email.MIMEMultipart") as _mock_mltp: + # act + email_handler.send_notification(_context=_mock_context, _config_args=mock_config_args) + + # assert + mock_smtp.assert_called_with(_mock_context.get_mail_smtp_server, _mock_context.get_mail_smtp_port) + mock_smtp().starttls.assert_called() + mock_get_smtp_password.assert_called_once_with(_mock_context, mock_smtp()) + mock_smtp().sendmail.assert_called_with(_mock_context.get_mail_from, [email.strip() for email in _mock_context.get_to_mail.split(",")], + _mock_mltp().as_string()) + mock_smtp().quit.assert_called() \ No newline at end of file diff --git a/tests/utils/test_reader.py b/tests/utils/test_reader.py index 9032a32..b7abd3a 100644 --- a/tests/utils/test_reader.py +++ b/tests/utils/test_reader.py @@ -130,7 +130,20 @@ def fixture_product_rules_pipe(): "spark.expectations.notifications.slack.webhook_url": "", "spark.expectations.notifications.teams.enabled": False, "spark.expectations.notifications.teams.webhook_url": "", - }, None) + }, None), +({ + "spark.expectations.notifications.email.smtp_server_auth": True, + "spark.expectations.notifications.email.enabled": True, + "spark.expectations.notifications.email.smtp_host": "smtp.mail.com", + "spark.expectations.notifications.email.smtp_port": 587, + "spark.expectations.notifications.email.from": "sender@mail.com", + "spark.expectations.notifications.email.to.other.mail.com": "recipient@mail.com", + "spark.expectations.notifications.email.subject": "Test email", + "spark.expectations.notifications.slack.enabled": False, + "spark.expectations.notifications.slack.webhook_url": "", + "spark.expectations.notifications.teams.enabled": False, + "spark.expectations.notifications.teams.webhook_url": "", + }, None), ]) def test_set_notification_param(notification, expected_result): # This function helps/implements test cases for while setting notification @@ -175,6 +188,15 @@ def test_set_notification_param(notification, expected_result): notification.get("spark.expectations.notifications.enable.custom.email.body")) mock_context.set_email_custom_body.assert_called_once_with( notification.get("spark.expectations.notifications.email.custom.body")) + if notification.get("spark.expectations.notifications.email.smtp_server_auth"): + mock_context.set_enable_mail.assert_called_once_with( + notification.get("spark.expectations.notifications.email.enabled")) + mock_context.set_enable_mail.assert_called_once_with( + notification.get("spark.expectations.notifications.email.smtp_server_auth")) + mock_context.set_mail_subject.assert_called_once_with( + notification.get("spark.expectations.notifications.email.subject")) + mock_context.set_mail_smtp_server.assert_called_once_with( + notification.get("spark.expectations.notifications.email.smtp_host")) if notification.get("spark.expectations.notifications.slack.enabled"): mock_context.set_enable_slack.assert_called_once_with( notification.get("spark.expectations.notifications.slack.enabled"))