From 826a77ad1e01fb64075227999baf9607685a3a90 Mon Sep 17 00:00:00 2001 From: David Mudrauskas Date: Thu, 5 Oct 2023 23:28:59 -0400 Subject: [PATCH 01/11] Start of reusable CSV extractor, incorporating preexisting patterns and options --- src/pudl/extract/csv.py | 153 ++++++++++++++++++ src/pudl/extract/eia176.py | 47 ++++++ src/pudl/extract/eia191.py | 1 + src/pudl/extract/eia757.py | 1 + .../package_data/eia176/table_file_map.csv | 6 + src/pudl/workspace/datastore.py | 2 + 6 files changed, 210 insertions(+) create mode 100644 src/pudl/extract/csv.py create mode 100644 src/pudl/extract/eia176.py create mode 100644 src/pudl/extract/eia191.py create mode 100644 src/pudl/extract/eia757.py create mode 100644 src/pudl/package_data/eia176/table_file_map.csv diff --git a/src/pudl/extract/csv.py b/src/pudl/extract/csv.py new file mode 100644 index 0000000000..98cd98845d --- /dev/null +++ b/src/pudl/extract/csv.py @@ -0,0 +1,153 @@ +"""Extractor for CSV data.""" +from csv import DictReader +from functools import lru_cache +from importlib import resources +from pathlib import Path +from zipfile import ZipFile + +import pandas as pd +import sqlalchemy as sa + +from pudl import logging_helpers +from pudl.workspace.datastore import Datastore + +logger = logging_helpers.get_logger(__name__) + + +class CsvReader: + """Wrapper to provide standardized access to CSV files.""" + + def __init__(self, datastore: Datastore, dataset: str): + """Create a new instance of CsvReader. + + This can be used for retrieving data from CSV files. + + Args: + datastore: provides access to raw files on disk. + dataset: name of the dataset (e.g. eia176), this is used to load metadata + from package_data/{dataset} subdirectory. + """ + self.datastore = datastore + self.dataset = dataset + self._table_file_map = {} + for row in self._open_csv_resource("table_file_map.csv"): + self._table_file_map[row["table"]] = row["filename"] + + def _open_csv_resource(self, base_filename: str) -> DictReader: + """Open the given resource file as :class:`csv.DictReader`.""" + csv_path = resources.files(f"pudl.package_data.{self.dataset}") / base_filename + return DictReader(csv_path.open()) + + def get_table_names(self) -> list[str]: + """Returns list of tables that this datastore provides access to.""" + return list(self._table_file_map) + + @lru_cache + def _cache_zipfile(self) -> ZipFile: + """Returns a ZipFile instance corresponding to the dataset.""" + return self.datastore.get_zipfile_resource(self.dataset) + + def read(self, filename: str) -> pd.DataFrame: + """Read the data from the CSV source and return as dataframes.""" + logger.info(f"Extracting {filename} from CSV into pandas DataFrame.") + zipfile = self._cache_zipfile() + with zipfile.open(filename) as f: + # TODO: Define encoding + df = pd.read_csv(f) + return df + + +class CsvExtractor: + """Generalized class for loading data from CSV files into tables into SQLAlchemy. + + When subclassing from this generic extractor, one should implement dataset specific + logic in the following manner: + + 1. Set DATABASE_NAME class attribute. This controls what filename is used for the output + sqlite database. + 2. Set DATASET class attribute. This is used to load metadata from package_data/{dataset} subdirectory. + + Dataset specific logic and transformations can be injected by overriding: + + # TODO: Update the details here to align with functions in this class + 1. finalize_schema() in order to modify sqlite schema. This is called just before + the schema is written into the sqlite database. This is good place for adding + primary and/or foreign key constraints to tables. + 2. aggregate_table_frames() is responsible for concatenating individual data frames + (one par input partition) into single one. This is where deduplication can take place. + 3. transform_table(table_name, df) will be invoked after dataframe is loaded from + the foxpro database and before it's written to sqlite. This is good place for + table-specific preprocessing and/or cleanup. + 4. postprocess() is called after data is written to sqlite. This can be used for + database level final cleanup and transformations (e.g. injecting missing + respondent_ids). + + The extraction logic is invoked by calling execute() method of this class. + """ + + # TODO: Reconcile this with the above + """This represents an ETL pipeling (as opposed to an ELT pipeline), since we're more interested in transformed output and don't want to commit a lot of space to persisting raw data + Transformation mainly occurs just before loading (aside from more circumstantial pre- or postprocessing that's introduced). + """ + + DATABASE_NAME = None + DATASET = None + + def __init__(self, datastore: Datastore, output_path: Path): + """Constructs new instance of CsvExtractor. + + Args: + datastore: top-level datastore instance for accessing raw data files. + output_path: directory where the output databases should be stored. + # TODO: Consider including this for consistency + clobber: if True, existing databases should be replaced. + """ + self.sqlite_engine = sa.create_engine(self.get_db_path()) + self.sqlite_meta = sa.MetaData() + self.output_path = output_path + self.csv_reader = self.get_csv_reader(datastore) + + def get_db_path(self) -> str: + """Returns the connection string for the sqlite database.""" + db_path = str(Path(self.output_path) / self.DATABASE_NAME) + return f"sqlite:///{db_path}" + + def get_csv_reader(self, datastore: Datastore): + """Returns instance of CsvReader to access the data.""" + return CsvReader(datastore, dataset=self.DATASET) + + def execute(self): + """Runs the extraction of the data from csv to sqlite.""" + self.delete_schema() + self.create_sqlite_tables() + self.load_table_data() + self.postprocess() + + def delete_schema(self): + # TODO: Implement or extract from dbf.py to more general space. Take a pass at reconciling with excel.py + """Drops all tables from the existing sqlite database.""" + pass + + def create_sqlite_tables(self): + # TODO: Implement or extract from dbf.py to more general space. Take a pass at reconciling with excel.py + """Creates database schema based on the input tables.""" + pass + + def load_table_data(self) -> None: + """Extracts and loads csv data into sqlite.""" + for table in self.csv_reader.get_table_names(): + df = self.csv_reader.read(table) + coltypes = {col.name: col.type for col in self.sqlite_meta.tables[table].c} + logger.info(f"SQLite: loading {len(df)} rows into {table}.") + df.to_sql( + table, + self.sqlite_engine, + if_exists="append", + chunksize=100000, + dtype=coltypes, + index=False, + ) + + def postprocess(self): + """This method is called after all the data is loaded into sqlite to transform raw data to targets.""" + pass diff --git a/src/pudl/extract/eia176.py b/src/pudl/extract/eia176.py new file mode 100644 index 0000000000..d36540d380 --- /dev/null +++ b/src/pudl/extract/eia176.py @@ -0,0 +1,47 @@ +"""Extract EIA Form 176 data from CSVs. + +The EIA Form 176 archive also contains CSVs for EIA Form 191 and EIA Form 757. +""" + +from pudl.extract.csv import CsvExtractor + + +class Eia176CsvExtractor(CsvExtractor): + """Extractor for EIA Form 176 data.""" + + DATASET = "eia176" + DATABASE_NAME = "eia176.sqlite" + + +# TODO: This was an alternative avenue of exploration; reconcile, clean-up +# def extract_eia176(context): +# """Extract raw EIA data from excel sheets into dataframes. +# +# Args: +# context: dagster keyword that provides access to resources and config. +# +# Returns: +# A tuple of extracted EIA dataframes. +# """ +# ds = context.resources.datastore +# # TODO: Should I use this? +# eia_settings = context.resources.dataset_settings.eia +# +# for filename in EIA176_FILES: +# raw_df = pudl.extract.csv.Extractor(ds).extract( +# year_month=eia860m_date +# ) +# eia860_raw_dfs = pudl.extract.eia860m.append_eia860m( +# eia860_raw_dfs=eia860_raw_dfs, eia860m_raw_dfs=eia860m_raw_dfs +# ) +# +# # create descriptive table_names +# eia860_raw_dfs = { +# "raw_eia860__" + table_name: df for table_name, df in eia860_raw_dfs.items() +# } +# eia860_raw_dfs = dict(sorted(eia860_raw_dfs.items())) +# +# return ( +# Output(output_name=table_name, value=df) +# for table_name, df in eia860_raw_dfs.items() +# ) diff --git a/src/pudl/extract/eia191.py b/src/pudl/extract/eia191.py new file mode 100644 index 0000000000..7d1b977a97 --- /dev/null +++ b/src/pudl/extract/eia191.py @@ -0,0 +1 @@ +"""Extract EIA Form 191 data from CSVs.""" diff --git a/src/pudl/extract/eia757.py b/src/pudl/extract/eia757.py new file mode 100644 index 0000000000..e5d1fc8f92 --- /dev/null +++ b/src/pudl/extract/eia757.py @@ -0,0 +1 @@ +"""Extract EIA Form 757 data from CSVs.""" diff --git a/src/pudl/package_data/eia176/table_file_map.csv b/src/pudl/package_data/eia176/table_file_map.csv new file mode 100644 index 0000000000..984b7a8b8e --- /dev/null +++ b/src/pudl/package_data/eia176/table_file_map.csv @@ -0,0 +1,6 @@ +table,filename +e176,all_data_176.csv +e176_company,all_company_176.csv +e176_other,all_other_176.csv +e191,all_data_191.csv +e757,all_data_757.csv \ No newline at end of file diff --git a/src/pudl/workspace/datastore.py b/src/pudl/workspace/datastore.py index 50acc8729c..ca9bb8793d 100644 --- a/src/pudl/workspace/datastore.py +++ b/src/pudl/workspace/datastore.py @@ -158,6 +158,8 @@ class ZenodoDoiSettings(BaseSettings): # Sandbox DOIs are provided for reference censusdp1tract: ZenodoDoi = "10.5281/zenodo.4127049" # censusdp1tract: ZenodoDoi = "10.5072/zenodo.674992" + eia176: ZenodoDoi = "10.5281/zenodo.7682358" + # eia176: ZenodoDoi - "10.5072/zenodo.1166385" eia860: ZenodoDoi = "10.5281/zenodo.8164776" # eia860: ZenodoDoi = "10.5072/zenodo.1222854" eia860m: ZenodoDoi = "10.5281/zenodo.8188017" From bc6eddf0ee7bd68f57baac0356c70951873c6cc1 Mon Sep 17 00:00:00 2001 From: David Mudrauskas Date: Sun, 29 Oct 2023 20:12:45 -0400 Subject: [PATCH 02/11] Table schema and archive objects for CSV extraction, pipeline-/form-specific column types --- src/pudl/extract/csv.py | 140 +++++++++++++++++- src/pudl/extract/dbf.py | 2 +- src/pudl/extract/eia176.py | 55 +++---- .../package_data/eia176/table_file_map.csv | 6 +- 4 files changed, 157 insertions(+), 46 deletions(-) diff --git a/src/pudl/extract/csv.py b/src/pudl/extract/csv.py index 98cd98845d..d29bccee1e 100644 --- a/src/pudl/extract/csv.py +++ b/src/pudl/extract/csv.py @@ -2,22 +2,123 @@ from csv import DictReader from functools import lru_cache from importlib import resources +from io import TextIOWrapper from pathlib import Path from zipfile import ZipFile import pandas as pd import sqlalchemy as sa -from pudl import logging_helpers +import pudl.logging_helpers from pudl.workspace.datastore import Datastore -logger = logging_helpers.get_logger(__name__) +logger = pudl.logging_helpers.get_logger(__name__) + + +class CsvTableSchema: + """Simple data-wrapper for the fox-pro table schema.""" + + def __init__(self, table_name: str): + """Creates new instance of the table schema setting. + + The table name will be set as table_name and table will have no columns. + """ + self.name = table_name + self._columns = [] + self._column_types = {} + self._short_name_map = {} # short_name_map[short_name] -> long_name + + def add_column( + self, + col_name: str, + col_type: sa.types.TypeEngine, + short_name: str | None = None, + ): + """Adds a new column to this table schema.""" + assert col_name not in self._columns + self._columns.append(col_name) + self._column_types[col_name] = col_type + if short_name is not None: + self._short_name_map[short_name] = col_name + + def get_columns(self) -> list[tuple[str, sa.types.TypeEngine]]: + """Itereates over the (column_name, column_type) pairs.""" + for col_name in self._columns: + yield (col_name, self._column_types[col_name]) + + def get_column_names(self) -> set[str]: + """Returns set of long column names.""" + return set(self._columns) + + def get_column_rename_map(self) -> dict[str, str]: + """Returns dictionary that maps from short to long column names.""" + return dict(self._short_name_map) + + def create_sa_table(self, sa_meta: sa.MetaData) -> sa.Table: + """Creates SQLAlchemy table described by this instance. + + Args: + sa_meta: new table will be written to this MetaData object. + """ + table = sa.Table(self.name, sa_meta) + for col_name, col_type in self.get_columns(): + table.append_column(sa.Column(col_name, col_type)) + return table + + +class CsvArchive: + """Represents API for accessing files within a single CSV archive.""" + + def __init__( + self, + zipfile: ZipFile, + table_file_map: dict[str, str], + column_types: dict[str, dict[str, sa.types.TypeEngine]], + ): + """Constructs new instance of CsvArchive.""" + self.zipfile = zipfile + self._table_file_map = table_file_map + self._column_types = column_types + self._table_schemas: dict[str, list[str]] = {} + + @lru_cache + def get_table_schema(self, table_name: str) -> CsvTableSchema: + """Returns TableSchema for a given table and a given year.""" + with self.zipfile.open(self._table_file_map[table_name]) as f: + text_f = TextIOWrapper(f) + table_columns = DictReader(text_f).fieldnames + + # TODO: Introduce some validations here so we can know if source structure changed + schema = CsvTableSchema(table_name) + for column_name in table_columns: + # TODO: length for string type, if default is inappropriate + col_type = self._column_types[table_name][column_name] + schema.add_column(column_name, col_type) + return schema + + def load_table(self, table_name: str) -> pd.DataFrame: + """Returns dataframe that holds data for a table contained within this archive. + + Args: + table_name: name of the table. + """ + sch = self.get_table_schema(table_name) + df = pd.DataFrame(iter(self.get_table_dbf(table_name))) + df = df.drop("_NullFlags", axis=1, errors="ignore").rename( + sch.get_column_rename_map(), axis=1 + ) + return df class CsvReader: """Wrapper to provide standardized access to CSV files.""" - def __init__(self, datastore: Datastore, dataset: str): + def __init__( + self, + datastore: Datastore, + dataset: str, + column_types: dict[str, dict[str, sa.types.TypeEngine]], + ): """Create a new instance of CsvReader. This can be used for retrieving data from CSV files. @@ -30,6 +131,7 @@ def __init__(self, datastore: Datastore, dataset: str): self.datastore = datastore self.dataset = dataset self._table_file_map = {} + self._column_types = column_types for row in self._open_csv_resource("table_file_map.csv"): self._table_file_map[row["table"]] = row["filename"] @@ -42,6 +144,15 @@ def get_table_names(self) -> list[str]: """Returns list of tables that this datastore provides access to.""" return list(self._table_file_map) + @lru_cache + def get_archive(self) -> CsvArchive: + """Returns a ZipFile instance corresponding to the dataset.""" + return CsvArchive( + self.datastore.get_zipfile_resource(self.dataset), + table_file_map=self._table_file_map, + column_types=self._column_types, + ) + @lru_cache def _cache_zipfile(self) -> ZipFile: """Returns a ZipFile instance corresponding to the dataset.""" @@ -66,6 +177,7 @@ class CsvExtractor: 1. Set DATABASE_NAME class attribute. This controls what filename is used for the output sqlite database. 2. Set DATASET class attribute. This is used to load metadata from package_data/{dataset} subdirectory. + 3. Set COLUMN_TYPES to a map of tables to column names and their sqlalchemy types. This is used to generate DDL. Dataset specific logic and transformations can be injected by overriding: @@ -92,6 +204,7 @@ class CsvExtractor: DATABASE_NAME = None DATASET = None + COLUMN_TYPES = {} def __init__(self, datastore: Datastore, output_path: Path): """Constructs new instance of CsvExtractor. @@ -102,10 +215,10 @@ def __init__(self, datastore: Datastore, output_path: Path): # TODO: Consider including this for consistency clobber: if True, existing databases should be replaced. """ - self.sqlite_engine = sa.create_engine(self.get_db_path()) - self.sqlite_meta = sa.MetaData() self.output_path = output_path self.csv_reader = self.get_csv_reader(datastore) + self.sqlite_engine = sa.create_engine(self.get_db_path()) + self.sqlite_meta = sa.MetaData() def get_db_path(self) -> str: """Returns the connection string for the sqlite database.""" @@ -114,7 +227,7 @@ def get_db_path(self) -> str: def get_csv_reader(self, datastore: Datastore): """Returns instance of CsvReader to access the data.""" - return CsvReader(datastore, dataset=self.DATASET) + return CsvReader(datastore, self.DATASET, self.COLUMN_TYPES) def execute(self): """Runs the extraction of the data from csv to sqlite.""" @@ -129,9 +242,12 @@ def delete_schema(self): pass def create_sqlite_tables(self): - # TODO: Implement or extract from dbf.py to more general space. Take a pass at reconciling with excel.py """Creates database schema based on the input tables.""" - pass + csv_archive = self.csv_reader.get_archive() + for tn in self.csv_reader.get_table_names(): + csv_archive.get_table_schema(tn).create_sa_table(self.sqlite_meta) + self.finalize_schema(self.sqlite_meta) + self.sqlite_meta.create_all(self.sqlite_engine) def load_table_data(self) -> None: """Extracts and loads csv data into sqlite.""" @@ -148,6 +264,14 @@ def load_table_data(self) -> None: index=False, ) + def finalize_schema(self, meta: sa.MetaData) -> sa.MetaData: + """This method is called just before the schema is written to sqlite. + + You can use this method to apply dataset specific alterations to the schema, + such as adding primary and foreign key constraints. + """ + return meta + def postprocess(self): """This method is called after all the data is loaded into sqlite to transform raw data to targets.""" pass diff --git a/src/pudl/extract/dbf.py b/src/pudl/extract/dbf.py index e48b9c3f25..baa52fcfde 100644 --- a/src/pudl/extract/dbf.py +++ b/src/pudl/extract/dbf.py @@ -159,7 +159,7 @@ def get_table_schema(self, table_name: str) -> DbfTableSchema: table_columns = self.get_db_schema()[table_name] dbf = self.get_table_dbf(table_name) dbf_fields = [field for field in dbf.fields if field.name != "_NullFlags"] - if len(table_columns) != len(table_columns): + if len(dbf_fields) != len(table_columns): return ValueError( f"Number of DBF fields in {table_name} does not match what was " f"found in the DBC index file for {self.partition}." diff --git a/src/pudl/extract/eia176.py b/src/pudl/extract/eia176.py index d36540d380..8ef24cbced 100644 --- a/src/pudl/extract/eia176.py +++ b/src/pudl/extract/eia176.py @@ -3,7 +3,10 @@ The EIA Form 176 archive also contains CSVs for EIA Form 191 and EIA Form 757. """ +import sqlalchemy as sa # noqa: I001 + from pudl.extract.csv import CsvExtractor +from pudl.workspace.setup import PudlPaths class Eia176CsvExtractor(CsvExtractor): @@ -11,37 +14,25 @@ class Eia176CsvExtractor(CsvExtractor): DATASET = "eia176" DATABASE_NAME = "eia176.sqlite" + COLUMN_TYPES = { + "e176_company": { + "COMPANY_ID": sa.String, + "ACTIVITY_STATUS": sa.String, + "NAME1": sa.String, + }, + } + + +def extract_eia176(context): + """Extract raw EIA data from excel sheets into dataframes. + Args: + # TODO: Add this context for dagster once you're past simple local execution + context: dagster keyword that provides access to resources and config. + """ + ds = context.resources.datastore + # TODO: Should I use this? + # eia_settings = context.resources.dataset_settings.eia -# TODO: This was an alternative avenue of exploration; reconcile, clean-up -# def extract_eia176(context): -# """Extract raw EIA data from excel sheets into dataframes. -# -# Args: -# context: dagster keyword that provides access to resources and config. -# -# Returns: -# A tuple of extracted EIA dataframes. -# """ -# ds = context.resources.datastore -# # TODO: Should I use this? -# eia_settings = context.resources.dataset_settings.eia -# -# for filename in EIA176_FILES: -# raw_df = pudl.extract.csv.Extractor(ds).extract( -# year_month=eia860m_date -# ) -# eia860_raw_dfs = pudl.extract.eia860m.append_eia860m( -# eia860_raw_dfs=eia860_raw_dfs, eia860m_raw_dfs=eia860m_raw_dfs -# ) -# -# # create descriptive table_names -# eia860_raw_dfs = { -# "raw_eia860__" + table_name: df for table_name, df in eia860_raw_dfs.items() -# } -# eia860_raw_dfs = dict(sorted(eia860_raw_dfs.items())) -# -# return ( -# Output(output_name=table_name, value=df) -# for table_name, df in eia860_raw_dfs.items() -# ) + csv_extractor = Eia176CsvExtractor(ds, PudlPaths().output_dir) + csv_extractor.execute() diff --git a/src/pudl/package_data/eia176/table_file_map.csv b/src/pudl/package_data/eia176/table_file_map.csv index 984b7a8b8e..58b57dc3fa 100644 --- a/src/pudl/package_data/eia176/table_file_map.csv +++ b/src/pudl/package_data/eia176/table_file_map.csv @@ -1,6 +1,2 @@ table,filename -e176,all_data_176.csv -e176_company,all_company_176.csv -e176_other,all_other_176.csv -e191,all_data_191.csv -e757,all_data_757.csv \ No newline at end of file +e176_company,all_company_176.csv \ No newline at end of file From 9903674a2b4ce96d033348613e8494155cd0c93c Mon Sep 17 00:00:00 2001 From: David Mudrauskas Date: Thu, 2 Nov 2023 20:40:35 -0400 Subject: [PATCH 03/11] Unit tests for CsvTableSchema --- src/pudl/extract/csv.py | 5 +++- test/unit/extract/csv.py | 57 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) create mode 100644 test/unit/extract/csv.py diff --git a/src/pudl/extract/csv.py b/src/pudl/extract/csv.py index d29bccee1e..30e5a10049 100644 --- a/src/pudl/extract/csv.py +++ b/src/pudl/extract/csv.py @@ -154,6 +154,7 @@ def get_archive(self) -> CsvArchive: ) @lru_cache + # TODO: We shouldn't need to call get_zipfile_resource multiple times def _cache_zipfile(self) -> ZipFile: """Returns a ZipFile instance corresponding to the dataset.""" return self.datastore.get_zipfile_resource(self.dataset) @@ -252,7 +253,9 @@ def create_sqlite_tables(self): def load_table_data(self) -> None: """Extracts and loads csv data into sqlite.""" for table in self.csv_reader.get_table_names(): - df = self.csv_reader.read(table) + # TODO: Make a method instead of using this private attribute + filename = self.csv_reader._table_file_map[table] + df = self.csv_reader.read(filename) coltypes = {col.name: col.type for col in self.sqlite_meta.tables[table].c} logger.info(f"SQLite: loading {len(df)} rows into {table}.") df.to_sql( diff --git a/test/unit/extract/csv.py b/test/unit/extract/csv.py new file mode 100644 index 0000000000..1c3a2f4e11 --- /dev/null +++ b/test/unit/extract/csv.py @@ -0,0 +1,57 @@ +from unittest import mock +from unittest.mock import MagicMock, patch + +import sqlalchemy as sa + +from pudl.extract.csv import CsvTableSchema + +TABLE_NAME = "e176_company" + +COL1_NAME = "COMPANY_ID" +COL1_TYPE = sa.String + +COL2_NAME = "ACTIVITY_STATUS" +COL2_TYPE = sa.String +COL2_SHORT_NAME = "status" + + +def get_csv_table_schema(): + table_schema = CsvTableSchema(TABLE_NAME) + table_schema.add_column(COL1_NAME, COL1_TYPE) + table_schema.add_column(COL2_NAME, COL2_TYPE, COL2_SHORT_NAME) + return table_schema + + +def test_csv_table_schema_get_columns(): + table_schema = get_csv_table_schema() + assert table_schema.name == TABLE_NAME + assert [(COL1_NAME, COL1_TYPE), (COL2_NAME, COL2_TYPE)] == list( + table_schema.get_columns() + ) + + +def test_csv_table_schema_get_column_names(): + table_schema = get_csv_table_schema() + assert {COL1_NAME, COL2_NAME} == table_schema.get_column_names() + + +def test_csv_table_schema_get_column_rename_map(): + table_schema = get_csv_table_schema() + assert {COL2_SHORT_NAME: COL2_NAME} == table_schema.get_column_rename_map() + + +@patch("pudl.extract.csv.sa.Table") +@patch("pudl.extract.csv.sa.Column") +def test_csv_table_schema_create_sa_table(mock_column, mock_table): + sa_meta = MagicMock(sa.MetaData) + table_schema = get_csv_table_schema() + table = table_schema.create_sa_table(sa_meta) + assert table == mock_table.return_value + mock_table.assert_called_once_with(TABLE_NAME, sa_meta) + + expected_calls = [ + mock.call(mock_column(COL1_NAME, COL1_TYPE)), + mock.call(mock_column(COL2_NAME, COL2_TYPE)), + ] + + table.append_column.assert_has_calls(expected_calls) From 3a0bfe2bfd8c69622d2ab2ce177038611188e44c Mon Sep 17 00:00:00 2001 From: David Mudrauskas Date: Sat, 4 Nov 2023 15:34:13 -0400 Subject: [PATCH 04/11] Full unit test coverage for CSV extractor --- src/pudl/extract/csv.py | 36 +++------- test/unit/extract/csv.py | 145 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 153 insertions(+), 28 deletions(-) diff --git a/src/pudl/extract/csv.py b/src/pudl/extract/csv.py index 30e5a10049..d9f5011d9b 100644 --- a/src/pudl/extract/csv.py +++ b/src/pudl/extract/csv.py @@ -96,17 +96,12 @@ def get_table_schema(self, table_name: str) -> CsvTableSchema: schema.add_column(column_name, col_type) return schema - def load_table(self, table_name: str) -> pd.DataFrame: - """Returns dataframe that holds data for a table contained within this archive. - - Args: - table_name: name of the table. - """ - sch = self.get_table_schema(table_name) - df = pd.DataFrame(iter(self.get_table_dbf(table_name))) - df = df.drop("_NullFlags", axis=1, errors="ignore").rename( - sch.get_column_rename_map(), axis=1 - ) + def load_table(self, filename: str) -> pd.DataFrame: + """Read the data from the CSV source and return as dataframes.""" + logger.info(f"Extracting {filename} from CSV into pandas DataFrame.") + with self.zipfile.open(filename) as f: + # TODO: Define encoding + df = pd.read_csv(f) return df @@ -153,21 +148,6 @@ def get_archive(self) -> CsvArchive: column_types=self._column_types, ) - @lru_cache - # TODO: We shouldn't need to call get_zipfile_resource multiple times - def _cache_zipfile(self) -> ZipFile: - """Returns a ZipFile instance corresponding to the dataset.""" - return self.datastore.get_zipfile_resource(self.dataset) - - def read(self, filename: str) -> pd.DataFrame: - """Read the data from the CSV source and return as dataframes.""" - logger.info(f"Extracting {filename} from CSV into pandas DataFrame.") - zipfile = self._cache_zipfile() - with zipfile.open(filename) as f: - # TODO: Define encoding - df = pd.read_csv(f) - return df - class CsvExtractor: """Generalized class for loading data from CSV files into tables into SQLAlchemy. @@ -245,6 +225,8 @@ def delete_schema(self): def create_sqlite_tables(self): """Creates database schema based on the input tables.""" csv_archive = self.csv_reader.get_archive() + print("TABLE NAMESSS") + print(self.csv_reader.get_table_names()) for tn in self.csv_reader.get_table_names(): csv_archive.get_table_schema(tn).create_sa_table(self.sqlite_meta) self.finalize_schema(self.sqlite_meta) @@ -255,7 +237,7 @@ def load_table_data(self) -> None: for table in self.csv_reader.get_table_names(): # TODO: Make a method instead of using this private attribute filename = self.csv_reader._table_file_map[table] - df = self.csv_reader.read(filename) + df = self.csv_reader.get_archive().load_table(filename) coltypes = {col.name: col.type for col in self.sqlite_meta.tables[table].c} logger.info(f"SQLite: loading {len(df)} rows into {table}.") df.to_sql( diff --git a/test/unit/extract/csv.py b/test/unit/extract/csv.py index 1c3a2f4e11..6a9a85b542 100644 --- a/test/unit/extract/csv.py +++ b/test/unit/extract/csv.py @@ -1,9 +1,11 @@ +from pathlib import Path from unittest import mock from unittest.mock import MagicMock, patch +from zipfile import ZipFile import sqlalchemy as sa -from pudl.extract.csv import CsvTableSchema +from pudl.extract.csv import CsvArchive, CsvExtractor, CsvReader, CsvTableSchema TABLE_NAME = "e176_company" @@ -14,6 +16,26 @@ COL2_TYPE = sa.String COL2_SHORT_NAME = "status" +FILENAME = "all_company_176.csv" +TABLE_FILE_MAP = {TABLE_NAME: FILENAME} + +COLUMN_TYPES = { + TABLE_NAME: { + COL1_NAME: COL1_TYPE, + COL2_NAME: COL2_TYPE, + }, +} + +DATASET = "eia176" +DATABASE_NAME = f"{DATASET}.sqlite" +PATHNAME = "fakepath" + + +class FakeCsvExtractor(CsvExtractor): + DATASET = DATASET + DATABASE_NAME = DATABASE_NAME + COLUMN_TYPES = COLUMN_TYPES + def get_csv_table_schema(): table_schema = CsvTableSchema(TABLE_NAME) @@ -22,6 +44,12 @@ def get_csv_table_schema(): return table_schema +def get_csv_extractor(): + datastore = MagicMock() + path = Path(PATHNAME) + return FakeCsvExtractor(datastore, path) + + def test_csv_table_schema_get_columns(): table_schema = get_csv_table_schema() assert table_schema.name == TABLE_NAME @@ -55,3 +83,118 @@ def test_csv_table_schema_create_sa_table(mock_column, mock_table): ] table.append_column.assert_has_calls(expected_calls) + + +@patch("pudl.extract.csv.DictReader") +@patch("pudl.extract.csv.TextIOWrapper") +@patch("pudl.extract.csv.ZipFile") +def test_csv_archive_get_table_schema( + mock_zipfile, mock_text_io_wrapper, mock_dict_reader +): + mock_dict_reader.return_value.fieldnames = [COL1_NAME, COL2_NAME] + zipfile = MagicMock(ZipFile) + archive = CsvArchive(zipfile, TABLE_FILE_MAP, COLUMN_TYPES) + schema = archive.get_table_schema(TABLE_NAME) + assert [(COL1_NAME, COL1_TYPE), (COL2_NAME, COL2_TYPE)] == list( + schema.get_columns() + ) + + +@patch("pudl.extract.csv.pd") +@patch("pudl.extract.csv.ZipFile") +def test_csv_archive_load_table(mock_zipfile, mock_pd): + archive = CsvArchive(mock_zipfile, TABLE_FILE_MAP, COLUMN_TYPES) + res = archive.load_table(FILENAME) + mock_zipfile.open.assert_called_once_with(FILENAME) + f = mock_zipfile.open.return_value.__enter__.return_value + mock_pd.read_csv.assert_called_once_with(f) + df = mock_pd.read_csv() + assert df == res + + +def test_csv_reader_get_table_names(): + datastore = MagicMock() + reader = CsvReader(datastore, DATASET, COLUMN_TYPES) + assert reader._table_file_map == TABLE_FILE_MAP + assert [TABLE_NAME] == reader.get_table_names() + + +def test_csv_reader_get_archive(): + datastore = MagicMock() + reader = CsvReader(datastore, DATASET, COLUMN_TYPES) + archive = reader.get_archive() + zipfile = datastore.get_zipfile_resource(DATASET) + assert zipfile == archive.zipfile + assert archive._table_file_map == TABLE_FILE_MAP + assert archive._column_types == COLUMN_TYPES + + +def test_csv_extractor_get_db_path(): + extractor = get_csv_extractor() + assert f"sqlite:///{PATHNAME}/{DATABASE_NAME}" == extractor.get_db_path() + + +def test_csv_extractor_get_csv_reader(): + extractor = get_csv_extractor() + datastore = MagicMock() + reader = extractor.get_csv_reader(datastore) + assert datastore == reader.datastore + assert reader.dataset == DATASET + assert reader._column_types == COLUMN_TYPES + + +@patch("pudl.extract.csv.sa.create_engine") +@patch("pudl.extract.csv.CsvExtractor.finalize_schema") +@patch("pudl.extract.csv.sa.MetaData") +@patch("pudl.extract.csv.CsvArchive") +def test_csv_extractor_create_sqlite_tables( + mock_archive, mock_metadata, mock_finalize_schema, mock_create_engine +): + extractor = get_csv_extractor() + extractor.create_sqlite_tables() + sqlite_meta = mock_metadata.return_value + get_table_schema = mock_archive.return_value.get_table_schema + get_table_schema.assert_called_once_with(TABLE_NAME) + get_table_schema.return_value.create_sa_table.assert_called_once_with(sqlite_meta) + mock_finalize_schema.assert_called_once_with(sqlite_meta) + sqlite_engine = mock_create_engine(extractor.get_db_path()) + mock_metadata.return_value.create_all.assert_called_once_with(sqlite_engine) + + +@patch("pudl.extract.csv.sa.create_engine") +@patch("pudl.extract.csv.sa.MetaData") +@patch("pudl.extract.csv.CsvArchive") +def test_csv_extractor_load_table_data(mock_archive, mock_metadata, mock_create_engine): + extractor = get_csv_extractor() + extractor.load_table_data() + mock_archive.return_value.load_table.assert_called_once_with(FILENAME) + df = extractor.csv_reader.get_archive().load_table() + sqlite_meta = mock_metadata.return_value + sqlite_engine = mock_create_engine.return_value + coltypes = {col.name: col.type for col in sqlite_meta.tables[TABLE_NAME].c} + df.to_sql.assert_called_once_with( + TABLE_NAME, + sqlite_engine, + if_exists="append", + chunksize=100000, + dtype=coltypes, + index=False, + ) + + +@patch("pudl.extract.csv.CsvExtractor.postprocess") +@patch("pudl.extract.csv.CsvExtractor.load_table_data") +@patch("pudl.extract.csv.CsvExtractor.create_sqlite_tables") +@patch("pudl.extract.csv.CsvExtractor.delete_schema") +def test_csv_extractor_execute( + mock_delete_schema, + mock_create_sqlite_tables, + mock_load_table_data, + mock_postprocess, +): + extractor = get_csv_extractor() + extractor.execute() + mock_delete_schema.assert_called_once() + mock_create_sqlite_tables.assert_called_once() + mock_load_table_data.assert_called_once() + mock_postprocess.assert_called_once() From 1fb52e98d15eca64a168e84e6c0aae51367c78d4 Mon Sep 17 00:00:00 2001 From: David Mudrauskas Date: Sun, 5 Nov 2023 19:04:10 -0500 Subject: [PATCH 05/11] Follow patterns for clobber and test file names, implement delete_schema for CSV extractor --- src/pudl/extract/csv.py | 55 +++++++++++------------ src/pudl/extract/eia176.py | 18 +------- test/unit/extract/{csv.py => csv_test.py} | 17 ++++++- 3 files changed, 42 insertions(+), 48 deletions(-) rename test/unit/extract/{csv.py => csv_test.py} (91%) diff --git a/src/pudl/extract/csv.py b/src/pudl/extract/csv.py index d9f5011d9b..0f86aec497 100644 --- a/src/pudl/extract/csv.py +++ b/src/pudl/extract/csv.py @@ -1,4 +1,5 @@ """Extractor for CSV data.""" +import contextlib from csv import DictReader from functools import lru_cache from importlib import resources @@ -16,7 +17,7 @@ class CsvTableSchema: - """Simple data-wrapper for the fox-pro table schema.""" + """Provides the data definition of a table.""" def __init__(self, table_name: str): """Creates new instance of the table schema setting. @@ -42,7 +43,7 @@ def add_column( self._short_name_map[short_name] = col_name def get_columns(self) -> list[tuple[str, sa.types.TypeEngine]]: - """Itereates over the (column_name, column_type) pairs.""" + """Iterates over the (column_name, column_type) pairs.""" for col_name in self._columns: yield (col_name, self._column_types[col_name]) @@ -83,24 +84,26 @@ def __init__( @lru_cache def get_table_schema(self, table_name: str) -> CsvTableSchema: - """Returns TableSchema for a given table and a given year.""" + """Returns TableSchema for a given table.""" with self.zipfile.open(self._table_file_map[table_name]) as f: text_f = TextIOWrapper(f) table_columns = DictReader(text_f).fieldnames - # TODO: Introduce some validations here so we can know if source structure changed + if sorted(table_columns) != sorted(self._column_types[table_name].keys()): + raise ValueError( + f"Columns extracted from CSV for {table_name} do not match expected columns" + ) + schema = CsvTableSchema(table_name) for column_name in table_columns: - # TODO: length for string type, if default is inappropriate col_type = self._column_types[table_name][column_name] schema.add_column(column_name, col_type) return schema def load_table(self, filename: str) -> pd.DataFrame: - """Read the data from the CSV source and return as dataframes.""" + """Read the data from the CSV source and return as a dataframe.""" logger.info(f"Extracting {filename} from CSV into pandas DataFrame.") with self.zipfile.open(filename) as f: - # TODO: Define encoding df = pd.read_csv(f) return df @@ -141,7 +144,7 @@ def get_table_names(self) -> list[str]: @lru_cache def get_archive(self) -> CsvArchive: - """Returns a ZipFile instance corresponding to the dataset.""" + """Returns a CsvArchive instance corresponding to the dataset.""" return CsvArchive( self.datastore.get_zipfile_resource(self.dataset), table_file_map=self._table_file_map, @@ -150,7 +153,7 @@ def get_archive(self) -> CsvArchive: class CsvExtractor: - """Generalized class for loading data from CSV files into tables into SQLAlchemy. + """Generalized class for extracting and loading data from CSV files into SQL database. When subclassing from this generic extractor, one should implement dataset specific logic in the following manner: @@ -162,40 +165,28 @@ class CsvExtractor: Dataset specific logic and transformations can be injected by overriding: - # TODO: Update the details here to align with functions in this class 1. finalize_schema() in order to modify sqlite schema. This is called just before the schema is written into the sqlite database. This is good place for adding primary and/or foreign key constraints to tables. - 2. aggregate_table_frames() is responsible for concatenating individual data frames - (one par input partition) into single one. This is where deduplication can take place. - 3. transform_table(table_name, df) will be invoked after dataframe is loaded from - the foxpro database and before it's written to sqlite. This is good place for - table-specific preprocessing and/or cleanup. - 4. postprocess() is called after data is written to sqlite. This can be used for - database level final cleanup and transformations (e.g. injecting missing - respondent_ids). + 2. postprocess() is called after data is written to sqlite. This can be used for + database level final cleanup and transformations (e.g. injecting missing IDs). The extraction logic is invoked by calling execute() method of this class. """ - # TODO: Reconcile this with the above - """This represents an ETL pipeling (as opposed to an ELT pipeline), since we're more interested in transformed output and don't want to commit a lot of space to persisting raw data - Transformation mainly occurs just before loading (aside from more circumstantial pre- or postprocessing that's introduced). - """ - DATABASE_NAME = None DATASET = None COLUMN_TYPES = {} - def __init__(self, datastore: Datastore, output_path: Path): + def __init__(self, datastore: Datastore, output_path: Path, clobber: bool = False): """Constructs new instance of CsvExtractor. Args: datastore: top-level datastore instance for accessing raw data files. output_path: directory where the output databases should be stored. - # TODO: Consider including this for consistency clobber: if True, existing databases should be replaced. """ + self.clobber = clobber self.output_path = output_path self.csv_reader = self.get_csv_reader(datastore) self.sqlite_engine = sa.create_engine(self.get_db_path()) @@ -218,15 +209,20 @@ def execute(self): self.postprocess() def delete_schema(self): - # TODO: Implement or extract from dbf.py to more general space. Take a pass at reconciling with excel.py """Drops all tables from the existing sqlite database.""" - pass + with contextlib.suppress(sa.exc.OperationalError): + pudl.helpers.drop_tables( + self.sqlite_engine, + clobber=self.clobber, + ) + + self.sqlite_engine = sa.create_engine(self.get_db_path()) + self.sqlite_meta = sa.MetaData() + self.sqlite_meta.reflect(self.sqlite_engine) def create_sqlite_tables(self): """Creates database schema based on the input tables.""" csv_archive = self.csv_reader.get_archive() - print("TABLE NAMESSS") - print(self.csv_reader.get_table_names()) for tn in self.csv_reader.get_table_names(): csv_archive.get_table_schema(tn).create_sa_table(self.sqlite_meta) self.finalize_schema(self.sqlite_meta) @@ -235,7 +231,6 @@ def create_sqlite_tables(self): def load_table_data(self) -> None: """Extracts and loads csv data into sqlite.""" for table in self.csv_reader.get_table_names(): - # TODO: Make a method instead of using this private attribute filename = self.csv_reader._table_file_map[table] df = self.csv_reader.get_archive().load_table(filename) coltypes = {col.name: col.type for col in self.sqlite_meta.tables[table].c} diff --git a/src/pudl/extract/eia176.py b/src/pudl/extract/eia176.py index 8ef24cbced..d4fbdb8b92 100644 --- a/src/pudl/extract/eia176.py +++ b/src/pudl/extract/eia176.py @@ -6,14 +6,13 @@ import sqlalchemy as sa # noqa: I001 from pudl.extract.csv import CsvExtractor -from pudl.workspace.setup import PudlPaths class Eia176CsvExtractor(CsvExtractor): """Extractor for EIA Form 176 data.""" - DATASET = "eia176" DATABASE_NAME = "eia176.sqlite" + DATASET = "eia176" COLUMN_TYPES = { "e176_company": { "COMPANY_ID": sa.String, @@ -21,18 +20,3 @@ class Eia176CsvExtractor(CsvExtractor): "NAME1": sa.String, }, } - - -def extract_eia176(context): - """Extract raw EIA data from excel sheets into dataframes. - - Args: - # TODO: Add this context for dagster once you're past simple local execution - context: dagster keyword that provides access to resources and config. - """ - ds = context.resources.datastore - # TODO: Should I use this? - # eia_settings = context.resources.dataset_settings.eia - - csv_extractor = Eia176CsvExtractor(ds, PudlPaths().output_dir) - csv_extractor.execute() diff --git a/test/unit/extract/csv.py b/test/unit/extract/csv_test.py similarity index 91% rename from test/unit/extract/csv.py rename to test/unit/extract/csv_test.py index 6a9a85b542..2a1726808b 100644 --- a/test/unit/extract/csv.py +++ b/test/unit/extract/csv_test.py @@ -1,9 +1,11 @@ +"""Unit tests for pudl.extract.csv module.""" from pathlib import Path from unittest import mock from unittest.mock import MagicMock, patch from zipfile import ZipFile import sqlalchemy as sa +from pytest import raises from pudl.extract.csv import CsvArchive, CsvExtractor, CsvReader, CsvTableSchema @@ -88,7 +90,7 @@ def test_csv_table_schema_create_sa_table(mock_column, mock_table): @patch("pudl.extract.csv.DictReader") @patch("pudl.extract.csv.TextIOWrapper") @patch("pudl.extract.csv.ZipFile") -def test_csv_archive_get_table_schema( +def test_csv_archive_get_table_schema_valid( mock_zipfile, mock_text_io_wrapper, mock_dict_reader ): mock_dict_reader.return_value.fieldnames = [COL1_NAME, COL2_NAME] @@ -100,6 +102,19 @@ def test_csv_archive_get_table_schema( ) +@patch("pudl.extract.csv.DictReader") +@patch("pudl.extract.csv.TextIOWrapper") +@patch("pudl.extract.csv.ZipFile") +def test_csv_archive_get_table_schema_invalid( + mock_zipfile, mock_text_io_wrapper, mock_dict_reader +): + mock_dict_reader.return_value.fieldnames = [COL1_NAME] + zipfile = MagicMock(ZipFile) + archive = CsvArchive(zipfile, TABLE_FILE_MAP, COLUMN_TYPES) + with raises(ValueError): + archive.get_table_schema(TABLE_NAME) + + @patch("pudl.extract.csv.pd") @patch("pudl.extract.csv.ZipFile") def test_csv_archive_load_table(mock_zipfile, mock_pd): From 9aff0a84f9ebefe21158d10a004cf3d0e8197557 Mon Sep 17 00:00:00 2001 From: David Mudrauskas Date: Thu, 16 Nov 2023 13:49:37 -0500 Subject: [PATCH 06/11] Update CSV extractor to just return dataframes, integrate with Dagster --- src/pudl/etl/__init__.py | 1 + src/pudl/extract/__init__.py | 1 + src/pudl/extract/csv.py | 219 +++++------------- src/pudl/extract/eia176.py | 39 +++- src/pudl/extract/ferc1.py | 2 +- .../package_data/eia176/table_file_map.csv | 2 +- test/unit/extract/csv_test.py | 169 +------------- 7 files changed, 106 insertions(+), 327 deletions(-) diff --git a/src/pudl/etl/__init__.py b/src/pudl/etl/__init__.py index e62d1003c7..7fe2995154 100644 --- a/src/pudl/etl/__init__.py +++ b/src/pudl/etl/__init__.py @@ -32,6 +32,7 @@ default_assets = ( *load_assets_from_modules([eia_bulk_elec_assets], group_name="eia_bulk_elec"), *load_assets_from_modules([epacems_assets], group_name="epacems"), + *load_assets_from_modules([pudl.extract.eia176], group_name="raw_eia176"), *load_assets_from_modules([pudl.extract.eia860], group_name="raw_eia860"), *load_assets_from_modules([pudl.transform.eia860], group_name="_core_eia860"), *load_assets_from_modules([pudl.extract.eia861], group_name="raw_eia861"), diff --git a/src/pudl/extract/__init__.py b/src/pudl/extract/__init__.py index a25e4b6fd2..7e889c6d0c 100644 --- a/src/pudl/extract/__init__.py +++ b/src/pudl/extract/__init__.py @@ -8,6 +8,7 @@ :mod:`pudl.transform` subpackage. """ from . import ( + eia176, eia860, eia860m, eia861, diff --git a/src/pudl/extract/csv.py b/src/pudl/extract/csv.py index 0f86aec497..2fa0a7fa9b 100644 --- a/src/pudl/extract/csv.py +++ b/src/pudl/extract/csv.py @@ -1,14 +1,11 @@ """Extractor for CSV data.""" -import contextlib from csv import DictReader from functools import lru_cache from importlib import resources -from io import TextIOWrapper -from pathlib import Path from zipfile import ZipFile import pandas as pd -import sqlalchemy as sa +from dagster import AssetsDefinition, OpDefinition, graph_asset, op import pudl.logging_helpers from pudl.workspace.datastore import Datastore @@ -16,89 +13,12 @@ logger = pudl.logging_helpers.get_logger(__name__) -class CsvTableSchema: - """Provides the data definition of a table.""" - - def __init__(self, table_name: str): - """Creates new instance of the table schema setting. - - The table name will be set as table_name and table will have no columns. - """ - self.name = table_name - self._columns = [] - self._column_types = {} - self._short_name_map = {} # short_name_map[short_name] -> long_name - - def add_column( - self, - col_name: str, - col_type: sa.types.TypeEngine, - short_name: str | None = None, - ): - """Adds a new column to this table schema.""" - assert col_name not in self._columns - self._columns.append(col_name) - self._column_types[col_name] = col_type - if short_name is not None: - self._short_name_map[short_name] = col_name - - def get_columns(self) -> list[tuple[str, sa.types.TypeEngine]]: - """Iterates over the (column_name, column_type) pairs.""" - for col_name in self._columns: - yield (col_name, self._column_types[col_name]) - - def get_column_names(self) -> set[str]: - """Returns set of long column names.""" - return set(self._columns) - - def get_column_rename_map(self) -> dict[str, str]: - """Returns dictionary that maps from short to long column names.""" - return dict(self._short_name_map) - - def create_sa_table(self, sa_meta: sa.MetaData) -> sa.Table: - """Creates SQLAlchemy table described by this instance. - - Args: - sa_meta: new table will be written to this MetaData object. - """ - table = sa.Table(self.name, sa_meta) - for col_name, col_type in self.get_columns(): - table.append_column(sa.Column(col_name, col_type)) - return table - - class CsvArchive: """Represents API for accessing files within a single CSV archive.""" - def __init__( - self, - zipfile: ZipFile, - table_file_map: dict[str, str], - column_types: dict[str, dict[str, sa.types.TypeEngine]], - ): + def __init__(self, zipfile: ZipFile): """Constructs new instance of CsvArchive.""" self.zipfile = zipfile - self._table_file_map = table_file_map - self._column_types = column_types - self._table_schemas: dict[str, list[str]] = {} - - @lru_cache - def get_table_schema(self, table_name: str) -> CsvTableSchema: - """Returns TableSchema for a given table.""" - with self.zipfile.open(self._table_file_map[table_name]) as f: - text_f = TextIOWrapper(f) - table_columns = DictReader(text_f).fieldnames - - if sorted(table_columns) != sorted(self._column_types[table_name].keys()): - raise ValueError( - f"Columns extracted from CSV for {table_name} do not match expected columns" - ) - - schema = CsvTableSchema(table_name) - for column_name in table_columns: - col_type = self._column_types[table_name][column_name] - schema.add_column(column_name, col_type) - return schema def load_table(self, filename: str) -> pd.DataFrame: """Read the data from the CSV source and return as a dataframe.""" @@ -115,7 +35,6 @@ def __init__( self, datastore: Datastore, dataset: str, - column_types: dict[str, dict[str, sa.types.TypeEngine]], ): """Create a new instance of CsvReader. @@ -129,7 +48,6 @@ def __init__( self.datastore = datastore self.dataset = dataset self._table_file_map = {} - self._column_types = column_types for row in self._open_csv_resource("table_file_map.csv"): self._table_file_map[row["table"]] = row["filename"] @@ -145,11 +63,7 @@ def get_table_names(self) -> list[str]: @lru_cache def get_archive(self) -> CsvArchive: """Returns a CsvArchive instance corresponding to the dataset.""" - return CsvArchive( - self.datastore.get_zipfile_resource(self.dataset), - table_file_map=self._table_file_map, - column_types=self._column_types, - ) + return CsvArchive(self.datastore.get_zipfile_resource(self.dataset)) class CsvExtractor: @@ -161,97 +75,76 @@ class CsvExtractor: 1. Set DATABASE_NAME class attribute. This controls what filename is used for the output sqlite database. 2. Set DATASET class attribute. This is used to load metadata from package_data/{dataset} subdirectory. - 3. Set COLUMN_TYPES to a map of tables to column names and their sqlalchemy types. This is used to generate DDL. - Dataset specific logic and transformations can be injected by overriding: - - 1. finalize_schema() in order to modify sqlite schema. This is called just before - the schema is written into the sqlite database. This is good place for adding - primary and/or foreign key constraints to tables. - 2. postprocess() is called after data is written to sqlite. This can be used for - database level final cleanup and transformations (e.g. injecting missing IDs). - - The extraction logic is invoked by calling execute() method of this class. + The extraction logic is invoked by calling extract() method of this class. """ DATABASE_NAME = None DATASET = None - COLUMN_TYPES = {} - def __init__(self, datastore: Datastore, output_path: Path, clobber: bool = False): + def __init__(self, datastore: Datastore): """Constructs new instance of CsvExtractor. Args: datastore: top-level datastore instance for accessing raw data files. - output_path: directory where the output databases should be stored. - clobber: if True, existing databases should be replaced. """ - self.clobber = clobber - self.output_path = output_path self.csv_reader = self.get_csv_reader(datastore) - self.sqlite_engine = sa.create_engine(self.get_db_path()) - self.sqlite_meta = sa.MetaData() - - def get_db_path(self) -> str: - """Returns the connection string for the sqlite database.""" - db_path = str(Path(self.output_path) / self.DATABASE_NAME) - return f"sqlite:///{db_path}" def get_csv_reader(self, datastore: Datastore): """Returns instance of CsvReader to access the data.""" - return CsvReader(datastore, self.DATASET, self.COLUMN_TYPES) - - def execute(self): - """Runs the extraction of the data from csv to sqlite.""" - self.delete_schema() - self.create_sqlite_tables() - self.load_table_data() - self.postprocess() - - def delete_schema(self): - """Drops all tables from the existing sqlite database.""" - with contextlib.suppress(sa.exc.OperationalError): - pudl.helpers.drop_tables( - self.sqlite_engine, - clobber=self.clobber, - ) - - self.sqlite_engine = sa.create_engine(self.get_db_path()) - self.sqlite_meta = sa.MetaData() - self.sqlite_meta.reflect(self.sqlite_engine) - - def create_sqlite_tables(self): - """Creates database schema based on the input tables.""" - csv_archive = self.csv_reader.get_archive() - for tn in self.csv_reader.get_table_names(): - csv_archive.get_table_schema(tn).create_sa_table(self.sqlite_meta) - self.finalize_schema(self.sqlite_meta) - self.sqlite_meta.create_all(self.sqlite_engine) - - def load_table_data(self) -> None: - """Extracts and loads csv data into sqlite.""" + return CsvReader(datastore, self.DATASET) + + def extract(self) -> dict[str, pd.DataFrame]: + """Extracts a dictionary of table names and dataframes from CSV files.""" + data = {} for table in self.csv_reader.get_table_names(): filename = self.csv_reader._table_file_map[table] df = self.csv_reader.get_archive().load_table(filename) - coltypes = {col.name: col.type for col in self.sqlite_meta.tables[table].c} - logger.info(f"SQLite: loading {len(df)} rows into {table}.") - df.to_sql( - table, - self.sqlite_engine, - if_exists="append", - chunksize=100000, - dtype=coltypes, - index=False, - ) - - def finalize_schema(self, meta: sa.MetaData) -> sa.MetaData: - """This method is called just before the schema is written to sqlite. - - You can use this method to apply dataset specific alterations to the schema, - such as adding primary and foreign key constraints. + data[table] = df + return data + + +def extractor_factory(extractor_cls: type[CsvExtractor], name: str) -> OpDefinition: + """Construct a Dagster op that extracts one year of data, given an extractor class. + + Args: + extractor_cls: Class of type :class:`GenericExtractor` used to extract the data. + name: Name of an Excel based dataset (e.g. "eia860"). + """ + + def extract(context) -> dict[str, pd.DataFrame]: + """A function that extracts data from a CSV file. + + This function will be decorated with a Dagster op and returned. + + Args: + context: Dagster keyword that provides access to resources and config. + + Returns: + A dictionary of DataFrames extracted from CSV, keyed by page name. """ - return meta + ds = context.resources.datastore + return extractor_cls(ds).extract() + + return op( + required_resource_keys={"datastore", "dataset_settings"}, + name=f"extract_single_{name}_year", + )(extract) + + +def raw_df_factory(extractor_cls: type[CsvExtractor], name: str) -> AssetsDefinition: + """Return a dagster graph asset to extract a set of raw DataFrames from CSV files. + + Args: + extractor_cls: The dataset-specific CSV extractor used to extract the data. + Needs to correspond to the dataset identified by ``name``. + name: Name of an CSV based dataset (e.g. "eia176"). Currently this must be + one of the attributes of :class:`pudl.settings.EiaSettings` + """ + extractor = extractor_factory(extractor_cls, name) + + def raw_dfs() -> dict[str, pd.DataFrame]: + """Produce a dictionary of extracted EIA dataframes.""" + return extractor() - def postprocess(self): - """This method is called after all the data is loaded into sqlite to transform raw data to targets.""" - pass + return graph_asset(name=f"{name}_raw_dfs")(raw_dfs) diff --git a/src/pudl/extract/eia176.py b/src/pudl/extract/eia176.py index d4fbdb8b92..3bf1657138 100644 --- a/src/pudl/extract/eia176.py +++ b/src/pudl/extract/eia176.py @@ -4,15 +4,18 @@ """ import sqlalchemy as sa # noqa: I001 +from dagster import AssetOut, Output, multi_asset -from pudl.extract.csv import CsvExtractor +from pudl.extract.csv import CsvExtractor, raw_df_factory + +DATASET = "eia176" class Eia176CsvExtractor(CsvExtractor): """Extractor for EIA Form 176 data.""" - DATABASE_NAME = "eia176.sqlite" - DATASET = "eia176" + DATABASE_NAME = f"{DATASET}.sqlite" + DATASET = DATASET COLUMN_TYPES = { "e176_company": { "COMPANY_ID": sa.String, @@ -20,3 +23,33 @@ class Eia176CsvExtractor(CsvExtractor): "NAME1": sa.String, }, } + + +# TODO (davidmudrauskas): Add this information to the metadata +raw_table_names = (f"raw_{DATASET}__company",) + +eia176_raw_dfs = raw_df_factory(Eia176CsvExtractor, name=DATASET) + + +@multi_asset( + outs={table_name: AssetOut() for table_name in sorted(raw_table_names)}, + required_resource_keys={"datastore", "dataset_settings"}, +) +def extract_eia176(context, eia176_raw_dfs): + """Extract EIA-176 data from CSV source and return dataframes. + + Args: + context: dagster keyword that provides access to resources and config. + + Returns: + A tuple of extracted EIA dataframes. + """ + eia176_raw_dfs = { + f"raw_{DATASET}__" + table_name: df for table_name, df in eia176_raw_dfs.items() + } + eia176_raw_dfs = dict(sorted(eia176_raw_dfs.items())) + + return ( + Output(output_name=table_name, value=df) + for table_name, df in eia176_raw_dfs.items() + ) diff --git a/src/pudl/extract/ferc1.py b/src/pudl/extract/ferc1.py index 386c6e78a1..5ce59744f8 100644 --- a/src/pudl/extract/ferc1.py +++ b/src/pudl/extract/ferc1.py @@ -301,7 +301,7 @@ def add_missing_respondents(self): # Write missing respondents back into SQLite. with self.sqlite_engine.begin() as conn: - conn.execute( + conn.extract( self.sqlite_meta.tables["f1_respondent_id"].insert().values(records) ) diff --git a/src/pudl/package_data/eia176/table_file_map.csv b/src/pudl/package_data/eia176/table_file_map.csv index 58b57dc3fa..73e45fcd5e 100644 --- a/src/pudl/package_data/eia176/table_file_map.csv +++ b/src/pudl/package_data/eia176/table_file_map.csv @@ -1,2 +1,2 @@ table,filename -e176_company,all_company_176.csv \ No newline at end of file +company,all_company_176.csv \ No newline at end of file diff --git a/test/unit/extract/csv_test.py b/test/unit/extract/csv_test.py index 2a1726808b..374649c0ca 100644 --- a/test/unit/extract/csv_test.py +++ b/test/unit/extract/csv_test.py @@ -1,124 +1,31 @@ """Unit tests for pudl.extract.csv module.""" -from pathlib import Path -from unittest import mock from unittest.mock import MagicMock, patch -from zipfile import ZipFile -import sqlalchemy as sa -from pytest import raises +from pudl.extract.csv import CsvArchive, CsvExtractor, CsvReader -from pudl.extract.csv import CsvArchive, CsvExtractor, CsvReader, CsvTableSchema - -TABLE_NAME = "e176_company" - -COL1_NAME = "COMPANY_ID" -COL1_TYPE = sa.String - -COL2_NAME = "ACTIVITY_STATUS" -COL2_TYPE = sa.String -COL2_SHORT_NAME = "status" +TABLE_NAME = "company" FILENAME = "all_company_176.csv" TABLE_FILE_MAP = {TABLE_NAME: FILENAME} -COLUMN_TYPES = { - TABLE_NAME: { - COL1_NAME: COL1_TYPE, - COL2_NAME: COL2_TYPE, - }, -} - DATASET = "eia176" DATABASE_NAME = f"{DATASET}.sqlite" -PATHNAME = "fakepath" class FakeCsvExtractor(CsvExtractor): DATASET = DATASET DATABASE_NAME = DATABASE_NAME - COLUMN_TYPES = COLUMN_TYPES - - -def get_csv_table_schema(): - table_schema = CsvTableSchema(TABLE_NAME) - table_schema.add_column(COL1_NAME, COL1_TYPE) - table_schema.add_column(COL2_NAME, COL2_TYPE, COL2_SHORT_NAME) - return table_schema def get_csv_extractor(): datastore = MagicMock() - path = Path(PATHNAME) - return FakeCsvExtractor(datastore, path) - - -def test_csv_table_schema_get_columns(): - table_schema = get_csv_table_schema() - assert table_schema.name == TABLE_NAME - assert [(COL1_NAME, COL1_TYPE), (COL2_NAME, COL2_TYPE)] == list( - table_schema.get_columns() - ) - - -def test_csv_table_schema_get_column_names(): - table_schema = get_csv_table_schema() - assert {COL1_NAME, COL2_NAME} == table_schema.get_column_names() - - -def test_csv_table_schema_get_column_rename_map(): - table_schema = get_csv_table_schema() - assert {COL2_SHORT_NAME: COL2_NAME} == table_schema.get_column_rename_map() - - -@patch("pudl.extract.csv.sa.Table") -@patch("pudl.extract.csv.sa.Column") -def test_csv_table_schema_create_sa_table(mock_column, mock_table): - sa_meta = MagicMock(sa.MetaData) - table_schema = get_csv_table_schema() - table = table_schema.create_sa_table(sa_meta) - assert table == mock_table.return_value - mock_table.assert_called_once_with(TABLE_NAME, sa_meta) - - expected_calls = [ - mock.call(mock_column(COL1_NAME, COL1_TYPE)), - mock.call(mock_column(COL2_NAME, COL2_TYPE)), - ] - - table.append_column.assert_has_calls(expected_calls) - - -@patch("pudl.extract.csv.DictReader") -@patch("pudl.extract.csv.TextIOWrapper") -@patch("pudl.extract.csv.ZipFile") -def test_csv_archive_get_table_schema_valid( - mock_zipfile, mock_text_io_wrapper, mock_dict_reader -): - mock_dict_reader.return_value.fieldnames = [COL1_NAME, COL2_NAME] - zipfile = MagicMock(ZipFile) - archive = CsvArchive(zipfile, TABLE_FILE_MAP, COLUMN_TYPES) - schema = archive.get_table_schema(TABLE_NAME) - assert [(COL1_NAME, COL1_TYPE), (COL2_NAME, COL2_TYPE)] == list( - schema.get_columns() - ) - - -@patch("pudl.extract.csv.DictReader") -@patch("pudl.extract.csv.TextIOWrapper") -@patch("pudl.extract.csv.ZipFile") -def test_csv_archive_get_table_schema_invalid( - mock_zipfile, mock_text_io_wrapper, mock_dict_reader -): - mock_dict_reader.return_value.fieldnames = [COL1_NAME] - zipfile = MagicMock(ZipFile) - archive = CsvArchive(zipfile, TABLE_FILE_MAP, COLUMN_TYPES) - with raises(ValueError): - archive.get_table_schema(TABLE_NAME) + return FakeCsvExtractor(datastore) @patch("pudl.extract.csv.pd") @patch("pudl.extract.csv.ZipFile") def test_csv_archive_load_table(mock_zipfile, mock_pd): - archive = CsvArchive(mock_zipfile, TABLE_FILE_MAP, COLUMN_TYPES) + archive = CsvArchive(mock_zipfile) res = archive.load_table(FILENAME) mock_zipfile.open.assert_called_once_with(FILENAME) f = mock_zipfile.open.return_value.__enter__.return_value @@ -129,24 +36,17 @@ def test_csv_archive_load_table(mock_zipfile, mock_pd): def test_csv_reader_get_table_names(): datastore = MagicMock() - reader = CsvReader(datastore, DATASET, COLUMN_TYPES) + reader = CsvReader(datastore, DATASET) assert reader._table_file_map == TABLE_FILE_MAP assert [TABLE_NAME] == reader.get_table_names() def test_csv_reader_get_archive(): datastore = MagicMock() - reader = CsvReader(datastore, DATASET, COLUMN_TYPES) + reader = CsvReader(datastore, DATASET) archive = reader.get_archive() zipfile = datastore.get_zipfile_resource(DATASET) assert zipfile == archive.zipfile - assert archive._table_file_map == TABLE_FILE_MAP - assert archive._column_types == COLUMN_TYPES - - -def test_csv_extractor_get_db_path(): - extractor = get_csv_extractor() - assert f"sqlite:///{PATHNAME}/{DATABASE_NAME}" == extractor.get_db_path() def test_csv_extractor_get_csv_reader(): @@ -155,61 +55,12 @@ def test_csv_extractor_get_csv_reader(): reader = extractor.get_csv_reader(datastore) assert datastore == reader.datastore assert reader.dataset == DATASET - assert reader._column_types == COLUMN_TYPES -@patch("pudl.extract.csv.sa.create_engine") -@patch("pudl.extract.csv.CsvExtractor.finalize_schema") -@patch("pudl.extract.csv.sa.MetaData") @patch("pudl.extract.csv.CsvArchive") -def test_csv_extractor_create_sqlite_tables( - mock_archive, mock_metadata, mock_finalize_schema, mock_create_engine -): +def test_csv_extractor_extract(mock_archive): extractor = get_csv_extractor() - extractor.create_sqlite_tables() - sqlite_meta = mock_metadata.return_value - get_table_schema = mock_archive.return_value.get_table_schema - get_table_schema.assert_called_once_with(TABLE_NAME) - get_table_schema.return_value.create_sa_table.assert_called_once_with(sqlite_meta) - mock_finalize_schema.assert_called_once_with(sqlite_meta) - sqlite_engine = mock_create_engine(extractor.get_db_path()) - mock_metadata.return_value.create_all.assert_called_once_with(sqlite_engine) - - -@patch("pudl.extract.csv.sa.create_engine") -@patch("pudl.extract.csv.sa.MetaData") -@patch("pudl.extract.csv.CsvArchive") -def test_csv_extractor_load_table_data(mock_archive, mock_metadata, mock_create_engine): - extractor = get_csv_extractor() - extractor.load_table_data() + raw_dfs = extractor.extract() mock_archive.return_value.load_table.assert_called_once_with(FILENAME) - df = extractor.csv_reader.get_archive().load_table() - sqlite_meta = mock_metadata.return_value - sqlite_engine = mock_create_engine.return_value - coltypes = {col.name: col.type for col in sqlite_meta.tables[TABLE_NAME].c} - df.to_sql.assert_called_once_with( - TABLE_NAME, - sqlite_engine, - if_exists="append", - chunksize=100000, - dtype=coltypes, - index=False, - ) - - -@patch("pudl.extract.csv.CsvExtractor.postprocess") -@patch("pudl.extract.csv.CsvExtractor.load_table_data") -@patch("pudl.extract.csv.CsvExtractor.create_sqlite_tables") -@patch("pudl.extract.csv.CsvExtractor.delete_schema") -def test_csv_extractor_execute( - mock_delete_schema, - mock_create_sqlite_tables, - mock_load_table_data, - mock_postprocess, -): - extractor = get_csv_extractor() - extractor.execute() - mock_delete_schema.assert_called_once() - mock_create_sqlite_tables.assert_called_once() - mock_load_table_data.assert_called_once() - mock_postprocess.assert_called_once() + df = mock_archive.return_value.load_table.return_value + assert {TABLE_NAME: df} == raw_dfs From caaa2126cc8c9a40a0a6498493f6b67551293948 Mon Sep 17 00:00:00 2001 From: David Mudrauskas Date: Thu, 16 Nov 2023 16:02:33 -0500 Subject: [PATCH 07/11] Combine thin CSV extraction-related class, update tests --- src/pudl/extract/csv.py | 104 ++++++++++------------------------ src/pudl/extract/eia176.py | 9 --- test/integration/etl_test.py | 11 ++++ test/unit/extract/csv_test.py | 45 ++++----------- 4 files changed, 51 insertions(+), 118 deletions(-) diff --git a/src/pudl/extract/csv.py b/src/pudl/extract/csv.py index 2fa0a7fa9b..bcccfa80a8 100644 --- a/src/pudl/extract/csv.py +++ b/src/pudl/extract/csv.py @@ -1,8 +1,6 @@ """Extractor for CSV data.""" from csv import DictReader -from functools import lru_cache from importlib import resources -from zipfile import ZipFile import pandas as pd from dagster import AssetsDefinition, OpDefinition, graph_asset, op @@ -13,103 +11,61 @@ logger = pudl.logging_helpers.get_logger(__name__) -class CsvArchive: - """Represents API for accessing files within a single CSV archive.""" - - def __init__(self, zipfile: ZipFile): - """Constructs new instance of CsvArchive.""" - self.zipfile = zipfile - - def load_table(self, filename: str) -> pd.DataFrame: - """Read the data from the CSV source and return as a dataframe.""" - logger.info(f"Extracting {filename} from CSV into pandas DataFrame.") - with self.zipfile.open(filename) as f: - df = pd.read_csv(f) - return df - - -class CsvReader: - """Wrapper to provide standardized access to CSV files.""" - - def __init__( - self, - datastore: Datastore, - dataset: str, - ): - """Create a new instance of CsvReader. - - This can be used for retrieving data from CSV files. - - Args: - datastore: provides access to raw files on disk. - dataset: name of the dataset (e.g. eia176), this is used to load metadata - from package_data/{dataset} subdirectory. - """ - self.datastore = datastore - self.dataset = dataset - self._table_file_map = {} - for row in self._open_csv_resource("table_file_map.csv"): - self._table_file_map[row["table"]] = row["filename"] - - def _open_csv_resource(self, base_filename: str) -> DictReader: - """Open the given resource file as :class:`csv.DictReader`.""" - csv_path = resources.files(f"pudl.package_data.{self.dataset}") / base_filename - return DictReader(csv_path.open()) - - def get_table_names(self) -> list[str]: - """Returns list of tables that this datastore provides access to.""" - return list(self._table_file_map) - - @lru_cache - def get_archive(self) -> CsvArchive: - """Returns a CsvArchive instance corresponding to the dataset.""" - return CsvArchive(self.datastore.get_zipfile_resource(self.dataset)) - - class CsvExtractor: - """Generalized class for extracting and loading data from CSV files into SQL database. + """Generalized class for extracting dataframes from CSV files. When subclassing from this generic extractor, one should implement dataset specific logic in the following manner: - 1. Set DATABASE_NAME class attribute. This controls what filename is used for the output - sqlite database. 2. Set DATASET class attribute. This is used to load metadata from package_data/{dataset} subdirectory. The extraction logic is invoked by calling extract() method of this class. """ - DATABASE_NAME = None DATASET = None def __init__(self, datastore: Datastore): - """Constructs new instance of CsvExtractor. + """Create a new instance of CsvExtractor. + + This can be used for retrieving data from CSV files. Args: - datastore: top-level datastore instance for accessing raw data files. + datastore: provides access to raw files on disk. """ - self.csv_reader = self.get_csv_reader(datastore) + self._zipfile = datastore.get_zipfile_resource(self.DATASET) + self._table_file_map = { + row["table"]: row["filename"] + for row in self._open_csv_resource("table_file_map.csv") + } + + def _open_csv_resource(self, base_filename: str) -> DictReader: + """Open the given resource file as :class:`csv.DictReader`.""" + csv_path = resources.files(f"pudl.package_data.{self.DATASET}") / base_filename + return DictReader(csv_path.open()) - def get_csv_reader(self, datastore: Datastore): - """Returns instance of CsvReader to access the data.""" - return CsvReader(datastore, self.DATASET) + def read_source(self, filename: str) -> pd.DataFrame: + """Read the data from the CSV source file and return as a dataframe.""" + logger.info(f"Extracting {filename} from CSV into pandas DataFrame.") + with self._zipfile.open(filename) as f: + df = pd.read_csv(f) + return df def extract(self) -> dict[str, pd.DataFrame]: - """Extracts a dictionary of table names and dataframes from CSV files.""" + """Extracts a dictionary of table names and dataframes from CSV source files.""" data = {} - for table in self.csv_reader.get_table_names(): - filename = self.csv_reader._table_file_map[table] - df = self.csv_reader.get_archive().load_table(filename) + for table in self._table_file_map: + filename = self._table_file_map[table] + df = self.read_source(filename) data[table] = df return data def extractor_factory(extractor_cls: type[CsvExtractor], name: str) -> OpDefinition: - """Construct a Dagster op that extracts one year of data, given an extractor class. + """Construct a Dagster op that extracts data given an extractor class. Args: - extractor_cls: Class of type :class:`GenericExtractor` used to extract the data. - name: Name of an Excel based dataset (e.g. "eia860"). + extractor_cls: Class of type :class:`CsvExtractor` used to extract the data. + name: Name of a CSV-based dataset (e.g. "eia176"). """ def extract(context) -> dict[str, pd.DataFrame]: @@ -121,7 +77,7 @@ def extract(context) -> dict[str, pd.DataFrame]: context: Dagster keyword that provides access to resources and config. Returns: - A dictionary of DataFrames extracted from CSV, keyed by page name. + A dictionary of DataFrames extracted from CSV, keyed by table name. """ ds = context.resources.datastore return extractor_cls(ds).extract() @@ -138,7 +94,7 @@ def raw_df_factory(extractor_cls: type[CsvExtractor], name: str) -> AssetsDefini Args: extractor_cls: The dataset-specific CSV extractor used to extract the data. Needs to correspond to the dataset identified by ``name``. - name: Name of an CSV based dataset (e.g. "eia176"). Currently this must be + name: Name of a CSV-based dataset (e.g. "eia176"). Currently this must be one of the attributes of :class:`pudl.settings.EiaSettings` """ extractor = extractor_factory(extractor_cls, name) diff --git a/src/pudl/extract/eia176.py b/src/pudl/extract/eia176.py index 3bf1657138..7f76b013a4 100644 --- a/src/pudl/extract/eia176.py +++ b/src/pudl/extract/eia176.py @@ -3,7 +3,6 @@ The EIA Form 176 archive also contains CSVs for EIA Form 191 and EIA Form 757. """ -import sqlalchemy as sa # noqa: I001 from dagster import AssetOut, Output, multi_asset from pudl.extract.csv import CsvExtractor, raw_df_factory @@ -14,15 +13,7 @@ class Eia176CsvExtractor(CsvExtractor): """Extractor for EIA Form 176 data.""" - DATABASE_NAME = f"{DATASET}.sqlite" DATASET = DATASET - COLUMN_TYPES = { - "e176_company": { - "COMPANY_ID": sa.String, - "ACTIVITY_STATUS": sa.String, - "NAME1": sa.String, - }, - } # TODO (davidmudrauskas): Add this information to the metadata diff --git a/test/integration/etl_test.py b/test/integration/etl_test.py index 21ea53dd13..6a0ae8d39b 100644 --- a/test/integration/etl_test.py +++ b/test/integration/etl_test.py @@ -68,6 +68,17 @@ def test_ferc1_xbrl2sqlite(ferc1_engine_xbrl, ferc1_xbrl_taxonomy_metadata): ) +class TestCsvExtractor: + """Verify that we can lead CSV files as provided via the datastore.""" + + def test_extract_eia176(self, pudl_datastore_fixture): + """Spot check extraction of eia176 csv files.""" + extractor = pudl.extract.eia176.Eia176CsvExtractor(pudl_datastore_fixture) + table = "company" + if table not in extractor.extract(): + raise AssertionError(f"table {table} not found in datastore") + + class TestExcelExtractor: """Verify that we can lead excel files as provided via the datastore.""" diff --git a/test/unit/extract/csv_test.py b/test/unit/extract/csv_test.py index 374649c0ca..454fda1930 100644 --- a/test/unit/extract/csv_test.py +++ b/test/unit/extract/csv_test.py @@ -1,7 +1,7 @@ """Unit tests for pudl.extract.csv module.""" from unittest.mock import MagicMock, patch -from pudl.extract.csv import CsvArchive, CsvExtractor, CsvReader +from pudl.extract.csv import CsvExtractor TABLE_NAME = "company" @@ -9,12 +9,10 @@ TABLE_FILE_MAP = {TABLE_NAME: FILENAME} DATASET = "eia176" -DATABASE_NAME = f"{DATASET}.sqlite" class FakeCsvExtractor(CsvExtractor): DATASET = DATASET - DATABASE_NAME = DATABASE_NAME def get_csv_extractor(): @@ -23,10 +21,10 @@ def get_csv_extractor(): @patch("pudl.extract.csv.pd") -@patch("pudl.extract.csv.ZipFile") -def test_csv_archive_load_table(mock_zipfile, mock_pd): - archive = CsvArchive(mock_zipfile) - res = archive.load_table(FILENAME) +def test_csv_extractor_read_source(mock_pd): + extractor = get_csv_extractor() + res = extractor.read_source(FILENAME) + mock_zipfile = extractor._zipfile mock_zipfile.open.assert_called_once_with(FILENAME) f = mock_zipfile.open.return_value.__enter__.return_value mock_pd.read_csv.assert_called_once_with(f) @@ -34,33 +32,10 @@ def test_csv_archive_load_table(mock_zipfile, mock_pd): assert df == res -def test_csv_reader_get_table_names(): - datastore = MagicMock() - reader = CsvReader(datastore, DATASET) - assert reader._table_file_map == TABLE_FILE_MAP - assert [TABLE_NAME] == reader.get_table_names() - - -def test_csv_reader_get_archive(): - datastore = MagicMock() - reader = CsvReader(datastore, DATASET) - archive = reader.get_archive() - zipfile = datastore.get_zipfile_resource(DATASET) - assert zipfile == archive.zipfile - - -def test_csv_extractor_get_csv_reader(): - extractor = get_csv_extractor() - datastore = MagicMock() - reader = extractor.get_csv_reader(datastore) - assert datastore == reader.datastore - assert reader.dataset == DATASET - - -@patch("pudl.extract.csv.CsvArchive") -def test_csv_extractor_extract(mock_archive): +def test_csv_extractor_extract(): extractor = get_csv_extractor() - raw_dfs = extractor.extract() - mock_archive.return_value.load_table.assert_called_once_with(FILENAME) - df = mock_archive.return_value.load_table.return_value + df = MagicMock() + with patch.object(CsvExtractor, "read_source", return_value=df) as mock_read_source: + raw_dfs = extractor.extract() + mock_read_source.assert_called_once_with(FILENAME) assert {TABLE_NAME: df} == raw_dfs From fe3fbb7c473b34b462cf2da50a4478b41d0e40ec Mon Sep 17 00:00:00 2001 From: David Mudrauskas Date: Thu, 16 Nov 2023 16:07:34 -0500 Subject: [PATCH 08/11] Remove extraneous files, undo find-replace error --- src/pudl/extract/eia191.py | 1 - src/pudl/extract/eia757.py | 1 - src/pudl/extract/ferc1.py | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) delete mode 100644 src/pudl/extract/eia191.py delete mode 100644 src/pudl/extract/eia757.py diff --git a/src/pudl/extract/eia191.py b/src/pudl/extract/eia191.py deleted file mode 100644 index 7d1b977a97..0000000000 --- a/src/pudl/extract/eia191.py +++ /dev/null @@ -1 +0,0 @@ -"""Extract EIA Form 191 data from CSVs.""" diff --git a/src/pudl/extract/eia757.py b/src/pudl/extract/eia757.py deleted file mode 100644 index e5d1fc8f92..0000000000 --- a/src/pudl/extract/eia757.py +++ /dev/null @@ -1 +0,0 @@ -"""Extract EIA Form 757 data from CSVs.""" diff --git a/src/pudl/extract/ferc1.py b/src/pudl/extract/ferc1.py index 5ce59744f8..386c6e78a1 100644 --- a/src/pudl/extract/ferc1.py +++ b/src/pudl/extract/ferc1.py @@ -301,7 +301,7 @@ def add_missing_respondents(self): # Write missing respondents back into SQLite. with self.sqlite_engine.begin() as conn: - conn.extract( + conn.execute( self.sqlite_meta.tables["f1_respondent_id"].insert().values(records) ) From 0b703c6f4fa7e1d41ee23ac4164f018086668fdd Mon Sep 17 00:00:00 2001 From: David Mudrauskas Date: Fri, 17 Nov 2023 13:42:15 -0500 Subject: [PATCH 09/11] Extract one table using CSV extractor --- src/pudl/extract/csv.py | 35 +++++++++++++++++------------------ src/pudl/extract/eia176.py | 8 +------- test/unit/extract/csv_test.py | 20 +++++++++++--------- 3 files changed, 29 insertions(+), 34 deletions(-) diff --git a/src/pudl/extract/csv.py b/src/pudl/extract/csv.py index bcccfa80a8..cb9ef7377c 100644 --- a/src/pudl/extract/csv.py +++ b/src/pudl/extract/csv.py @@ -14,25 +14,20 @@ class CsvExtractor: """Generalized class for extracting dataframes from CSV files. - When subclassing from this generic extractor, one should implement dataset specific - logic in the following manner: - - 2. Set DATASET class attribute. This is used to load metadata from package_data/{dataset} subdirectory. - The extraction logic is invoked by calling extract() method of this class. """ - DATASET = None - - def __init__(self, datastore: Datastore): + def __init__(self, datastore: Datastore, dataset: str): """Create a new instance of CsvExtractor. This can be used for retrieving data from CSV files. Args: datastore: provides access to raw files on disk. + dataset: used to load metadata from package_data/{dataset} subdirectory. """ - self._zipfile = datastore.get_zipfile_resource(self.DATASET) + self.dataset = dataset + self._zipfile = datastore.get_zipfile_resource(dataset) self._table_file_map = { row["table"]: row["filename"] for row in self._open_csv_resource("table_file_map.csv") @@ -40,23 +35,27 @@ def __init__(self, datastore: Datastore): def _open_csv_resource(self, base_filename: str) -> DictReader: """Open the given resource file as :class:`csv.DictReader`.""" - csv_path = resources.files(f"pudl.package_data.{self.DATASET}") / base_filename + csv_path = resources.files(f"pudl.package_data.{self.dataset}") / base_filename return DictReader(csv_path.open()) - def read_source(self, filename: str) -> pd.DataFrame: + def get_table_names(self) -> list[str]: + """Returns list of tables that this extractor provides access to.""" + return list(self._table_file_map) + + def extract_one(self, table_name: str) -> pd.DataFrame: """Read the data from the CSV source file and return as a dataframe.""" - logger.info(f"Extracting {filename} from CSV into pandas DataFrame.") + logger.info(f"Extracting {table_name} from CSV into pandas DataFrame.") + filename = self._table_file_map[table_name] with self._zipfile.open(filename) as f: df = pd.read_csv(f) return df - def extract(self) -> dict[str, pd.DataFrame]: + def extract_all(self) -> dict[str, pd.DataFrame]: """Extracts a dictionary of table names and dataframes from CSV source files.""" data = {} - for table in self._table_file_map: - filename = self._table_file_map[table] - df = self.read_source(filename) - data[table] = df + for table_name in self.get_table_names(): + df = self.extract_one(table_name) + data[table_name] = df return data @@ -80,7 +79,7 @@ def extract(context) -> dict[str, pd.DataFrame]: A dictionary of DataFrames extracted from CSV, keyed by table name. """ ds = context.resources.datastore - return extractor_cls(ds).extract() + return extractor_cls(ds, name).extract() return op( required_resource_keys={"datastore", "dataset_settings"}, diff --git a/src/pudl/extract/eia176.py b/src/pudl/extract/eia176.py index 7f76b013a4..383976fbbd 100644 --- a/src/pudl/extract/eia176.py +++ b/src/pudl/extract/eia176.py @@ -10,16 +10,10 @@ DATASET = "eia176" -class Eia176CsvExtractor(CsvExtractor): - """Extractor for EIA Form 176 data.""" - - DATASET = DATASET - - # TODO (davidmudrauskas): Add this information to the metadata raw_table_names = (f"raw_{DATASET}__company",) -eia176_raw_dfs = raw_df_factory(Eia176CsvExtractor, name=DATASET) +eia176_raw_dfs = raw_df_factory(CsvExtractor, name=DATASET) @multi_asset( diff --git a/test/unit/extract/csv_test.py b/test/unit/extract/csv_test.py index 454fda1930..b4a37530af 100644 --- a/test/unit/extract/csv_test.py +++ b/test/unit/extract/csv_test.py @@ -11,19 +11,21 @@ DATASET = "eia176" -class FakeCsvExtractor(CsvExtractor): - DATASET = DATASET - - def get_csv_extractor(): datastore = MagicMock() - return FakeCsvExtractor(datastore) + return CsvExtractor(datastore, DATASET) + + +def test_get_table_names(): + extractor = get_csv_extractor() + table_names = extractor.get_table_names() + assert [TABLE_NAME] == table_names @patch("pudl.extract.csv.pd") def test_csv_extractor_read_source(mock_pd): extractor = get_csv_extractor() - res = extractor.read_source(FILENAME) + res = extractor.extract_one(TABLE_NAME) mock_zipfile = extractor._zipfile mock_zipfile.open.assert_called_once_with(FILENAME) f = mock_zipfile.open.return_value.__enter__.return_value @@ -35,7 +37,7 @@ def test_csv_extractor_read_source(mock_pd): def test_csv_extractor_extract(): extractor = get_csv_extractor() df = MagicMock() - with patch.object(CsvExtractor, "read_source", return_value=df) as mock_read_source: - raw_dfs = extractor.extract() - mock_read_source.assert_called_once_with(FILENAME) + with patch.object(CsvExtractor, "extract_one", return_value=df) as mock_read_source: + raw_dfs = extractor.extract_all() + mock_read_source.assert_called_once_with(TABLE_NAME) assert {TABLE_NAME: df} == raw_dfs From cb8e7e10dfc714f1df52fbf7a981fb8f3ac5004c Mon Sep 17 00:00:00 2001 From: David Mudrauskas Date: Fri, 17 Nov 2023 15:23:14 -0500 Subject: [PATCH 10/11] Move managing zipfile and table-file map to CSV extractor client, simplify Dagster asset definition --- src/pudl/extract/csv.py | 91 +++++++++++------------------------ src/pudl/extract/eia176.py | 34 ++++--------- test/integration/etl_test.py | 7 ++- test/unit/extract/csv_test.py | 21 +++++--- 4 files changed, 59 insertions(+), 94 deletions(-) diff --git a/src/pudl/extract/csv.py b/src/pudl/extract/csv.py index cb9ef7377c..af7718e8c0 100644 --- a/src/pudl/extract/csv.py +++ b/src/pudl/extract/csv.py @@ -1,42 +1,55 @@ """Extractor for CSV data.""" from csv import DictReader from importlib import resources +from zipfile import ZipFile import pandas as pd -from dagster import AssetsDefinition, OpDefinition, graph_asset, op import pudl.logging_helpers -from pudl.workspace.datastore import Datastore logger = pudl.logging_helpers.get_logger(__name__) +def open_csv_resource(dataset: str, base_filename: str) -> DictReader: + """Open the given resource file as :class:`csv.DictReader`. + + Args: + dataset: used to load metadata from package_data/{dataset} subdirectory. + base_filename: the name of the file in the subdirectory to open. + """ + csv_path = resources.files(f"pudl.package_data.{dataset}") / base_filename + return DictReader(csv_path.open()) + + +def get_table_file_map(dataset: str) -> dict[str, str]: + """Return a dictionary of table names and filenames for the dataset. + + Args: + dataset: used to load metadata from package_data/{dataset} subdirectory. + """ + return { + row["table"]: row["filename"] + for row in open_csv_resource(dataset, "table_file_map.csv") + } + + class CsvExtractor: """Generalized class for extracting dataframes from CSV files. The extraction logic is invoked by calling extract() method of this class. """ - def __init__(self, datastore: Datastore, dataset: str): + def __init__(self, zipfile: ZipFile, table_file_map: dict[str, str]): """Create a new instance of CsvExtractor. This can be used for retrieving data from CSV files. Args: - datastore: provides access to raw files on disk. - dataset: used to load metadata from package_data/{dataset} subdirectory. + zipfile: zipfile object containing source files + table_file_map: map of table name to source file in zipfile archive """ - self.dataset = dataset - self._zipfile = datastore.get_zipfile_resource(dataset) - self._table_file_map = { - row["table"]: row["filename"] - for row in self._open_csv_resource("table_file_map.csv") - } - - def _open_csv_resource(self, base_filename: str) -> DictReader: - """Open the given resource file as :class:`csv.DictReader`.""" - csv_path = resources.files(f"pudl.package_data.{self.dataset}") / base_filename - return DictReader(csv_path.open()) + self._zipfile = zipfile + self._table_file_map = table_file_map def get_table_names(self) -> list[str]: """Returns list of tables that this extractor provides access to.""" @@ -57,49 +70,3 @@ def extract_all(self) -> dict[str, pd.DataFrame]: df = self.extract_one(table_name) data[table_name] = df return data - - -def extractor_factory(extractor_cls: type[CsvExtractor], name: str) -> OpDefinition: - """Construct a Dagster op that extracts data given an extractor class. - - Args: - extractor_cls: Class of type :class:`CsvExtractor` used to extract the data. - name: Name of a CSV-based dataset (e.g. "eia176"). - """ - - def extract(context) -> dict[str, pd.DataFrame]: - """A function that extracts data from a CSV file. - - This function will be decorated with a Dagster op and returned. - - Args: - context: Dagster keyword that provides access to resources and config. - - Returns: - A dictionary of DataFrames extracted from CSV, keyed by table name. - """ - ds = context.resources.datastore - return extractor_cls(ds, name).extract() - - return op( - required_resource_keys={"datastore", "dataset_settings"}, - name=f"extract_single_{name}_year", - )(extract) - - -def raw_df_factory(extractor_cls: type[CsvExtractor], name: str) -> AssetsDefinition: - """Return a dagster graph asset to extract a set of raw DataFrames from CSV files. - - Args: - extractor_cls: The dataset-specific CSV extractor used to extract the data. - Needs to correspond to the dataset identified by ``name``. - name: Name of a CSV-based dataset (e.g. "eia176"). Currently this must be - one of the attributes of :class:`pudl.settings.EiaSettings` - """ - extractor = extractor_factory(extractor_cls, name) - - def raw_dfs() -> dict[str, pd.DataFrame]: - """Produce a dictionary of extracted EIA dataframes.""" - return extractor() - - return graph_asset(name=f"{name}_raw_dfs")(raw_dfs) diff --git a/src/pudl/extract/eia176.py b/src/pudl/extract/eia176.py index 383976fbbd..aeab25026f 100644 --- a/src/pudl/extract/eia176.py +++ b/src/pudl/extract/eia176.py @@ -3,38 +3,24 @@ The EIA Form 176 archive also contains CSVs for EIA Form 191 and EIA Form 757. """ -from dagster import AssetOut, Output, multi_asset +from dagster import asset -from pudl.extract.csv import CsvExtractor, raw_df_factory +from pudl.extract.csv import CsvExtractor, get_table_file_map DATASET = "eia176" -# TODO (davidmudrauskas): Add this information to the metadata -raw_table_names = (f"raw_{DATASET}__company",) - -eia176_raw_dfs = raw_df_factory(CsvExtractor, name=DATASET) - - -@multi_asset( - outs={table_name: AssetOut() for table_name in sorted(raw_table_names)}, - required_resource_keys={"datastore", "dataset_settings"}, -) -def extract_eia176(context, eia176_raw_dfs): - """Extract EIA-176 data from CSV source and return dataframes. +@asset(required_resource_keys={"datastore"}) +def raw_eia176__company(context): + """Extract raw EIA company data from CSV sheets into dataframes. Args: context: dagster keyword that provides access to resources and config. Returns: - A tuple of extracted EIA dataframes. + An extracted EIA dataframe with company data. """ - eia176_raw_dfs = { - f"raw_{DATASET}__" + table_name: df for table_name, df in eia176_raw_dfs.items() - } - eia176_raw_dfs = dict(sorted(eia176_raw_dfs.items())) - - return ( - Output(output_name=table_name, value=df) - for table_name, df in eia176_raw_dfs.items() - ) + zipfile = context.resources.datastore.get_zipfile_resource(DATASET) + table_file_map = get_table_file_map(DATASET) + extractor = CsvExtractor(zipfile, table_file_map) + extractor.extract_one("company") diff --git a/test/integration/etl_test.py b/test/integration/etl_test.py index 6a0ae8d39b..6427dd4f5e 100644 --- a/test/integration/etl_test.py +++ b/test/integration/etl_test.py @@ -73,9 +73,12 @@ class TestCsvExtractor: def test_extract_eia176(self, pudl_datastore_fixture): """Spot check extraction of eia176 csv files.""" - extractor = pudl.extract.eia176.Eia176CsvExtractor(pudl_datastore_fixture) + dataset = "eia176" + zipfile = pudl_datastore_fixture.get_zipfile_resource(dataset) + table_file_map = pudl.extract.csv.get_table_file_map(dataset) + extractor = pudl.extract.csv.CsvExtractor(zipfile, table_file_map) table = "company" - if table not in extractor.extract(): + if table not in extractor.extract_all(): raise AssertionError(f"table {table} not found in datastore") diff --git a/test/unit/extract/csv_test.py b/test/unit/extract/csv_test.py index b4a37530af..2b07b0bf30 100644 --- a/test/unit/extract/csv_test.py +++ b/test/unit/extract/csv_test.py @@ -1,19 +1,28 @@ """Unit tests for pudl.extract.csv module.""" from unittest.mock import MagicMock, patch -from pudl.extract.csv import CsvExtractor +from pudl.extract.csv import CsvExtractor, get_table_file_map, open_csv_resource +DATASET = "eia176" +BASE_FILENAME = "table_file_map.csv" TABLE_NAME = "company" - FILENAME = "all_company_176.csv" TABLE_FILE_MAP = {TABLE_NAME: FILENAME} -DATASET = "eia176" - def get_csv_extractor(): - datastore = MagicMock() - return CsvExtractor(datastore, DATASET) + zipfile = MagicMock() + return CsvExtractor(zipfile, TABLE_FILE_MAP) + + +def test_open_csv_resource(): + csv_resource = open_csv_resource(DATASET, BASE_FILENAME) + assert ["table", "filename"] == csv_resource.fieldnames + + +def test_get_table_file_map(): + table_file_map = get_table_file_map(DATASET) + assert table_file_map == TABLE_FILE_MAP def test_get_table_names(): From 35de6a89c6bccdebb35a6f2c52689b206803c665 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 1 Dec 2023 17:45:13 +0000 Subject: [PATCH 11/11] [pre-commit.ci] auto fixes from pre-commit.com hooks For more information, see https://pre-commit.ci --- src/pudl/package_data/eia176/table_file_map.csv | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pudl/package_data/eia176/table_file_map.csv b/src/pudl/package_data/eia176/table_file_map.csv index 73e45fcd5e..343bdb33c0 100644 --- a/src/pudl/package_data/eia176/table_file_map.csv +++ b/src/pudl/package_data/eia176/table_file_map.csv @@ -1,2 +1,2 @@ table,filename -company,all_company_176.csv \ No newline at end of file +company,all_company_176.csv