Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Converting spark udf's to native functions #26

Merged
merged 6 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions spark_expectations/core/expectations.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,6 @@ def wrapper(*args: tuple, **kwargs: dict) -> DataFrame:
self._context.get_agg_dq_rule_type_name,
source_agg_dq_flag=True,
)
self._context.set_source_agg_dq_result(
_dq_source_agg_results
)
self._context.set_source_agg_dq_status(status)
self._context.set_source_agg_dq_end_time()

Expand Down Expand Up @@ -393,9 +390,6 @@ def wrapper(*args: tuple, **kwargs: dict) -> DataFrame:
self._context.get_query_dq_rule_type_name,
source_query_dq_flag=True,
)
self._context.set_source_query_dq_result(
_dq_source_query_results
)
self._context.set_source_query_dq_status(status)
self._context.set_source_query_dq_end_time()
_log.info(
Expand Down Expand Up @@ -471,7 +465,6 @@ def wrapper(*args: tuple, **kwargs: dict) -> DataFrame:
error_count=_error_count,
output_count=_output_count,
)
self._context.set_final_agg_dq_result(_dq_final_agg_results)
self._context.set_final_agg_dq_status(status)
self._context.set_final_agg_dq_end_time()
_log.info(
Expand Down Expand Up @@ -514,9 +507,6 @@ def wrapper(*args: tuple, **kwargs: dict) -> DataFrame:
error_count=_error_count,
output_count=_output_count,
)
self._context.set_final_query_dq_result(
_dq_final_query_results
)
self._context.set_final_query_dq_status(status)
self._context.set_final_query_dq_end_time()

Expand Down
29 changes: 12 additions & 17 deletions spark_expectations/utils/udf.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,31 @@
from typing import List, Dict, Union
from pyspark.sql import Column
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, MapType, StringType
from pyspark.sql.functions import filter, size, transform, when, lit, array


@udf(returnType=ArrayType(MapType(StringType(), StringType())))
def remove_empty_maps(column: Column) -> List[Union[Dict[str, str], None]]:
def remove_empty_maps(column: Column) -> Column:
"""
This Spark UDF takes a column of type array(map(str,str)) and removes empty maps from it
This function takes a column of type array(map(str,str)) and removes empty maps from it
Args:
column: Provide a column of type array(map(str,str))
Returns:
list: Returns a list which is not having empty maps
list: Returns a Column which is not having empty maps
"""
# The below line is already tested in test_udf.py but not shown in coverage. So ignoring it for now!
return [
x for x in column if isinstance(x, dict) and len(x) != 0
] # pragma: no cover
return filter(column, lambda x: size(x) > 0) # pragma: no cover


@udf(returnType=ArrayType(StringType()))
def get_actions_list(column: Column) -> List[str]:
def get_actions_list(column: Column) -> Column:
"""
This Spark UDF takes column of type array(map(str,str)) and creates list by picking action_if_failed from dict
This function takes column of type array(map(str,str)) and creates list by picking action_if_failed from dict
Args:
column: Provide a column of type array(map(str,str))

Returns:
list: returns list of action_if_failed from the set expectations rules
list: returns a column with list of action_if_failed from the set expectations rules

"""

action_failed = [itr.get("action_if_failed") for itr in column] # pragma: no cover

return action_failed if len(action_failed) > 0 else ["ignore"] # pragma: no cover
action_if_failed = transform(column, lambda x: x["action_if_failed"])
return when(size(action_if_failed) == 0, array(lit("ignore"))).otherwise(
action_if_failed
) # pragma: no cover