Skip to content

Commit

Permalink
Make asset checks run in integration test (#3990)
Browse files Browse the repository at this point in the history
* WIP: Use same dagster Definitions for test and prod.

* only check row nums for years that are in the dataset_settings

* make the warning only warn once

* Since some asset checks use job name to switch what checks they run, allow test to specify job name.

---------

Co-authored-by: Christina Gosnell <cgosnell@catalyst.coop>
  • Loading branch information
jdangerx and cmgosnell authored Dec 22, 2024
1 parent d6e385c commit ba58b94
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 28 deletions.
2 changes: 0 additions & 2 deletions src/pudl/etl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
from pudl.settings import EtlSettings

from . import (
check_foreign_keys,
cli,
eia_bulk_elec_assets,
epacems_assets,
glue_assets,
Expand Down
23 changes: 5 additions & 18 deletions src/pudl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@
import fsspec
from dagster import (
DagsterInstance,
Definitions,
JobDefinition,
build_reconstructable_job,
define_asset_job,
execute_job,
)

import pudl
from pudl.etl import defs
from pudl.helpers import get_dagster_execution_config
from pudl.settings import EpaCemsSettings, EtlSettings
from pudl.workspace.setup import PudlPaths
Expand All @@ -37,24 +36,12 @@ def pudl_etl_job_factory(
The job definition to be executed.
"""

def get_pudl_etl_job():
def get_pudl_etl_job(job_name: str | None = None):
"""Create an pudl_etl_job wrapped by to be wrapped by reconstructable."""
pudl.logging_helpers.configure_root_logger(logfile=logfile, loglevel=loglevel)
jobs = [define_asset_job("etl_job")]
if not process_epacems:
jobs = [
define_asset_job(
"etl_job",
selection=pudl.etl.create_non_cems_selection(
pudl.etl.default_assets
),
)
]
return Definitions(
assets=pudl.etl.default_assets,
resources=pudl.etl.default_resources,
jobs=jobs,
).get_job_def("etl_job")
if job_name is None:
job_name = "etl_full_no_cems" if not process_epacems else "etl_full"
return defs.get_job_def(job_name)

return get_pudl_etl_job

Expand Down
17 changes: 10 additions & 7 deletions src/pudl/transform/ferc714.py
Original file line number Diff line number Diff line change
Expand Up @@ -1276,25 +1276,28 @@ class Ferc714CheckSpec:
]


def make_check(spec: Ferc714CheckSpec) -> AssetChecksDefinition:
def make_row_num_check(spec: Ferc714CheckSpec) -> AssetChecksDefinition:
"""Turn the Ferc714CheckSpec into an actual Dagster asset check."""

@asset_check(asset=spec.asset, blocking=True)
def _check(df):
@asset_check(
asset=spec.asset, required_resource_keys={"dataset_settings"}, blocking=True
)
def _row_num_check(context, df):
errors = []
for year, expected_rows in spec.num_rows_by_report_year.items():
for year in context.resources.dataset_settings.ferc714.years:
expected_rows = spec.num_rows_by_report_year[year]
if (num_rows := len(df.loc[df.report_year == year])) != expected_rows:
errors.append(
f"Expected {expected_rows} for report year {year}, found {num_rows}"
)
logger.info(errors)
logger.warning(errors)

if errors:
return AssetCheckResult(passed=False, metadata={"errors": errors})

return AssetCheckResult(passed=True)

return _check
return _row_num_check


_checks = [make_check(spec) for spec in check_specs]
_checks = [make_row_num_check(spec) for spec in check_specs]
2 changes: 1 addition & 1 deletion test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ def pudl_io_manager(
md = PUDL_PACKAGE.to_sql()
md.create_all(engine)
# Run the ETL and generate a new PUDL SQLite DB for testing:
execute_result = pudl_etl_job_factory()().execute_in_process(
execute_result = pudl_etl_job_factory()("etl_fast").execute_in_process(
run_config={
"resources": {
"dataset_settings": {
Expand Down

0 comments on commit ba58b94

Please sign in to comment.