diff --git a/src/pudl/etl/__init__.py b/src/pudl/etl/__init__.py index 029600cbe9..d6fb0035c2 100644 --- a/src/pudl/etl/__init__.py +++ b/src/pudl/etl/__init__.py @@ -34,6 +34,7 @@ default_assets = ( *load_assets_from_modules([eia_bulk_elec_assets], group_name="core_eia_bulk_elec"), *load_assets_from_modules([epacems_assets], group_name="core_epacems"), + *load_assets_from_modules([pudl.extract.eia176], group_name="raw_eia176"), *load_assets_from_modules([pudl.extract.eia860], group_name="raw_eia860"), *load_assets_from_modules([pudl.transform.eia860], group_name="_core_eia860"), *load_assets_from_modules([pudl.extract.eia861], group_name="raw_eia861"), diff --git a/src/pudl/extract/__init__.py b/src/pudl/extract/__init__.py index a25e4b6fd2..7e889c6d0c 100644 --- a/src/pudl/extract/__init__.py +++ b/src/pudl/extract/__init__.py @@ -8,6 +8,7 @@ :mod:`pudl.transform` subpackage. """ from . import ( + eia176, eia860, eia860m, eia861, diff --git a/src/pudl/extract/csv.py b/src/pudl/extract/csv.py new file mode 100644 index 0000000000..af7718e8c0 --- /dev/null +++ b/src/pudl/extract/csv.py @@ -0,0 +1,72 @@ +"""Extractor for CSV data.""" +from csv import DictReader +from importlib import resources +from zipfile import ZipFile + +import pandas as pd + +import pudl.logging_helpers + +logger = pudl.logging_helpers.get_logger(__name__) + + +def open_csv_resource(dataset: str, base_filename: str) -> DictReader: + """Open the given resource file as :class:`csv.DictReader`. + + Args: + dataset: used to load metadata from package_data/{dataset} subdirectory. + base_filename: the name of the file in the subdirectory to open. + """ + csv_path = resources.files(f"pudl.package_data.{dataset}") / base_filename + return DictReader(csv_path.open()) + + +def get_table_file_map(dataset: str) -> dict[str, str]: + """Return a dictionary of table names and filenames for the dataset. + + Args: + dataset: used to load metadata from package_data/{dataset} subdirectory. + """ + return { + row["table"]: row["filename"] + for row in open_csv_resource(dataset, "table_file_map.csv") + } + + +class CsvExtractor: + """Generalized class for extracting dataframes from CSV files. + + The extraction logic is invoked by calling extract() method of this class. + """ + + def __init__(self, zipfile: ZipFile, table_file_map: dict[str, str]): + """Create a new instance of CsvExtractor. + + This can be used for retrieving data from CSV files. + + Args: + zipfile: zipfile object containing source files + table_file_map: map of table name to source file in zipfile archive + """ + self._zipfile = zipfile + self._table_file_map = table_file_map + + def get_table_names(self) -> list[str]: + """Returns list of tables that this extractor provides access to.""" + return list(self._table_file_map) + + def extract_one(self, table_name: str) -> pd.DataFrame: + """Read the data from the CSV source file and return as a dataframe.""" + logger.info(f"Extracting {table_name} from CSV into pandas DataFrame.") + filename = self._table_file_map[table_name] + with self._zipfile.open(filename) as f: + df = pd.read_csv(f) + return df + + def extract_all(self) -> dict[str, pd.DataFrame]: + """Extracts a dictionary of table names and dataframes from CSV source files.""" + data = {} + for table_name in self.get_table_names(): + df = self.extract_one(table_name) + data[table_name] = df + return data diff --git a/src/pudl/extract/dbf.py b/src/pudl/extract/dbf.py index 1c296aaea6..e34602e19a 100644 --- a/src/pudl/extract/dbf.py +++ b/src/pudl/extract/dbf.py @@ -160,7 +160,7 @@ def get_table_schema(self, table_name: str) -> DbfTableSchema: table_columns = self.get_db_schema()[table_name] dbf = self.get_table_dbf(table_name) dbf_fields = [field for field in dbf.fields if field.name != "_NullFlags"] - if len(table_columns) != len(table_columns): + if len(dbf_fields) != len(table_columns): return ValueError( f"Number of DBF fields in {table_name} does not match what was " f"found in the DBC index file for {self.partition}." diff --git a/src/pudl/extract/eia176.py b/src/pudl/extract/eia176.py new file mode 100644 index 0000000000..aeab25026f --- /dev/null +++ b/src/pudl/extract/eia176.py @@ -0,0 +1,26 @@ +"""Extract EIA Form 176 data from CSVs. + +The EIA Form 176 archive also contains CSVs for EIA Form 191 and EIA Form 757. +""" + +from dagster import asset + +from pudl.extract.csv import CsvExtractor, get_table_file_map + +DATASET = "eia176" + + +@asset(required_resource_keys={"datastore"}) +def raw_eia176__company(context): + """Extract raw EIA company data from CSV sheets into dataframes. + + Args: + context: dagster keyword that provides access to resources and config. + + Returns: + An extracted EIA dataframe with company data. + """ + zipfile = context.resources.datastore.get_zipfile_resource(DATASET) + table_file_map = get_table_file_map(DATASET) + extractor = CsvExtractor(zipfile, table_file_map) + extractor.extract_one("company") diff --git a/src/pudl/package_data/eia176/table_file_map.csv b/src/pudl/package_data/eia176/table_file_map.csv new file mode 100644 index 0000000000..343bdb33c0 --- /dev/null +++ b/src/pudl/package_data/eia176/table_file_map.csv @@ -0,0 +1,2 @@ +table,filename +company,all_company_176.csv diff --git a/src/pudl/workspace/datastore.py b/src/pudl/workspace/datastore.py index 144c187aeb..2064edf879 100644 --- a/src/pudl/workspace/datastore.py +++ b/src/pudl/workspace/datastore.py @@ -171,6 +171,7 @@ class ZenodoDoiSettings(BaseSettings): """Digital Object Identifiers pointing to currently used Zenodo archives.""" censusdp1tract: ZenodoDoi = "10.5281/zenodo.4127049" + eia176: ZenodoDoi = "10.5281/zenodo.7682358" eia860: ZenodoDoi = "10.5281/zenodo.10067566" eia860m: ZenodoDoi = "10.5281/zenodo.10204686" eia861: ZenodoDoi = "10.5281/zenodo.10204708" diff --git a/test/integration/etl_test.py b/test/integration/etl_test.py index affa1d813b..4ab7887409 100644 --- a/test/integration/etl_test.py +++ b/test/integration/etl_test.py @@ -75,6 +75,20 @@ def test_ferc1_xbrl2sqlite(ferc1_engine_xbrl: sa.Engine, ferc1_xbrl_taxonomy_met ) +class TestCsvExtractor: + """Verify that we can lead CSV files as provided via the datastore.""" + + def test_extract_eia176(self, pudl_datastore_fixture): + """Spot check extraction of eia176 csv files.""" + dataset = "eia176" + zipfile = pudl_datastore_fixture.get_zipfile_resource(dataset) + table_file_map = pudl.extract.csv.get_table_file_map(dataset) + extractor = pudl.extract.csv.CsvExtractor(zipfile, table_file_map) + table = "company" + if table not in extractor.extract_all(): + raise AssertionError(f"table {table} not found in datastore") + + class TestExcelExtractor: """Verify that we can lead excel files as provided via the datastore.""" diff --git a/test/unit/extract/csv_test.py b/test/unit/extract/csv_test.py new file mode 100644 index 0000000000..2b07b0bf30 --- /dev/null +++ b/test/unit/extract/csv_test.py @@ -0,0 +1,52 @@ +"""Unit tests for pudl.extract.csv module.""" +from unittest.mock import MagicMock, patch + +from pudl.extract.csv import CsvExtractor, get_table_file_map, open_csv_resource + +DATASET = "eia176" +BASE_FILENAME = "table_file_map.csv" +TABLE_NAME = "company" +FILENAME = "all_company_176.csv" +TABLE_FILE_MAP = {TABLE_NAME: FILENAME} + + +def get_csv_extractor(): + zipfile = MagicMock() + return CsvExtractor(zipfile, TABLE_FILE_MAP) + + +def test_open_csv_resource(): + csv_resource = open_csv_resource(DATASET, BASE_FILENAME) + assert ["table", "filename"] == csv_resource.fieldnames + + +def test_get_table_file_map(): + table_file_map = get_table_file_map(DATASET) + assert table_file_map == TABLE_FILE_MAP + + +def test_get_table_names(): + extractor = get_csv_extractor() + table_names = extractor.get_table_names() + assert [TABLE_NAME] == table_names + + +@patch("pudl.extract.csv.pd") +def test_csv_extractor_read_source(mock_pd): + extractor = get_csv_extractor() + res = extractor.extract_one(TABLE_NAME) + mock_zipfile = extractor._zipfile + mock_zipfile.open.assert_called_once_with(FILENAME) + f = mock_zipfile.open.return_value.__enter__.return_value + mock_pd.read_csv.assert_called_once_with(f) + df = mock_pd.read_csv() + assert df == res + + +def test_csv_extractor_extract(): + extractor = get_csv_extractor() + df = MagicMock() + with patch.object(CsvExtractor, "extract_one", return_value=df) as mock_read_source: + raw_dfs = extractor.extract_all() + mock_read_source.assert_called_once_with(TABLE_NAME) + assert {TABLE_NAME: df} == raw_dfs