From 5a2e82005325bdebe94cc16d9e6b48e15eb2147c Mon Sep 17 00:00:00 2001 From: aysegulcayir <49029525+aysegulcayir@users.noreply.github.com> Date: Wed, 10 Jan 2024 15:45:34 +0100 Subject: [PATCH 1/2] added brontable,bronattribute,dqreqel,and changed the dqrules format --- README.md | 25 +++- dq_rules_example.json | 92 ++++++++++----- requirements.txt | 3 +- src/dq_suite/df_checker.py | 157 +++++++++++++------------ src/dq_suite/output_transformations.py | 95 ++++++++++++++- 5 files changed, 257 insertions(+), 115 deletions(-) diff --git a/README.md b/README.md index e556e3e..954d159 100644 --- a/README.md +++ b/README.md @@ -31,4 +31,27 @@ dq_rules_example.json is updated. Added: "dataframe_parameters": { "unique_identifier": "id" - } \ No newline at end of file + } + +version = "0.2.0" : +dq_rules_example.json is updated. +Added for each tables: +{ + "dataframe_parameters": [ + { + "unique_identifier": "id", + "table_name": "well", + "rules": [ + { + "rule_name": "expect_column_values_to_be_between", + "parameters": [ + { + "column": "latitude", + "min_value": 6, + "max_value": 10000 + } + ] + } + ] + }, + .... \ No newline at end of file diff --git a/dq_rules_example.json b/dq_rules_example.json index e3ec738..9e31105 100644 --- a/dq_rules_example.json +++ b/dq_rules_example.json @@ -1,47 +1,81 @@ { - "dataframe_parameters": { - "unique_identifier": "id" - }, - "rules": [ + "dataframe_parameters": [ { - "rule_name": "expect_table_row_count_to_be_between", - "parameters": [ + "unique_identifier": "id", + "table_name": "well", + "rules": [ { - "min_value": 1, - "max_value": 1000 + "rule_name": "expect_column_values_to_be_between", + "parameters": [ + { + "column": "latitude", + "min_value": 6, + "max_value": 10000 + } + ] } ] }, { - "rule_name": "expect_column_values_to_not_be_null", - "parameters": [ + "unique_identifier": "id", + "table_name": "container", + "rules": [ { - "column": "weight" - }, - { - "column": "volume" + "rule_name": "expect_column_values_to_not_be_null", + "parameters": [ + { + "column": "containertype" + } + ] } - ] }, { - "rule_name": "expect_column_values_to_be_between", - "parameters": [ + "unique_identifier": "id", + "table_name": "containertype", + "rules": [ { - "column": "volume", - "min_value": 0, - "max_value": 10000 - } - ] - }, - { - "rule_name": "expect_column_values_to_be_of_type", - "parameters": [ + "rule_name": "expect_table_row_count_to_be_between", + "parameters": [ + { + "min_value": 1, + "max_value": 1000 + } + ] + }, + { + "rule_name": "expect_column_values_to_not_be_null", + "parameters": [ + { + "column": "weight" + }, + { + "column": "volume" + } + ] + }, + { + "rule_name": "expect_column_values_to_be_between", + "parameters": [ + { + "column": "volume", + "min_value": 0, + "max_value": 10000 + } + ] + }, { - "column": "volume", - "type_": "DoubleType" + "rule_name": "expect_column_values_to_be_of_type", + "parameters": [ + { + "column": "volume", + "type_": "DoubleType" + } + ] } ] } + ] -} \ No newline at end of file +} + \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 6c07e12..665ed47 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ great_expectations pandas -pyspark \ No newline at end of file +pyspark +typing \ No newline at end of file diff --git a/src/dq_suite/df_checker.py b/src/dq_suite/df_checker.py index 9643b45..5e2999b 100644 --- a/src/dq_suite/df_checker.py +++ b/src/dq_suite/df_checker.py @@ -1,96 +1,97 @@ import json from jsonschema import validate as validate_json -from databricks.sdk.runtime import * - -from pyspark.sql import DataFrame +from typing import Dict, Any, Tuple +import pandas as pd import great_expectations as gx from great_expectations.checkpoint import Checkpoint from dq_suite.input_validator import validate_dqrules -from dq_suite.output_transformations import extract_dq_validatie_data -from dq_suite.output_transformations import extract_dq_afwijking_data - +from dq_suite.output_transformations import extract_dq_validatie_data, extract_dq_afwijking_data, create_brontabel, create_bronattribute, create_dqRegel -def df_check(df: DataFrame, dq_rules: str, check_name: str) -> str: +def df_check(dfs: list, dq_rules: str, check_name: str) -> Tuple[Dict[str, Any], Dict[str, Tuple[Any, Any]], pd.DataFrame, pd.DataFrame, pd.DataFrame]: """ - Function takes a DataFrame instance and returns a JSON string with the DQ results in a different dataframe, result_dqValidatie - "result_dqAfwijking. + Function takes DataFrame instances with specified Data Quality rules + and returns a JSON string with the DQ results with different dataframes in results dict, + and returns different dfs as specified using Data Quality rules - :param df: A DataFrame instance to process - :type df: DataFrame - :param result_dqValidatie: A df containing the valid result - :param result_dqAfwijking: A df containing the deviated results - :param dq_rules: A JSON string containing the Data Quality rules to be evaluated + :param dfs: A list of DataFrame instances to process. + :type dfs: list[DataFrame] + :param dq_rules: JSON string containing the Data Quality rules to be evaluated. :type dq_rules: str - :param check_name: Name of the run for reference purposes + :param check_name: Name of the run for reference purposes. :type check_name: str - :return: Two tables df result_dqValidatie - result_dqAfwijking with the DQ results, parsed from the GX output - :rtype: df. + :return: A dictionary of Data Quality results for each DataFrame, + along with metadata DataFrames: brontabel_df, bronattribute_df, dqRegel_df . + Results contains 'result_dqValidatie' and 'result_dqAfwijking'. + :rtype: Tuple[Dict[str, Any], pd.DataFrame, pd.DataFrame, pd.DataFrame] """ + results = {} name = check_name validate_dqrules(dq_rules) rule_json = json.loads(dq_rules) - unique_identifier = rule_json["dataframe_parameters"]["unique_identifier"] - - # Configure the Great Expectations context - context_root_dir = "/dbfs/great_expectations/" - context = gx.get_context(context_root_dir=context_root_dir) - - dataframe_datasource = context.sources.add_or_update_spark( - name="my_spark_in_memory_datasource_" + name, - ) - - # GX Structures - df_asset = dataframe_datasource.add_dataframe_asset(name=name, dataframe=df) - batch_request = df_asset.build_batch_request() - - expectation_suite_name = name + "_exp_suite" - context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name) - - validator = context.get_validator( - batch_request=batch_request, - expectation_suite_name=expectation_suite_name, - ) - - # DQ rules - # This section converts the DQ_rules input into expectations that Great Expectations understands - for rule in rule_json["rules"]: - check = getattr(validator, rule["rule_name"]) - for param_set in rule["parameters"]: - kwargs = {} - for param in param_set.keys(): - kwargs[param] = param_set[param] - check(**kwargs) - - validator.save_expectation_suite(discard_failed_expectations=False) - - # Save output - my_checkpoint_name = name + "_checkpoint" - - checkpoint = Checkpoint( - name=my_checkpoint_name, - run_name_template="%Y%m%d-%H%M%S-" + name + "-template", - data_context=context, - batch_request=batch_request, - expectation_suite_name=expectation_suite_name, - action_list=[ - { - "name": "store_validation_result", - "action": {"class_name": "StoreValidationResultAction"}, - }, - ], - ) - - context.add_or_update_checkpoint(checkpoint=checkpoint) - checkpoint_result = checkpoint.run() - - # Parse output - output = checkpoint_result["run_results"] - - for key in output.keys(): - result = output[key]["validation_result"] - result_dqValidatie = extract_dq_validatie_data(name,result) - result_dqAfwijking = extract_dq_afwijking_data(name, result, df, unique_identifier) + brontabel_df = create_brontabel(rule_json) + bronattribute_df = create_bronattribute(rule_json) + dqRegel_df = create_dqRegel(rule_json) + for df in dfs: + + context_root_dir = "/dbfs/great_expectations/" + context = gx.get_context(context_root_dir=context_root_dir) + + dataframe_datasource = context.sources.add_or_update_spark(name="my_spark_in_memory_datasource_" + name) + + df_asset = dataframe_datasource.add_dataframe_asset(name=name, dataframe=df) + batch_request = df_asset.build_batch_request() + + expectation_suite_name = name + "_exp_suite" + context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name) + + validator = context.get_validator(batch_request=batch_request, expectation_suite_name=expectation_suite_name) - return result_dqValidatie, result_dqAfwijking + # to compare table_name in dq_rules and given table_names by data teams + matching_rules = [rule for rule in rule_json["dataframe_parameters"] if rule["table_name"] == df.table_name] + + if not matching_rules: + continue + + for rule in matching_rules: + df_name = rule["table_name"] + unique_identifier = rule["unique_identifier"] + for rule_param in rule["rules"]: + check = getattr(validator, rule_param["rule_name"]) + for param_set in rule_param["parameters"]: + kwargs = {} + for param in param_set.keys(): + kwargs[param] = param_set[param] + check(**kwargs) + + validator.save_expectation_suite(discard_failed_expectations=False) + + my_checkpoint_name = name + "_checkpoint" + + checkpoint = Checkpoint( + name=my_checkpoint_name, + run_name_template="%Y%m%d-%H%M%S-" + name + "-template", + data_context=context, + batch_request=batch_request, + expectation_suite_name=expectation_suite_name, + action_list=[ + { + "name": "store_validation_result", + "action": {"class_name": "StoreValidationResultAction"}, + }, + ], + ) + + context.add_or_update_checkpoint(checkpoint=checkpoint) + checkpoint_result = checkpoint.run() + output = checkpoint_result["run_results"] + print(f"{df_name} output: ", output) + for key, value in output.items(): + result = value["validation_result"] + result_dqValidatie = extract_dq_validatie_data(name, result) + result_dqAfwijking = extract_dq_afwijking_data(name, result, df, unique_identifier) + results[df_name] = (result_dqValidatie, result_dqAfwijking) + + return results ,brontabel_df, bronattribute_df, dqRegel_df \ No newline at end of file diff --git a/src/dq_suite/output_transformations.py b/src/dq_suite/output_transformations.py index f5bded0..464a410 100644 --- a/src/dq_suite/output_transformations.py +++ b/src/dq_suite/output_transformations.py @@ -4,7 +4,7 @@ # for df_dqValidatie def extract_dq_validatie_data(check_name, dq_result): """ - Function takes a json with the GX output and a string check_name and returns dataframe. + Function takes a json dq_rules,and a string check_name and returns dataframe. :param df_dq_validatie: A df containing the valid result :type df: DataFrame @@ -36,11 +36,11 @@ def extract_dq_validatie_data(check_name, dq_result): "dqDatum": run_time, "output": output_text, }) - # Create a DataFrame df_dq_validatie = pd.DataFrame(extracted_data) return df_dq_validatie + def extract_dq_afwijking_data(check_name, dq_result, df, unique_identifier): """ Function takes a json dq_rules and a string check_name and returns a DataFrame. @@ -53,13 +53,11 @@ def extract_dq_afwijking_data(check_name, dq_result, df, unique_identifier): :type unique_identifier: int :rtype: DataFrame """ - # Extracting information from the JSON run_time = dq_result["meta"]["run_id"].run_time # Access run_time attribute # Extracted data for df extracted_data = [] - #for IdentifierVeldWaarde - unique_identifier = unique_identifier + # To store unique combinations of value and IDs unique_entries = set() @@ -89,4 +87,89 @@ def extract_dq_afwijking_data(check_name, dq_result, df, unique_identifier): # Create a DataFrame df_dq_afwijking = pd.DataFrame(extracted_data) - return df_dq_afwijking \ No newline at end of file + return df_dq_afwijking + +def create_brontabel(dq_rules): + """ + Function takes the table name and their unique identifier from the provided Data Quality rules + to create a DataFrame containing this metadata. + + :param name: str comes from dq_rules + :type name: str + :param unique_identifier: int comes from dq_rules + :type unique_identifier: int + :rtype: DataFrame + :return: df_brontable + :rtype: DataFrame + """ + extracted_data = [] + for param in dq_rules["dataframe_parameters"]: + name = param["table_name"] + unique_identifier = param["unique_identifier"] + extracted_data.append({ + "name": name, + "unique_identifier": unique_identifier + }) + + df_brontable = pd.DataFrame(extracted_data) + return df_brontable + +def create_bronattribute(dq_rules): + """ + This function takes attributes/columns for each table specified in the Data Quality rules and creates a DataFrame containing these attribute details. + + :param BronTabel: str comes from dq_rules + :type BronTabel: str + :param attribute_name: str comes from dq_rules + :type attribute_name: str + :param unique_identifier: int comes from dq_rules + :return: df_bronattribuut + :rtype: DataFrame + """ + extracted_data = [] + for param in dq_rules["dataframe_parameters"]: + BronTabel = param["table_name"] + for rule in param["rules"]: + parameters = rule.get("parameters", []) + for parameter in parameters: + if isinstance(parameter, dict) and "column" in parameter: + attribute_name = parameter["column"] + extracted_data.append({ + "name": attribute_name, + "BronTabel": BronTabel, + "id": f"{BronTabel}_{attribute_name}" + }) + + df_bronattribuut = pd.DataFrame(extracted_data) + return df_bronattribuut + +def create_dqRegel(dq_rules): + """ + Function extracts information about Data Quality rules applied to each attribute/column for tables specified in the Data Quality rules and creates a DataFrame containing these rule details. + + :param BronTabel: str comes from dq_rules + :type BronTabel: str + :param rule_name: str comes from dq_rules + :type rule_name: str + :param attribute_name: str comes from dq_rules + :type attribute_name: str + :return: df_dqRegel + :rtype: DataFrame + """ + extracted_data = [] + for param in dq_rules["dataframe_parameters"]: + BronTabel = param["table_name"] + for rule in param["rules"]: + rule_name = rule["rule_name"] + parameters = rule.get("parameters", []) + for parameter in parameters: + if isinstance(parameter, dict) and "column" in parameter: + attribute_name = parameter["column"] + extracted_data.append({ + "id": f"{BronTabel}_{rule_name}_{attribute_name}", + "bronAttibuteId": f"{BronTabel}_{attribute_name}" + }) + + df_dqRegel = pd.DataFrame(extracted_data) + return df_dqRegel + From cd55873363fdf8afc3553c0d02628fec8b06113c Mon Sep 17 00:00:00 2001 From: aysegulcayir <49029525+aysegulcayir@users.noreply.github.com> Date: Wed, 10 Jan 2024 15:47:59 +0100 Subject: [PATCH 2/2] added dqregel,brontable,bronattribute and change dq rule format --- src/dq_suite/df_checker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dq_suite/df_checker.py b/src/dq_suite/df_checker.py index 5e2999b..1131fce 100644 --- a/src/dq_suite/df_checker.py +++ b/src/dq_suite/df_checker.py @@ -12,7 +12,7 @@ def df_check(dfs: list, dq_rules: str, check_name: str) -> Tuple[Dict[str, Any], Dict[str, Tuple[Any, Any]], pd.DataFrame, pd.DataFrame, pd.DataFrame]: """ - Function takes DataFrame instances with specified Data Quality rules + Function takes DataFrame instances with specified Data Quality rules. and returns a JSON string with the DQ results with different dataframes in results dict, and returns different dfs as specified using Data Quality rules