Skip to content

Commit

Permalink
enable flag for error table (#74)
Browse files Browse the repository at this point in the history
* Update writer.py

* Update test_writer.py

* Update CONTRIBUTORS.md

* Update writer.py

* Update context.py

* Update expectations.py

* spelling corrections, writing more unit tests, removing rdd's

* reformatting

* fixing tests

* Update test_writer.py

* Update test_regulate_flow.py

* Update test_context.py

* Update test_regulate_flow.py

* enabling error table

* enabling error table

* Update migration_versions_comparison.md

* enabling error table

* enabling error table

* Update context.py

---------

Co-authored-by: Ashok Singamaneni <ashok.singamaneni@nike.com>
Co-authored-by: krishnam Jagadapi <krishnam.jagadapi@nike.com>
  • Loading branch information
3 people authored Mar 7, 2024
1 parent d0a0dc0 commit 7bf3247
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ Please find the difference in the changes with different version, latest three v



| stage | 0.8.0 | 1.0.0 |
|:----------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------|
| rules table schema changes | added additional two column <br> 1.`enable_error_drop_alert(boolean)` <br> 2.`error_drop_threshold(int)` <br><br> documentation found [here](https://engineering.nike.com/spark-expectations/0.8.1/getting-started/setup/) | Remains same |
| rule table creation required | yes - creation not required if you're upgrading from old version but schema changes required | yes - creation not required if you're upgrading from old version but schema changes required |
| stats table schema changes | remains same | Remains Same |
| stats table creation required | automated | Remains Same |
| notification config setting | remains same | Remains Same |
| secret store and Kafka authentication details | Create a dictionary that contains your secret configuration values and register in `__init__.py` for multiple usage, [example](https://engineering.nike.com/spark-expectations/0.8.1/examples/) | Remains Same. You can disable streaming if needed, in SparkExpectations class |
| spark expectations initialization | create spark expectations class object using `SparkExpectations` by passing `product_id` and additional optional parameter `debugger`, `stats_streaming_options` [example](https://engineering.nike.com/spark-expectations/0.8.1/examples/) | New arguments are added. Please follow this - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) |
| with_expectations decorator | remains same | New arguments are added. Please follow this - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) |
| WrappedDataFrameWriter | Doesn't exist | This is new and users need to provider the writer object to record the spark conf that need to be used while writing - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) |
| stage | 0.8.0 | 1.0.0 | 1.2.0 |
|:----------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------|
| rules table schema changes | added additional two column <br> 1.`enable_error_drop_alert(boolean)` <br> 2.`error_drop_threshold(int)` <br><br> documentation found [here](https://engineering.nike.com/spark-expectations/0.8.1/getting-started/setup/) | Remains same | Remains same |
| rule table creation required | yes - creation not required if you're upgrading from old version but schema changes required | yes - creation not required if you're upgrading from old version but schema changes required | yes - creation not required if you're upgrading from old version but schema changes required |
| stats table schema changes | remains same | Remains Same | Remains same. Additionally all row dq rules stats get in row dq rules summary |
| stats table creation required | automated | Remains Same | Remains same |
| notification config setting | remains same | Remains Same | Remains same |
| secret store and Kafka authentication details | Create a dictionary that contains your secret configuration values and register in `__init__.py` for multiple usage, [example](https://engineering.nike.com/spark-expectations/0.8.1/examples/) | Remains Same. You can disable streaming if needed, in SparkExpectations class | Remains same |
| spark expectations initialization | create spark expectations class object using `SparkExpectations` by passing `product_id` and additional optional parameter `debugger`, `stats_streaming_options` [example](https://engineering.nike.com/spark-expectations/0.8.1/examples/) | New arguments are added. Please follow this - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) | Remains same |
| with_expectations decorator | remains same | New arguments are added. Please follow this - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) | Remains same |
| WrappedDataFrameWriter | Doesn't exist | This is new and users need to provider the writer object to record the spark conf that need to be used while writing - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) | Remains same |




3 changes: 3 additions & 0 deletions docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ se_user_conf = {
user_config.se_notifications_on_fail: True, # (11)!
user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True, # (12)!
user_config.se_notifications_on_error_drop_threshold: 15, # (13)!
user_config.se_enable_error_table: True, # (14)!
}
}
```

Expand All @@ -36,6 +38,7 @@ se_user_conf = {
11. When `user_config.se_notifications_on_fail` parameter set to `True` enables notification on failure of spark-expectations data quality framework, variable by default set to `True`
12. When `user_config.se_notifications_on_error_drop_exceeds_threshold_breach` parameter set to `True` enables notification when error threshold reaches above the configured value
13. The `user_config.se_notifications_on_error_drop_threshold` parameter captures error drop threshold value
14. The `user_config.se_enable_error_table` parameter, which controls whether error data to load into error table, is set to true by default

### Spark Expectations Initialization

Expand Down
4 changes: 2 additions & 2 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ theme:
nav:
- Home: 'index.md'
- Setup: getting-started/setup.md
- Adoption_Guide:
- comparision: configurations/adoption_versions_comparsion.md
- Migration_Guide:
- comparison: configurations/migration_versions_comparison.md
- Examples:
Understand Args: examples.md
Delta: delta.md
Expand Down
1 change: 1 addition & 0 deletions spark_expectations/config/user_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class Constants:
)

se_enable_streaming = "se.enable.streaming"
se_enable_error_table = "se.enable.error.table"

secret_type = "se.streaming.secret.type"

Expand Down
21 changes: 21 additions & 0 deletions spark_expectations/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __post_init__(self) -> None:
self._final_query_dq_status: str = "Skipped"
self._dq_run_status: str = "Failed"
self._dq_expectations: Optional[Dict[str, str]] = None
self._se_enable_error_table: bool = True

# above configuration variable value has to be set to python
self._dq_project_env_name = "spark_expectations"
Expand Down Expand Up @@ -1564,3 +1565,23 @@ def get_stats_table_writer_config(self) -> dict:
dict: Returns stats_table_writer_config which in dict
"""
return self._stats_table_writer_config

def set_se_enable_error_table(self, se_enable_error_table: bool) -> None:
"""
Args:
_se_enable_error_table:
Returns:
"""
self._se_enable_error_table = se_enable_error_table

@property
def get_se_enable_error_table(self) -> bool:
"""
This function returns whether to enable relational table or not
Returns: Returns _se_enable_error_table(bool)
"""
return self._se_enable_error_table
11 changes: 6 additions & 5 deletions spark_expectations/sinks/utils/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,11 +418,12 @@ def write_error_records_final(
error_df = df.filter(f"size(meta_{rule_type}_results) != 0")
self._context.print_dataframe_with_debugger(error_df)

self.save_df_as_table(
error_df,
error_table,
self._context.get_target_and_error_table_writer_config,
)
if self._context.get_se_enable_error_table:
self.save_df_as_table(
error_df,
error_table,
self._context.get_target_and_error_table_writer_config,
)

_error_count = error_df.count()
if _error_count > 0:
Expand Down
2 changes: 1 addition & 1 deletion spark_expectations/utils/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,5 +293,5 @@ def action_on_rules(

except Exception as e:
raise SparkExpectationsMiscException(
f"error occured while taking action on given rules {e}"
f"error occurred while taking action on given rules {e}"
)
2 changes: 2 additions & 0 deletions tests/config/test_user_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ def test_constants():

assert user_config.se_enable_streaming == "se.enable.streaming"

assert user_config.se_enable_error_table == "se.enable.error.table"

assert user_config.secret_type == "se.streaming.secret.type"

assert user_config.cbs_url == "se.streaming.cerberus.url"
Expand Down
Loading

0 comments on commit 7bf3247

Please sign in to comment.