Skip to content

Commit

Permalink
Fix imports, again (#32)
Browse files Browse the repository at this point in the history
* Fix imports

* Fix schema imports

* Fix schema imports

* Fix schema imports

* Fix building empty df

* Fix building empty df

* Minor change

* Increase project version

* Modify README.md

---------

Co-authored-by: bas <b.schotten@amsterdam.nl>
  • Loading branch information
SSchotten and bas authored Aug 21, 2024
1 parent bb16def commit ad3d07c
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 27 deletions.
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@ pip install dq-suite-amsterdam
```

To validate your first table:
- define `json_path` as a path to a JSON file, similar to shown in dq_rules_example.json in this repo
- define `dq_rule_json_path` as a path to a JSON file, similar to shown in dq_rules_example.json in this repo
- define `table_name` as the name of the table for which a data quality check is required. This name should also occur in the JSON file
- load the table requiring a data quality check into a PySpark dataframe `df` (e.g. via `spark.read.csv` or `spark.read.table`)

```python
import dq_suite

validation_settings_obj = dq_suite.ValidationSettings(spark_session=spark, catalog_name="dpxx_dev",
table_name="showcase_table",
check_name="showcase_check")
dq_suite.run(json_path=json_path, df=df, validation_settings_obj=validation_settings_obj)
validation_settings_obj = dq_suite.ValidationSettings(spark_session=spark,
catalog_name="dpxx_dev",
table_name=table_name,
check_name="name_of_check_goes_here")
dq_suite.run(json_path=dq_rule_json_path, df=df, validation_settings_obj=validation_settings_obj)
```
Looping over multiple data frames may require a redefinition of the `json_path` and `validation_settings` variables.

Expand Down
11 changes: 3 additions & 8 deletions src/dq_suite/df_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,9 @@
from great_expectations.validator.validator import Validator
from pyspark.sql import DataFrame

from src.dq_suite.common import (
DataQualityRulesDict,
Rule,
RulesDict,
ValidationSettings,
)
from src.dq_suite.input_helpers import get_data_quality_rules_dict
from src.dq_suite.output_transformations import (
from .common import DataQualityRulesDict, Rule, RulesDict, ValidationSettings
from .input_helpers import get_data_quality_rules_dict
from .output_transformations import (
write_non_validation_tables,
write_validation_table,
)
Expand Down
2 changes: 1 addition & 1 deletion src/dq_suite/input_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

from src.dq_suite.common import DataQualityRulesDict, Rule
from .common import DataQualityRulesDict, Rule


def export_schema(dataset: str, spark: SparkSession) -> str:
Expand Down
47 changes: 34 additions & 13 deletions src/dq_suite/output_transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,34 @@

from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType

from src.dq_suite import ValidationSettings
from src.dq_suite.common import (
from .common import (
DataQualityRulesDict,
ValidationSettings,
is_empty_dataframe,
write_to_unity_catalog,
)
from src.schemas.afwijking import SCHEMA as AFWIJKING_SCHEMA
from src.schemas.bronattribuut import SCHEMA as BRONATTRIBUUT_SCHEMA
from src.schemas.brontabel import SCHEMA as BRONTABEL_SCHEMA
from src.schemas.regel import SCHEMA as REGEL_SCHEMA
from src.schemas.validatie import SCHEMA as VALIDATIE_SCHEMA
from .schemas.afwijking import SCHEMA as AFWIJKING_SCHEMA
from .schemas.bronattribuut import SCHEMA as BRONATTRIBUUT_SCHEMA
from .schemas.brontabel import SCHEMA as BRONTABEL_SCHEMA
from .schemas.regel import SCHEMA as REGEL_SCHEMA
from .schemas.validatie import SCHEMA as VALIDATIE_SCHEMA


def create_empty_dataframe(
spark_session: SparkSession, schema: StructType
) -> DataFrame:
return spark_session.sparkContext.parallelize([]).toDF(schema)


def list_of_dicts_to_df(
list_of_dicts: List[dict], spark_session: SparkSession
list_of_dicts: List[dict], spark_session: SparkSession, schema: StructType
) -> DataFrame:
if len(list_of_dicts) == 0:
return create_empty_dataframe(
spark_session=spark_session, schema=schema
)
return spark_session.createDataFrame(Row(**x) for x in list_of_dicts)


Expand Down Expand Up @@ -61,7 +72,9 @@ def extract_dq_validatie_data(
)

df_validatie = list_of_dicts_to_df(
list_of_dicts=extracted_data, spark_session=spark_session
list_of_dicts=extracted_data,
spark_session=spark_session,
schema=VALIDATIE_SCHEMA,
)
if not is_empty_dataframe(df=df_validatie):
write_to_unity_catalog(
Expand Down Expand Up @@ -140,7 +153,9 @@ def extract_dq_afwijking_data(
)

df_afwijking = list_of_dicts_to_df(
list_of_dicts=extracted_data, spark_session=spark_session
list_of_dicts=extracted_data,
spark_session=spark_session,
schema=AFWIJKING_SCHEMA,
)
if not is_empty_dataframe(df=df_afwijking):
write_to_unity_catalog(
Expand Down Expand Up @@ -176,7 +191,9 @@ def create_brontabel(
)

df_brontabel = list_of_dicts_to_df(
list_of_dicts=extracted_data, spark_session=spark_session
list_of_dicts=extracted_data,
spark_session=spark_session,
schema=BRONTABEL_SCHEMA,
)
write_to_unity_catalog(
df=df_brontabel,
Expand Down Expand Up @@ -222,7 +239,9 @@ def create_bronattribute(
)

df_bronattribuut = list_of_dicts_to_df(
list_of_dicts=extracted_data, spark_session=spark_session
list_of_dicts=extracted_data,
spark_session=spark_session,
schema=BRONATTRIBUUT_SCHEMA,
)
write_to_unity_catalog(
df=df_bronattribuut,
Expand Down Expand Up @@ -265,7 +284,9 @@ def create_dq_regel(
)

df_regel = list_of_dicts_to_df(
list_of_dicts=extracted_data, spark_session=spark_session
list_of_dicts=extracted_data,
spark_session=spark_session,
schema=REGEL_SCHEMA,
)
write_to_unity_catalog(
df=df_regel,
Expand Down
Empty file.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 comments on commit ad3d07c

Please sign in to comment.