Skip to content

Commit

Permalink
Add dataset level (#49)
Browse files Browse the repository at this point in the history
* Adds dataset level

* Update pyproject.toml

* Update pyproject.toml

* Update test_common.py

* Update test_common.py

* Update test_common.py

* Update test_common.py

* Update test_common.py

* Update test_common.py

* Update common.py

---------

Co-authored-by: ArthurKordes <ArthurKordes>
Co-authored-by: Bas <SSchotten@users.noreply.github.com>
  • Loading branch information
ArthurKordes and SSchotten authored Sep 12, 2024
1 parent 158fe1a commit 2ccdc7d
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 48 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "dq-suite-amsterdam"
version = "0.8.0"
version = "0.9.0"
authors = [
{ name="Arthur Kordes", email="a.kordes@amsterdam.nl" },
{ name="Aysegul Cayir Aydar", email="a.cayiraydar@amsterdam.nl" },
Expand Down
73 changes: 49 additions & 24 deletions scripts/data_quality_tables.sql
Original file line number Diff line number Diff line change
@@ -1,20 +1,38 @@
-- Databricks notebook source
-- MAGIC %md
-- MAGIC schema and tables for Dataquality great expectations
-- MAGIC This script creates the schema and tables for dq-suite-amsterdam

-- COMMAND ----------

CREATE WIDGET TEXT catalog DEFAULT "dpxx_dev"

-- COMMAND ----------

create schema if not exists ${catalog}.data_quality

-- COMMAND ----------

CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.regel (
`regelId` STRING,
`regelNaam` STRING,
`regelParameters` STRING,
`bronTabelId` STRING)
CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.brondataset (
bronDatasetId STRING,
medaillonLaag STRING)
USING delta
COMMENT 'Created by the file upload UI'
COMMENT 'Deployed by dq-suite-amsterdam'
TBLPROPERTIES (
'delta.columnMapping.mode' = 'name',
'delta.enableDeletionVectors' = 'true',
'delta.feature.columnMapping' = 'supported',
'delta.feature.deletionVectors' = 'supported',
'delta.minReaderVersion' = '3',
'delta.minWriterVersion' = '7')

-- COMMAND ----------

CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.brontabel (
bronTabelId STRING,
tabelNaam STRING,
uniekeSleutel STRING)
USING delta
COMMENT 'Deployed by dq-suite-amsterdam'
TBLPROPERTIES (
'delta.columnMapping.mode' = 'name',
'delta.enableDeletionVectors' = 'true',
Expand All @@ -26,12 +44,29 @@ TBLPROPERTIES (
-- COMMAND ----------

CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.bronattribuut (
name STRING,
`bronAttribuutId` STRING,
`bronTabelId` STRING,
`attribuutNaam` STRING)
bronAttribuutId STRING,
bronTabelId STRING,
attribuutNaam STRING)
USING delta
COMMENT 'Created by the file upload UI'
COMMENT 'Deployed by dq-suite-amsterdam'
TBLPROPERTIES (
'delta.columnMapping.mode' = 'name',
'delta.enableDeletionVectors' = 'true',
'delta.feature.columnMapping' = 'supported',
'delta.feature.deletionVectors' = 'supported',
'delta.minReaderVersion' = '3',
'delta.minWriterVersion' = '7')

-- COMMAND ----------

CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.regel (
regelId STRING,
regelNaam STRING,
regelParameters STRING,
bronTabelId STRING,
attribuut STRING)
USING delta
COMMENT 'Deployed by dq-suite-amsterdam'
TBLPROPERTIES (
'delta.columnMapping.mode' = 'name',
'delta.enableDeletionVectors' = 'true',
Expand All @@ -49,7 +84,7 @@ CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.validatie (
dqDatum TIMESTAMP,
dqResultaat STRING)
USING delta
COMMENT 'Created by the file upload UI'
COMMENT 'Deployed by dq-suite-amsterdam'
TBLPROPERTIES (
'delta.checkpoint.writeStatsAsJson' = 'false',
'delta.checkpoint.writeStatsAsStruct' = 'true',
Expand All @@ -67,21 +102,11 @@ CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.afwijking (
afwijkendeAttribuutWaarde STRING,
dqDatum TIMESTAMP)
USING delta
COMMENT 'Created by the file upload UI'
COMMENT 'Deployed by dq-suite-amsterdam'
TBLPROPERTIES (
'delta.checkpoint.writeStatsAsJson' = 'false',
'delta.checkpoint.writeStatsAsStruct' = 'true',
'delta.enableDeletionVectors' = 'true',
'delta.feature.deletionVectors' = 'supported',
'delta.minReaderVersion' = '3',
'delta.minWriterVersion' = '7')

-- COMMAND ----------

CREATE TABLE IF NOT EXISTS ${catalog}.data_quality.brontabel (
bronTabelId STRING,
uniekeSleutel STRING)
USING delta
TBLPROPERTIES (
'delta.minReaderVersion' = '1',
'delta.minWriterVersion' = '2')
40 changes: 38 additions & 2 deletions src/dq_suite/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,55 @@ def __getitem__(self, key) -> str | RulesList | None:
raise KeyError(key)


@dataclass()
class DatasetDict:
"""
Groups the name and the medallion layer of the dataset where the
rules apply to.
"""

name: str
layer: str

def __post_init__(self):
if not isinstance(self.name, str):
raise TypeError("'name' should be of type str")

if not isinstance(self.layer, str):
raise TypeError("'layer' should be of type str")

def __getitem__(self, key) -> str | RulesList | None:
if key == "name":
return self.name
elif key == "layer":
return self.layer
raise KeyError(key)


RulesDictList = List[RulesDict] # a list of dictionaries containing DQ rules


@dataclass()
class DataQualityRulesDict:
"""
Groups a list of Table-objects together with the definition of the dataset
these tables are a part of.
"""

dataset: DatasetDict
tables: RulesDictList

def __post_init__(self):
if not isinstance(self.dataset, dict):
raise TypeError("'dataset' should be DatasetDict")

if not isinstance(self.tables, list):
raise TypeError("'tables' should be RulesDictList")

def __getitem__(self, key) -> RulesDictList | None:
if key == "tables":
def __getitem__(self, key) -> str | RulesDictList | None:
if key == "dataset":
return self.dataset
elif key == "tables":
return self.tables
raise KeyError(key)

Expand Down
92 changes: 74 additions & 18 deletions src/dq_suite/output_transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
write_to_unity_catalog,
merge_df_with_unity_table,
)
from .schemas.bronattribuut import SCHEMA as BRONATTRIBUUT_SCHEMA
from .schemas.brondataset import SCHEMA as BRONDATASET_SCHEMA
from .schemas.brontabel import SCHEMA as BRONTABEL_SCHEMA
from .schemas.bronattribuut import SCHEMA as BRONATTRIBUUT_SCHEMA
from .schemas.regel import SCHEMA as REGEL_SCHEMA
from .schemas.validatie import SCHEMA as VALIDATIE_SCHEMA
from .schemas.pre_validatie import SCHEMA as PRE_VALIDATIE_SCHEMA
Expand Down Expand Up @@ -199,6 +200,43 @@ def extract_dq_afwijking_data(
pass


def create_brondataset(
dq_rules_dict: DataQualityRulesDict,
catalog_name: str,
spark_session: SparkSession,
) -> None:
"""
Function takes the dataset name and layer from the provided
Data Quality rules to create a DataFrame containing this metadata.
:param dq_rules_dict:
:param catalog_name:
:param spark_session:
"""
name = dq_rules_dict["dataset"]["name"]
layer = dq_rules_dict["dataset"]["layer"]
extracted_data = [{"bronDatasetId": name, "medaillonLaag": layer}]

df_brondataset = list_of_dicts_to_df(
list_of_dicts=extracted_data,
spark_session=spark_session,
schema=BRONDATASET_SCHEMA,
)
merge_dict = {
"bronDatasetId": "brondataset_df.bronDatasetId",
"medaillonLaag": "brondataset_df.medaillonLaag",
}
merge_df_with_unity_table(
df=df_brondataset,
catalog_name=catalog_name,
table_name="brondataset",
table_merge_id="bronDatasetId",
df_merge_id="bronDatasetId",
merge_dict=merge_dict,
spark_session=spark_session,
)


def create_brontabel(
dq_rules_dict: DataQualityRulesDict,
catalog_name: str,
Expand All @@ -213,11 +251,15 @@ def create_brontabel(
:param spark_session:
"""
extracted_data = []
dataset_name = dq_rules_dict["dataset"]["name"]
for param in dq_rules_dict["tables"]:
name = param["table_name"]
table_name = param["table_name"]
tabel_id = f"{dataset_name}_{table_name}"
unique_identifier = param["unique_identifier"]
extracted_data.append(
{"bronTabelId": name, "uniekeSleutel": unique_identifier}
{"bronTabelId": tabel_id,
"tabelNaam": table_name,
"uniekeSleutel": unique_identifier}
)

df_brontabel = list_of_dicts_to_df(
Expand All @@ -227,6 +269,7 @@ def create_brontabel(
)
merge_dict = {
"bronTabelId": "brontabel_df.bronTabelId",
"tabelNaam": "brontabel_df.tabelNaam",
"uniekeSleutel": "brontabel_df.uniekeSleutel",
}
merge_df_with_unity_table(
Expand Down Expand Up @@ -254,24 +297,26 @@ def create_bronattribute(
:param spark_session:
"""
extracted_data = []
dataset_name = dq_rules_dict["dataset"]["name"]
used_ids = set() # To keep track of used IDs
for param in dq_rules_dict["tables"]:
bron_tabel = param["table_name"]
table_name = param["table_name"]
tabel_id = f"{dataset_name}_{table_name}"
for rule in param["rules"]:
parameters = rule.get("parameters", [])
for parameter in parameters:
if isinstance(parameter, dict) and "column" in parameter:
attribute_name = parameter["column"]
# Create a unique ID
unique_id = f"{bron_tabel}_{attribute_name}"
unique_id = f"{tabel_id}_{attribute_name}"
# Check if the ID is already used
if unique_id not in used_ids:
used_ids.add(unique_id)
extracted_data.append(
{
"bronAttribuutId": unique_id,
"attribuutNaam": attribute_name,
"bronTabelId": bron_tabel,
"bronTabelId": tabel_id,
}
)

Expand Down Expand Up @@ -311,18 +356,23 @@ def create_dq_regel(
:param spark_session:
"""
extracted_data = []
for param in dq_rules_dict["tables"]:
bron_tabel = param["table_name"]
for rule in param["rules"]:
dataset_name = dq_rules_dict["dataset"]["name"]
for table in dq_rules_dict["tables"]:
table_name = table["table_name"]
tabel_id = f"{dataset_name}_{table_name}"
for rule in table["rules"]:
rule_name = rule["rule_name"]
parameters = rule.get("parameters", [])
extracted_data.append(
{
"regelNaam": rule_name,
"regelParameters": parameters,
"bronTabelId": bron_tabel
}
)
for param_set in parameters:
column = param_set.get("column")
extracted_data.append(
{
"regelNaam": rule_name,
"regelParameters": parameters,
"bronTabelId": tabel_id,
"attribuut": column
}
)

df_regel = list_of_dicts_to_df(
list_of_dicts=extracted_data,
Expand All @@ -331,13 +381,14 @@ def create_dq_regel(
)
df_regel_with_id_ordered = construct_regel_id(
df=df_regel,
output_columns_list=['regelId','regelNaam','regelParameters','bronTabelId']
output_columns_list=['regelId','regelNaam','regelParameters','bronTabelId','attribuut']
)
merge_dict = {
"regelId": "regel_df.regelId",
"regelNaam": "regel_df.regelNaam",
"regelParameters": "regel_df.regelParameters",
"bronTabelId": "regel_df.bronTabelId"
"bronTabelId": "regel_df.bronTabelId",
"attribuut": "regel_df.attribuut"
}
merge_df_with_unity_table(
df=df_regel_with_id_ordered,
Expand All @@ -354,6 +405,11 @@ def write_non_validation_tables(
dq_rules_dict: DataQualityRulesDict,
validation_settings_obj: ValidationSettings,
) -> None:
create_brondataset(
dq_rules_dict=dq_rules_dict,
catalog_name=validation_settings_obj.catalog_name,
spark_session=validation_settings_obj.spark_session,
)
create_brontabel(
dq_rules_dict=dq_rules_dict,
catalog_name=validation_settings_obj.catalog_name,
Expand Down
5 changes: 5 additions & 0 deletions src/dq_suite/schemas/brondataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from pyspark.sql.types import StructType

SCHEMA = (
StructType().add("bronDatasetId", "string").add("medaillonLaag", "string")
)
7 changes: 5 additions & 2 deletions src/dq_suite/schemas/brontabel.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from pyspark.sql.types import StructType

SCHEMA = (
StructType().add("bronTabelId", "string").add("uniekeSleutel", "string")
)
StructType()
.add("bronTabelId", "string")
.add("tabelNaam", "string")
.add("uniekeSleutel", "string")
)
1 change: 1 addition & 0 deletions src/dq_suite/schemas/regel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
.add("regelNaam", "string")
.add("regelParameters", "string")
.add("bronTabelId", "string")
.add("attribuut", "string")
)
Loading

0 comments on commit 2ccdc7d

Please sign in to comment.