Skip to content

Commit

Permalink
Move pudl_etl_job_factory back to pudl.etl.__init__.py (#3711)
Browse files Browse the repository at this point in the history
  • Loading branch information
bendnorman authored Jul 3, 2024
1 parent 548401f commit f977c74
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 27 deletions.
40 changes: 40 additions & 0 deletions src/pudl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@

import pathlib
import sys
from collections.abc import Callable

import click
import fsspec
from dagster import (
DagsterInstance,
Definitions,
JobDefinition,
build_reconstructable_job,
define_asset_job,
execute_job,
)

Expand All @@ -19,6 +23,42 @@
logger = pudl.logging_helpers.get_logger(__name__)


def pudl_etl_job_factory(
logfile: str | None = None, loglevel: str = "INFO", process_epacems: bool = True
) -> Callable[[], JobDefinition]:
"""Factory for parameterizing a reconstructable pudl_etl job.
Args:
loglevel: The log level for the job's execution.
logfile: Path to a log file for the job's execution.
process_epacems: Include EPA CEMS assets in the job execution.
Returns:
The job definition to be executed.
"""

def get_pudl_etl_job():
"""Create an pudl_etl_job wrapped by to be wrapped by reconstructable."""
pudl.logging_helpers.configure_root_logger(logfile=logfile, loglevel=loglevel)
jobs = [define_asset_job("etl_job")]
if not process_epacems:
jobs = [
define_asset_job(
"etl_job",
selection=pudl.etl.create_non_cems_selection(
pudl.etl.default_assets
),
)
]
return Definitions(
assets=pudl.etl.default_assets,
resources=pudl.etl.default_resources,
jobs=jobs,
).get_job_def("etl_job")

return get_pudl_etl_job


@click.command(
context_settings={"help_option_names": ["-h", "--help"]},
)
Expand Down
30 changes: 3 additions & 27 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@
import pytest
import sqlalchemy as sa
from dagster import (
JobDefinition,
build_init_resource_context,
graph,
materialize_to_memory,
)

import pudl
from pudl import resources
from pudl.etl import defs
from pudl.etl.cli import pudl_etl_job_factory
from pudl.extract.ferc1 import Ferc1DbfExtractor, raw_ferc1_xbrl__metadata_json
from pudl.extract.xbrl import xbrl2sqlite_op_factory
from pudl.io_managers import (
Expand Down Expand Up @@ -279,29 +278,6 @@ def ferc1_xbrl_taxonomy_metadata(ferc1_engine_xbrl: sa.Engine):
return result.output_for_node("raw_ferc1_xbrl__metadata_json")


def _pudl_etl_job_factory(
logfile: str | None = None, loglevel: str = "INFO", process_epacems: bool = True
) -> JobDefinition:
"""Function that lets us pass jobs between processes.
If we are using e.g. a multi-process executor, we can't pass the whole
JobDefinition to the child processes. Instead we pass this function around
that makes JobDefinitions.
Args:
loglevel: The log level for the job's execution.
logfile: Path to a log file for the job's execution.
process_epacems: Include EPA CEMS assets in the job execution.
Returns:
The job definition to be executed.
"""
pudl.logging_helpers.configure_root_logger(logfile=logfile, loglevel=loglevel)
if not process_epacems:
return defs.get_job_def("etl_full_no_cems")
return defs.get_job_def("etl_full")


@pytest.fixture(scope="session")
def pudl_io_manager(
ferc1_engine_dbf: sa.Engine, # Implicit dependency
Expand All @@ -323,7 +299,7 @@ def pudl_io_manager(
md = PUDL_PACKAGE.to_sql()
md.create_all(engine)
# Run the ETL and generate a new PUDL SQLite DB for testing:
execute_result = _pudl_etl_job_factory().execute_in_process(
execute_result = pudl_etl_job_factory()().execute_in_process(
run_config={
"resources": {
"dataset_settings": {
Expand Down Expand Up @@ -364,7 +340,7 @@ def configure_paths_for_tests(tmp_path_factory, request):
Set ``--live-dbs`` to force PUDL_OUTPUT to *NOT* be a temporary directory
and instead inherit from environment.
``--live--dbs`` flag is ignored in unit tests, see pudl/test/unit/conftest.py.
``--live-dbs`` flag is ignored in unit tests, see pudl/test/unit/conftest.py.
"""
# Just in case we need this later...
pudl_tmpdir = tmp_path_factory.mktemp("pudl")
Expand Down

0 comments on commit f977c74

Please sign in to comment.