Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature remove mandatory args #40

Merged
merged 31 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f198994
making changes and refactoring the code. yet to modify unittests for …
asingamaneni Sep 13, 2023
9c3e589
Adding Wrapped DataFrameWriter and unitests for the same
asingamaneni Sep 13, 2023
ced4a59
ignoring the mypy to be in sync with pyspark
asingamaneni Sep 13, 2023
59c2462
ignoring agg_dq and query_dq
asingamaneni Sep 13, 2023
f043b33
Rearranging lot of code, removed delta. Enabled custom writers in dif…
asingamaneni Sep 14, 2023
6b6fe74
making changes and refactoring the code. yet to modify unittests for …
asingamaneni Sep 13, 2023
3eeda22
Adding Wrapped DataFrameWriter and unitests for the same
asingamaneni Sep 13, 2023
680a060
ignoring the mypy to be in sync with pyspark
asingamaneni Sep 13, 2023
0376f84
ignoring agg_dq and query_dq
asingamaneni Sep 13, 2023
0c347d9
Rearranging lot of code, removed delta. Enabled custom writers in dif…
asingamaneni Sep 14, 2023
1172ec0
Merge branch 'Nike-Inc:feature-remove-mandatory-args' into feature-re…
asingamaneni Sep 14, 2023
8f2235d
fixing tests for the sinks and updating README
asingamaneni Sep 14, 2023
988c826
fixing kafka configurations
asingamaneni Sep 14, 2023
959bf46
Merge branch 'Nike-Inc:feature-remove-mandatory-args' into feature-re…
asingamaneni Sep 14, 2023
c37bfd1
Feature add reader (#1)
asingamaneni Sep 17, 2023
1bba704
Updating formatting, removing delta as dependency
asingamaneni Sep 17, 2023
37c1986
Updating examples
asingamaneni Sep 17, 2023
9043741
started unitests
asingamaneni Sep 17, 2023
42cdf1a
Adding more tests
asingamaneni Sep 18, 2023
ce17b20
first test succesful for with_expectations, rest will follow through
asingamaneni Sep 18, 2023
35b5d8e
Adding examples for expectations
asingamaneni Sep 19, 2023
584c6de
adding documentation
asingamaneni Sep 26, 2023
3451515
adding test cases (#2)
asingamaneni Sep 27, 2023
e87c5df
Merge remote-tracking branch 'personal/feature-remove-mandatory-args'…
asingamaneni Sep 27, 2023
0e7e030
adding a test
asingamaneni Sep 27, 2023
05bb079
Updating tests
asingamaneni Sep 27, 2023
e313c90
Updating tests-1
asingamaneni Sep 27, 2023
bdd74dd
updating docs
asingamaneni Sep 27, 2023
055d68b
updating docs
asingamaneni Sep 27, 2023
f324d35
Updating documentation
asingamaneni Sep 27, 2023
a574b52
Updating WrappedDataframeWriter
asingamaneni Sep 28, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
Thanks to the contributors who helped on this project apart from the authors
* [Teja Dogiparthi](https://github.com/Tejadogiparthi)
* [Phani Kumar Vemuri](https://www.linkedin.com/in/vemuriphani/)
* [Sarath Chandra Bandaru](https://www.linkedin.com/in/sarath-chandra-bandaru/)
* [Holden Karau](https://www.linkedin.com/in/holdenkarau/)

# Honorary Mentions
Thanks to the team below for invaluable insights and support throughout the initial release of this project
Expand All @@ -15,4 +17,3 @@ Thanks to the team below for invaluable insights and support throughout the init
* [Aditya Chaturvedi](https://www.linkedin.com/in/chaturvediaditya/)
* [Scott Haines](https://www.linkedin.com/in/scotthaines/)
* [Arijit Banerjee](https://www.linkedin.com/in/massborn/)
* [Sarath Chandra Bandaru](https://www.linkedin.com/in/sarath-chandra-bandaru/)
78 changes: 31 additions & 47 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
[![Checked with mypy](http://www.mypy-lang.org/static/mypy_badge.svg)](http://mypy-lang.org/)
[![License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
![PYPI version](https://img.shields.io/pypi/v/spark-expectations.svg)

![PYPI - Downloads](https://static.pepy.tech/badge/spark-expectations)
![PYPI - Python Version](https://img.shields.io/pypi/pyversions/spark-expectations.svg)

<p align="center">
Spark Expectations is a specialized tool designed with the primary goal of maintaining data integrity within your processing pipeline.
Expand Down Expand Up @@ -70,7 +71,7 @@ is provided in the appropriate fields.
```python
from spark_expectations.config.user_config import *

se_global_spark_Conf = {
se_user_conf = {
se_notifications_enable_email: False,
se_notifications_email_smtp_host: "mailhost.nike.com",
se_notifications_email_smtp_port: 25,
Expand All @@ -91,66 +92,49 @@ se_global_spark_Conf = {

For all the below examples the below import and SparkExpectations class instantiation is mandatory

```python
from spark_expectations.core.expectations import SparkExpectations
1. Instantiate `SparkExpectations` class which has all the required functions for running data quality rules

```python
from spark_expectations.core.expectations import SparkExpectations, WrappedDataFrameWriter
from pyspark.sql import SparkSession

spark: SparkSession = SparkSession.builder.getOrCreate()
writer = WrappedDataFrameWriter().mode("append").format("delta")
# writer = WrappedDataFrameWriter().mode("append").format("iceberg")
# product_id should match with the "product_id" in the rules table
se: SparkExpectations = SparkExpectations(product_id="your-products-id")
se: SparkExpectations = SparkExpectations(
product_id="your_product",
rules_df=spark.table("dq_spark_local.dq_rules"),
stats_table="dq_spark_local.dq_stats",
stats_table_writer=writer,
target_and_error_table_writer=writer,
debugger=False,
# stats_streaming_options={user_config.se_enable_streaming: False},
)
```

1. Instantiate `SparkExpectations` class which has all the required functions for running data quality rules

2. Decorate the function with `@se.with_expectations` decorator

```python
from spark_expectations.config.user_config import *


@se.with_expectations(
se.reader.get_rules_from_table(
product_rules_table="pilot_nonpub.dq.dq_rules",
table_name="pilot_nonpub.dq_employee.employee",
dq_stats_table_name="pilot_nonpub.dq.dq_stats"
),
write_to_table=True,
write_to_temp_table=True,
row_dq=True,
agg_dq={
se_agg_dq: True,
se_source_agg_dq: True,
se_final_agg_dq: True,
},
query_dq={
se_query_dq: True,
se_source_query_dq: True,
se_final_query_dq: True,
se_target_table_view: "order",
},
spark_conf=se_global_spark_Conf,
from spark_expectations.config.user_config import *
from pyspark.sql import DataFrame
import os


@se.with_expectations(
target_table="dq_spark_local.customer_order",
write_to_table=True,
user_conf=se_user_conf,
target_table_view="order",
)
def build_new() -> DataFrame:
# Return the dataframe on which Spark-Expectations needs to be run
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
_df_order.createOrReplaceTempView("order")

_df_product: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/product.csv"))
)
_df_product.createOrReplaceTempView("product")

_df_customer: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/customer.csv"))
)

_df_customer.createOrReplaceTempView("customer")
_df_order.createOrReplaceTempView("order")

return _df_order
```
11 changes: 0 additions & 11 deletions docs/api/delta_sink_plugin.md

This file was deleted.

6 changes: 1 addition & 5 deletions docs/api/sample_dq.md → docs/api/sample_dq_bigquery.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
---
search:
exclude: true
---

::: spark_expectations.examples.sample_dq
::: spark_expectations.examples.sample_dq_bigquery
handler: python
options:
filters:
Expand Down
8 changes: 8 additions & 0 deletions docs/api/sample_dq_delta.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

::: spark_expectations.examples.sample_dq_delta
handler: python
options:
filters:
- "!^_[^_]"
- "!^__[^__]"

8 changes: 8 additions & 0 deletions docs/api/sample_dq_iceberg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

::: spark_expectations.examples.sample_dq_iceberg
handler: python
options:
filters:
- "!^_[^_]"
- "!^__[^__]"

92 changes: 92 additions & 0 deletions docs/bigquery.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
### Example - Write to Delta

Setup SparkSession for bigquery to test in your local environment. Configure accordingly for higher environments.
Refer to Examples in [base_setup.py](../spark_expectations/examples/base_setup.py) and
[delta.py](../spark_expectations/examples/sample_dq_bigquery.py)

```python title="spark_session"
from pyspark.sql import SparkSession

builder = (
SparkSession.builder.config(
"spark.jars.packages",
"com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.30.0",
)
)
spark = builder.getOrCreate()

spark._jsc.hadoopConfiguration().set(
"fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"
)
spark.conf.set("viewsEnabled", "true")
spark.conf.set("materializationDataset", "<temp_dataset>")
```

Below is the configuration that can be used to run SparkExpectations and write to DeltaLake

```python title="iceberg_write"
import os
from pyspark.sql import DataFrame
from spark_expectations.core.expectations import (
SparkExpectations,
WrappedDataFrameWriter,
)
from spark_expectations.config.user_config import Constants as user_config

os.environ[
"GOOGLE_APPLICATION_CREDENTIALS"
] = "path_to_your_json_credential_file" # This is needed for spark write to bigquery
writer = (
WrappedDataFrameWriter.mode("overwrite")
.format("bigquery")
.option("createDisposition", "CREATE_IF_NEEDED")
.option("writeMethod", "direct")
)

se: SparkExpectations = SparkExpectations(
product_id="your_product",
rules_df=spark.read.format("bigquery").load(
"<project_id>.<dataset_id>.<rules_table>"
),
stats_table="<project_id>.<dataset_id>.<stats_table>",
stats_table_writer=writer,
target_and_error_table_writer=writer,
debugger=False,
stats_streaming_options={user_config.se_enable_streaming: False}
)


# Commented fields are optional or required when notifications are enabled
user_conf = {
user_config.se_notifications_enable_email: False,
# user_config.se_notifications_email_smtp_host: "mailhost.com",
# user_config.se_notifications_email_smtp_port: 25,
# user_config.se_notifications_email_from: "",
# user_config.se_notifications_email_to_other_mail_id: "",
# user_config.se_notifications_email_subject: "spark expectations - data quality - notifications",
user_config.se_notifications_enable_slack: False,
# user_config.se_notifications_slack_webhook_url: "",
# user_config.se_notifications_on_start: True,
# user_config.se_notifications_on_completion: True,
# user_config.se_notifications_on_fail: True,
# user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True,
# user_config.se_notifications_on_error_drop_threshold: 15,
}


@se.with_expectations(
target_table="<project_id>.<dataset_id>.<target_table_name>",
write_to_table=True,
user_conf=user_conf,
target_table_view="<project_id>.<dataset_id>.<target_table_view_name>",
)
def build_new() -> DataFrame:
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
_df_order.createOrReplaceTempView("order")

return _df_order
```
21 changes: 11 additions & 10 deletions docs/configurations/adoption_versions_comparsion.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ Please find the difference in the changes with different version, latest three v



| stage | 0.6.0 | 0.7.0 | 0.8.0 |
| :------------------| :----------- | :----- | ------------------ |
| rules table schema changes | refer rule table creation [here](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/getting-started/setup/) | added three additional column <br> 1.`enable_for_source_dq_validation(boolean)` <br> 2.`enable_for_target_dq_validation(boolean)` <br> 3.`is_active(boolean)` <br> <br> documentation found [here](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/getting-started/setup/) | added additional two column <br> 1.`enable_error_drop_alert(boolean)` <br> 2.`error_drop_thresholdt(int)` <br><br> documentation found [here](https://glowing-umbrella-j8jnolr.pages.github.io/0.8.0/getting-started/setup/)|
| rule table creation required | yes | yes - creation not required if you're upgrading from old version but schema changes required | yes - creation not required if you're upgrading from old version but schema changes required |
| stats table schema changes | refer rule table creation [here](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/getting-started/setup/) | added additional columns <br> 1. `source_query_dq_results` <br> 2. `final_query_dq_results` <br> 3. `row_dq_res_summary` <br> 4. `dq_run_time` <br> 5. `dq_rules` <br><br> renamed columns <br> 1. `runtime` to `meta_dq_run_time` <br> 2. `run_date` to `meta_dq_run_date` <br> 3. `run_id` to `meta_dq_run_id` <br><br> documentation found [here](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/getting-started/setup/)| remains same |
| stats table creation required | yes | yes - creation not required if you're upgrading from old version but schema changes required | automated |
| notification config setting | define global notification param, register as env variable and place in the `__init__.py` file for multiple usage, [example](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/examples/) | Define a global notification parameter in the `__init__.py` file to be used in multiple instances where the spark_conf parameter needs to be passed within the with_expectations function. [example](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/examples/) | remains same |
| secret store and kafka authentication details | not applicable | not applicable | Create a dictionary that contains your secret configuration values and register in `__init__.py` for multiple usage, [example](https://glowing-umbrella-j8jnolr.pages.github.io/0.8.0/examples/) |
| spark expectations initialisation | create SparkExpectations class object using the `SparkExpectations` library and by passing the `product_id` | create spark expectations class object using `SpakrExpectations` by passing `product_id` and optional parameter `debugger` [example](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/examples/) | create spark expectations class object using `SpakrExpectations` by passing `product_id` and additional optional parameter `debugger`, `stats_streaming_options` [example](https://glowing-umbrella-j8jnolr.pages.github.io/0.8.0/examples/) |
| spark expectations decorator | The decorator allows for configuration by passing individual parameters to each decorator. However, registering a DataFrame view within a decorated function is not supported for implementations of query_dq [example](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/examples/) | The decorator allows configurations to be logically grouped through a dictionary passed as a parameter to the decorator. Additionally, registering a DataFrame view within a decorated function is supported for implementations of query_dq. [example](https://glowing-umbrella-j8jnolr.pages.github.io/0.7.0/examples/) | remains same |
| stage | 0.8.0 | 1.0.0 |
|:----------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------|
| rules table schema changes | added additional two column <br> 1.`enable_error_drop_alert(boolean)` <br> 2.`error_drop_thresholdt(int)` <br><br> documentation found [here](https://engineering.nike.com/spark-expectations/0.8.1/getting-started/setup/) | Remains same |
| rule table creation required | yes - creation not required if you're upgrading from old version but schema changes required | yes - creation not required if you're upgrading from old version but schema changes required |
| stats table schema changes | remains same | Remains Same |
| stats table creation required | automated | Remains Same |
| notification config setting | remains same | Remains Same |
| secret store and kafka authentication details | Create a dictionary that contains your secret configuration values and register in `__init__.py` for multiple usage, [example](https://engineering.nike.com/spark-expectations/0.8.1/examples/) | Remains Same. You can disable streaming if needed, in SparkExpectations class |
| spark expectations initialisation | create spark expectations class object using `SpakrExpectations` by passing `product_id` and additional optional parameter `debugger`, `stats_streaming_options` [example](https://engineering.nike.com/spark-expectations/0.8.1/examples/) | New arguments are added. Please follow this - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) |
| with_expectations decorator | remains same | New arguments are added. Please follow this - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) |
| WrappedDataFrameWriter | Doesn't exist | This is new and users need to provider the writer object to record the spark conf that need to be used while writing - [example](https://engineering.nike.com/spark-expectations/1.0.0/examples/) |



Loading