Skip to content

Commit

Permalink
Refactor df_check.py (#24)
Browse files Browse the repository at this point in the history
* Start refactoring df_check function, rename to validate_dataframes

* Add common write-function to input_helpers.py

* Add output_helpers.py

* Rename to common.py

* Move more functions from validate_dataframes to common.py

* Update README.md

* Rename parameters to spark_session

* Add get_batch_request_and_validator

* Add _dict suffixes

* Fix __init__.py

* Formatting

* Add ValidationSettings dataclass

* Add first unit test with test data

* Add run_validation placeholder function

* Use ValidationSettings in df_checker.py

* Modify README.md

* Modify README.md

* Move json functions to df_checker.py

* Refactor validate function to accept only single dataframe

* Add table_name to ValidationSettings object

* Modify README.md

* Modify README.md

* Modify README.md

* Modify ValidationSettings, simplify validate function

* Simplify get_batch_request_and_validator

* Add filter_validation_dict_by_table_name

* Minor changes

* Add TestValidationSettings

* Expand TestValidationSettings

* Add create_and_configure_expectations

* Add write_validation_table

* Minor changes

* Rename to run

* Cleanup

* Re-add input_helpers.py, split common.py

* Increase version

* Move code and rename some files

* Modify import and comment

* Update pyproject.toml

* Update README.md

---------

Co-authored-by: bas <b.schotten@amsterdam.nl>
Co-authored-by: ArthurKordes <75675106+ArthurKordes@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 13, 2024
1 parent 4c7e269 commit 407e5db
Show file tree
Hide file tree
Showing 12 changed files with 579 additions and 252 deletions.
28 changes: 13 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,22 @@ Install the dq suite on your compute, for example by running the following code
pip install dq-suite-amsterdam
```

```
import dq_suite
```
To validate your first table:
- define `json_path` as a path to a JSON file, similar to shown in dq_rules_example.json in this repo
- load the table requiring a data quality check into a PySpark dataframe `df` (e.g. via `spark.read.csv` or `spark.read.table`)

Load your data in dataframes, give them a table_name, and create a list of all dataframes:
```python
import dq_suite

validation_settings_obj = dq_suite.ValidationSettings(spark_session=spark, catalog_name="dpxx_dev",
table_name="showcase_table",
check_name="showcase_check")
dq_suite.run(json_path=json_path, df=df, validation_settings_obj=validation_settings_obj)
```
df = spark.read.csv(csv_path+file_name, header=True, inferSchema=True) #example using csv
df.table_name = "showcase_table"
dfs = [df]
```
Looping over multiple data frames may require a redefinition of the `json_path` and `validation_settings` variables.

- Define 'dfs' as a list of dataframes that require a dq check
- Define 'dq_rules' as a JSON as shown in dq_rules_example.json in this repo
- Define a name for your dq check, in this case "showcase"

```
dq_suite.df_check(dfs, dq_rules, "dpxx_dev", "showcase", spark)
```
# Create dataquality schema and tables (in respective catalog of data team)
# Create data quality schema and tables (in respective catalog of data team)

for the first time installation create data quality schema and tables from the notebook from repo path scripts/data_quality_tables.sql
- open the notebook, connect to a cluster
Expand Down Expand Up @@ -82,3 +78,5 @@ Version 0.4: Added schema validation with Amsterdam Schema per table
Version 0.5: Export schema from Unity Catalog

Version 0.6: The results are written to tables in the "dataquality" schema

Version 0.7: Refactored the solution
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "dq-suite-amsterdam"
version = "0.6.3"
version = "0.7.0"
authors = [
{ name="Arthur Kordes", email="a.kordes@amsterdam.nl" },
{ name="Aysegul Cayir Aydar", email="a.cayiraydar@amsterdam.nl" },
Expand Down
Empty file added src/__init__.py
Empty file.
7 changes: 4 additions & 3 deletions src/dq_suite/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""DQ API."""

from dq_suite.df_checker import df_check
from dq_suite.input_helpers import export_schema, validate_dqrules
from src.dq_suite.common import ValidationSettings
from src.dq_suite.df_checker import run
from src.dq_suite.input_helpers import schema_to_json_string

# Use __all__ to let developers know what is part of the public API.
__all__ = ["validate_dqrules", "df_check", "export_schema"]
__all__ = ["schema_to_json_string", "run", "ValidationSettings"]
129 changes: 129 additions & 0 deletions src/dq_suite/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from dataclasses import dataclass
from typing import Any, Dict, List, Literal

from great_expectations import get_context
from great_expectations.data_context import AbstractDataContext
from pyspark.sql import DataFrame, SparkSession


@dataclass()
class Rule:
"""
Groups the name of the GX validation rule together with the
parameters required to apply this rule.
"""

rule_name: str # Name of the GX expectation
parameters: List[Dict[str, Any]] # Collection of parameters required for
# evaluating the expectation

def __getitem__(self, key) -> str | List[Dict[str, Any]] | None:
if key == "rule_name":
return self.rule_name
elif key == "parameters":
return self.parameters
raise KeyError(key)


@dataclass()
class RulesDict:
"""
Groups a list of Rule-objects together with the name of the table
these rules are to be applied to, as well as a unique identifier used for
identifying outliers.
"""

unique_identifier: str # TODO: List[str] for more complex keys?
table_name: str
rules_list: List[Rule]

def __getitem__(self, key) -> str | List[Rule] | None:
if key == "unique_identifier":
return self.unique_identifier
elif key == "table_name":
return self.table_name
elif key == "rules_list":
return self.rules_list
raise KeyError(key)


RulesDictList = List[RulesDict] # a list of dictionaries containing DQ rules


@dataclass()
class DataQualityRulesDict:
tables: RulesDictList

def __getitem__(self, key) -> RulesDictList | None:
if key == "tables":
return self.tables
raise KeyError(key)


def get_full_table_name(
catalog_name: str, table_name: str, schema_name: str = "data_quality"
) -> str:
return f"{catalog_name}.{schema_name}.{table_name}"


def write_to_unity_catalog(
df: DataFrame,
catalog_name: str,
table_name: str,
# schema: StructType,
mode: Literal["append", "overwrite"] = "append",
) -> None:
# TODO: enforce schema?
# df = enforce_schema(df=df, schema_to_enforce=schema)
full_table_name = get_full_table_name(
catalog_name=catalog_name, table_name=table_name
)
df.write.mode(mode).option("overwriteSchema", "true").saveAsTable(
full_table_name
) # TODO: write as delta-table? .format("delta")


def get_data_context(
data_context_root_dir: str = "/dbfs/great_expectations/",
) -> AbstractDataContext:
return get_context(context_root_dir=data_context_root_dir)


@dataclass()
class ValidationSettings:
spark_session: SparkSession
catalog_name: str
table_name: str
check_name: str
data_context_root_dir: str = "/dbfs/great_expectations/"
data_context: AbstractDataContext | None = None
expectation_suite_name: str | None = None
checkpoint_name: str | None = None
run_name: str | None = None

def initialise_or_update_attributes(self):
self._set_data_context()

# TODO/check: do we want to allow for custom names?
self._set_expectation_suite_name()
self._set_checkpoint_name()
self._set_run_name()

# Finally, apply the (new) suite name to the data context
self.data_context.add_or_update_expectation_suite(
expectation_suite_name=self.expectation_suite_name
)

def _set_data_context(self):
self.data_context = get_data_context(
data_context_root_dir=self.data_context_root_dir
)

def _set_expectation_suite_name(self):
self.expectation_suite_name = f"{self.check_name}_expectation_suite"

def _set_checkpoint_name(self):
self.checkpoint_name = f"{self.check_name}_checkpoint"

def _set_run_name(self):
self.run_name = f"%Y%m%d-%H%M%S-{self.check_name}"
Loading

0 comments on commit 407e5db

Please sign in to comment.