diff --git a/pyproject.toml b/pyproject.toml index 79695c1..25ef46d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "dq-suite-amsterdam" -version = "0.0.9" +version = "0.0.10" authors = [ { name="Arthur Kordes", email="a.kordes@amsterdam.nl" }, ] diff --git a/requirements.txt b/requirements.txt index c88eead..5fc23cf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ -great_expectations \ No newline at end of file +great_expectations +pandas \ No newline at end of file diff --git a/src/dq_suite/__init__.py b/src/dq_suite/__init__.py index 9571006..381c95f 100644 --- a/src/dq_suite/__init__.py +++ b/src/dq_suite/__init__.py @@ -2,6 +2,10 @@ from dq_suite.input_validator import validate_dqrules from dq_suite.df_checker import df_check +from dq_suite.output_transformations import ( + extract_dq_validatie_data, + extract_dq_afwijking_data +) # Use __all__ to let developers know what is part of the public API. __all__ = [ diff --git a/src/dq_suite/df_checker.py b/src/dq_suite/df_checker.py index 228c199..1065607 100644 --- a/src/dq_suite/df_checker.py +++ b/src/dq_suite/df_checker.py @@ -9,6 +9,8 @@ from great_expectations.checkpoint import Checkpoint from dq_suite.input_validator import validate_dqrules +from dq_suite.output_transformations import extract_dq_validatie_data +from dq_suite.output_transformations import extract_dq_afwijking_data def df_check(df: DataFrame, dq_rules: str, check_name: str) -> str: @@ -21,8 +23,8 @@ def df_check(df: DataFrame, dq_rules: str, check_name: str) -> str: :type dq_rules: str :param check_name: Name of the run for reference purposes :type check_name: str - :return: A JSON string with the DQ results, parsed from the GX output - :rtype: str. + :return: Two tables df result_dqValidatie - result_dqAfwijking with the DQ results, parsed from the GX output + :rtype: df. """ name = check_name validate_dqrules(dq_rules) @@ -83,7 +85,10 @@ def df_check(df: DataFrame, dq_rules: str, check_name: str) -> str: # Parse output output = checkpoint_result["run_results"] + for key in output.keys(): result = output[key]["validation_result"] + result_dqValidatie = extract_dq_validatie_data(name,result) + result_dqAfwijking = extract_dq_afwijking_data(name,result) - return result + return result_dqValidatie, result_dqAfwijking diff --git a/src/dq_suite/output_transformations.py b/src/dq_suite/output_transformations.py new file mode 100644 index 0000000..f50cd80 --- /dev/null +++ b/src/dq_suite/output_transformations.py @@ -0,0 +1,70 @@ +import pandas as pd + + +def extract_dq_validatie_data(check_name, dq_result): + """Function takes a json with the GX output and a string check_name and returns dataframe. + + :param df_dq_validatie: A df containing the valid result + :type df: DataFrame + :param dq_rules: A JSON string containing the Data Quality rules to be evaluated + :type dq_rules: str + :param check_name: Name of the run for reference purposes + :type check_name: str + :return: A table df with the valid result DQ results, parsed from the extract_dq_validatie_data output + :rtype: df. + """ + + # Access run_time attribute + run_time = dq_result["meta"]["run_id"].run_time + # Extracted data + extracted_data = [] + for result in dq_result["results"]: + element_count = int(result["result"].get("element_count", 0)) + unexpected_count = int(result["result"].get("unexpected_count", 0)) + aantal_valide_records = element_count - unexpected_count + expectation_type = result["expectation_config"]["expectation_type"] + attribute = result["expectation_config"]["kwargs"].get("column") + dq_regel_id = f"{check_name}_{expectation_type}_{attribute}" + extracted_data.append({ + "dqRegelId": dq_regel_id, + "aantalValideRecords": aantal_valide_records, + "aantalReferentieRecords": element_count, + "dqDatum": run_time, + }) + # Create a DataFrame + df_dq_validatie = pd.DataFrame(extracted_data) + return df_dq_validatie + + +def extract_dq_afwijking_data(check_name, dq_result): + """ + Function takes a json dq_rules,and a string check_name and returns dataframe. + + :param df_dq_validatie: A df containing the invalid(deviated) result + :type df: DataFrame + :param dq_rules: A JSON string containing the Data Quality rules to be evaluated + :type dq_rules: str + :param check_name: Name of the run for reference purposes + :type check_name: str + :return: A table df with the invalid result DQ results, parsed from the extract_dq_afwijking_data output + :rtype: df. + """ + # Extracting information from the JSON + run_time = dq_result["meta"]["run_id"].run_time # Access run_time attribute + # Extracted data + extracted_data = [] + for result in dq_result["results"]: + filter_veld_waarde = result["expectation_config"]["kwargs"].get("column") + expectation_type = result["expectation_config"]["expectation_type"] + attribute = result["expectation_config"]["kwargs"].get("column") + dq_regel_id = f"{check_name}_{expectation_type}_{attribute}" + afwijkende_attribuut_waarde = result.get("result", {}).get("partial_unexpected_list", []) + for value in afwijkende_attribuut_waarde: + extracted_data.append({ + "dqRegelId": dq_regel_id, + "afwijkendeAttribuutWaarde": value, + "dqDatum": run_time, + }) + # Create a DataFrame + df_dq_afwijking = pd.DataFrame(extracted_data) + return df_dq_afwijking