Skip to content

Commit

Permalink
Merge pull request #55 from Amsterdam/add_run_time
Browse files Browse the repository at this point in the history
Add run_time parameter
  • Loading branch information
SSchotten authored Oct 14, 2024
2 parents e630d1b + bb5968a commit 424546b
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 12 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ Using a Shared Compute Cluster will result in an error, as it does not have the
([click](https://docs.databricks.com/en/release-notes/runtime/13.3lts.html#system-environment)).
Older versions of DBR will result in errors upon install of the `dq-suite-amsterdam` library.

- At time of writing (late Aug 2024), Great Expectations v1.0.0 has just been released, and is not (yet) compatible with Python 3.12. Hence, make sure you are using the correct version of Python as interpreter for your project.
- At time of writing (late Aug 2024), Great Expectations v1.0.0 has just been released, and is not (yet) compatible with Python 3.12. Hence, make sure you are using the correct version of Python as interpreter for your project.

- The run_time is defined separately from Great Expectations in df_checker. We plan on fixing it when Great Expectations has documented how to access it from the RunIdentifier object.

# Contributing to this library
See the separate [developers' readme](src/Readme-dev.md).
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "dq-suite-amsterdam"
version = "0.10.3"
version = "0.10.4"
authors = [
{ name="Arthur Kordes", email="a.kordes@amsterdam.nl" },
{ name="Aysegul Cayir Aydar", email="a.cayiraydar@amsterdam.nl" },
Expand Down
3 changes: 3 additions & 0 deletions src/dq_suite/df_checker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import List

import great_expectations
import datetime
from great_expectations import Checkpoint, ValidationDefinition
from great_expectations.checkpoint.actions import CheckpointAction
from great_expectations.checkpoint.checkpoint import CheckpointResult
Expand Down Expand Up @@ -216,6 +217,7 @@ def run(
validation_settings_obj=validation_settings_obj,
)
validation_output = checkpoint_result.describe_dict()
run_time = datetime.datetime.now() #TODO: get from RunIdentifier object

# 3) write results to unity catalog
write_non_validation_tables(
Expand All @@ -228,4 +230,5 @@ def run(
df=df,
dataset_name=validation_dict["dataset"]["name"],
unique_identifier=rules_dict["unique_identifier"],
run_time = run_time,
)
19 changes: 9 additions & 10 deletions src/dq_suite/output_transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def get_grouped_ids_per_deviating_value(
def extract_dq_validatie_data(
table_name: str,
dataset_name: str,
run_time: datetime,
dq_result: CheckpointDescriptionDict,
catalog_name: str,
spark_session: SparkSession,
Expand All @@ -123,7 +124,8 @@ def extract_dq_validatie_data(
:param table_name: Name of the tables
:param dataset_name:
:param dq_result:
:param run_time:
:param dq_result: # TODO: add dataclass?
:param catalog_name:
:param spark_session:
"""
Expand All @@ -132,10 +134,6 @@ def extract_dq_validatie_data(
# "validation_results" is typed List[Dict[str, Any]] in GX
dq_result = dq_result["validation_results"]

# run_time = dq_result["meta"]["run_id"].run_time
run_time = datetime.datetime(1900, 1, 1)
# TODO: fix, find run_time in new GX API

extracted_data = []
for validation_result in dq_result:
for expectation_result in validation_result["expectations"]:
Expand Down Expand Up @@ -199,6 +197,7 @@ def extract_dq_afwijking_data(
dq_result: CheckpointDescriptionDict,
df: DataFrame,
unique_identifier: str,
run_time: datetime,
catalog_name: str,
spark_session: SparkSession,
) -> None:
Expand All @@ -210,18 +209,15 @@ def extract_dq_afwijking_data(
:param dq_result:
:param df: A DataFrame containing the invalid (deviated) result
:param unique_identifier:
:param run_time:
:param catalog_name:
:param spark_session:
"""
tabel_id = f"{dataset_name}_{table_name}"

# "validation_results" is typed List[Dict[str, Any]] in GX
dq_result = dq_result["validation_results"]

# run_time = dq_result["meta"]["run_id"].run_time
run_time = datetime.datetime(1900, 1, 1)
# TODO: fix, find run_time in new GX API


extracted_data = []
if not isinstance(unique_identifier, list):
unique_identifier = [unique_identifier]
Expand Down Expand Up @@ -526,11 +522,13 @@ def write_validation_table(
df: DataFrame,
dataset_name: str,
unique_identifier: str,
run_time: datetime,
):
extract_dq_validatie_data(
validation_settings_obj.table_name,
dataset_name,
validation_output,
run_time,
validation_settings_obj.catalog_name,
validation_settings_obj.spark_session,
)
Expand All @@ -540,6 +538,7 @@ def write_validation_table(
validation_output,
df,
unique_identifier,
run_time,
validation_settings_obj.catalog_name,
validation_settings_obj.spark_session,
)

0 comments on commit 424546b

Please sign in to comment.