Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start EIA-176 pipelines: company data #2949

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
826a77a
Start of reusable CSV extractor, incorporating preexisting patterns a…
Oct 6, 2023
bc6eddf
Table schema and archive objects for CSV extraction, pipeline-/form-s…
davidmudrauskas Oct 30, 2023
ff6b5bf
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Oct 30, 2023
9903674
Unit tests for CsvTableSchema
davidmudrauskas Nov 3, 2023
3a0bfe2
Full unit test coverage for CSV extractor
davidmudrauskas Nov 4, 2023
1fb52e9
Follow patterns for clobber and test file names, implement delete_sch…
davidmudrauskas Nov 6, 2023
8dbd975
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Nov 6, 2023
1531313
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Nov 16, 2023
9aff0a8
Update CSV extractor to just return dataframes, integrate with Dagster
davidmudrauskas Nov 16, 2023
caaa212
Combine thin CSV extraction-related class, update tests
davidmudrauskas Nov 16, 2023
fe3fbb7
Remove extraneous files, undo find-replace error
davidmudrauskas Nov 16, 2023
0b703c6
Extract one table using CSV extractor
davidmudrauskas Nov 17, 2023
cb8e7e1
Move managing zipfile and table-file map to CSV extractor client, sim…
davidmudrauskas Nov 17, 2023
4dc4ad9
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Nov 21, 2023
87b7c51
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Nov 22, 2023
82504f0
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Nov 27, 2023
8f4d93e
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Dec 1, 2023
35de6a8
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 1, 2023
35fabe6
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Dec 6, 2023
07b48f3
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Jan 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/pudl/etl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
default_assets = (
*load_assets_from_modules([eia_bulk_elec_assets], group_name="eia_bulk_elec"),
*load_assets_from_modules([epacems_assets], group_name="epacems"),
*load_assets_from_modules([pudl.extract.eia176], group_name="raw_eia176"),
*load_assets_from_modules([pudl.extract.eia860], group_name="raw_eia860"),
*load_assets_from_modules([pudl.transform.eia860], group_name="_core_eia860"),
*load_assets_from_modules([pudl.extract.eia861], group_name="raw_eia861"),
Expand Down
1 change: 1 addition & 0 deletions src/pudl/extract/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
:mod:`pudl.transform` subpackage.
"""
from . import (
eia176,
eia860,
eia860m,
eia861,
Expand Down
106 changes: 106 additions & 0 deletions src/pudl/extract/csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""Extractor for CSV data."""
from csv import DictReader
from importlib import resources

import pandas as pd
from dagster import AssetsDefinition, OpDefinition, graph_asset, op

import pudl.logging_helpers
from pudl.workspace.datastore import Datastore

logger = pudl.logging_helpers.get_logger(__name__)


class CsvExtractor:
"""Generalized class for extracting dataframes from CSV files.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this CsvExtractor class requires subclassing to set the DATASET - that seems a little clunky. If we can pass the dataset name to __init__, we'll have a CsvExtractor class which exposes:

extractor = CsvExtractor(datastore, dataset_name)
extractor.read_source(filename) # get one file
extractor.extract() # get all files

Which seems like a nice generic CSV extraction interface that doesn't require subclassing to read a variety of different collections of CSV files.

I think we might want to tweak that API a bit to expose table-level operations:

extractor.extract_one(table_name) # pd.DataFrame
extractor.extract_all() # dict[str, pd.DataFrame]

Because then we can read multiple tables' files in, in parallel - each table could have its own asset like

@asset 
def raw_eia176__table_name(context):
    ...
    extractor.extract_one(table_name)

Which is pretty straightforward to factory-ize if you want to make a bunch of these assets.

In the event we need to read & combine multiple files for a single table (like we see in EIA 860), we can turn that simple asset above into a graph-backed asset. But for EIA176 that seems like overkill.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's also an argument for passing in the table -> file(s) map and the path to file as init params, and then using the datastore at the call site - this lets people use the extractor to explore data sets that we haven't actually integrated into our upstream work yet, and lets you pass stuff in for testing without extensive patching. Could be something like

class CsvExtractor:
    def __init__(self, path: pathlib.Path | zipfile.Path, table_files_map: dict[str, list[str]]):
        ...
    @classmethod
    def from_resource(cls, datastore: Datastore, resource_id: str):
        # existing logic to get zipfile + table/file map
        return cls(...)

What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this CsvExtractor class requires subclassing to set the DATASET - that seems a little clunky. If we can pass the dataset name to init

Meant to comment on this. I actually started using dataset as a construction parameter but stopped because I was introducing a competing pattern, e.g., vs FercDbfExtractor's inheritance tree. I'll proceed with parameterizing over inheritance now that someone else is also inclined.

I think there's also an argument for passing in the table -> file(s) map and the path to file as init params, and then using the datastore at the call site

I don't know what the datastore would be used for in that case, since right now it's only to get the zipfile path. But generally you're talking about providing a class that lets a user/client point to any zip file and get dataframes out of it based on a table_files_map? Right now the zipfile path and the table-file(s) map are coupled on the dataset name, e.g., eia176. Decoupling opens a wider window for invalid combos, i.e., table filenames that do not exist in the zip archive, but yeah, it would be nice to be able to develop against data without needing the source published on Zenodo. I'll take a pass in that direction.


When subclassing from this generic extractor, one should implement dataset specific
logic in the following manner:

2. Set DATASET class attribute. This is used to load metadata from package_data/{dataset} subdirectory.

The extraction logic is invoked by calling extract() method of this class.
"""

DATASET = None

def __init__(self, datastore: Datastore):
"""Create a new instance of CsvExtractor.

This can be used for retrieving data from CSV files.

Args:
datastore: provides access to raw files on disk.
"""
self._zipfile = datastore.get_zipfile_resource(self.DATASET)
self._table_file_map = {
row["table"]: row["filename"]
for row in self._open_csv_resource("table_file_map.csv")
}

def _open_csv_resource(self, base_filename: str) -> DictReader:
"""Open the given resource file as :class:`csv.DictReader`."""
csv_path = resources.files(f"pudl.package_data.{self.DATASET}") / base_filename
return DictReader(csv_path.open())

def read_source(self, filename: str) -> pd.DataFrame:
"""Read the data from the CSV source file and return as a dataframe."""
logger.info(f"Extracting {filename} from CSV into pandas DataFrame.")
with self._zipfile.open(filename) as f:
df = pd.read_csv(f)
return df

def extract(self) -> dict[str, pd.DataFrame]:
"""Extracts a dictionary of table names and dataframes from CSV source files."""
data = {}
for table in self._table_file_map:
filename = self._table_file_map[table]
df = self.read_source(filename)
data[table] = df
return data


def extractor_factory(extractor_cls: type[CsvExtractor], name: str) -> OpDefinition:
"""Construct a Dagster op that extracts data given an extractor class.

Args:
extractor_cls: Class of type :class:`CsvExtractor` used to extract the data.
name: Name of a CSV-based dataset (e.g. "eia176").
"""

def extract(context) -> dict[str, pd.DataFrame]:
"""A function that extracts data from a CSV file.

This function will be decorated with a Dagster op and returned.

Args:
context: Dagster keyword that provides access to resources and config.

Returns:
A dictionary of DataFrames extracted from CSV, keyed by table name.
"""
ds = context.resources.datastore
return extractor_cls(ds).extract()

return op(
required_resource_keys={"datastore", "dataset_settings"},
name=f"extract_single_{name}_year",
)(extract)


def raw_df_factory(extractor_cls: type[CsvExtractor], name: str) -> AssetsDefinition:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This follows the pattern established in excel.raw_df_factory, i.e., what we do for extracting EIA-860 in Dagster. Looks like the preexisting logic is covered by the Dagster nightly tests and I'm inclined to rely on the same here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EIA 860 is a little more complicated than EIA 176, since each table corresponds to a number of files spread across a number of different zip files (see src/pudl/package_data/eia860/file_map.csv for a look into the mess...)

The upshot is, I think this raw_df_factory and extractor_factory can be combined into one layer of abstraction. See above for more details.

"""Return a dagster graph asset to extract a set of raw DataFrames from CSV files.

Args:
extractor_cls: The dataset-specific CSV extractor used to extract the data.
Needs to correspond to the dataset identified by ``name``.
name: Name of a CSV-based dataset (e.g. "eia176"). Currently this must be
one of the attributes of :class:`pudl.settings.EiaSettings`
"""
extractor = extractor_factory(extractor_cls, name)

def raw_dfs() -> dict[str, pd.DataFrame]:
"""Produce a dictionary of extracted EIA dataframes."""
return extractor()

return graph_asset(name=f"{name}_raw_dfs")(raw_dfs)
2 changes: 1 addition & 1 deletion src/pudl/extract/dbf.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def get_table_schema(self, table_name: str) -> DbfTableSchema:
table_columns = self.get_db_schema()[table_name]
dbf = self.get_table_dbf(table_name)
dbf_fields = [field for field in dbf.fields if field.name != "_NullFlags"]
if len(table_columns) != len(table_columns):
if len(dbf_fields) != len(table_columns):
davidmudrauskas marked this conversation as resolved.
Show resolved Hide resolved
return ValueError(
f"Number of DBF fields in {table_name} does not match what was "
f"found in the DBC index file for {self.partition}."
Expand Down
46 changes: 46 additions & 0 deletions src/pudl/extract/eia176.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Extract EIA Form 176 data from CSVs.

The EIA Form 176 archive also contains CSVs for EIA Form 191 and EIA Form 757.
davidmudrauskas marked this conversation as resolved.
Show resolved Hide resolved
"""

from dagster import AssetOut, Output, multi_asset

from pudl.extract.csv import CsvExtractor, raw_df_factory

DATASET = "eia176"


class Eia176CsvExtractor(CsvExtractor):
"""Extractor for EIA Form 176 data."""

DATASET = DATASET


# TODO (davidmudrauskas): Add this information to the metadata
raw_table_names = (f"raw_{DATASET}__company",)

eia176_raw_dfs = raw_df_factory(Eia176CsvExtractor, name=DATASET)


@multi_asset(
outs={table_name: AssetOut() for table_name in sorted(raw_table_names)},
required_resource_keys={"datastore", "dataset_settings"},
)
def extract_eia176(context, eia176_raw_dfs):
"""Extract EIA-176 data from CSV source and return dataframes.

Args:
context: dagster keyword that provides access to resources and config.

Returns:
A tuple of extracted EIA dataframes.
"""
eia176_raw_dfs = {
f"raw_{DATASET}__" + table_name: df for table_name, df in eia176_raw_dfs.items()
}
eia176_raw_dfs = dict(sorted(eia176_raw_dfs.items()))

return (
Output(output_name=table_name, value=df)
for table_name, df in eia176_raw_dfs.items()
)
1 change: 1 addition & 0 deletions src/pudl/extract/eia191.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Extract EIA Form 191 data from CSVs."""
1 change: 1 addition & 0 deletions src/pudl/extract/eia757.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Extract EIA Form 757 data from CSVs."""
2 changes: 1 addition & 1 deletion src/pudl/extract/ferc1.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def add_missing_respondents(self):

# Write missing respondents back into SQLite.
with self.sqlite_engine.begin() as conn:
conn.execute(
conn.extract(
self.sqlite_meta.tables["f1_respondent_id"].insert().values(records)
)

Expand Down
2 changes: 2 additions & 0 deletions src/pudl/package_data/eia176/table_file_map.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
table,filename
company,all_company_176.csv
davidmudrauskas marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions src/pudl/workspace/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ class ZenodoDoiSettings(BaseSettings):
# Sandbox DOIs are provided for reference
censusdp1tract: ZenodoDoi = "10.5281/zenodo.4127049"
# censusdp1tract: ZenodoDoi = "10.5072/zenodo.674992"
eia176: ZenodoDoi = "10.5281/zenodo.7682358"
# eia176: ZenodoDoi - "10.5072/zenodo.1166385"
eia860: ZenodoDoi = "10.5281/zenodo.10067566"
# eia860: ZenodoDoi = "10.5072/zenodo.1222854"
eia860m: ZenodoDoi = "10.5281/zenodo.8188017"
Expand Down
11 changes: 11 additions & 0 deletions test/integration/etl_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@ def test_ferc1_xbrl2sqlite(ferc1_engine_xbrl, ferc1_xbrl_taxonomy_metadata):
)


class TestCsvExtractor:
"""Verify that we can lead CSV files as provided via the datastore."""

def test_extract_eia176(self, pudl_datastore_fixture):
davidmudrauskas marked this conversation as resolved.
Show resolved Hide resolved
"""Spot check extraction of eia176 csv files."""
extractor = pudl.extract.eia176.Eia176CsvExtractor(pudl_datastore_fixture)
table = "company"
if table not in extractor.extract():
raise AssertionError(f"table {table} not found in datastore")


class TestExcelExtractor:
"""Verify that we can lead excel files as provided via the datastore."""

Expand Down
41 changes: 41 additions & 0 deletions test/unit/extract/csv_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Unit tests for pudl.extract.csv module."""
from unittest.mock import MagicMock, patch

from pudl.extract.csv import CsvExtractor

TABLE_NAME = "company"

FILENAME = "all_company_176.csv"
TABLE_FILE_MAP = {TABLE_NAME: FILENAME}

DATASET = "eia176"


class FakeCsvExtractor(CsvExtractor):
DATASET = DATASET


def get_csv_extractor():
datastore = MagicMock()
return FakeCsvExtractor(datastore)


@patch("pudl.extract.csv.pd")
def test_csv_extractor_read_source(mock_pd):
extractor = get_csv_extractor()
res = extractor.read_source(FILENAME)
mock_zipfile = extractor._zipfile
mock_zipfile.open.assert_called_once_with(FILENAME)
f = mock_zipfile.open.return_value.__enter__.return_value
mock_pd.read_csv.assert_called_once_with(f)
df = mock_pd.read_csv()
assert df == res


def test_csv_extractor_extract():
extractor = get_csv_extractor()
df = MagicMock()
with patch.object(CsvExtractor, "read_source", return_value=df) as mock_read_source:
raw_dfs = extractor.extract()
mock_read_source.assert_called_once_with(FILENAME)
assert {TABLE_NAME: df} == raw_dfs