Skip to content

Commit

Permalink
Feature dagster check results (#271)
Browse files Browse the repository at this point in the history
* Added dagster helper functions

* bump version to dagster checks utilities
  • Loading branch information
canimus authored Jul 6, 2024
1 parent d9ff574 commit 8d51514
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 20 deletions.
4 changes: 3 additions & 1 deletion cuallee/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@
except (ModuleNotFoundError, ImportError):
logger.debug("KO: BigQuery")


class CustomComputeException(Exception):
pass


class CheckLevel(enum.Enum):
"""Level of verifications in cuallee"""

Expand Down Expand Up @@ -1179,7 +1181,7 @@ def is_custom(
fn (Callable): A function that receives a dataframe as input and returns a dataframe with at least 1 column as result
pct (float): The threshold percentage required to pass
"""

(Rule("is_custom", column, fn, CheckDataType.AGNOSTIC, pct) >> self._rule)
return self

Expand Down
43 changes: 40 additions & 3 deletions cuallee/dagster/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from dagster import asset_check, AssetCheckResult
from cuallee import Check
from dagster import asset_check, AssetCheckResult, AssetCheckSpec, AssetCheckSeverity
from cuallee import Check, CheckLevel
import pandas as pd
from typing import List
from typing import List, Iterator


def make_dagster_checks(
Expand All @@ -27,3 +27,40 @@ def _check():

checks.append(_check)
return checks


def get_severity(check: Check):
if check.level == CheckLevel.ERROR:
return AssetCheckSeverity.ERROR
else:
return AssetCheckSeverity.WARN


def make_check_specs(check: Check, asset_name: str) -> List[AssetCheckSpec]:
"""To be used in the @asset decorator as input for check_specs"""
return [
AssetCheckSpec(name=f"{rule.method}.{rule.column}", asset=asset_name)
for rule in check.rules
]


def yield_check_results(
check: Check, dataframe: pd.DataFrame
) -> Iterator[AssetCheckResult]:
"""Used in checks inside an asset, to yield all cuallee validations"""
results = check.validate(dataframe)

for item in results.itertuples():
yield AssetCheckResult(
name=f"{item.method}.{item.column}",
passed=(item.status == "PASS"),
metadata={
"level": item.level,
"rows": int(item.rows),
"column": item.column,
"value": str(item.value),
"violations": int(item.violations),
"pass_rate": item.pass_rate,
},
severity=get_severity(check),
)
23 changes: 14 additions & 9 deletions cuallee/pyspark_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,26 +594,31 @@ def is_custom(self, rule: Rule):

def _execute(dataframe: DataFrame, key: str):
try:
assert isinstance(rule.value, Callable), "Please provide a Callable/Function for validation"
assert isinstance(
rule.value, Callable
), "Please provide a Callable/Function for validation"
computed_frame = rule.value(dataframe)
assert isinstance(computed_frame, DataFrame), "Custom function does not return a PySpark DataFrame"
assert len(computed_frame.columns) >= 1, "Custom function should retun at least one column"
assert isinstance(
computed_frame, DataFrame
), "Custom function does not return a PySpark DataFrame"
assert (
len(computed_frame.columns) >= 1
), "Custom function should retun at least one column"
computed_column = last(computed_frame.columns)
return computed_frame.select(
F.sum(F.col(f"`{computed_column}`").cast("integer")).alias(key)
)

except Exception as err:
raise CustomComputeException(str(err))


except Exception as err:
raise CustomComputeException(str(err))

self.compute_instruction = ComputeInstruction(
predicate, _execute, ComputeMethod.TRANSFORM
)

return self.compute_instruction


def _field_type_filter(
dataframe: DataFrame,
field_type: Union[
Expand Down Expand Up @@ -796,12 +801,12 @@ def summary(check: Check, dataframe: DataFrame) -> DataFrame:
spark = SparkSession.builder.getOrCreate()

def _value(x):
""" Removes verbosity for Callable values"""
"""Removes verbosity for Callable values"""
if isinstance(x, Callable):
return "f(x)"
else:
return str(x)

# Compute the expression
computed_expressions = compute(check._rule)
if (int(spark.version.replace(".", "")[:3]) < 330) or (
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "cuallee"
version = "0.11.1"
version = "0.12.0"
authors = [
{ name="Herminio Vazquez", email="canimus@gmail.com"},
{ name="Virginie Grosboillot", email="vestalisvirginis@gmail.com" }
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[metadata]
name = cuallee
version = 0.11.1
version = 0.12.0
[options]
packages = find:
20 changes: 18 additions & 2 deletions test/unit/dagster_checks/test_methods.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,27 @@
from cuallee.dagster import make_dagster_checks
from cuallee.dagster import make_dagster_checks, make_check_specs, yield_check_results
from cuallee import Check, CheckLevel
import pandas as pd

from typing import Iterator

def test_make_checks():
df = pd.DataFrame({"id": [1, 2, 3, 4, 5]})
check = Check(CheckLevel.WARNING, "Dagster")
check.is_complete("id")
result = make_dagster_checks(check, "AssetName", df)
assert isinstance(result, list)


def test_make_check_specs():
df = pd.DataFrame({"id": [1, 2, 3, 4, 5]})
check = Check(CheckLevel.WARNING, "Dagster")
check.is_complete("id")
specs = make_check_specs(check, "test_asset")
assert isinstance(specs, list)


def test_yield_check_specs():
df = pd.DataFrame({"id": [1, 2, 3, 4, 5]})
check = Check(CheckLevel.WARNING, "Dagster")
check.is_complete("id")
results = yield_check_results(check, df)
assert isinstance(results, Iterator)
5 changes: 2 additions & 3 deletions test/unit/pyspark_dataframe/test_is_custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@ def test_negative(spark):
def test_parameters(spark):
df = spark.range(10)
with pytest.raises(
CustomComputeException, match="Please provide a Callable/Function for validation"
CustomComputeException,
match="Please provide a Callable/Function for validation",
):
check = Check(CheckLevel.WARNING, "pytest")
check.is_custom("id", "wrong value")
check.validate(df)




def test_coverage(spark):
Expand Down

0 comments on commit 8d51514

Please sign in to comment.