Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start EIA-176 pipelines: company data #2949

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
826a77a
Start of reusable CSV extractor, incorporating preexisting patterns a…
Oct 6, 2023
bc6eddf
Table schema and archive objects for CSV extraction, pipeline-/form-s…
davidmudrauskas Oct 30, 2023
ff6b5bf
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Oct 30, 2023
9903674
Unit tests for CsvTableSchema
davidmudrauskas Nov 3, 2023
3a0bfe2
Full unit test coverage for CSV extractor
davidmudrauskas Nov 4, 2023
1fb52e9
Follow patterns for clobber and test file names, implement delete_sch…
davidmudrauskas Nov 6, 2023
8dbd975
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Nov 6, 2023
1531313
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Nov 16, 2023
9aff0a8
Update CSV extractor to just return dataframes, integrate with Dagster
davidmudrauskas Nov 16, 2023
caaa212
Combine thin CSV extraction-related class, update tests
davidmudrauskas Nov 16, 2023
fe3fbb7
Remove extraneous files, undo find-replace error
davidmudrauskas Nov 16, 2023
0b703c6
Extract one table using CSV extractor
davidmudrauskas Nov 17, 2023
cb8e7e1
Move managing zipfile and table-file map to CSV extractor client, sim…
davidmudrauskas Nov 17, 2023
4dc4ad9
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Nov 21, 2023
87b7c51
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Nov 22, 2023
82504f0
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Nov 27, 2023
8f4d93e
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Dec 1, 2023
35de6a8
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 1, 2023
35fabe6
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Dec 6, 2023
07b48f3
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Jan 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 153 additions & 0 deletions src/pudl/extract/csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
"""Extractor for CSV data."""
from csv import DictReader
from functools import lru_cache
from importlib import resources
from pathlib import Path
from zipfile import ZipFile

import pandas as pd
import sqlalchemy as sa

from pudl import logging_helpers
from pudl.workspace.datastore import Datastore

logger = logging_helpers.get_logger(__name__)


class CsvReader:
davidmudrauskas marked this conversation as resolved.
Show resolved Hide resolved
"""Wrapper to provide standardized access to CSV files."""

def __init__(self, datastore: Datastore, dataset: str):
"""Create a new instance of CsvReader.

This can be used for retrieving data from CSV files.

Args:
datastore: provides access to raw files on disk.
dataset: name of the dataset (e.g. eia176), this is used to load metadata
from package_data/{dataset} subdirectory.
"""
self.datastore = datastore
self.dataset = dataset
self._table_file_map = {}
for row in self._open_csv_resource("table_file_map.csv"):
self._table_file_map[row["table"]] = row["filename"]

def _open_csv_resource(self, base_filename: str) -> DictReader:
"""Open the given resource file as :class:`csv.DictReader`."""
csv_path = resources.files(f"pudl.package_data.{self.dataset}") / base_filename
return DictReader(csv_path.open())

def get_table_names(self) -> list[str]:
"""Returns list of tables that this datastore provides access to."""
return list(self._table_file_map)

@lru_cache
def _cache_zipfile(self) -> ZipFile:
"""Returns a ZipFile instance corresponding to the dataset."""
return self.datastore.get_zipfile_resource(self.dataset)

def read(self, filename: str) -> pd.DataFrame:
"""Read the data from the CSV source and return as dataframes."""
logger.info(f"Extracting {filename} from CSV into pandas DataFrame.")
zipfile = self._cache_zipfile()
with zipfile.open(filename) as f:
# TODO: Define encoding
df = pd.read_csv(f)
return df


class CsvExtractor:
davidmudrauskas marked this conversation as resolved.
Show resolved Hide resolved
"""Generalized class for loading data from CSV files into tables into SQLAlchemy.

When subclassing from this generic extractor, one should implement dataset specific
logic in the following manner:

1. Set DATABASE_NAME class attribute. This controls what filename is used for the output
sqlite database.
2. Set DATASET class attribute. This is used to load metadata from package_data/{dataset} subdirectory.

Dataset specific logic and transformations can be injected by overriding:

# TODO: Update the details here to align with functions in this class
1. finalize_schema() in order to modify sqlite schema. This is called just before
the schema is written into the sqlite database. This is good place for adding
primary and/or foreign key constraints to tables.
2. aggregate_table_frames() is responsible for concatenating individual data frames
(one par input partition) into single one. This is where deduplication can take place.
3. transform_table(table_name, df) will be invoked after dataframe is loaded from
the foxpro database and before it's written to sqlite. This is good place for
table-specific preprocessing and/or cleanup.
4. postprocess() is called after data is written to sqlite. This can be used for
database level final cleanup and transformations (e.g. injecting missing
respondent_ids).

The extraction logic is invoked by calling execute() method of this class.
"""

# TODO: Reconcile this with the above
"""This represents an ETL pipeling (as opposed to an ELT pipeline), since we're more interested in transformed output and don't want to commit a lot of space to persisting raw data
Transformation mainly occurs just before loading (aside from more circumstantial pre- or postprocessing that's introduced).
"""

DATABASE_NAME = None
DATASET = None

def __init__(self, datastore: Datastore, output_path: Path):
"""Constructs new instance of CsvExtractor.

Args:
datastore: top-level datastore instance for accessing raw data files.
output_path: directory where the output databases should be stored.
# TODO: Consider including this for consistency
clobber: if True, existing databases should be replaced.
"""
self.sqlite_engine = sa.create_engine(self.get_db_path())
self.sqlite_meta = sa.MetaData()
self.output_path = output_path
self.csv_reader = self.get_csv_reader(datastore)

def get_db_path(self) -> str:
"""Returns the connection string for the sqlite database."""
db_path = str(Path(self.output_path) / self.DATABASE_NAME)
return f"sqlite:///{db_path}"

def get_csv_reader(self, datastore: Datastore):
"""Returns instance of CsvReader to access the data."""
return CsvReader(datastore, dataset=self.DATASET)

def execute(self):
"""Runs the extraction of the data from csv to sqlite."""
self.delete_schema()
self.create_sqlite_tables()
self.load_table_data()
self.postprocess()

def delete_schema(self):
# TODO: Implement or extract from dbf.py to more general space. Take a pass at reconciling with excel.py
"""Drops all tables from the existing sqlite database."""
pass

def create_sqlite_tables(self):
# TODO: Implement or extract from dbf.py to more general space. Take a pass at reconciling with excel.py
"""Creates database schema based on the input tables."""
pass

def load_table_data(self) -> None:
"""Extracts and loads csv data into sqlite."""
for table in self.csv_reader.get_table_names():
df = self.csv_reader.read(table)
coltypes = {col.name: col.type for col in self.sqlite_meta.tables[table].c}
logger.info(f"SQLite: loading {len(df)} rows into {table}.")
df.to_sql(
table,
self.sqlite_engine,
if_exists="append",
chunksize=100000,
dtype=coltypes,
index=False,
)

def postprocess(self):
"""This method is called after all the data is loaded into sqlite to transform raw data to targets."""
pass
47 changes: 47 additions & 0 deletions src/pudl/extract/eia176.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Extract EIA Form 176 data from CSVs.

The EIA Form 176 archive also contains CSVs for EIA Form 191 and EIA Form 757.
davidmudrauskas marked this conversation as resolved.
Show resolved Hide resolved
"""

from pudl.extract.csv import CsvExtractor


class Eia176CsvExtractor(CsvExtractor):
"""Extractor for EIA Form 176 data."""

DATASET = "eia176"
DATABASE_NAME = "eia176.sqlite"


# TODO: This was an alternative avenue of exploration; reconcile, clean-up
# def extract_eia176(context):
# """Extract raw EIA data from excel sheets into dataframes.
#
# Args:
# context: dagster keyword that provides access to resources and config.
#
# Returns:
# A tuple of extracted EIA dataframes.
# """
# ds = context.resources.datastore
# # TODO: Should I use this?
# eia_settings = context.resources.dataset_settings.eia
#
# for filename in EIA176_FILES:
# raw_df = pudl.extract.csv.Extractor(ds).extract(
# year_month=eia860m_date
# )
# eia860_raw_dfs = pudl.extract.eia860m.append_eia860m(
# eia860_raw_dfs=eia860_raw_dfs, eia860m_raw_dfs=eia860m_raw_dfs
# )
#
# # create descriptive table_names
# eia860_raw_dfs = {
# "raw_eia860__" + table_name: df for table_name, df in eia860_raw_dfs.items()
# }
# eia860_raw_dfs = dict(sorted(eia860_raw_dfs.items()))
#
# return (
# Output(output_name=table_name, value=df)
# for table_name, df in eia860_raw_dfs.items()
# )
1 change: 1 addition & 0 deletions src/pudl/extract/eia191.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Extract EIA Form 191 data from CSVs."""
1 change: 1 addition & 0 deletions src/pudl/extract/eia757.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Extract EIA Form 757 data from CSVs."""
6 changes: 6 additions & 0 deletions src/pudl/package_data/eia176/table_file_map.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
table,filename
e176,all_data_176.csv
e176_company,all_company_176.csv
e176_other,all_other_176.csv
e191,all_data_191.csv
e757,all_data_757.csv
2 changes: 2 additions & 0 deletions src/pudl/workspace/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ class ZenodoDoiSettings(BaseSettings):
# Sandbox DOIs are provided for reference
censusdp1tract: ZenodoDoi = "10.5281/zenodo.4127049"
# censusdp1tract: ZenodoDoi = "10.5072/zenodo.674992"
eia176: ZenodoDoi = "10.5281/zenodo.7682358"
# eia176: ZenodoDoi - "10.5072/zenodo.1166385"
eia860: ZenodoDoi = "10.5281/zenodo.8164776"
# eia860: ZenodoDoi = "10.5072/zenodo.1222854"
eia860m: ZenodoDoi = "10.5281/zenodo.8188017"
Expand Down
Loading