diff --git a/README.md b/README.md index ec119b9..74bb2a6 100644 --- a/README.md +++ b/README.md @@ -12,16 +12,18 @@ pip install dq-suite-amsterdam ``` To validate your first table: -- define `json_path` as a path to a JSON file, similar to shown in dq_rules_example.json in this repo +- define `dq_rule_json_path` as a path to a JSON file, similar to shown in dq_rules_example.json in this repo +- define `table_name` as the name of the table for which a data quality check is required. This name should also occur in the JSON file - load the table requiring a data quality check into a PySpark dataframe `df` (e.g. via `spark.read.csv` or `spark.read.table`) ```python import dq_suite -validation_settings_obj = dq_suite.ValidationSettings(spark_session=spark, catalog_name="dpxx_dev", - table_name="showcase_table", - check_name="showcase_check") -dq_suite.run(json_path=json_path, df=df, validation_settings_obj=validation_settings_obj) +validation_settings_obj = dq_suite.ValidationSettings(spark_session=spark, + catalog_name="dpxx_dev", + table_name=table_name, + check_name="name_of_check_goes_here") +dq_suite.run(json_path=dq_rule_json_path, df=df, validation_settings_obj=validation_settings_obj) ``` Looping over multiple data frames may require a redefinition of the `json_path` and `validation_settings` variables. diff --git a/src/dq_suite/df_checker.py b/src/dq_suite/df_checker.py index 6ebc2e5..ea0cad2 100644 --- a/src/dq_suite/df_checker.py +++ b/src/dq_suite/df_checker.py @@ -4,14 +4,9 @@ from great_expectations.validator.validator import Validator from pyspark.sql import DataFrame -from src.dq_suite.common import ( - DataQualityRulesDict, - Rule, - RulesDict, - ValidationSettings, -) -from src.dq_suite.input_helpers import get_data_quality_rules_dict -from src.dq_suite.output_transformations import ( +from .common import DataQualityRulesDict, Rule, RulesDict, ValidationSettings +from .input_helpers import get_data_quality_rules_dict +from .output_transformations import ( write_non_validation_tables, write_validation_table, ) diff --git a/src/dq_suite/input_helpers.py b/src/dq_suite/input_helpers.py index 564d2d0..6385a99 100644 --- a/src/dq_suite/input_helpers.py +++ b/src/dq_suite/input_helpers.py @@ -5,7 +5,7 @@ from pyspark.sql import SparkSession from pyspark.sql.functions import col -from src.dq_suite.common import DataQualityRulesDict, Rule +from .common import DataQualityRulesDict, Rule def export_schema(dataset: str, spark: SparkSession) -> str: diff --git a/src/dq_suite/output_transformations.py b/src/dq_suite/output_transformations.py index 6367ee5..ceffcde 100644 --- a/src/dq_suite/output_transformations.py +++ b/src/dq_suite/output_transformations.py @@ -2,23 +2,34 @@ from pyspark.sql import DataFrame, Row, SparkSession from pyspark.sql.functions import col +from pyspark.sql.types import StructType -from src.dq_suite import ValidationSettings -from src.dq_suite.common import ( +from .common import ( DataQualityRulesDict, + ValidationSettings, is_empty_dataframe, write_to_unity_catalog, ) -from src.schemas.afwijking import SCHEMA as AFWIJKING_SCHEMA -from src.schemas.bronattribuut import SCHEMA as BRONATTRIBUUT_SCHEMA -from src.schemas.brontabel import SCHEMA as BRONTABEL_SCHEMA -from src.schemas.regel import SCHEMA as REGEL_SCHEMA -from src.schemas.validatie import SCHEMA as VALIDATIE_SCHEMA +from .schemas.afwijking import SCHEMA as AFWIJKING_SCHEMA +from .schemas.bronattribuut import SCHEMA as BRONATTRIBUUT_SCHEMA +from .schemas.brontabel import SCHEMA as BRONTABEL_SCHEMA +from .schemas.regel import SCHEMA as REGEL_SCHEMA +from .schemas.validatie import SCHEMA as VALIDATIE_SCHEMA + + +def create_empty_dataframe( + spark_session: SparkSession, schema: StructType +) -> DataFrame: + return spark_session.sparkContext.parallelize([]).toDF(schema) def list_of_dicts_to_df( - list_of_dicts: List[dict], spark_session: SparkSession + list_of_dicts: List[dict], spark_session: SparkSession, schema: StructType ) -> DataFrame: + if len(list_of_dicts) == 0: + return create_empty_dataframe( + spark_session=spark_session, schema=schema + ) return spark_session.createDataFrame(Row(**x) for x in list_of_dicts) @@ -61,7 +72,9 @@ def extract_dq_validatie_data( ) df_validatie = list_of_dicts_to_df( - list_of_dicts=extracted_data, spark_session=spark_session + list_of_dicts=extracted_data, + spark_session=spark_session, + schema=VALIDATIE_SCHEMA, ) if not is_empty_dataframe(df=df_validatie): write_to_unity_catalog( @@ -140,7 +153,9 @@ def extract_dq_afwijking_data( ) df_afwijking = list_of_dicts_to_df( - list_of_dicts=extracted_data, spark_session=spark_session + list_of_dicts=extracted_data, + spark_session=spark_session, + schema=AFWIJKING_SCHEMA, ) if not is_empty_dataframe(df=df_afwijking): write_to_unity_catalog( @@ -176,7 +191,9 @@ def create_brontabel( ) df_brontabel = list_of_dicts_to_df( - list_of_dicts=extracted_data, spark_session=spark_session + list_of_dicts=extracted_data, + spark_session=spark_session, + schema=BRONTABEL_SCHEMA, ) write_to_unity_catalog( df=df_brontabel, @@ -222,7 +239,9 @@ def create_bronattribute( ) df_bronattribuut = list_of_dicts_to_df( - list_of_dicts=extracted_data, spark_session=spark_session + list_of_dicts=extracted_data, + spark_session=spark_session, + schema=BRONATTRIBUUT_SCHEMA, ) write_to_unity_catalog( df=df_bronattribuut, @@ -265,7 +284,9 @@ def create_dq_regel( ) df_regel = list_of_dicts_to_df( - list_of_dicts=extracted_data, spark_session=spark_session + list_of_dicts=extracted_data, + spark_session=spark_session, + schema=REGEL_SCHEMA, ) write_to_unity_catalog( df=df_regel, diff --git a/src/dq_suite/schemas/__init__.py b/src/dq_suite/schemas/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/schemas/afwijking.py b/src/dq_suite/schemas/afwijking.py similarity index 100% rename from src/schemas/afwijking.py rename to src/dq_suite/schemas/afwijking.py diff --git a/src/schemas/bronattribuut.py b/src/dq_suite/schemas/bronattribuut.py similarity index 100% rename from src/schemas/bronattribuut.py rename to src/dq_suite/schemas/bronattribuut.py diff --git a/src/schemas/brontabel.py b/src/dq_suite/schemas/brontabel.py similarity index 100% rename from src/schemas/brontabel.py rename to src/dq_suite/schemas/brontabel.py diff --git a/src/schemas/regel.py b/src/dq_suite/schemas/regel.py similarity index 100% rename from src/schemas/regel.py rename to src/dq_suite/schemas/regel.py diff --git a/src/schemas/validatie.py b/src/dq_suite/schemas/validatie.py similarity index 100% rename from src/schemas/validatie.py rename to src/dq_suite/schemas/validatie.py