Skip to content

Commit

Permalink
Added output transformations to return dataframes
Browse files Browse the repository at this point in the history
  • Loading branch information
ArthurKordes committed Nov 21, 2023
1 parent f0b9fe2 commit 67fd861
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "dq-suite-amsterdam"
version = "0.0.9"
version = "0.0.10"
authors = [
{ name="Arthur Kordes", email="a.kordes@amsterdam.nl" },
]
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
great_expectations
great_expectations
pandas
4 changes: 4 additions & 0 deletions src/dq_suite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

from dq_suite.input_validator import validate_dqrules
from dq_suite.df_checker import df_check
from dq_suite.output_transformations import (
extract_dq_validatie_data,
extract_dq_afwijking_data
)

# Use __all__ to let developers know what is part of the public API.
__all__ = [
Expand Down
11 changes: 8 additions & 3 deletions src/dq_suite/df_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from great_expectations.checkpoint import Checkpoint

from dq_suite.input_validator import validate_dqrules
from dq_suite.output_transformations import extract_dq_validatie_data
from dq_suite.output_transformations import extract_dq_afwijking_data


def df_check(df: DataFrame, dq_rules: str, check_name: str) -> str:
Expand All @@ -21,8 +23,8 @@ def df_check(df: DataFrame, dq_rules: str, check_name: str) -> str:
:type dq_rules: str
:param check_name: Name of the run for reference purposes
:type check_name: str
:return: A JSON string with the DQ results, parsed from the GX output
:rtype: str.
:return: Two tables df result_dqValidatie - result_dqAfwijking with the DQ results, parsed from the GX output
:rtype: df.
"""
name = check_name
validate_dqrules(dq_rules)
Expand Down Expand Up @@ -83,7 +85,10 @@ def df_check(df: DataFrame, dq_rules: str, check_name: str) -> str:

# Parse output
output = checkpoint_result["run_results"]

for key in output.keys():
result = output[key]["validation_result"]
result_dqValidatie = extract_dq_validatie_data(name,result)
result_dqAfwijking = extract_dq_afwijking_data(name,result)

return result
return result_dqValidatie, result_dqAfwijking
70 changes: 70 additions & 0 deletions src/dq_suite/output_transformations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import pandas as pd


def extract_dq_validatie_data(check_name, dq_result):
"""Function takes a json with the GX output and a string check_name and returns dataframe.
:param df_dq_validatie: A df containing the valid result
:type df: DataFrame
:param dq_rules: A JSON string containing the Data Quality rules to be evaluated
:type dq_rules: str
:param check_name: Name of the run for reference purposes
:type check_name: str
:return: A table df with the valid result DQ results, parsed from the extract_dq_validatie_data output
:rtype: df.
"""

# Access run_time attribute
run_time = dq_result["meta"]["run_id"].run_time
# Extracted data
extracted_data = []
for result in dq_result["results"]:
element_count = int(result["result"].get("element_count", 0))
unexpected_count = int(result["result"].get("unexpected_count", 0))
aantal_valide_records = element_count - unexpected_count
expectation_type = result["expectation_config"]["expectation_type"]
attribute = result["expectation_config"]["kwargs"].get("column")
dq_regel_id = f"{check_name}_{expectation_type}_{attribute}"
extracted_data.append({
"dqRegelId": dq_regel_id,
"aantalValideRecords": aantal_valide_records,
"aantalReferentieRecords": element_count,
"dqDatum": run_time,
})
# Create a DataFrame
df_dq_validatie = pd.DataFrame(extracted_data)
return df_dq_validatie


def extract_dq_afwijking_data(check_name, dq_result):
"""
Function takes a json dq_rules,and a string check_name and returns dataframe.
:param df_dq_validatie: A df containing the invalid(deviated) result
:type df: DataFrame
:param dq_rules: A JSON string containing the Data Quality rules to be evaluated
:type dq_rules: str
:param check_name: Name of the run for reference purposes
:type check_name: str
:return: A table df with the invalid result DQ results, parsed from the extract_dq_afwijking_data output
:rtype: df.
"""
# Extracting information from the JSON
run_time = dq_result["meta"]["run_id"].run_time # Access run_time attribute
# Extracted data
extracted_data = []
for result in dq_result["results"]:
filter_veld_waarde = result["expectation_config"]["kwargs"].get("column")
expectation_type = result["expectation_config"]["expectation_type"]
attribute = result["expectation_config"]["kwargs"].get("column")
dq_regel_id = f"{check_name}_{expectation_type}_{attribute}"
afwijkende_attribuut_waarde = result.get("result", {}).get("partial_unexpected_list", [])
for value in afwijkende_attribuut_waarde:
extracted_data.append({
"dqRegelId": dq_regel_id,
"afwijkendeAttribuutWaarde": value,
"dqDatum": run_time,
})
# Create a DataFrame
df_dq_afwijking = pd.DataFrame(extracted_data)
return df_dq_afwijking

0 comments on commit 67fd861

Please sign in to comment.