Skip to content

Commit

Permalink
Merge pull request #9 from aysegulcayir/models
Browse files Browse the repository at this point in the history
Models
  • Loading branch information
ArthurKordes authored Jan 12, 2024
2 parents 72e6b69 + d9658d6 commit 7e02914
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 110 deletions.
25 changes: 24 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,27 @@ dq_rules_example.json is updated.
Added:
"dataframe_parameters": {
"unique_identifier": "id"
}
}

version = "0.2.0" :
dq_rules_example.json is updated.
Added for each tables:
{
"dataframe_parameters": [
{
"unique_identifier": "id",
"table_name": "well",
"rules": [
{
"rule_name": "expect_column_values_to_be_between",
"parameters": [
{
"column": "latitude",
"min_value": 6,
"max_value": 10000
}
]
}
]
},
....
92 changes: 63 additions & 29 deletions dq_rules_example.json
Original file line number Diff line number Diff line change
@@ -1,47 +1,81 @@
{
"dataframe_parameters": {
"unique_identifier": "id"
},
"rules": [
"dataframe_parameters": [
{
"rule_name": "expect_table_row_count_to_be_between",
"parameters": [
"unique_identifier": "id",
"table_name": "well",
"rules": [
{
"min_value": 1,
"max_value": 1000
"rule_name": "expect_column_values_to_be_between",
"parameters": [
{
"column": "latitude",
"min_value": 6,
"max_value": 10000
}
]
}
]
},
{
"rule_name": "expect_column_values_to_not_be_null",
"parameters": [
"unique_identifier": "id",
"table_name": "container",
"rules": [
{
"column": "weight"
},
{
"column": "volume"
"rule_name": "expect_column_values_to_not_be_null",
"parameters": [
{
"column": "containertype"
}
]
}

]
},
{
"rule_name": "expect_column_values_to_be_between",
"parameters": [
"unique_identifier": "id",
"table_name": "containertype",
"rules": [
{
"column": "volume",
"min_value": 0,
"max_value": 10000
}
]
},
{
"rule_name": "expect_column_values_to_be_of_type",
"parameters": [
"rule_name": "expect_table_row_count_to_be_between",
"parameters": [
{
"min_value": 1,
"max_value": 1000
}
]
},
{
"rule_name": "expect_column_values_to_not_be_null",
"parameters": [
{
"column": "weight"
},
{
"column": "volume"
}
]
},
{
"rule_name": "expect_column_values_to_be_between",
"parameters": [
{
"column": "volume",
"min_value": 0,
"max_value": 10000
}
]
},
{
"column": "volume",
"type_": "DoubleType"
"rule_name": "expect_column_values_to_be_of_type",
"parameters": [
{
"column": "volume",
"type_": "DoubleType"
}
]
}
]
}

]
}
}

3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
great_expectations
pandas
pyspark
pyspark
typing
155 changes: 79 additions & 76 deletions src/dq_suite/df_checker.py
Original file line number Diff line number Diff line change
@@ -1,94 +1,97 @@
import json
from jsonschema import validate as validate_json

from databricks.sdk.runtime import *

from pyspark.sql import DataFrame
from typing import Dict, Any, Tuple
import pandas as pd

import great_expectations as gx
from great_expectations.checkpoint import Checkpoint

from dq_suite.input_validator import validate_dqrules
from dq_suite.output_transformations import extract_dq_validatie_data
from dq_suite.output_transformations import extract_dq_afwijking_data

from dq_suite.output_transformations import extract_dq_validatie_data, extract_dq_afwijking_data, create_brontabel, create_bronattribute, create_dqRegel

def df_check(df: DataFrame, dq_rules: str, check_name: str) -> str:
def df_check(dfs: list, dq_rules: str, check_name: str) -> Tuple[Dict[str, Any], Dict[str, Tuple[Any, Any]], pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
Function takes a DataFrame instance and returns a JSON string with the DQ results in a different dataframe, result_dqValidatie - "result_dqAfwijking.
Function takes DataFrame instances with specified Data Quality rules.
and returns a JSON string with the DQ results with different dataframes in results dict,
and returns different dfs as specified using Data Quality rules
:param df: A DataFrame instance to process
:type df: DataFrame
:param dq_rules: A JSON string containing the Data Quality rules to be evaluated
:param dfs: A list of DataFrame instances to process.
:type dfs: list[DataFrame]
:param dq_rules: JSON string containing the Data Quality rules to be evaluated.
:type dq_rules: str
:param check_name: Name of the run for reference purposes
:param check_name: Name of the run for reference purposes.
:type check_name: str
:return: Two tables df result_dqValidatie - result_dqAfwijking with the DQ results, parsed from the GX output
:rtype: df.
:return: A dictionary of Data Quality results for each DataFrame,
along with metadata DataFrames: brontabel_df, bronattribute_df, dqRegel_df .
Results contains 'result_dqValidatie' and 'result_dqAfwijking'.
:rtype: Tuple[Dict[str, Any], pd.DataFrame, pd.DataFrame, pd.DataFrame]
"""
results = {}
name = check_name
validate_dqrules(dq_rules)
rule_json = json.loads(dq_rules)
unique_identifier = rule_json["dataframe_parameters"]["unique_identifier"]

# Configure the Great Expectations context
context_root_dir = "/dbfs/great_expectations/"
context = gx.get_context(context_root_dir=context_root_dir)

dataframe_datasource = context.sources.add_or_update_spark(
name="my_spark_in_memory_datasource_" + name,
)

# GX Structures
df_asset = dataframe_datasource.add_dataframe_asset(name=name, dataframe=df)
batch_request = df_asset.build_batch_request()

expectation_suite_name = name + "_exp_suite"
context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name)

validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
)

# DQ rules
# This section converts the DQ_rules input into expectations that Great Expectations understands
for rule in rule_json["rules"]:
check = getattr(validator, rule["rule_name"])
for param_set in rule["parameters"]:
kwargs = {}
for param in param_set.keys():
kwargs[param] = param_set[param]
check(**kwargs)

validator.save_expectation_suite(discard_failed_expectations=False)

# Save output
my_checkpoint_name = name + "_checkpoint"

checkpoint = Checkpoint(
name=my_checkpoint_name,
run_name_template="%Y%m%d-%H%M%S-" + name + "-template",
data_context=context,
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
action_list=[
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
],
)

context.add_or_update_checkpoint(checkpoint=checkpoint)
checkpoint_result = checkpoint.run()

# Parse output
output = checkpoint_result["run_results"]

for key in output.keys():
result = output[key]["validation_result"]
result_dqValidatie = extract_dq_validatie_data(name,result)
result_dqAfwijking = extract_dq_afwijking_data(name, result, df, unique_identifier)
brontabel_df = create_brontabel(rule_json)
bronattribute_df = create_bronattribute(rule_json)
dqRegel_df = create_dqRegel(rule_json)
for df in dfs:

context_root_dir = "/dbfs/great_expectations/"
context = gx.get_context(context_root_dir=context_root_dir)

dataframe_datasource = context.sources.add_or_update_spark(name="my_spark_in_memory_datasource_" + name)

df_asset = dataframe_datasource.add_dataframe_asset(name=name, dataframe=df)
batch_request = df_asset.build_batch_request()

expectation_suite_name = name + "_exp_suite"
context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name)

validator = context.get_validator(batch_request=batch_request, expectation_suite_name=expectation_suite_name)

return result_dqValidatie, result_dqAfwijking
# to compare table_name in dq_rules and given table_names by data teams
matching_rules = [rule for rule in rule_json["dataframe_parameters"] if rule["table_name"] == df.table_name]

if not matching_rules:
continue

for rule in matching_rules:
df_name = rule["table_name"]
unique_identifier = rule["unique_identifier"]
for rule_param in rule["rules"]:
check = getattr(validator, rule_param["rule_name"])
for param_set in rule_param["parameters"]:
kwargs = {}
for param in param_set.keys():
kwargs[param] = param_set[param]
check(**kwargs)

validator.save_expectation_suite(discard_failed_expectations=False)

my_checkpoint_name = name + "_checkpoint"

checkpoint = Checkpoint(
name=my_checkpoint_name,
run_name_template="%Y%m%d-%H%M%S-" + name + "-template",
data_context=context,
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
action_list=[
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
],
)

context.add_or_update_checkpoint(checkpoint=checkpoint)
checkpoint_result = checkpoint.run()
output = checkpoint_result["run_results"]
print(f"{df_name} output: ", output)
for key, value in output.items():
result = value["validation_result"]
result_dqValidatie = extract_dq_validatie_data(name, result)
result_dqAfwijking = extract_dq_afwijking_data(name, result, df, unique_identifier)
results[df_name] = (result_dqValidatie, result_dqAfwijking)

return results ,brontabel_df, bronattribute_df, dqRegel_df
Loading

0 comments on commit 7e02914

Please sign in to comment.