Skip to content

Commit

Permalink
update row dq summary (#73)
Browse files Browse the repository at this point in the history
* Update writer.py

* Update CONTRIBUTORS.md

* Update context.py

* Update expectations.py

* spelling corrections, writing more unit tests, removing rdd's

* reformatting

* fixing tests

* Update test_writer.py

* Update test_regulate_flow.py

* Update test_context.py

* Update test_regulate_flow.py

---------

Co-authored-by: Ashok Singamaneni <ashok.singamaneni@nike.com>
  • Loading branch information
jskrajareddy21 and asingamaneni authored Mar 4, 2024
1 parent 89da08b commit d0a0dc0
Show file tree
Hide file tree
Showing 11 changed files with 3,185 additions and 1,418 deletions.
15 changes: 15 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: Debug Unit Tests",
"type": "debugpy",
"request": "launch",
"purpose": [
"debug-test"
],
"console": "integratedTerminal",
"justMyCode": false,
}
]
}
39 changes: 29 additions & 10 deletions spark_expectations/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __post_init__(self) -> None:
self._source_query_dq_status: str = "Skipped"
self._final_query_dq_status: str = "Skipped"
self._dq_run_status: str = "Failed"
self._dq_expectations: Optional[Dict[str, str]] = None

# above configuration variable value has to be set to python
self._dq_project_env_name = "spark_expectations"
Expand Down Expand Up @@ -120,7 +121,7 @@ def __post_init__(self) -> None:
"num_final_query_dq_rules": 0,
}
self._num_dq_rules: int = 0
self._summarised_row_dq_res: Optional[List[Dict[str, str]]] = None
self._summarized_row_dq_res: Optional[List[Dict[str, str]]] = None
self._rules_error_per: Optional[List[dict]] = None

self._target_and_error_table_writer_config: dict = {}
Expand Down Expand Up @@ -165,6 +166,24 @@ def get_dq_stats_table_name(self) -> str:
accessing it"""
)

def set_dq_expectations(self, dq_expectations: dict) -> None:
self._dq_expectations = dq_expectations

@property
def get_dq_expectations(self) -> dict:
"""
Get dq_expectations to which has rule infromation
Returns:
str: returns the rules_df
"""
if self._dq_expectations:
return self._dq_expectations
raise SparkExpectationsMiscException(
"""The spark expectations context is not set completely, please assign '_dq_expectations' before
accessing it"""
)

def set_final_table_name(self, final_table_name: str) -> None:
self._final_table_name = final_table_name

Expand Down Expand Up @@ -1472,28 +1491,28 @@ def get_num_dq_rules(self) -> int:
accessing it"""
)

def set_summarised_row_dq_res(
self, summarised_row_dq_res: Optional[List[Dict[str, str]]] = None
def set_summarized_row_dq_res(
self, summarized_row_dq_res: Optional[List[Dict[str, str]]] = None
) -> None:
"""
This function implements or supports to set row dq summarised res
This function implements or supports to set row dq summarized res
Args:
summarised_row_dq_res: list(dict)
summarized_row_dq_res: list(dict)
Returns: None
"""
self._summarised_row_dq_res = summarised_row_dq_res
self._summarized_row_dq_res = summarized_row_dq_res

@property
def get_summarised_row_dq_res(self) -> Optional[List[Dict[str, str]]]:
def get_summarized_row_dq_res(self) -> Optional[List[Dict[str, str]]]:
"""
This function returns row dq summarised res
This function returns row dq summarized res
Returns:
list(dict): Returns summarised_row_dq_res which in list of dict with str(key) and
list(dict): Returns summarized_row_dq_res which in list of dict with str(key) and
str(value) of rule meta data
"""
return self._summarised_row_dq_res
return self._summarized_row_dq_res

def set_rules_exceeds_threshold(self, rules: Optional[List[dict]] = None) -> None:
"""
Expand Down
5 changes: 3 additions & 2 deletions spark_expectations/core/expectations.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def _except(func: Any) -> Any:
user_config.se_enable_streaming: True,
user_config.secret_type: "databricks",
user_config.dbx_workspace_url: "https://workspace.cloud.databricks.com",
user_config.dbx_secret_scope: "sercet_scope",
user_config.dbx_secret_scope: "secret_scope",
user_config.dbx_kafka_server_url: "se_streaming_server_url_secret_key",
user_config.dbx_secret_token_url: "se_streaming_auth_secret_token_url_key",
user_config.dbx_secret_app_name: "se_streaming_auth_secret_appid_key",
Expand Down Expand Up @@ -255,7 +255,8 @@ def wrapper(*args: tuple, **kwargs: dict) -> DataFrame:
self._context.set_final_agg_dq_result()
self._context.set_source_query_dq_result()
self._context.set_final_query_dq_result()
self._context.set_summarised_row_dq_res()
self._context.set_summarized_row_dq_res()
self._context.set_dq_expectations(expectations)

# initialize variables of start and end time with default values
self._context._source_agg_dq_start_time = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,12 @@ def notify_rules_exceeds_threshold(self, rules: dict) -> None:
try:
rules_failed_row_count: Dict[str, int] = {}
notification_body = ""
if self._context.get_summarised_row_dq_res is None:
if self._context.get_summarized_row_dq_res is None:
return None

rules_failed_row_count = {
itr["rule"]: int(itr["failed_row_count"])
for itr in self._context.get_summarised_row_dq_res
for itr in self._context.get_summarized_row_dq_res
}

for rule in rules[f"{self._context.get_row_dq_rule_type_name}_rules"]:
Expand Down
122 changes: 79 additions & 43 deletions spark_expectations/sinks/utils/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,19 +162,27 @@ def write_error_stats(self) -> None:
self._context.get_output_percentage,
self._context.get_success_percentage,
self._context.get_error_percentage,
source_agg_dq_result
if source_agg_dq_result and len(source_agg_dq_result) > 0
else None,
final_agg_dq_result
if final_agg_dq_result and len(final_agg_dq_result) > 0
else None,
source_query_dq_result
if source_query_dq_result and len(source_query_dq_result) > 0
else None,
final_query_dq_result
if final_query_dq_result and len(final_query_dq_result) > 0
else None,
self._context.get_summarised_row_dq_res,
(
source_agg_dq_result
if source_agg_dq_result and len(source_agg_dq_result) > 0
else None
),
(
final_agg_dq_result
if final_agg_dq_result and len(final_agg_dq_result) > 0
else None
),
(
source_query_dq_result
if source_query_dq_result and len(source_query_dq_result) > 0
else None
),
(
final_query_dq_result
if final_query_dq_result and len(final_query_dq_result) > 0
else None
),
self._context.get_summarized_row_dq_res,
self._context.get_rules_exceeds_threshold,
{
"run_status": self._context.get_dq_run_status,
Expand Down Expand Up @@ -208,7 +216,6 @@ def write_error_stats(self) -> None:
),
)
]
error_stats_rdd = self.spark.sparkContext.parallelize(error_stats_data)

from pyspark.sql.types import (
StructType,
Expand Down Expand Up @@ -280,7 +287,7 @@ def write_error_stats(self) -> None:
]
)

df = self.spark.createDataFrame(error_stats_rdd, schema=error_stats_schema)
df = self.spark.createDataFrame(error_stats_data, schema=error_stats_schema)
self._context.print_dataframe_with_debugger(df)

df = (
Expand Down Expand Up @@ -419,7 +426,7 @@ def write_error_records_final(

_error_count = error_df.count()
if _error_count > 0:
self.generate_summarised_row_dq_res(error_df, rule_type)
self.generate_summarized_row_dq_res(error_df, rule_type)

_log.info("_write_error_records_final ended")
return _error_count, df
Expand All @@ -429,9 +436,9 @@ def write_error_records_final(
f"error occurred while saving data into the final error table {e}"
)

def generate_summarised_row_dq_res(self, df: DataFrame, rule_type: str) -> None:
def generate_summarized_row_dq_res(self, df: DataFrame, rule_type: str) -> None:
"""
This function implements/supports summarising row dq error result
This function implements/supports summarizing row dq error result
Args:
df: error dataframe(DataFrame)
rule_type: type of the rule(str)
Expand All @@ -441,41 +448,70 @@ def generate_summarised_row_dq_res(self, df: DataFrame, rule_type: str) -> None:
"""
try:
df_exploded = df.select(
df_explode = df.select(
explode(f"meta_{rule_type}_results").alias("row_dq_res")
)

keys = (
df_exploded.select(explode("row_dq_res"))
.select("key")
.distinct()
.rdd.flatMap(lambda x: x)
.collect()
)
nested_keys = [col("row_dq_res").getItem(k).alias(k) for k in keys]

df_select = df_exploded.select(*nested_keys)
df_pivot = (
df_select.groupBy(df_select.columns)
df_res = (
df_explode.withColumn("rule_type", col("row_dq_res")["rule_type"])
.withColumn("rule", col("row_dq_res")["rule"])
.withColumn("description", col("row_dq_res")["description"])
.withColumn("tag", col("row_dq_res")["tag"])
.withColumn("action_if_failed", col("row_dq_res")["action_if_failed"])
.select("rule_type", "rule", "description", "tag", "action_if_failed")
.groupBy("rule_type", "rule", "description", "tag", "action_if_failed")
.count()
.withColumnRenamed("count", "failed_row_count")
)

keys += ["failed_row_count"]
summarised_row_dq_list = df_pivot.rdd.map(
lambda x: {i: x[i] for i in keys}
).collect()

self._context.set_summarised_row_dq_res(summarised_row_dq_list)
summarized_row_dq_list = [
{
"rule_type": row.rule_type,
"rule": row.rule,
"description": row.description,
"tag": row.tag,
"action_if_failed": row.action_if_failed,
"failed_row_count": row.failed_row_count,
}
for row in df_res.select(
"rule_type",
"rule",
"description",
"tag",
"action_if_failed",
"failed_row_count",
).collect()
]
failed_rule_list = []
for failed_rule in summarized_row_dq_list:
failed_rule_list.append(failed_rule["rule"])

for (
each_rule_type,
all_rule_type_rules,
) in self._context.get_dq_expectations.items():
if each_rule_type in ["row_dq_rules"]:
for each_rule in all_rule_type_rules:
if each_rule["rule"] not in failed_rule_list:
summarized_row_dq_list.append(
{
"description": each_rule["description"],
"tag": each_rule["tag"],
"rule": each_rule["rule"],
"action_if_failed": each_rule["action_if_failed"],
"rule_type": each_rule["rule_type"],
"failed_row_count": 0,
}
)

self._context.set_summarized_row_dq_res(summarized_row_dq_list)

except Exception as e:
raise SparkExpectationsMiscException(
f"error occurred created summarised row dq statistics {e}"
f"error occurred created summarized row dq statistics {e}"
)

def generate_rules_exceeds_threshold(self, rules: dict) -> None:
"""
This function implements/supports summarising row dq error threshold
This function implements/supports summarizing row dq error threshold
Args:
rules: accepts rule metadata within dict
Returns:
Expand All @@ -484,12 +520,12 @@ def generate_rules_exceeds_threshold(self, rules: dict) -> None:
try:
error_threshold_list = []
rules_failed_row_count: Dict[str, int] = {}
if self._context.get_summarised_row_dq_res is None:
if self._context.get_summarized_row_dq_res is None:
return None

rules_failed_row_count = {
itr["rule"]: int(itr["failed_row_count"])
for itr in self._context.get_summarised_row_dq_res
for itr in self._context.get_summarized_row_dq_res
}

for rule in rules[f"{self._context.get_row_dq_rule_type_name}_rules"]:
Expand Down
2 changes: 1 addition & 1 deletion spark_expectations/utils/regulate_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def func_process(
f"{table_name}_error",
_context.get_row_dq_rule_type_name,
)
if _context.get_summarised_row_dq_res:
if _context.get_summarized_row_dq_res:
_notification.notify_rules_exceeds_threshold(expectations)
_writer.generate_rules_exceeds_threshold(expectations)

Expand Down
Loading

0 comments on commit d0a0dc0

Please sign in to comment.