Skip to content

Commit

Permalink
Upgrade GX to version 1.0 (#53)
Browse files Browse the repository at this point in the history
* Change dependencies GX version to 1.0.0

* Add note to README.md

* Downgrade python-version to 3.11 in github-ci.yml

* Bump up version

* Formatting

* Modify use of GX methods to follow 1.0.0 standards

* Add use of ExpectationSuite

* Rework the use of data assets and batches

* Correctly handle adding of expectation suites

* Correctly handle adding of expectations

* Minor comment

* Modify get_or_add_validation_definition

* Modify get_or_add_checkpoint

* Modify create_and_configure_expectations

* Modify validate function

* Modify run function

* Add pyhumps to pyproject.toml

* Formatting

* Add get_data_context

* Undo add get_data_context

* Pass suite object to ValidationDefinition

* Change type hint for actions

* Add some debug info

* Add some debug info

* Increase gx version

* Temporarily comment out __init__.py contents

* Modify creation of actions

* Modify import of Checkpoint

* Force name to be string-typed

* Change way of calling Checkpoint

* Change parameter name

* Re-activate __init__.py

* Re-activate __init__.py

* Fix use of checkpoint results

* Revert __init__.py

* Fix getting validation_results field

* Again __init__.py

* Simple fix for getting validation_results field

* Simple fix for getting validation_results field

* Attempt at fixing write to catalog

* Blehhh

* Fix iteration over dq_results

* Fix run_time type

* Cleanup

* Formatting

* Upgrade GX to 1.0.3

* GX1.0 fixes

* indent

---------

Co-authored-by: bas <b.schotten@amsterdam.nl>
Co-authored-by: ArthurKordes <ArthurKordes>
Co-authored-by: ArthurKordes <75675106+ArthurKordes@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 20, 2024
1 parent 68e24dc commit 71527b3
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 174 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/github-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
python-version: "3.11"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ Using a Shared Compute Cluster will result in an error, as it does not have the
([click](https://docs.databricks.com/en/release-notes/runtime/13.3lts.html#system-environment)).
Older versions of DBR will result in errors upon install of the `dq-suite-amsterdam` library.

- At time of writing (late Aug 2024), Great Expectations v1.0.0 has just been released, and is not (yet) compatible with Python 3.12. Hence, make sure you are using the correct version of Python as interpreter for your project.

# Contributing to this library
See the separate [developers' readme](src/Readme-dev.md).
Expand Down
8 changes: 5 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "dq-suite-amsterdam"
version = "0.9.1"
version = "0.10.0"
authors = [
{ name="Arthur Kordes", email="a.kordes@amsterdam.nl" },
{ name="Aysegul Cayir Aydar", email="a.cayiraydar@amsterdam.nl" },
Expand All @@ -15,9 +15,11 @@ description = "Wrapper for Great Expectations to fit the requirements of the Gem
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"great_expectations==0.18.19",
"pandas==2.2.2",
"great_expectations==1.0.3",
"pandas==2.1.4",
"pyspark==3.5.2",
"pyhumps==3.8.0",
"pyyaml==6.0.2",
]


Expand Down
44 changes: 29 additions & 15 deletions src/dq_suite/common.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
from dataclasses import dataclass
from typing import Any, Dict, List, Literal

from great_expectations import get_context
from great_expectations.data_context import AbstractDataContext
import yaml

from great_expectations import ExpectationSuite, get_context
from great_expectations.data_context import (
AbstractDataContext,
EphemeralDataContext,
)
from great_expectations.data_context.types.base import (
DataContextConfig,
InMemoryStoreBackendDefaults,
)
from great_expectations.exceptions import DataContextError
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType
Expand Down Expand Up @@ -206,10 +215,13 @@ def merge_df_with_unity_table(
.execute()


def get_data_context(
data_context_root_dir: str = "/dbfs/great_expectations/",
) -> AbstractDataContext: # pragma: no cover - part of GX
return get_context(context_root_dir=data_context_root_dir)
def get_data_context() -> AbstractDataContext: # pragma: no cover - part of GX
return get_context(
project_config=DataContextConfig(
store_backend_defaults=InMemoryStoreBackendDefaults(),
analytics_enabled=False
)
)


@dataclass()
Expand All @@ -234,6 +246,7 @@ class ValidationSettings:
notify_on: when to send notifications, can be equal to "all",
"success" or "failure"
"""

spark_session: SparkSession
catalog_name: str
table_name: str
Expand Down Expand Up @@ -269,20 +282,21 @@ def initialise_or_update_attributes(self): # pragma: no cover - complex
# function
self._set_data_context()

# TODO/check: do we want to allow for custom names?
# TODO/check: do we want to allow for custom names via parameters?
self._set_expectation_suite_name()
self._set_checkpoint_name()
self._set_run_name()

# Finally, apply the (new) suite name to the data context
self.data_context.add_or_update_expectation_suite(
expectation_suite_name=self.expectation_suite_name
)
# Finally, add/retrieve the suite to/from the data context
try:
self.data_context.suites.get(name=self.expectation_suite_name)
except DataContextError:
self.data_context.suites.add(
suite=ExpectationSuite(name=self.expectation_suite_name)
)

def _set_data_context(self): # pragma: no cover - uses part of GX
self.data_context = get_data_context(
data_context_root_dir=self.data_context_root_dir
)
self.data_context = get_data_context()

def _set_expectation_suite_name(self):
self.expectation_suite_name = f"{self.check_name}_expectation_suite"
Expand Down
195 changes: 114 additions & 81 deletions src/dq_suite/df_checker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from typing import Any, List, Tuple

from great_expectations.checkpoint import Checkpoint
from great_expectations.validator.validator import Validator
from typing import List

import great_expectations
import humps
from great_expectations import Checkpoint, ValidationDefinition
from great_expectations.checkpoint.actions import CheckpointAction
from great_expectations.checkpoint.checkpoint import CheckpointResult
from great_expectations.exceptions import DataContextError
from pyspark.sql import DataFrame

from .common import DataQualityRulesDict, Rule, RulesDict, ValidationSettings
Expand All @@ -22,126 +26,152 @@ def filter_validation_dict_by_table_name(
return None


def get_batch_request_and_validator(
df: DataFrame,
def get_or_add_validation_definition(
validation_settings_obj: ValidationSettings,
) -> Tuple[Any, Validator]:
) -> ValidationDefinition:
dataframe_datasource = (
validation_settings_obj.data_context.sources.add_or_update_spark(
name="my_spark_in_memory_datasource_"
+ validation_settings_obj.check_name
validation_settings_obj.data_context.data_sources.add_or_update_spark(
name=f"spark_datasource_" f"{validation_settings_obj.check_name}"
)
)

df_asset = dataframe_datasource.add_dataframe_asset(
name=validation_settings_obj.check_name, dataframe=df
name=validation_settings_obj.check_name
)
batch_definition = df_asset.add_batch_definition_whole_dataframe(
name=f"{validation_settings_obj.check_name}_batch_definition"
)
batch_request = df_asset.build_batch_request()

validator = validation_settings_obj.data_context.get_validator(
batch_request=batch_request,
expectation_suite_name=validation_settings_obj.expectation_suite_name,
validation_definition_name = (
f"{validation_settings_obj.check_name}" f"_validation_definition"
)
try:
validation_definition = (
validation_settings_obj.data_context.validation_definitions.get(
name=validation_definition_name
)
)
except DataContextError:
validation_definition = ValidationDefinition(
name=validation_definition_name,
data=batch_definition,
suite=validation_settings_obj.data_context.suites.get(
validation_settings_obj.expectation_suite_name
),
) # Note: a validation definition combines data with a suite of
# expectations
validation_definition = (
validation_settings_obj.data_context.validation_definitions.add(
validation=validation_definition
)
)

return batch_request, validator
return validation_definition


def create_action_list(
validation_settings_obj: ValidationSettings,
) -> List[dict[str, Any]]:
) -> List[CheckpointAction]:
action_list = list()

action_list.append(
{ # TODO/check: do we really have to store the validation results?
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
}
)

if validation_settings_obj.send_slack_notification & (
validation_settings_obj.slack_webhook is not None
):
action_list.append(
{
"name": "send_slack_notification",
"action": {
"class_name": "SlackNotificationAction",
"slack_webhook": validation_settings_obj.slack_webhook,
"notify_on": validation_settings_obj.notify_on,
"renderer": {
"module_name": "great_expectations.render.renderer.slack_renderer",
"class_name": "SlackRenderer",
},
great_expectations.checkpoint.SlackNotificationAction(
name="send_slack_notification",
slack_webhook=validation_settings_obj.slack_webhook,
notify_on=validation_settings_obj.notify_on,
renderer={
"module_name": "great_expectations.render.renderer.slack_renderer",
"class_name": "SlackRenderer",
},
}
)
)

if validation_settings_obj.send_ms_teams_notification & (
validation_settings_obj.ms_teams_webhook is not None
):
action_list.append(
{
"name": "send_ms_teams_notification",
"action": {
"class_name": "MicrosoftTeamsNotificationAction",
"microsoft_teams_webhook": validation_settings_obj.ms_teams_webhook,
"notify_on": validation_settings_obj.notify_on,
"renderer": {
"module_name": "great_expectations.render.renderer.microsoft_teams_renderer",
"class_name": "MicrosoftTeamsRenderer",
},
great_expectations.checkpoint.MicrosoftTeamsNotificationAction(
name="send_ms_teams_notification",
microsoft_teams_webhook=validation_settings_obj.ms_teams_webhook,
notify_on=validation_settings_obj.notify_on,
renderer={
"module_name": "great_expectations.render.renderer.microsoft_teams_renderer",
"class_name": "MicrosoftTeamsRenderer",
},
}
)
)

return action_list


def create_and_run_checkpoint(
validation_settings_obj: ValidationSettings, batch_request: Any
) -> Any:
action_list = create_action_list(
validation_settings_obj=validation_settings_obj
)
checkpoint = Checkpoint(
name=validation_settings_obj.checkpoint_name,
run_name_template=validation_settings_obj.run_name,
data_context=validation_settings_obj.data_context,
batch_request=batch_request,
expectation_suite_name=validation_settings_obj.expectation_suite_name,
action_list=action_list,
)

validation_settings_obj.data_context.add_or_update_checkpoint(
checkpoint=checkpoint
)
checkpoint_result = checkpoint.run()
return checkpoint_result["run_results"]
def get_or_add_checkpoint(
validation_settings_obj: ValidationSettings,
validation_definition: ValidationDefinition,
) -> Checkpoint:
try:
checkpoint = validation_settings_obj.data_context.checkpoints.get(
name=validation_settings_obj.checkpoint_name
)
except DataContextError:
action_list = create_action_list(
validation_settings_obj=validation_settings_obj
)
checkpoint = Checkpoint(
name=validation_settings_obj.checkpoint_name,
validation_definitions=[validation_definition],
actions=action_list,
) # Note: a checkpoint combines validations with actions

# Add checkpoint to data context for future use
(
validation_settings_obj.data_context.checkpoints.add(
checkpoint=checkpoint
)
)
return checkpoint


def create_and_configure_expectations(
validation_rules_list: List[Rule], validator: Validator
validation_rules_list: List[Rule],
validation_settings_obj: ValidationSettings,
) -> None:
# The suite should exist by now
suite = validation_settings_obj.data_context.suites.get(
name=validation_settings_obj.expectation_suite_name
)

for validation_rule in validation_rules_list:
# Get the name of expectation as defined by GX
gx_expectation_name = validation_rule["rule_name"]

# Get the actual expectation as defined by GX
gx_expectation = getattr(validator, gx_expectation_name)
gx_expectation = getattr(
great_expectations.expectations.core,
humps.pascalize(gx_expectation_name),
)
# Issue 50
# TODO: drop pascalization, and require this as input check
# when ingesting JSON? Could be done via humps.is_pascalcase()

for validation_parameter_dict in validation_rule["parameters"]:
kwargs = {}
# Issue 51
# TODO/check: is this loop really necessary? Intuitively, I added
# the same expectation for each column - I didn't consider using
# the same expectation with different parameters
for par_name, par_value in validation_parameter_dict.items():
kwargs[par_name] = par_value
gx_expectation(**kwargs)

validator.save_expectation_suite(discard_failed_expectations=False)
suite.add_expectation(gx_expectation(**kwargs))


def validate(
df: DataFrame,
rules_dict: RulesDict,
validation_settings_obj: ValidationSettings,
) -> Any:
) -> CheckpointResult:
"""
[explanation goes here]
Expand All @@ -153,21 +183,23 @@ def validate(
# Make sure all attributes are aligned before validating
validation_settings_obj.initialise_or_update_attributes()

batch_request, validator = get_batch_request_and_validator(
df=df,
create_and_configure_expectations(
validation_rules_list=rules_dict["rules"],
validation_settings_obj=validation_settings_obj,
)

create_and_configure_expectations(
validation_rules_list=rules_dict["rules"], validator=validator
validation_definition = get_or_add_validation_definition(
validation_settings_obj=validation_settings_obj,
)

checkpoint_output = create_and_run_checkpoint(
print("***Starting validation definition run***")
print(validation_definition.run(batch_parameters={"dataframe": df}))
checkpoint = get_or_add_checkpoint(
validation_settings_obj=validation_settings_obj,
batch_request=batch_request,
validation_definition=validation_definition,
)

return checkpoint_output
batch_params = {"dataframe": df}
return checkpoint.run(batch_parameters=batch_params)


def run(
Expand All @@ -191,11 +223,12 @@ def run(
return

# 2) perform the validation on the dataframe
validation_output = validate(
checkpoint_result = validate(
df=df,
rules_dict=rules_dict,
validation_settings_obj=validation_settings_obj,
)
validation_output = checkpoint_result.describe_dict()

# 3) write results to unity catalog
write_non_validation_tables(
Expand Down
Loading

0 comments on commit 71527b3

Please sign in to comment.