diff --git a/.github/workflows/github-ci.yml b/.github/workflows/github-ci.yml index ba1d57e..4f31967 100644 --- a/.github/workflows/github-ci.yml +++ b/.github/workflows/github-ci.yml @@ -25,7 +25,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v5 with: - python-version: "3.12" + python-version: "3.11" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/README.md b/README.md index eb88fbf..2d335ba 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,7 @@ Using a Shared Compute Cluster will result in an error, as it does not have the ([click](https://docs.databricks.com/en/release-notes/runtime/13.3lts.html#system-environment)). Older versions of DBR will result in errors upon install of the `dq-suite-amsterdam` library. +- At time of writing (late Aug 2024), Great Expectations v1.0.0 has just been released, and is not (yet) compatible with Python 3.12. Hence, make sure you are using the correct version of Python as interpreter for your project. # Contributing to this library See the separate [developers' readme](src/Readme-dev.md). diff --git a/pyproject.toml b/pyproject.toml index 0e9a22c..d7e4a4e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "dq-suite-amsterdam" -version = "0.9.1" +version = "0.10.0" authors = [ { name="Arthur Kordes", email="a.kordes@amsterdam.nl" }, { name="Aysegul Cayir Aydar", email="a.cayiraydar@amsterdam.nl" }, @@ -15,9 +15,11 @@ description = "Wrapper for Great Expectations to fit the requirements of the Gem readme = "README.md" requires-python = ">=3.10" dependencies = [ - "great_expectations==0.18.19", - "pandas==2.2.2", + "great_expectations==1.0.3", + "pandas==2.1.4", "pyspark==3.5.2", + "pyhumps==3.8.0", + "pyyaml==6.0.2", ] diff --git a/src/dq_suite/common.py b/src/dq_suite/common.py index 5d61e2c..012319d 100644 --- a/src/dq_suite/common.py +++ b/src/dq_suite/common.py @@ -1,8 +1,17 @@ from dataclasses import dataclass from typing import Any, Dict, List, Literal - -from great_expectations import get_context -from great_expectations.data_context import AbstractDataContext +import yaml + +from great_expectations import ExpectationSuite, get_context +from great_expectations.data_context import ( + AbstractDataContext, + EphemeralDataContext, +) +from great_expectations.data_context.types.base import ( + DataContextConfig, + InMemoryStoreBackendDefaults, +) +from great_expectations.exceptions import DataContextError from pyspark.sql import DataFrame, SparkSession from pyspark.sql.functions import col from pyspark.sql.types import StructType @@ -206,10 +215,13 @@ def merge_df_with_unity_table( .execute() -def get_data_context( - data_context_root_dir: str = "/dbfs/great_expectations/", -) -> AbstractDataContext: # pragma: no cover - part of GX - return get_context(context_root_dir=data_context_root_dir) +def get_data_context() -> AbstractDataContext: # pragma: no cover - part of GX + return get_context( + project_config=DataContextConfig( + store_backend_defaults=InMemoryStoreBackendDefaults(), + analytics_enabled=False + ) + ) @dataclass() @@ -234,6 +246,7 @@ class ValidationSettings: notify_on: when to send notifications, can be equal to "all", "success" or "failure" """ + spark_session: SparkSession catalog_name: str table_name: str @@ -269,20 +282,21 @@ def initialise_or_update_attributes(self): # pragma: no cover - complex # function self._set_data_context() - # TODO/check: do we want to allow for custom names? + # TODO/check: do we want to allow for custom names via parameters? self._set_expectation_suite_name() self._set_checkpoint_name() self._set_run_name() - # Finally, apply the (new) suite name to the data context - self.data_context.add_or_update_expectation_suite( - expectation_suite_name=self.expectation_suite_name - ) + # Finally, add/retrieve the suite to/from the data context + try: + self.data_context.suites.get(name=self.expectation_suite_name) + except DataContextError: + self.data_context.suites.add( + suite=ExpectationSuite(name=self.expectation_suite_name) + ) def _set_data_context(self): # pragma: no cover - uses part of GX - self.data_context = get_data_context( - data_context_root_dir=self.data_context_root_dir - ) + self.data_context = get_data_context() def _set_expectation_suite_name(self): self.expectation_suite_name = f"{self.check_name}_expectation_suite" diff --git a/src/dq_suite/df_checker.py b/src/dq_suite/df_checker.py index dc3d635..d6c9fdc 100644 --- a/src/dq_suite/df_checker.py +++ b/src/dq_suite/df_checker.py @@ -1,7 +1,11 @@ -from typing import Any, List, Tuple - -from great_expectations.checkpoint import Checkpoint -from great_expectations.validator.validator import Validator +from typing import List + +import great_expectations +import humps +from great_expectations import Checkpoint, ValidationDefinition +from great_expectations.checkpoint.actions import CheckpointAction +from great_expectations.checkpoint.checkpoint import CheckpointResult +from great_expectations.exceptions import DataContextError from pyspark.sql import DataFrame from .common import DataQualityRulesDict, Rule, RulesDict, ValidationSettings @@ -22,126 +26,152 @@ def filter_validation_dict_by_table_name( return None -def get_batch_request_and_validator( - df: DataFrame, +def get_or_add_validation_definition( validation_settings_obj: ValidationSettings, -) -> Tuple[Any, Validator]: +) -> ValidationDefinition: dataframe_datasource = ( - validation_settings_obj.data_context.sources.add_or_update_spark( - name="my_spark_in_memory_datasource_" - + validation_settings_obj.check_name + validation_settings_obj.data_context.data_sources.add_or_update_spark( + name=f"spark_datasource_" f"{validation_settings_obj.check_name}" ) ) df_asset = dataframe_datasource.add_dataframe_asset( - name=validation_settings_obj.check_name, dataframe=df + name=validation_settings_obj.check_name + ) + batch_definition = df_asset.add_batch_definition_whole_dataframe( + name=f"{validation_settings_obj.check_name}_batch_definition" ) - batch_request = df_asset.build_batch_request() - validator = validation_settings_obj.data_context.get_validator( - batch_request=batch_request, - expectation_suite_name=validation_settings_obj.expectation_suite_name, + validation_definition_name = ( + f"{validation_settings_obj.check_name}" f"_validation_definition" ) + try: + validation_definition = ( + validation_settings_obj.data_context.validation_definitions.get( + name=validation_definition_name + ) + ) + except DataContextError: + validation_definition = ValidationDefinition( + name=validation_definition_name, + data=batch_definition, + suite=validation_settings_obj.data_context.suites.get( + validation_settings_obj.expectation_suite_name + ), + ) # Note: a validation definition combines data with a suite of + # expectations + validation_definition = ( + validation_settings_obj.data_context.validation_definitions.add( + validation=validation_definition + ) + ) - return batch_request, validator + return validation_definition def create_action_list( validation_settings_obj: ValidationSettings, -) -> List[dict[str, Any]]: +) -> List[CheckpointAction]: action_list = list() - action_list.append( - { # TODO/check: do we really have to store the validation results? - "name": "store_validation_result", - "action": {"class_name": "StoreValidationResultAction"}, - } - ) - if validation_settings_obj.send_slack_notification & ( validation_settings_obj.slack_webhook is not None ): action_list.append( - { - "name": "send_slack_notification", - "action": { - "class_name": "SlackNotificationAction", - "slack_webhook": validation_settings_obj.slack_webhook, - "notify_on": validation_settings_obj.notify_on, - "renderer": { - "module_name": "great_expectations.render.renderer.slack_renderer", - "class_name": "SlackRenderer", - }, + great_expectations.checkpoint.SlackNotificationAction( + name="send_slack_notification", + slack_webhook=validation_settings_obj.slack_webhook, + notify_on=validation_settings_obj.notify_on, + renderer={ + "module_name": "great_expectations.render.renderer.slack_renderer", + "class_name": "SlackRenderer", }, - } + ) ) if validation_settings_obj.send_ms_teams_notification & ( validation_settings_obj.ms_teams_webhook is not None ): action_list.append( - { - "name": "send_ms_teams_notification", - "action": { - "class_name": "MicrosoftTeamsNotificationAction", - "microsoft_teams_webhook": validation_settings_obj.ms_teams_webhook, - "notify_on": validation_settings_obj.notify_on, - "renderer": { - "module_name": "great_expectations.render.renderer.microsoft_teams_renderer", - "class_name": "MicrosoftTeamsRenderer", - }, + great_expectations.checkpoint.MicrosoftTeamsNotificationAction( + name="send_ms_teams_notification", + microsoft_teams_webhook=validation_settings_obj.ms_teams_webhook, + notify_on=validation_settings_obj.notify_on, + renderer={ + "module_name": "great_expectations.render.renderer.microsoft_teams_renderer", + "class_name": "MicrosoftTeamsRenderer", }, - } + ) ) return action_list -def create_and_run_checkpoint( - validation_settings_obj: ValidationSettings, batch_request: Any -) -> Any: - action_list = create_action_list( - validation_settings_obj=validation_settings_obj - ) - checkpoint = Checkpoint( - name=validation_settings_obj.checkpoint_name, - run_name_template=validation_settings_obj.run_name, - data_context=validation_settings_obj.data_context, - batch_request=batch_request, - expectation_suite_name=validation_settings_obj.expectation_suite_name, - action_list=action_list, - ) - - validation_settings_obj.data_context.add_or_update_checkpoint( - checkpoint=checkpoint - ) - checkpoint_result = checkpoint.run() - return checkpoint_result["run_results"] +def get_or_add_checkpoint( + validation_settings_obj: ValidationSettings, + validation_definition: ValidationDefinition, +) -> Checkpoint: + try: + checkpoint = validation_settings_obj.data_context.checkpoints.get( + name=validation_settings_obj.checkpoint_name + ) + except DataContextError: + action_list = create_action_list( + validation_settings_obj=validation_settings_obj + ) + checkpoint = Checkpoint( + name=validation_settings_obj.checkpoint_name, + validation_definitions=[validation_definition], + actions=action_list, + ) # Note: a checkpoint combines validations with actions + + # Add checkpoint to data context for future use + ( + validation_settings_obj.data_context.checkpoints.add( + checkpoint=checkpoint + ) + ) + return checkpoint def create_and_configure_expectations( - validation_rules_list: List[Rule], validator: Validator + validation_rules_list: List[Rule], + validation_settings_obj: ValidationSettings, ) -> None: + # The suite should exist by now + suite = validation_settings_obj.data_context.suites.get( + name=validation_settings_obj.expectation_suite_name + ) + for validation_rule in validation_rules_list: # Get the name of expectation as defined by GX gx_expectation_name = validation_rule["rule_name"] # Get the actual expectation as defined by GX - gx_expectation = getattr(validator, gx_expectation_name) + gx_expectation = getattr( + great_expectations.expectations.core, + humps.pascalize(gx_expectation_name), + ) + # Issue 50 + # TODO: drop pascalization, and require this as input check + # when ingesting JSON? Could be done via humps.is_pascalcase() + for validation_parameter_dict in validation_rule["parameters"]: kwargs = {} + # Issue 51 + # TODO/check: is this loop really necessary? Intuitively, I added + # the same expectation for each column - I didn't consider using + # the same expectation with different parameters for par_name, par_value in validation_parameter_dict.items(): kwargs[par_name] = par_value - gx_expectation(**kwargs) - - validator.save_expectation_suite(discard_failed_expectations=False) + suite.add_expectation(gx_expectation(**kwargs)) def validate( df: DataFrame, rules_dict: RulesDict, validation_settings_obj: ValidationSettings, -) -> Any: +) -> CheckpointResult: """ [explanation goes here] @@ -153,21 +183,23 @@ def validate( # Make sure all attributes are aligned before validating validation_settings_obj.initialise_or_update_attributes() - batch_request, validator = get_batch_request_and_validator( - df=df, + create_and_configure_expectations( + validation_rules_list=rules_dict["rules"], validation_settings_obj=validation_settings_obj, ) - create_and_configure_expectations( - validation_rules_list=rules_dict["rules"], validator=validator + validation_definition = get_or_add_validation_definition( + validation_settings_obj=validation_settings_obj, ) - - checkpoint_output = create_and_run_checkpoint( + print("***Starting validation definition run***") + print(validation_definition.run(batch_parameters={"dataframe": df})) + checkpoint = get_or_add_checkpoint( validation_settings_obj=validation_settings_obj, - batch_request=batch_request, + validation_definition=validation_definition, ) - return checkpoint_output + batch_params = {"dataframe": df} + return checkpoint.run(batch_parameters=batch_params) def run( @@ -191,11 +223,12 @@ def run( return # 2) perform the validation on the dataframe - validation_output = validate( + checkpoint_result = validate( df=df, rules_dict=rules_dict, validation_settings_obj=validation_settings_obj, ) + validation_output = checkpoint_result.describe_dict() # 3) write results to unity catalog write_non_validation_tables( diff --git a/src/dq_suite/output_transformations.py b/src/dq_suite/output_transformations.py index d1fd251..cec9abb 100644 --- a/src/dq_suite/output_transformations.py +++ b/src/dq_suite/output_transformations.py @@ -1,5 +1,7 @@ -from typing import Any, List +import datetime +from typing import List +from great_expectations.checkpoint.checkpoint import CheckpointDescriptionDict from pyspark.sql import DataFrame, Row, SparkSession from pyspark.sql.functions import col, xxhash64 from pyspark.sql.types import StructType @@ -34,7 +36,9 @@ def list_of_dicts_to_df( return create_empty_dataframe( spark_session=spark_session, schema=schema ) - return spark_session.createDataFrame((Row(**x) for x in list_of_dicts), schema=schema) + return spark_session.createDataFrame( + (Row(**x) for x in list_of_dicts), schema=schema + ) def construct_regel_id( @@ -46,7 +50,7 @@ def construct_regel_id( def create_parameter_list_from_results(result: dict) -> list[dict]: - parameters = result["expectation_config"]["kwargs"] + parameters = result["kwargs"] parameters.pop("batch_id", None) return [parameters] @@ -114,30 +118,40 @@ def extract_dq_validatie_data( :param spark_session: """ tabel_id = f"{dataset_name}_{table_name}" - run_time = dq_result["meta"]["run_id"].run_time - extracted_data = [] - for result in dq_result["results"]: - element_count = int(result["result"].get("element_count", 0)) - unexpected_count = int(result["result"].get("unexpected_count", 0)) - aantal_valide_records = element_count - unexpected_count - expectation_type = result["expectation_config"]["expectation_type"] - parameter_list = create_parameter_list_from_results(result=result) - attribute = result["expectation_config"]["kwargs"].get("column") - - output = result["success"] - output_text = "success" if output else "failure" - extracted_data.append( - { - "aantalValideRecords": aantal_valide_records, - "aantalReferentieRecords": element_count, - "dqDatum": run_time, - "dqResultaat": output_text, - "regelNaam": expectation_type, - "regelParameters": parameter_list, - "bronTabelId": tabel_id, - } - ) + dq_result = dq_result["validation_results"] + + # run_time = dq_result["meta"]["run_id"].run_time + run_time = datetime.datetime(1900, 1, 1) + # TODO: fix, find run_time in new GX API + extracted_data = [] + for validation_result in dq_result: + for expectation_result in validation_result["expectations"]: + element_count = int( + expectation_result["result"].get("element_count", 0) + ) + unexpected_count = int( + expectation_result["result"].get("unexpected_count", 0) + ) + aantal_valide_records = element_count - unexpected_count + expectation_type = expectation_result["expectation_type"] + parameter_list = create_parameter_list_from_results(result=expectation_result) + attribute = expectation_result["kwargs"].get("column") + + output = expectation_result["success"] + output_text = "success" if output else "failure" + extracted_data.append( + { + "aantalValideRecords": aantal_valide_records, + "aantalReferentieRecords": element_count, + "dqDatum": run_time, + "dqResultaat": output_text, + "regelNaam": expectation_type, + "regelParameters": parameter_list, + "bronTabelId": tabel_id, + } + ) + df_validatie = list_of_dicts_to_df( list_of_dicts=extracted_data, spark_session=spark_session, @@ -179,41 +193,47 @@ def extract_dq_afwijking_data( :param spark_session: """ tabel_id = f"{dataset_name}_{table_name}" - run_time = dq_result["meta"]["run_id"].run_time # Get the run timestamp + dq_result = dq_result["validation_results"] + + # run_time = dq_result["meta"]["run_id"].run_time + run_time = datetime.datetime(1900, 1, 1) + # TODO: fix, find run_time in new GX API + extracted_data = [] if not isinstance(unique_identifier, list): unique_identifier = [unique_identifier] - for result in dq_result["results"]: - expectation_type = result["expectation_config"]["expectation_type"] - parameter_list = create_parameter_list_from_results(result=result) - attribute = get_target_attr_for_rule(result=result) - deviating_attribute_value = result["result"].get( + for validation_result in dq_result: + for expectation_result in validation_result["expectations"]: + expectation_type = expectation_result["expectation_type"] + parameter_list = create_parameter_list_from_results(result=expectation_result) + attribute = get_target_attr_for_rule(result=result) + deviating_attribute_value = expectation_result["result"].get( "partial_unexpected_list", [] - ) - unique_deviating_values = get_unique_deviating_values( - deviating_attribute_value - ) - for value in unique_deviating_values: - filtered_df = filter_df_based_on_deviating_values( - value=value, - attribute=attribute, - df=df - ) - grouped_ids = get_grouped_ids_per_deviating_value( - filtered_df=filtered_df, - unique_identifier=unique_identifier ) - if isinstance(attribute, list): value = str(value) - extracted_data.append( - { - "identifierVeldWaarde": grouped_ids, - "afwijkendeAttribuutWaarde": value, - "dqDatum": run_time, - "regelNaam": expectation_type, - "regelParameters": parameter_list, - "bronTabelId": tabel_id, - } + unique_deviating_values = get_unique_deviating_values( + deviating_attribute_value ) + for value in unique_deviating_values: + filtered_df = filter_df_based_on_deviating_values( + value=value, + attribute=attribute, + df=df + ) + grouped_ids = get_grouped_ids_per_deviating_value( + filtered_df=filtered_df, + unique_identifier=unique_identifier + ) + if isinstance(attribute, list): value = str(value) + extracted_data.append( + { + "identifierVeldWaarde": grouped_ids, + "afwijkendeAttribuutWaarde": value, + "dqDatum": run_time, + "regelNaam": expectation_type, + "regelParameters": parameter_list, + "bronTabelId": tabel_id, + } + ) df_afwijking = list_of_dicts_to_df( list_of_dicts=extracted_data, @@ -464,27 +484,23 @@ def write_non_validation_tables( def write_validation_table( - validation_output: Any, + validation_output: CheckpointDescriptionDict, validation_settings_obj: ValidationSettings, df: DataFrame, dataset_name: str, unique_identifier: str, ): - for results in validation_output.values(): - result = results["validation_result"] - extract_dq_validatie_data( - validation_settings_obj.table_name, - dataset_name, - result, - validation_settings_obj.catalog_name, - validation_settings_obj.spark_session, - ) - extract_dq_afwijking_data( - validation_settings_obj.table_name, - dataset_name, - result, - df, - unique_identifier, - validation_settings_obj.catalog_name, - validation_settings_obj.spark_session, - ) + extract_dq_validatie_data( + validation_settings_obj.table_name, + validation_output, + validation_settings_obj.catalog_name, + validation_settings_obj.spark_session, + ) + extract_dq_afwijking_data( + validation_settings_obj.table_name, + validation_output, + df, + unique_identifier, + validation_settings_obj.catalog_name, + validation_settings_obj.spark_session, + )