Skip to content

Commit

Permalink
optimization and additional columns to detailed stats (#96)
Browse files Browse the repository at this point in the history
* spark expectation optimization, detailed stats adding start and end time and job metadata info

* updated code for storing row dq results in detailed stats table

* Update writer.py

* initial edit

* Adding tests for the stats table writer

* Adding test for detailed stats table

* Fixing lint issues

* Update setup.md

* Adding the missing test

---------

Co-authored-by: krishnam Jagadapi <krishnam.jagadapi@nike.com>
Co-authored-by: Ashok Singamaneni <ashok.singamaneni@nike.com>
  • Loading branch information
3 people authored May 29, 2024
1 parent a80943c commit f702f39
Show file tree
Hide file tree
Showing 15 changed files with 5,386 additions and 3,312 deletions.
59 changes: 34 additions & 25 deletions docs/getting-started/setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ create table if not exists `catalog`.`schema`.`{product}_rules` (
7. `action_if_failed` There are 3 different types of actions. These are 'ignore', 'drop', and 'fail'.
Ignore: The rule is run and the output is logged. No action is performed regardless of whether the rule has succeeded or failed. Applies for all 3 rule types.
Drop: The rows that fail the rule get dropped from the dataset. Applies for only row_dq rule type.
Fail: DAG fails if the rule fails. Applies for all 3 rule types.
Fail: job fails if the rule fails. Applies for all 3 rule types.
8. `tag` provide some tag name to dq rule example: completeness, validity, uniqueness etc.
9. `description` Long description for the rule
10. `enable_for_source_dq_validation` flag to run the agg rule
Expand All @@ -59,12 +59,11 @@ create table if not exists `catalog`.`schema`.`{product}_rules` (
15. `query_dq_delimiter` segregate custom queries delimiter ex: $, @ etc. By default it is @. Users can override it with any other delimiter based on the need. The same delimiter mentioned here has to be used in the custom query.
16. `enable_querydq_custom_output` required custom query output in separate table

rule_type, enable_for_source_dq_validation and enable_for_target_dq_validation columns define source_agg_dq, target_agg_dq,source_query_dq and target_query_dq. please see the below definitions:
If rule_type is row_dq then row_dq is TRUE
If rule_type is agg_dq and enable_for_source_dq_validation is TRUE then source_agg_dq is TRUE
If rule_type is agg_dq and enable_for_target_dq_validation is TRUE then target_agg_dq is TRUE
If rule_type is query_dq and enable_for_source_dq_validation is TRUE then source_query_dq is TRUE
If rule_type is query_dq and enable_for_target_dq_validation is TRUE then target_query_dq is TRUE

The Spark Expectation process consists of three phases:
1. When enable_for_source_dq_validation is true, execute agg_dq and query_dq on the source Dataframe
2. If the first step is successful, proceed to run row_dq
3. When enable_for_target_dq_validation is true, exeucte agg_dq and query_dq on the Dataframe resulting from row_dq

### Rule Type For Rules

Expand Down Expand Up @@ -166,15 +165,20 @@ source_dq_expected_outcome string, -- (11)!
source_dq_actual_row_count string, -- (12)!
source_dq_error_row_count string, -- (13)!
source_dq_row_count string, -- (14)!
target_expectations string, -- (15)!
target_dq_status string, -- (16)!
target_dq_actual_outcome string, -- (17)!
target_dq_expected_outcome string, -- (18)!
target_dq_actual_row_count string, -- (19)!
target_dq_error_row_count string, -- (20)!
target_dq_row_count string, -- (21)!
dq_date date, -- (22)!
dq_time string, -- (23)!
source_dq_start_time string, -- (15)!
source_dq_end_time string, -- (16)!
target_expectations string, -- (17)!
target_dq_status string, -- (18)!
target_dq_actual_outcome string, -- (19)!
target_dq_expected_outcome string, -- (20)!
target_dq_actual_row_count string, -- (21)!
target_dq_error_row_count string, -- (22)!
target_dq_row_count string, -- (23)!
target_dq_start_time string, -- (24)!
target_dq_end_time string, -- (25)!
dq_date date, -- (26)!
dq_time string, -- (27)!
dq_job_metadata_info string, -- (28)!
);
```

Expand All @@ -192,12 +196,17 @@ dq_time string, -- (23)!
12. `source_dq_actual_row_count` Number of rows of the source dq
13. `source_dq_error_row_count` Number of rows failed in the source dq
14. `source_dq_row_count` Number of rows of the source dq
15. `target_expectations` Actual Rule to be executed on the target dq
16. `target_dq_status` Status of the rule execution in the Target dq
17. `target_dq_actual_outcome` Actual outcome of the Target dq check
18. `target_dq_expected_outcome` Expected outcome of the Target dq check
19. `target_dq_actual_row_count` Number of rows of the target dq
20. `target_dq_error_row_count` Number of rows failed in the target dq
21. `target_dq_row_count` Number of rows of the target dq
22. `dq_date` Dq executed date
23. `dq_time` Dq executed timestamp
15. `source_dq_start_time` source dq start timestamp
16. `source_dq_end_time` source dq end timestamp
17. `target_expectations` Actual Rule to be executed on the target dq
18. `target_dq_status` Status of the rule execution in the Target dq
19. `target_dq_actual_outcome` Actual outcome of the Target dq check
20. `target_dq_expected_outcome` Expected outcome of the Target dq check
21. `target_dq_actual_row_count` Number of rows of the target dq
22. `target_dq_error_row_count` Number of rows failed in the target dq
23. `target_dq_row_count` Number of rows of the target dq
24. `target_dq_start_time` target dq start timestamp
25. `target_dq_end_time` target dq end timestamp
26. `dq_date` Dq executed date
27. `dq_time` Dq executed timestamp
28. `dq_job_metadata_info` dq job metadata
1 change: 1 addition & 0 deletions prospector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ max-line-length: 120

pylint:
disable:
- too-many-lines
- too-many-branches
- too-many-statements
- too-many-instance-attributes
Expand Down
1 change: 1 addition & 0 deletions spark_expectations/config/user_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class Constants:
# declare const user config variables for agg query dq detailed stats
se_enable_agg_dq_detailed_result = "spark.expectations.agg.dq.detailed.stats"
se_enable_query_dq_detailed_result = "spark.expectations.query.dq.detailed.stats"
se_job_metadata = "spark.expectations.job.metadata"

querydq_output_custom_table_name = "spark.expectations.query.dq.custom.table_name"

Expand Down
24 changes: 24 additions & 0 deletions spark_expectations/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def __post_init__(self) -> None:
self._final_agg_dq_result: Optional[List[Dict[str, str]]] = None
self._source_query_dq_result: Optional[List[Dict[str, str]]] = None
self._final_query_dq_result: Optional[List[Dict[str, str]]] = None
self._job_metadata: Optional[str] = None

self._source_agg_dq_detailed_stats: Optional[List[Tuple]] = None
self._source_query_dq_detailed_stats: Optional[List[Tuple]] = None
Expand Down Expand Up @@ -1911,3 +1912,26 @@ def get_dq_rules_params(self) -> dict:
"""
return self._dq_rules_params

def set_job_metadata(self, job_metadata: Optional[str] = None) -> None:
"""
This function is used to set the job_metadata
Returns:
None
"""
self._job_metadata = job_metadata

@property
def get_job_metadata(self) -> Optional[str]:
"""
This function is used to get row data quality rule type name
Returns:
str: Returns _row_dq_rule_type_name"
"""
if self._job_metadata is not None:
return str(self._job_metadata)
return None
34 changes: 28 additions & 6 deletions spark_expectations/core/expectations.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,15 @@ class SparkExpectations:

def __post_init__(self) -> None:
if isinstance(self.rules_df, DataFrame):
self.spark: SparkSession = self.rules_df.sparkSession
try:
self.spark: Optional[SparkSession] = self.rules_df.sparkSession
except AttributeError:
self.spark = SparkSession.getActiveSession()

if self.spark is None:
raise SparkExpectationsMiscException(
"Spark session is not available, please initialize a spark session before calling SE"
)
else:
raise SparkExpectationsMiscException(
"Input rules_df is not of dataframe type"
Expand Down Expand Up @@ -112,7 +120,7 @@ def _except(func: Any) -> Any:
# variable used for enabling notification at different level

_default_notification_dict: Dict[
str, Union[str, int, bool, Dict[str, str]]
str, Union[str, int, bool, Dict[str, str], None]
] = {
user_config.se_notifications_on_start: False,
user_config.se_notifications_on_completion: False,
Expand All @@ -121,10 +129,13 @@ def _except(func: Any) -> Any:
user_config.se_notifications_on_error_drop_threshold: 100,
user_config.se_enable_agg_dq_detailed_result: False,
user_config.se_enable_query_dq_detailed_result: False,
user_config.se_job_metadata: None,
user_config.querydq_output_custom_table_name: f"{self.stats_table}_querydq_output",
}

_notification_dict: Dict[str, Union[str, int, bool, Dict[str, str]]] = (
_notification_dict: Dict[
str, Union[str, int, bool, Dict[str, str], None]
] = (
{**_default_notification_dict, **user_conf}
if user_conf
else _default_notification_dict
Expand Down Expand Up @@ -262,6 +273,8 @@ def _except(func: Any) -> Any:
else False
)

_job_metadata: str = user_config.se_job_metadata

notifications_on_error_drop_threshold = _notification_dict.get(
user_config.se_notifications_on_error_drop_threshold, 100
)
Expand All @@ -280,6 +293,7 @@ def _except(func: Any) -> Any:
self._context.set_dq_expectations(expectations)
self._context.set_rules_execution_settings_config(rules_execution_settings)
self._context.set_querydq_secondary_queries(dq_queries_dict)
self._context.set_job_metadata(_job_metadata)

@self._notification.send_notification_decorator
@self._statistics_decorator.collect_stats_decorator
Expand All @@ -292,6 +306,7 @@ def wrapper(*args: tuple, **kwargs: dict) -> DataFrame:
table_name: str = self._context.get_table_name

_input_count = _df.count()
_log.info("data frame input record count: %s", _input_count)
_output_count: int = 0
_error_count: int = 0
_source_dq_df: Optional[DataFrame] = None
Expand Down Expand Up @@ -333,21 +348,28 @@ def wrapper(*args: tuple, **kwargs: dict) -> DataFrame:
self._context.set_input_count(_input_count)
self._context.set_error_drop_threshold(_error_drop_threshold)

_log.info(
"Spark Expectations run id for this run: %s",
self._context.get_run_id,
)

if isinstance(_df, DataFrame):
_log.info("The function dataframe is created")
self._context.set_table_name(table_name)
if write_to_temp_table:
_log.info("Dropping to temp table started")
self.spark.sql(f"drop table if exists {table_name}_temp")
self.spark.sql(f"drop table if exists {table_name}_temp") # type: ignore
_log.info("Dropping to temp table completed")
_log.info("Writing to temp table started")
source_columns = _df.columns
self._writer.save_df_as_table(
_df,
f"{table_name}_temp",
self._context.get_target_and_error_table_writer_config,
)
_log.info("Read from temp table started")
_df = self.spark.sql(f"select * from {table_name}_temp")
_df = self.spark.sql(f"select * from {table_name}_temp") # type: ignore
_df = _df.select(source_columns)
_log.info("Read from temp table completed")

func_process = self._process.execute_dq_process(
Expand Down Expand Up @@ -544,7 +566,7 @@ def wrapper(*args: tuple, **kwargs: dict) -> DataFrame:
"error occurred while processing spark "
"expectations due to given dataframe is not type of dataframe"
)
self.spark.catalog.clearCache()
# self.spark.catalog.clearCache()

return _row_dq_df

Expand Down
6 changes: 3 additions & 3 deletions spark_expectations/examples/base_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"""


RULES_DATA = """
RULES_DATA = """
("your_product", "dq_spark_dev.customer_order", "row_dq", "sales_greater_than_zero", "sales", "sales > 2", "ignore", "accuracy", "sales value should be greater than zero", false, true, true, false, 0,null, null)
,("your_product", "dq_spark_{env}.customer_order", "row_dq", "discount_threshold", "discount", "discount*100 < 60","drop", "validity", "discount should be less than 40", true, true, true, false, 0,null, null)
Expand All @@ -41,8 +41,8 @@
,("your_product", "dq_spark_dev.customer_order", "query_dq", "order_count_validity_check", "*", "(select count(*) from order_source) > 10", "ignore", "validity", "row count threshold", true, true, true, false, 0, null, true)
,("your_product", "dq_spark_{env}.customer_order", "query_dq", "product_category", "*", "(select count(distinct category) from {table}) < 5", "ignore", "validity", "distinct product category", true, true, true, false, 0,null, true)
,("your_product", "dq_spark_{env}.customer_order", "agg_dq", "distinct_of_ship_mode", "ship_mode", "count(distinct ship_mode) <= 3", "ignore", "validity", "regex format validation for quantity", true, true, true, false, 0,null, null)
"""


Expand Down
7 changes: 7 additions & 0 deletions spark_expectations/examples/sample_dq_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
.option("createDisposition", "CREATE_IF_NEEDED")
.option("writeMethod", "direct")
)
dic_job_info = {
"job": "job_name",
"Region": "NA",
"Snapshot": "2024-04-15",
}
job_info = str(dic_job_info)

# if wanted to use indirect method use below setting and spark session
# writer = WrappedDataFrameWriter().mode("overwrite").format("bigquery").\
Expand Down Expand Up @@ -63,6 +69,7 @@
"env": "local",
"table": "product",
},
user_config.se_job_metadata: job_info,
}


Expand Down
7 changes: 7 additions & 0 deletions spark_expectations/examples/sample_dq_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
writer = WrappedDataFrameWriter().mode("append").format("delta")

spark = set_up_delta()
dic_job_info = {
"job": "job_name",
"Region": "NA",
"Snapshot": "2024-04-15",
}
job_info = str(dic_job_info)

se: SparkExpectations = SparkExpectations(
product_id="your_product",
Expand Down Expand Up @@ -47,6 +53,7 @@
"env": "dev",
"table": "product",
},
user_config.se_job_metadata: job_info,
}


Expand Down
7 changes: 7 additions & 0 deletions spark_expectations/examples/sample_dq_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
from spark_expectations.config.user_config import Constants as user_config

writer = WrappedDataFrameWriter().mode("append").format("iceberg")
dic_job_info = {
"job": "job_name",
"Region": "NA",
"Snapshot": "2024-04-15",
}
job_info = str(dic_job_info)

spark = set_up_iceberg()

Expand Down Expand Up @@ -48,6 +54,7 @@
"env": "local",
"table": "product",
},
user_config.se_job_metadata: job_info,
}


Expand Down
Loading

0 comments on commit f702f39

Please sign in to comment.