From 2ccdc7de2fc96fb38b83f7ea1eb7359dfe6f3043 Mon Sep 17 00:00:00 2001 From: ArthurKordes <75675106+ArthurKordes@users.noreply.github.com> Date: Thu, 12 Sep 2024 21:18:15 +0200 Subject: [PATCH] Add dataset level (#49) * Adds dataset level * Update pyproject.toml * Update pyproject.toml * Update test_common.py * Update test_common.py * Update test_common.py * Update test_common.py * Update test_common.py * Update test_common.py * Update common.py --------- Co-authored-by: ArthurKordes Co-authored-by: Bas --- pyproject.toml | 2 +- scripts/data_quality_tables.sql | 73 +++++++++++++------- src/dq_suite/common.py | 40 ++++++++++- src/dq_suite/output_transformations.py | 92 +++++++++++++++++++++----- src/dq_suite/schemas/brondataset.py | 5 ++ src/dq_suite/schemas/brontabel.py | 7 +- src/dq_suite/schemas/regel.py | 1 + tests/test_common.py | 54 ++++++++++++++- 8 files changed, 226 insertions(+), 48 deletions(-) create mode 100644 src/dq_suite/schemas/brondataset.py diff --git a/pyproject.toml b/pyproject.toml index 32d82bd..a9b19c2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "dq-suite-amsterdam" -version = "0.8.0" +version = "0.9.0" authors = [ { name="Arthur Kordes", email="a.kordes@amsterdam.nl" }, { name="Aysegul Cayir Aydar", email="a.cayiraydar@amsterdam.nl" }, diff --git a/scripts/data_quality_tables.sql b/scripts/data_quality_tables.sql index 3dfef49..fac811c 100644 --- a/scripts/data_quality_tables.sql +++ b/scripts/data_quality_tables.sql @@ -1,6 +1,10 @@ -- Databricks notebook source -- MAGIC %md --- MAGIC schema and tables for Dataquality great expectations +-- MAGIC This script creates the schema and tables for dq-suite-amsterdam + +-- COMMAND ---------- + +CREATE WIDGET TEXT catalog DEFAULT "dpxx_dev" -- COMMAND ---------- @@ -8,13 +12,27 @@ create schema if not exists ${catalog}.data_quality -- COMMAND ---------- -CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.regel ( - `regelId` STRING, - `regelNaam` STRING, - `regelParameters` STRING, - `bronTabelId` STRING) +CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.brondataset ( + bronDatasetId STRING, + medaillonLaag STRING) USING delta -COMMENT 'Created by the file upload UI' +COMMENT 'Deployed by dq-suite-amsterdam' +TBLPROPERTIES ( + 'delta.columnMapping.mode' = 'name', + 'delta.enableDeletionVectors' = 'true', + 'delta.feature.columnMapping' = 'supported', + 'delta.feature.deletionVectors' = 'supported', + 'delta.minReaderVersion' = '3', + 'delta.minWriterVersion' = '7') + +-- COMMAND ---------- + +CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.brontabel ( + bronTabelId STRING, + tabelNaam STRING, + uniekeSleutel STRING) +USING delta +COMMENT 'Deployed by dq-suite-amsterdam' TBLPROPERTIES ( 'delta.columnMapping.mode' = 'name', 'delta.enableDeletionVectors' = 'true', @@ -26,12 +44,29 @@ TBLPROPERTIES ( -- COMMAND ---------- CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.bronattribuut ( - name STRING, - `bronAttribuutId` STRING, - `bronTabelId` STRING, - `attribuutNaam` STRING) + bronAttribuutId STRING, + bronTabelId STRING, + attribuutNaam STRING) USING delta -COMMENT 'Created by the file upload UI' +COMMENT 'Deployed by dq-suite-amsterdam' +TBLPROPERTIES ( + 'delta.columnMapping.mode' = 'name', + 'delta.enableDeletionVectors' = 'true', + 'delta.feature.columnMapping' = 'supported', + 'delta.feature.deletionVectors' = 'supported', + 'delta.minReaderVersion' = '3', + 'delta.minWriterVersion' = '7') + +-- COMMAND ---------- + +CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.regel ( + regelId STRING, + regelNaam STRING, + regelParameters STRING, + bronTabelId STRING, + attribuut STRING) +USING delta +COMMENT 'Deployed by dq-suite-amsterdam' TBLPROPERTIES ( 'delta.columnMapping.mode' = 'name', 'delta.enableDeletionVectors' = 'true', @@ -49,7 +84,7 @@ CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.validatie ( dqDatum TIMESTAMP, dqResultaat STRING) USING delta -COMMENT 'Created by the file upload UI' +COMMENT 'Deployed by dq-suite-amsterdam' TBLPROPERTIES ( 'delta.checkpoint.writeStatsAsJson' = 'false', 'delta.checkpoint.writeStatsAsStruct' = 'true', @@ -67,7 +102,7 @@ CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.afwijking ( afwijkendeAttribuutWaarde STRING, dqDatum TIMESTAMP) USING delta -COMMENT 'Created by the file upload UI' +COMMENT 'Deployed by dq-suite-amsterdam' TBLPROPERTIES ( 'delta.checkpoint.writeStatsAsJson' = 'false', 'delta.checkpoint.writeStatsAsStruct' = 'true', @@ -75,13 +110,3 @@ TBLPROPERTIES ( 'delta.feature.deletionVectors' = 'supported', 'delta.minReaderVersion' = '3', 'delta.minWriterVersion' = '7') - --- COMMAND ---------- - -CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.brontabel ( - bronTabelId STRING, - uniekeSleutel STRING) -USING delta -TBLPROPERTIES ( - 'delta.minReaderVersion' = '1', - 'delta.minWriterVersion' = '2') diff --git a/src/dq_suite/common.py b/src/dq_suite/common.py index 117f8ff..5d61e2c 100644 --- a/src/dq_suite/common.py +++ b/src/dq_suite/common.py @@ -72,19 +72,55 @@ def __getitem__(self, key) -> str | RulesList | None: raise KeyError(key) +@dataclass() +class DatasetDict: + """ + Groups the name and the medallion layer of the dataset where the + rules apply to. + """ + + name: str + layer: str + + def __post_init__(self): + if not isinstance(self.name, str): + raise TypeError("'name' should be of type str") + + if not isinstance(self.layer, str): + raise TypeError("'layer' should be of type str") + + def __getitem__(self, key) -> str | RulesList | None: + if key == "name": + return self.name + elif key == "layer": + return self.layer + raise KeyError(key) + + RulesDictList = List[RulesDict] # a list of dictionaries containing DQ rules @dataclass() class DataQualityRulesDict: + """ + Groups a list of Table-objects together with the definition of the dataset + these tables are a part of. + """ + + dataset: DatasetDict tables: RulesDictList def __post_init__(self): + if not isinstance(self.dataset, dict): + raise TypeError("'dataset' should be DatasetDict") + if not isinstance(self.tables, list): raise TypeError("'tables' should be RulesDictList") - def __getitem__(self, key) -> RulesDictList | None: - if key == "tables": + def __getitem__(self, key) -> str | RulesDictList | None: + if key == "dataset": + return self.dataset + elif key == "tables": return self.tables raise KeyError(key) diff --git a/src/dq_suite/output_transformations.py b/src/dq_suite/output_transformations.py index 4643038..869eb7b 100644 --- a/src/dq_suite/output_transformations.py +++ b/src/dq_suite/output_transformations.py @@ -11,8 +11,9 @@ write_to_unity_catalog, merge_df_with_unity_table, ) -from .schemas.bronattribuut import SCHEMA as BRONATTRIBUUT_SCHEMA +from .schemas.brondataset import SCHEMA as BRONDATASET_SCHEMA from .schemas.brontabel import SCHEMA as BRONTABEL_SCHEMA +from .schemas.bronattribuut import SCHEMA as BRONATTRIBUUT_SCHEMA from .schemas.regel import SCHEMA as REGEL_SCHEMA from .schemas.validatie import SCHEMA as VALIDATIE_SCHEMA from .schemas.pre_validatie import SCHEMA as PRE_VALIDATIE_SCHEMA @@ -199,6 +200,43 @@ def extract_dq_afwijking_data( pass +def create_brondataset( + dq_rules_dict: DataQualityRulesDict, + catalog_name: str, + spark_session: SparkSession, +) -> None: + """ + Function takes the dataset name and layer from the provided + Data Quality rules to create a DataFrame containing this metadata. + + :param dq_rules_dict: + :param catalog_name: + :param spark_session: + """ + name = dq_rules_dict["dataset"]["name"] + layer = dq_rules_dict["dataset"]["layer"] + extracted_data = [{"bronDatasetId": name, "medaillonLaag": layer}] + + df_brondataset = list_of_dicts_to_df( + list_of_dicts=extracted_data, + spark_session=spark_session, + schema=BRONDATASET_SCHEMA, + ) + merge_dict = { + "bronDatasetId": "brondataset_df.bronDatasetId", + "medaillonLaag": "brondataset_df.medaillonLaag", + } + merge_df_with_unity_table( + df=df_brondataset, + catalog_name=catalog_name, + table_name="brondataset", + table_merge_id="bronDatasetId", + df_merge_id="bronDatasetId", + merge_dict=merge_dict, + spark_session=spark_session, + ) + + def create_brontabel( dq_rules_dict: DataQualityRulesDict, catalog_name: str, @@ -213,11 +251,15 @@ def create_brontabel( :param spark_session: """ extracted_data = [] + dataset_name = dq_rules_dict["dataset"]["name"] for param in dq_rules_dict["tables"]: - name = param["table_name"] + table_name = param["table_name"] + tabel_id = f"{dataset_name}_{table_name}" unique_identifier = param["unique_identifier"] extracted_data.append( - {"bronTabelId": name, "uniekeSleutel": unique_identifier} + {"bronTabelId": tabel_id, + "tabelNaam": table_name, + "uniekeSleutel": unique_identifier} ) df_brontabel = list_of_dicts_to_df( @@ -227,6 +269,7 @@ def create_brontabel( ) merge_dict = { "bronTabelId": "brontabel_df.bronTabelId", + "tabelNaam": "brontabel_df.tabelNaam", "uniekeSleutel": "brontabel_df.uniekeSleutel", } merge_df_with_unity_table( @@ -254,16 +297,18 @@ def create_bronattribute( :param spark_session: """ extracted_data = [] + dataset_name = dq_rules_dict["dataset"]["name"] used_ids = set() # To keep track of used IDs for param in dq_rules_dict["tables"]: - bron_tabel = param["table_name"] + table_name = param["table_name"] + tabel_id = f"{dataset_name}_{table_name}" for rule in param["rules"]: parameters = rule.get("parameters", []) for parameter in parameters: if isinstance(parameter, dict) and "column" in parameter: attribute_name = parameter["column"] # Create a unique ID - unique_id = f"{bron_tabel}_{attribute_name}" + unique_id = f"{tabel_id}_{attribute_name}" # Check if the ID is already used if unique_id not in used_ids: used_ids.add(unique_id) @@ -271,7 +316,7 @@ def create_bronattribute( { "bronAttribuutId": unique_id, "attribuutNaam": attribute_name, - "bronTabelId": bron_tabel, + "bronTabelId": tabel_id, } ) @@ -311,18 +356,23 @@ def create_dq_regel( :param spark_session: """ extracted_data = [] - for param in dq_rules_dict["tables"]: - bron_tabel = param["table_name"] - for rule in param["rules"]: + dataset_name = dq_rules_dict["dataset"]["name"] + for table in dq_rules_dict["tables"]: + table_name = table["table_name"] + tabel_id = f"{dataset_name}_{table_name}" + for rule in table["rules"]: rule_name = rule["rule_name"] parameters = rule.get("parameters", []) - extracted_data.append( - { - "regelNaam": rule_name, - "regelParameters": parameters, - "bronTabelId": bron_tabel - } - ) + for param_set in parameters: + column = param_set.get("column") + extracted_data.append( + { + "regelNaam": rule_name, + "regelParameters": parameters, + "bronTabelId": tabel_id, + "attribuut": column + } + ) df_regel = list_of_dicts_to_df( list_of_dicts=extracted_data, @@ -331,13 +381,14 @@ def create_dq_regel( ) df_regel_with_id_ordered = construct_regel_id( df=df_regel, - output_columns_list=['regelId','regelNaam','regelParameters','bronTabelId'] + output_columns_list=['regelId','regelNaam','regelParameters','bronTabelId','attribuut'] ) merge_dict = { "regelId": "regel_df.regelId", "regelNaam": "regel_df.regelNaam", "regelParameters": "regel_df.regelParameters", - "bronTabelId": "regel_df.bronTabelId" + "bronTabelId": "regel_df.bronTabelId", + "attribuut": "regel_df.attribuut" } merge_df_with_unity_table( df=df_regel_with_id_ordered, @@ -354,6 +405,11 @@ def write_non_validation_tables( dq_rules_dict: DataQualityRulesDict, validation_settings_obj: ValidationSettings, ) -> None: + create_brondataset( + dq_rules_dict=dq_rules_dict, + catalog_name=validation_settings_obj.catalog_name, + spark_session=validation_settings_obj.spark_session, + ) create_brontabel( dq_rules_dict=dq_rules_dict, catalog_name=validation_settings_obj.catalog_name, diff --git a/src/dq_suite/schemas/brondataset.py b/src/dq_suite/schemas/brondataset.py new file mode 100644 index 0000000..ccd32b4 --- /dev/null +++ b/src/dq_suite/schemas/brondataset.py @@ -0,0 +1,5 @@ +from pyspark.sql.types import StructType + +SCHEMA = ( + StructType().add("bronDatasetId", "string").add("medaillonLaag", "string") +) diff --git a/src/dq_suite/schemas/brontabel.py b/src/dq_suite/schemas/brontabel.py index 8d5d207..02e80cc 100644 --- a/src/dq_suite/schemas/brontabel.py +++ b/src/dq_suite/schemas/brontabel.py @@ -1,5 +1,8 @@ from pyspark.sql.types import StructType SCHEMA = ( - StructType().add("bronTabelId", "string").add("uniekeSleutel", "string") -) + StructType() + .add("bronTabelId", "string") + .add("tabelNaam", "string") + .add("uniekeSleutel", "string") +) \ No newline at end of file diff --git a/src/dq_suite/schemas/regel.py b/src/dq_suite/schemas/regel.py index 75a95f9..ef4929d 100644 --- a/src/dq_suite/schemas/regel.py +++ b/src/dq_suite/schemas/regel.py @@ -5,4 +5,5 @@ .add("regelNaam", "string") .add("regelParameters", "string") .add("bronTabelId", "string") + .add("attribuut", "string") ) diff --git a/tests/test_common.py b/tests/test_common.py index 3decb61..49b224c 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -6,6 +6,7 @@ from src.dq_suite.common import ( DataQualityRulesDict, + DatasetDict, Rule, RulesDict, ValidationSettings, @@ -95,6 +96,31 @@ def test_get_value_from_rule_dict_by_non_existing_key_raises_key_error( assert self.rules_dict_obj["wrong_key"] +class TestDatasetDict: + expected_dataset_name = "the_dataset" + expected_layer_name = "brons" + dataset_obj = DatasetDict(name=expected_dataset_name, layer=expected_layer_name) + + def test_initialisation_with_wrong_typed_name_raises_type_error(self): + with pytest.raises(TypeError): + assert DatasetDict(name=123, layer="brons") + + def test_initialisation_with_wrong_typed_layer_raises_type_error(self): + with pytest.raises(TypeError): + assert DatasetDict(name="the_dataset", layer=123) + + def test_rule_is_dataclass(self): + assert is_dataclass(self.dataset_obj) + + def test_get_value_from_rule_by_existing_key(self): + assert self.dataset_obj["name"] == self.expected_dataset_name + assert self.dataset_obj["layer"] == self.expected_layer_name + + def test_get_value_from_dataset_by_non_existing_key_raises_key_error(self): + with pytest.raises(KeyError): + assert self.dataset_obj["wrong_key"] + + class TestDataQualityRulesDict: rule_obj = Rule(rule_name="the_rule", parameters=[{"q": 42}]) expected_unique_identifier = "id" @@ -106,13 +132,39 @@ class TestDataQualityRulesDict: rules_list=expected_rules_list, ) expected_rules_dict_obj_list = [rules_dict_obj] + expected_dataset_name = "the_dataset" + expected_layer_name = "brons" + dataset_obj = DatasetDict( + name=expected_dataset_name, + layer=expected_layer_name + ) + dataset_obj = {"name": "the_dataset", "layer": "brons"} data_quality_rules_dict = DataQualityRulesDict( + dataset=dataset_obj, tables=expected_rules_dict_obj_list ) + def test_initialisation_with_wrong_typed_dataset_raises_type_error(self): + with pytest.raises(TypeError): + assert DataQualityRulesDict( + dataset=123, + tables=[ + RulesDict( + unique_identifier="id", + table_name="the_table", + rules_list=[ + Rule(rule_name="the_rule", parameters=[{"q": 42}]) + ], + ) + ] + ) + def test_initialisation_with_wrong_typed_tables_raises_type_error(self): with pytest.raises(TypeError): - assert DataQualityRulesDict(tables=123) + assert DataQualityRulesDict( + dataset={"name": "the_dataset", "layer": "brons"}, + tables=123 + ) def test_get_value_from_data_quality_rules_dict_by_existing_key(self): assert (