Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start EIA-176 pipelines: company data #2949

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
826a77a
Start of reusable CSV extractor, incorporating preexisting patterns a…
Oct 6, 2023
bc6eddf
Table schema and archive objects for CSV extraction, pipeline-/form-s…
davidmudrauskas Oct 30, 2023
ff6b5bf
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Oct 30, 2023
9903674
Unit tests for CsvTableSchema
davidmudrauskas Nov 3, 2023
3a0bfe2
Full unit test coverage for CSV extractor
davidmudrauskas Nov 4, 2023
1fb52e9
Follow patterns for clobber and test file names, implement delete_sch…
davidmudrauskas Nov 6, 2023
8dbd975
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Nov 6, 2023
1531313
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Nov 16, 2023
9aff0a8
Update CSV extractor to just return dataframes, integrate with Dagster
davidmudrauskas Nov 16, 2023
caaa212
Combine thin CSV extraction-related class, update tests
davidmudrauskas Nov 16, 2023
fe3fbb7
Remove extraneous files, undo find-replace error
davidmudrauskas Nov 16, 2023
0b703c6
Extract one table using CSV extractor
davidmudrauskas Nov 17, 2023
cb8e7e1
Move managing zipfile and table-file map to CSV extractor client, sim…
davidmudrauskas Nov 17, 2023
4dc4ad9
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Nov 21, 2023
87b7c51
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Nov 22, 2023
82504f0
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Nov 27, 2023
8f4d93e
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Dec 1, 2023
35de6a8
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 1, 2023
35fabe6
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Dec 6, 2023
07b48f3
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Jan 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/pudl/etl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
default_assets = (
*load_assets_from_modules([eia_bulk_elec_assets], group_name="eia_bulk_elec"),
*load_assets_from_modules([epacems_assets], group_name="epacems"),
*load_assets_from_modules([pudl.extract.eia176], group_name="raw_eia176"),
*load_assets_from_modules([pudl.extract.eia860], group_name="raw_eia860"),
*load_assets_from_modules([pudl.transform.eia860], group_name="_core_eia860"),
*load_assets_from_modules([pudl.extract.eia861], group_name="raw_eia861"),
Expand Down
1 change: 1 addition & 0 deletions src/pudl/extract/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
:mod:`pudl.transform` subpackage.
"""
from . import (
eia176,
eia860,
eia860m,
eia861,
Expand Down
72 changes: 72 additions & 0 deletions src/pudl/extract/csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""Extractor for CSV data."""
from csv import DictReader
from importlib import resources
from zipfile import ZipFile

import pandas as pd

import pudl.logging_helpers

logger = pudl.logging_helpers.get_logger(__name__)


def open_csv_resource(dataset: str, base_filename: str) -> DictReader:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be moved to an even more general space at some point but I didn't want to introduce another moving part in this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I could see this moving to pudl.helpers or something but this is a fine place for it.

"""Open the given resource file as :class:`csv.DictReader`.

Args:
dataset: used to load metadata from package_data/{dataset} subdirectory.
base_filename: the name of the file in the subdirectory to open.
"""
csv_path = resources.files(f"pudl.package_data.{dataset}") / base_filename
return DictReader(csv_path.open())


def get_table_file_map(dataset: str) -> dict[str, str]:
"""Return a dictionary of table names and filenames for the dataset.

Args:
dataset: used to load metadata from package_data/{dataset} subdirectory.
"""
return {
row["table"]: row["filename"]
for row in open_csv_resource(dataset, "table_file_map.csv")
}


class CsvExtractor:
davidmudrauskas marked this conversation as resolved.
Show resolved Hide resolved
"""Generalized class for extracting dataframes from CSV files.

The extraction logic is invoked by calling extract() method of this class.
"""

def __init__(self, zipfile: ZipFile, table_file_map: dict[str, str]):
"""Create a new instance of CsvExtractor.

This can be used for retrieving data from CSV files.

Args:
zipfile: zipfile object containing source files
table_file_map: map of table name to source file in zipfile archive
"""
self._zipfile = zipfile
self._table_file_map = table_file_map
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're worried about the table file map and the zipfile not matching up, you could validate that the files in table_file_map do show up in zipfile.namelist(). Doesn't have to happen in this pass, but should probably happen at some point.


def get_table_names(self) -> list[str]:
"""Returns list of tables that this extractor provides access to."""
return list(self._table_file_map)

def extract_one(self, table_name: str) -> pd.DataFrame:
"""Read the data from the CSV source file and return as a dataframe."""
logger.info(f"Extracting {table_name} from CSV into pandas DataFrame.")
filename = self._table_file_map[table_name]
with self._zipfile.open(filename) as f:
df = pd.read_csv(f)
return df

def extract_all(self) -> dict[str, pd.DataFrame]:
"""Extracts a dictionary of table names and dataframes from CSV source files."""
data = {}
for table_name in self.get_table_names():
df = self.extract_one(table_name)
data[table_name] = df
return data
2 changes: 1 addition & 1 deletion src/pudl/extract/dbf.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def get_table_schema(self, table_name: str) -> DbfTableSchema:
table_columns = self.get_db_schema()[table_name]
dbf = self.get_table_dbf(table_name)
dbf_fields = [field for field in dbf.fields if field.name != "_NullFlags"]
if len(table_columns) != len(table_columns):
if len(dbf_fields) != len(table_columns):
davidmudrauskas marked this conversation as resolved.
Show resolved Hide resolved
return ValueError(
f"Number of DBF fields in {table_name} does not match what was "
f"found in the DBC index file for {self.partition}."
Expand Down
26 changes: 26 additions & 0 deletions src/pudl/extract/eia176.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Extract EIA Form 176 data from CSVs.

The EIA Form 176 archive also contains CSVs for EIA Form 191 and EIA Form 757.
davidmudrauskas marked this conversation as resolved.
Show resolved Hide resolved
"""

from dagster import asset

from pudl.extract.csv import CsvExtractor, get_table_file_map

DATASET = "eia176"


@asset(required_resource_keys={"datastore"})
def raw_eia176__company(context):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote a concrete asset like this for now, after trying to get a factory pattern down. Happy to learn more about the Dagster components and write a factory at some point.

"""Extract raw EIA company data from CSV sheets into dataframes.

Args:
context: dagster keyword that provides access to resources and config.

Returns:
An extracted EIA dataframe with company data.
"""
zipfile = context.resources.datastore.get_zipfile_resource(DATASET)
table_file_map = get_table_file_map(DATASET)
extractor = CsvExtractor(zipfile, table_file_map)
extractor.extract_one("company")
2 changes: 2 additions & 0 deletions src/pudl/package_data/eia176/table_file_map.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
table,filename
company,all_company_176.csv
davidmudrauskas marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions src/pudl/workspace/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ class ZenodoDoiSettings(BaseSettings):
# Sandbox DOIs are provided for reference
censusdp1tract: ZenodoDoi = "10.5281/zenodo.4127049"
# censusdp1tract: ZenodoDoi = "10.5072/zenodo.674992"
eia176: ZenodoDoi = "10.5281/zenodo.7682358"
# eia176: ZenodoDoi - "10.5072/zenodo.1166385"
eia860: ZenodoDoi = "10.5281/zenodo.10067566"
# eia860: ZenodoDoi = "10.5072/zenodo.1222854"
eia860m: ZenodoDoi = "10.5281/zenodo.8188017"
Expand Down
14 changes: 14 additions & 0 deletions test/integration/etl_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,20 @@ def test_ferc1_xbrl2sqlite(ferc1_engine_xbrl, ferc1_xbrl_taxonomy_metadata):
)


class TestCsvExtractor:
"""Verify that we can lead CSV files as provided via the datastore."""

def test_extract_eia176(self, pudl_datastore_fixture):
davidmudrauskas marked this conversation as resolved.
Show resolved Hide resolved
"""Spot check extraction of eia176 csv files."""
dataset = "eia176"
zipfile = pudl_datastore_fixture.get_zipfile_resource(dataset)
table_file_map = pudl.extract.csv.get_table_file_map(dataset)
extractor = pudl.extract.csv.CsvExtractor(zipfile, table_file_map)
table = "company"
if table not in extractor.extract_all():
raise AssertionError(f"table {table} not found in datastore")


class TestExcelExtractor:
"""Verify that we can lead excel files as provided via the datastore."""

Expand Down
52 changes: 52 additions & 0 deletions test/unit/extract/csv_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Unit tests for pudl.extract.csv module."""
from unittest.mock import MagicMock, patch

from pudl.extract.csv import CsvExtractor, get_table_file_map, open_csv_resource

DATASET = "eia176"
BASE_FILENAME = "table_file_map.csv"
TABLE_NAME = "company"
FILENAME = "all_company_176.csv"
TABLE_FILE_MAP = {TABLE_NAME: FILENAME}


def get_csv_extractor():
zipfile = MagicMock()
return CsvExtractor(zipfile, TABLE_FILE_MAP)


def test_open_csv_resource():
csv_resource = open_csv_resource(DATASET, BASE_FILENAME)
assert ["table", "filename"] == csv_resource.fieldnames


def test_get_table_file_map():
table_file_map = get_table_file_map(DATASET)
assert table_file_map == TABLE_FILE_MAP


def test_get_table_names():
extractor = get_csv_extractor()
table_names = extractor.get_table_names()
assert [TABLE_NAME] == table_names


@patch("pudl.extract.csv.pd")
def test_csv_extractor_read_source(mock_pd):
extractor = get_csv_extractor()
res = extractor.extract_one(TABLE_NAME)
mock_zipfile = extractor._zipfile
mock_zipfile.open.assert_called_once_with(FILENAME)
f = mock_zipfile.open.return_value.__enter__.return_value
mock_pd.read_csv.assert_called_once_with(f)
df = mock_pd.read_csv()
assert df == res


def test_csv_extractor_extract():
extractor = get_csv_extractor()
df = MagicMock()
with patch.object(CsvExtractor, "extract_one", return_value=df) as mock_read_source:
raw_dfs = extractor.extract_all()
mock_read_source.assert_called_once_with(TABLE_NAME)
assert {TABLE_NAME: df} == raw_dfs