diff --git a/.gitignore b/.gitignore index 07cbad2a5b..2c0293ffae 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,4 @@ notebooks/*.tgz **.tfstate.* terraform/.terraform/* .env +.hypothesis/ diff --git a/pyproject.toml b/pyproject.toml index 063f26d6a5..7a3f3a9838 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ dependencies = [ "boto3>=1.28.55", "bottleneck>=1.3.4", # pandas[performance] "catalystcoop.dbfread>=3.0,<3.1", - "catalystcoop.ferc-xbrl-extractor>=1.1.1,<1.2", + "catalystcoop.ferc-xbrl-extractor>=1.2.0,<2", "coloredlogs>=14.0,<15.1", # Dagster requires 14.0 "dagster-webserver>=1.4,<1.6", "dagster>=1.4,<1.6", @@ -30,6 +30,7 @@ dependencies = [ "grpcio==1.57.0", # Required by dagster. Version works with MacOS "grpcio-health-checking==1.57.0", # Required by dagster. Version works with MacOS "grpcio-status==1.57.0", # Required by dagster. Version works with MacOS + "hypothesis>=6.87,<7.0", "jellyfish>=1.0.1,<1.1", # recordlinkage dependency "jinja2>=3,<3.2", "matplotlib>=3.6.1,<3.9", @@ -39,6 +40,7 @@ dependencies = [ "numpy>=1.24,<2.0a0", "openpyxl>=3.0.10", # pandas[excel] "pandas[parquet,excel,fss,gcp,compression]>=2,<2.2", + "pandera>=0.17,<1.0", "pyarrow>=13,<14", # pandas[parquet] "pydantic>=1.7,<2", "python-dotenv>=1,<1.1", diff --git a/src/pudl/helpers.py b/src/pudl/helpers.py index 56bbcf7bdc..343cabda5a 100644 --- a/src/pudl/helpers.py +++ b/src/pudl/helpers.py @@ -1798,3 +1798,31 @@ def scale_by_ownership( gens["fraction_owned"], axis="index" ) return gens + + +def assert_cols_areclose( + df: pd.DataFrame, + a_cols: list[str], + b_cols: list[str], + mismatch_threshold: float, + message: str, +): + """Check if two column sets of a dataframe are close to each other. + + Ignores NANs and raises if there are too many mismatches. + """ + # we use df.loc, so if we use a debugger in here we can see the actual data + # instead of just whether or not there are matches. + mismatch = df.loc[ + ~np.isclose( + np.ma.masked_where(np.isnan(df[a_cols]), df[a_cols]), + np.ma.masked_where(np.isnan(df[b_cols]), df[b_cols]), + equal_nan=True, + ).filled() + ] + mismatch_ratio = len(mismatch) / len(df) + if mismatch_ratio > mismatch_threshold: + raise AssertionError( + f"{message} Mismatch ratio {mismatch_ratio:.01%} > " + f"threshold {mismatch_threshold:.01%}." + ) diff --git a/src/pudl/io_managers.py b/src/pudl/io_managers.py index 1d8fd28328..a8b8512c61 100644 --- a/src/pudl/io_managers.py +++ b/src/pudl/io_managers.py @@ -1,4 +1,5 @@ """Dagster IO Managers.""" +import json import re from pathlib import Path from sqlite3 import sqlite_version @@ -643,6 +644,7 @@ def load_input(self, context: InputContext) -> pd.DataFrame: context: dagster keyword that provides access output information like asset name. """ + # TODO (daz): this is hard-coded to FERC1, though this is nominally for all FERC datasets. ferc1_settings = context.resources.dataset_settings.ferc1 table_name = self._get_table_name(context) @@ -683,6 +685,105 @@ class FercXBRLSQLiteIOManager(FercSQLiteIOManager): metadata. """ + @staticmethod + def filter_for_freshest_data( + table: pd.DataFrame, primary_key: list[str] + ) -> pd.DataFrame: + """Get most updated values for each XBRL context. + + An XBRL context includes an entity ID, the time period the data applies + to, and other dimensions such as utility type. Each context has its own + ID, but they are frequently redefined with the same contents but + different IDs - so we identify them by their actual content. + + Each row in our SQLite database includes all the facts for one + context/filing pair. + + If one context is represented in multiple filings, we take the facts from the most recently-published filing. + + This means that if a recently-published filing does not include a value for a fact that was previously reported, then that value will remain null. We do not + forward-fill facts on a fact-by-fact basis. + """ + filing_metadata_cols = {"publication_time", "filing_name"} + xbrl_context_cols = [c for c in primary_key if c not in filing_metadata_cols] + # we do this in multiple stages so we can log the drop-off at each stage. + stages = [ + { + "message": "completely duplicated rows", + "subset": table.columns, + }, + { + "message": "rows that are exactly the same in multiple filings", + "subset": [c for c in table.columns if c not in filing_metadata_cols], + }, + { + "message": "rows that were updated by later filings", + "subset": xbrl_context_cols, + }, + ] + original = table.sort_values("publication_time") + for stage in stages: + deduped = original.drop_duplicates(subset=stage["subset"], keep="last") + logger.debug(f"Dropped {len(original) - len(deduped)} {stage['message']}") + original = deduped + + return deduped + + @staticmethod + def refine_report_year(df: pd.DataFrame, xbrl_years: list[int]) -> pd.DataFrame: + """Set a fact's report year by its actual dates. + + Sometimes a fact belongs to a context which has no ReportYear + associated with it; other times there are multiple ReportYears + associated with a single filing. In these cases the report year of a + specific fact may be associated with the other years in the filing. + + In many cases we can infer the actual report year from the fact's + associated time period - either duration or instant. + """ + is_duration = len({"start_date", "end_date"} - set(df.columns)) == 0 + is_instant = "date" in df.columns + + def get_year(df: pd.DataFrame, col: str) -> pd.Series: + datetimes = pd.to_datetime(df.loc[:, col]) + if datetimes.isna().any(): + raise ValueError(f"{col} has null values!") + return datetimes.apply(lambda x: x.year) + + if is_duration: + start_years = get_year(df, "start_date") + end_years = get_year(df, "end_date") + if not (start_years == end_years).all(): + raise ValueError("start_date and end_date are in different years!") + new_report_years = start_years + elif is_instant: + new_report_years = get_year(df, "date") + else: + raise ValueError("Attempted to read a non-instant, non-duration table.") + + # we include XBRL data from before our "officially supported" XBRL + # range because we want to use it to set start-of-year values for the + # first XBRL year. + xbrl_years_plus_one_previous = [min(xbrl_years) - 1] + xbrl_years + return ( + df.assign(report_year=new_report_years) + .loc[lambda df: df.report_year.isin(xbrl_years_plus_one_previous)] + .reset_index(drop=True) + ) + + def _get_primary_key(self, sched_table_name: str) -> list[str]: + # TODO (daz): as of 2023-10-13, our datapackage.json is merely + # "frictionless-like" so we manually parse it as JSON. once we make our + # datapackage.json conformant, we will need to at least update the + # "primary_key" to "primaryKey", but maybe there will be other changes + # as well. + with (self.base_dir / f"{self.db_name}_datapackage.json").open() as f: + datapackage = json.loads(f.read()) + [table_resource] = [ + tr for tr in datapackage["resources"] if tr["name"] == sched_table_name + ] + return table_resource["schema"]["primary_key"] + def handle_output(self, context: OutputContext, obj: pd.DataFrame | str): """Handle an op or asset output.""" raise NotImplementedError("FercXBRLSQLiteIOManager can't write outputs yet.") @@ -694,6 +795,7 @@ def load_input(self, context: InputContext) -> pd.DataFrame: context: dagster keyword that provides access output information like asset name. """ + # TODO (daz): this is hard-coded to FERC1, though this is nominally for all FERC datasets. ferc1_settings = context.resources.dataset_settings.ferc1 table_name = self._get_table_name(context) @@ -709,23 +811,27 @@ def load_input(self, context: InputContext) -> pd.DataFrame: engine = self.engine - id_table = "identification_001_duration" - sched_table_name = re.sub("_instant|_duration", "", table_name) with engine.connect() as con: - return pd.read_sql( - f""" - SELECT {table_name}.*, {id_table}.report_year FROM {table_name} - JOIN {id_table} ON {id_table}.filing_name = {table_name}.filing_name - WHERE {id_table}.report_year BETWEEN :min_year AND :max_year; - """, # noqa: S608 - table names not supplied by user + df = pd.read_sql( + f"SELECT {table_name}.* FROM {table_name}", # noqa: S608 - table names not supplied by user con=con, - params={ - "min_year": min(ferc1_settings.xbrl_years), - "max_year": max(ferc1_settings.xbrl_years), - }, ).assign(sched_table_name=sched_table_name) + primary_key = self._get_primary_key(table_name) + + return ( + df.pipe( + FercXBRLSQLiteIOManager.filter_for_freshest_data, + primary_key=primary_key, + ) + .pipe( + FercXBRLSQLiteIOManager.refine_report_year, + xbrl_years=ferc1_settings.xbrl_years, + ) + .drop(columns=["publication_time"]) + ) + @io_manager(required_resource_keys={"dataset_settings"}) def ferc1_xbrl_sqlite_io_manager(init_context) -> FercXBRLSQLiteIOManager: diff --git a/src/pudl/metadata/sources.py b/src/pudl/metadata/sources.py index 87f2382510..af1f4c3873 100644 --- a/src/pudl/metadata/sources.py +++ b/src/pudl/metadata/sources.py @@ -365,7 +365,7 @@ }, "field_namespace": "ferc1", "working_partitions": { - "years": sorted(set(range(1994, 2022))), + "years": sorted(set(range(1994, 2023))), }, "contributors": [ CONTRIBUTORS["catalyst-cooperative"], @@ -405,7 +405,7 @@ # Years 1991-1995 use strange formats that need to be investigated further. # Years 1996-1999 come in split archives and full archives and we are going # to be using the aggregated archives (part=None). - "years": sorted(set(range(1996, 2022))), + "years": sorted(set(range(1996, 2023))), }, "contributors": [ CONTRIBUTORS["catalyst-cooperative"], @@ -463,7 +463,7 @@ ), "field_namespace": "ferc60", "working_partitions": { - "years": sorted(set(range(2006, 2022))), + "years": sorted(set(range(2006, 2023))), }, "contributors": [ CONTRIBUTORS["catalyst-cooperative"], diff --git a/src/pudl/package_data/glue/pudl_id_mapping.xlsx b/src/pudl/package_data/glue/pudl_id_mapping.xlsx index cb43d619fe..c181119ea2 100644 Binary files a/src/pudl/package_data/glue/pudl_id_mapping.xlsx and b/src/pudl/package_data/glue/pudl_id_mapping.xlsx differ diff --git a/src/pudl/package_data/glue/utility_id_ferc1.csv b/src/pudl/package_data/glue/utility_id_ferc1.csv index cde4d3f71f..a0623757ce 100644 --- a/src/pudl/package_data/glue/utility_id_ferc1.csv +++ b/src/pudl/package_data/glue/utility_id_ferc1.csv @@ -151,7 +151,7 @@ utility_id_ferc1,utility_id_ferc1_xbrl,utility_id_ferc1_dbf 150,,1001 151,,1003 152,C000029,231 -153,C000030,447 +153,C000030,255 154,C000038,250 155,C000041,161 156,C000045,294 @@ -402,7 +402,7 @@ utility_id_ferc1,utility_id_ferc1_xbrl,utility_id_ferc1_dbf 408,R001052,52 409,R001072,72 411,R001087,87 -414,R001140,140 +414,C011644,140 419,R001268,268 420,R001298,298 421,R001301,301 @@ -412,10 +412,12 @@ utility_id_ferc1,utility_id_ferc1_xbrl,utility_id_ferc1_dbf 428,R001419,419 429,R001422,422 436,R001445,445 -437,,255 439,R001515,515 440,R001520,520 441,R001521,521 442,,278 443,C010845, 444,C011304, +445,C011745,447 +446,C004679, +447,C011785, \ No newline at end of file diff --git a/src/pudl/package_data/glue/utility_id_pudl.csv b/src/pudl/package_data/glue/utility_id_pudl.csv index a00b1662a5..d6006d3167 100644 --- a/src/pudl/package_data/glue/utility_id_pudl.csv +++ b/src/pudl/package_data/glue/utility_id_pudl.csv @@ -256,7 +256,7 @@ utility_id_pudl,utility_id_ferc1,utility_name_ferc1,utility_id_eia,utility_name_ 258,111,Pinnacle West Marketing & Trading Co. LLC,, 259,414,Pioneer Power and Light Company,, 260,153,"PJM Interconnection, LLC",, -261,437,"PJM Settlement, Inc.",, +261,445,"PJM Settlement, Inc.",, 262,84,Platte-Clay Electric Cooperative Inc.,, 263,250,Portland General Electric Company,15248,Portland General Electric Co 264,290,Potomac Electric Power Company,, @@ -15826,3 +15826,5 @@ utility_id_pudl,utility_id_ferc1,utility_name_ferc1,utility_id_eia,utility_name_ 15868,,,65633,"BKV Barnett, LLC" 15869,,,65634,Highpeak Solar LLC 15870,,,1937004, +15871,446,"Transource Oklahoma, LLC",, +15872,447,"Mountrail-Williams Electric Cooperative",, diff --git a/src/pudl/package_data/settings/etl_fast.yml b/src/pudl/package_data/settings/etl_fast.yml index a0629e4176..6a9f1d7a92 100644 --- a/src/pudl/package_data/settings/etl_fast.yml +++ b/src/pudl/package_data/settings/etl_fast.yml @@ -5,23 +5,23 @@ ferc_to_sqlite_settings: ferc1_dbf_to_sqlite_settings: # What years of original FERC data should be cloned into the SQLite DB? - years: [2020] + years: [2019, 2020] ferc1_xbrl_to_sqlite_settings: - years: [2021] + years: [2021, 2022] ferc2_dbf_to_sqlite_settings: - years: [2020] + years: [2019, 2020] ferc2_xbrl_to_sqlite_settings: - years: [2021] + years: [2021, 2022] ferc6_dbf_to_sqlite_settings: - years: [2020] + years: [2019, 2020] ferc6_xbrl_to_sqlite_settings: - years: [2021] + years: [2021, 2022] ferc60_dbf_to_sqlite_settings: - years: [2020] + years: [2019, 2020] ferc60_xbrl_to_sqlite_settings: - years: [2021] + years: [2021, 2022] ferc714_xbrl_to_sqlite_settings: - years: [2021] + years: [2021, 2022] ########################################################################### # Settings for pudl_etl script @@ -34,7 +34,7 @@ description: > version: 0.1.0 datasets: ferc1: - years: [2020, 2021] + years: [2020, 2021, 2022] ferc714: years: [2019, 2020] eia: diff --git a/src/pudl/package_data/settings/etl_full.yml b/src/pudl/package_data/settings/etl_full.yml index e45eb82ebb..070fecdbfb 100644 --- a/src/pudl/package_data/settings/etl_full.yml +++ b/src/pudl/package_data/settings/etl_full.yml @@ -38,7 +38,7 @@ ferc_to_sqlite_settings: # A list of tables to be loaded into the local SQLite database. These are # the table names as they appear in the 2015 FERC Form 1 database. ferc1_xbrl_to_sqlite_settings: - years: [2021] + years: [2021, 2022] # A list of tables to be loaded into the local SQLite database. These are # the table names as created from the 2022 XBRL taxonomy. ferc2_dbf_to_sqlite_settings: @@ -71,7 +71,7 @@ ferc_to_sqlite_settings: 2020, ] ferc2_xbrl_to_sqlite_settings: - years: [2021] + years: [2021, 2022] ferc6_dbf_to_sqlite_settings: years: [ @@ -98,7 +98,7 @@ ferc_to_sqlite_settings: 2020, ] ferc6_xbrl_to_sqlite_settings: - years: [2021] + years: [2021, 2022] ferc60_dbf_to_sqlite_settings: years: [ @@ -119,9 +119,9 @@ ferc_to_sqlite_settings: 2020, ] ferc60_xbrl_to_sqlite_settings: - years: [2021] + years: [2021, 2022] ferc714_xbrl_to_sqlite_settings: - years: [2021] + years: [2021, 2022] ########################################################################### # Settings for pudl_etl script @@ -164,6 +164,7 @@ datasets: 2019, 2020, 2021, + 2022, ] ferc714: years: diff --git a/src/pudl/transform/ferc1.py b/src/pudl/transform/ferc1.py index ae818d4b46..6ad17c9990 100644 --- a/src/pudl/transform/ferc1.py +++ b/src/pudl/transform/ferc1.py @@ -29,7 +29,7 @@ plants_steam_validate_ids, ) from pudl.extract.ferc1 import TABLE_NAME_MAP_FERC1 -from pudl.helpers import convert_cols_dtypes +from pudl.helpers import assert_cols_areclose, convert_cols_dtypes from pudl.metadata.fields import apply_pudl_dtypes from pudl.settings import Ferc1Settings from pudl.transform.classes import ( @@ -599,12 +599,8 @@ def unstack_balances_to_report_year_instant_xbrl( if not params.unstack_balances_to_report_year: return df - df["year"] = pd.to_datetime(df["date"]).dt.year - # Check that the originally reported records are annually unique. - # year and report_year aren't necessarily the same since previous year data - # is often reported in the current report year, but we're constructing a table - # where report_year is part of the primary key, so we have to do this: - unique_cols = [c for c in primary_key_cols if c != "report_year"] + ["year"] + # report year always corresponds to the year of "date" + unique_cols = set(primary_key_cols).union({"report_year"}) if df.duplicated(unique_cols).any(): raise AssertionError( "Looks like there are multiple entries per year--not sure which to use " @@ -615,28 +611,26 @@ def unstack_balances_to_report_year_instant_xbrl( "Looks like there are some values in here that aren't from the end of " "the year. We can't use those to calculate start and end balances." ) - df.loc[df.report_year == (df.year + 1), "balance_type"] = "starting_balance" - df.loc[df.report_year == df.year, "balance_type"] = "ending_balance" - if df.balance_type.isna().any(): - # Remove rows from years that are not representative of start/end dates - # for a given report year (i.e., the report year and one year prior). - logger.warning( - f"Dropping unexpected years: " - f"{df.loc[df.balance_type.isna(), 'year'].unique()}" - ) - df = df[df["balance_type"].notna()].copy() - df = ( - df.drop(["year", "date"], axis="columns") + + ending_balances = df.assign(balance_type="ending_balance") + starting_balances = df.assign( + report_year=df.report_year + 1, balance_type="starting_balance" + ) + all_balances = pd.concat([starting_balances, ending_balances]) + # for the first year, we expect no starting balances; for the last year, we expect no ending balances. + first_last_year_stripped = all_balances.loc[ + lambda df: ~df.report_year.isin({df.report_year.min(), df.report_year.max()}) + ] + unstacked_by_year = ( + first_last_year_stripped.drop(columns=["date"]) .set_index(primary_key_cols + ["balance_type", "sched_table_name"]) .unstack("balance_type") ) - # This turns a multi-index into a single-level index with tuples of strings - # as the keys, and then converts the tuples of strings into a single string - # by joining their values with an underscore. This results in column labels - # like boiler_plant_equipment_steam_production_starting_balance - df.columns = ["_".join(items) for items in df.columns.to_flat_index()] - df = df.reset_index() - return df + # munge multi-index into flat index, separated by _ + unstacked_by_year.columns = [ + "_".join(items) for items in unstacked_by_year.columns.to_flat_index() + ] + return unstacked_by_year.reset_index() class CombineAxisColumnsXbrl(TransformParams): @@ -2163,7 +2157,8 @@ def merge_instant_and_duration_tables_xbrl( ], axis="columns", ).reset_index() - return out_df + + return out_df.loc[out_df.report_year.isin(Ferc1Settings().xbrl_years)] @cache_df("process_instant_xbrl") def process_instant_xbrl(self, df: pd.DataFrame) -> pd.DataFrame: @@ -4538,6 +4533,15 @@ class RetainedEarningsFerc1TableTransformer(Ferc1AbstractTableTransformer): table_id: TableIdFerc1 = TableIdFerc1.RETAINED_EARNINGS_FERC1 has_unique_record_ids: bool = False + current_year_types: set[str] = { + "unappropriated_undistributed_subsidiary_earnings", + "unappropriated_retained_earnings", + } + previous_year_types: set[str] = { + "unappropriated_undistributed_subsidiary_earnings_previous_year", + "unappropriated_retained_earnings_previous_year", + } + def convert_xbrl_metadata_json_to_df( self: Self, xbrl_metadata_json: dict[Literal["instant", "duration"], list[dict[str, Any]]], @@ -4619,6 +4623,82 @@ def transform_main(self, df): df = super().transform_main(df).pipe(self.add_previous_year_factoid) return df + def transform_end(self, df: pd.DataFrame) -> pd.DataFrame: + """Check ``_previous_year`` factoids for consistency after the transformation is done.""" + return super().transform_end(df).pipe(self.check_double_year_earnings_types) + + def check_double_year_earnings_types(self, df: pd.DataFrame) -> pd.DataFrame: + """Check previous year/current year factoids for consistency. + + The terminology can be very confusing - here are the expectations: + + 1. "inter year consistency": earlier year's "current starting/end + balance" == later year's "previous starting/end balance" + 2. "intra year consistency": each year's "previous ending balance" == + "current starting balance" + """ + current_year_facts = df.loc[df.earnings_type.isin(self.current_year_types)] + previous_year_facts = df.loc[ + df.earnings_type.isin(self.previous_year_types) + ].pipe( + lambda df: df.assign( + earnings_type=df.earnings_type.str.removesuffix("_previous_year") + ) + ) + + # inter year comparison requires us to match the earlier year's current facts + # to the later year's previous facts, so we add 1 to the report year & merge. + earlier_years = current_year_facts.assign( + report_year=current_year_facts.report_year + 1 + ) + later_years = previous_year_facts + idx = ["utility_id_ferc1", "report_year", "earnings_type"] + inter_year_facts = earlier_years.merge( + later_years, + on=idx, + suffixes=["_earlier", "_later"], + ).dropna( + subset=[ + "starting_balance_earlier", + "starting_balance_later", + "ending_balance_earlier", + "ending_balance_later", + ] + ) + + intra_year_facts = previous_year_facts.merge( + current_year_facts, on=idx, suffixes=["_previous", "_current"] + ) + + assert_cols_areclose( + df=inter_year_facts, + a_cols=["starting_balance_earlier"], + b_cols=["starting_balance_later"], + mismatch_threshold=0.05, + message="'Current starting balance' for year X-1 doesn't match " + "'previous starting balance' for year X.", + ) + + assert_cols_areclose( + df=inter_year_facts, + a_cols=["ending_balance_earlier"], + b_cols=["ending_balance_later"], + mismatch_threshold=0.05, + message="'Current ending balance' for year X-1 doesn't match " + "'previous ending balance' for year X.", + ) + + assert_cols_areclose( + df=intra_year_facts, + a_cols=["ending_balance_previous"], + b_cols=["starting_balance_current"], + mismatch_threshold=0.05, + message="'Previous year ending balance' should be the same as " + "'current year starting balance' for all years!", + ) + + return df + def targeted_drop_duplicates_dbf(self, df: pd.DataFrame) -> pd.DataFrame: """Drop duplicates with truly duplicate data. @@ -4677,6 +4757,7 @@ def reconcile_double_year_earnings_types_dbf( earnings types. """ logger.info(f"{self.table_id.value}: Reconciling previous year's data.") + # DBF has _current_year suffix while PUDL core version does not current_year_types = [ "unappropriated_undistributed_subsidiary_earnings_current_year", "unappropriated_retained_earnings_current_year", @@ -4685,7 +4766,7 @@ def reconcile_double_year_earnings_types_dbf( "unappropriated_undistributed_subsidiary_earnings_previous_year", "unappropriated_retained_earnings_previous_year", ] - # assign copies so no need to double copy when extracting this slice + # assign() copies, so no need to double copy when extracting this slice current_year = df[df.earnings_type.isin(current_year_types)].assign( earnings_type=lambda x: x.earnings_type.str.removesuffix("_current_year") ) @@ -4751,80 +4832,41 @@ def reconcile_double_year_earnings_types_dbf( return df def add_previous_year_factoid(self, df: pd.DataFrame) -> pd.DataFrame: - """Add `previous_year` factoids to XBRL data from prior year's DBF data.""" - current_year_types = [ - "unappropriated_undistributed_subsidiary_earnings", - "unappropriated_retained_earnings", - ] - previous_year_types = [ - "unappropriated_undistributed_subsidiary_earnings_previous_year", - "unappropriated_retained_earnings_previous_year", - ] - # If previous_year type factoids aren't in all report_years, make factoids - # for these years. Raise exception if more than one year. - [missing_year] = [ - year - for year in df[ - df.earnings_type.isin(current_year_types) - ].report_year.unique() - if year - not in df[df.earnings_type.isin(previous_year_types)].report_year.unique() - ] + """Create ``*_previous_year`` factoids for XBRL data. - current_year = df[ - (df.report_year == missing_year) - & (df.earnings_type.isin(current_year_types)) - ] - previous_year = df[ - (df.report_year == missing_year - 1) - & (df.earnings_type.isin(current_year_types)) + XBRL doesn't include the previous year's data, but DBF does - so we try to + check that year X's ``*_current_year`` factoid has the same value as year X+1's + ``*_previous_year`` factoid. + + To do this, we need to add some ``*_previous_year`` factoids to the XBRL data. + """ + current_year_facts = df[df.earnings_type.isin(self.current_year_types)] + previous_year_facts = df[df.earnings_type.isin(self.previous_year_types)] + + missing_years = set(current_year_facts.report_year.unique()) - set( + previous_year_facts.report_year.unique() + ) + + to_copy_forward = current_year_facts[ + (current_year_facts.report_year + 1).isin(missing_years) ] idx = [ "utility_id_ferc1", "earnings_type", + "report_year", ] - # This only works if there are two years of data, thus the assertion above. - data_columns = ["starting_balance", "ending_balance"] - metadata_columns = [ - "balance", - "xbrl_factoid_original", - "is_within_table_calc", - "row_type_xbrl", - ] - date_dupe_types = pd.merge( - current_year.loc[:, ~current_year.columns.isin(metadata_columns)], - previous_year[idx + data_columns], - on=idx, - how="inner", - suffixes=("_original", ""), - ).drop(columns=["starting_balance_original", "ending_balance_original"]) - - date_dupe_types["earnings_type"] = date_dupe_types["earnings_type"].apply( - lambda x: f"{x}_previous_year" - ) - - # Add in metadata that matches that of prior year's `previous_year` factoids - # These should be consistent. - previous_factoid_metadata = df.loc[ - (df.report_year == missing_year - 1) - & (df.earnings_type.str.contains("_previous_year")) - ] - date_dupe_types = pd.merge( - date_dupe_types, - previous_factoid_metadata[idx + metadata_columns], - on=idx, - how="left", + inferred_previous_year_facts = ( + to_copy_forward.assign( + report_year=to_copy_forward.report_year + 1, + new_earnings_type=to_copy_forward.earnings_type + "_previous_year", + ) + .merge(current_year_facts[idx]) + .drop(columns=["earnings_type"]) + .rename(columns={"new_earnings_type": "earnings_type"}) + .assign(row_type_xbrl="reported_value") ) - - df = pd.concat([df, date_dupe_types]) - - # All `previous_year` factoids are missing `row_type_xbrl`. Fill in. - df.loc[ - df.earnings_type.isin(previous_year_types), "row_type_xbrl" - ] = "reported_value" - - return df + return pd.concat([df, inferred_previous_year_facts]) def deduplicate_xbrl_factoid_xbrl_metadata(self, tbl_meta) -> pd.DataFrame: """Deduplicate the xbrl_metadata based on the ``xbrl_factoid``. diff --git a/src/pudl/transform/params/ferc1.py b/src/pudl/transform/params/ferc1.py index 290af85d9a..3c0b9f7d23 100644 --- a/src/pudl/transform/params/ferc1.py +++ b/src/pudl/transform/params/ferc1.py @@ -328,6 +328,7 @@ "categories": { "coal": { "bit coal", + "bit-coal", "ciak", "coa", "coal", diff --git a/src/pudl/validate.py b/src/pudl/validate.py index 3fe248f680..7e3d7f79d8 100644 --- a/src/pudl/validate.py +++ b/src/pudl/validate.py @@ -832,9 +832,9 @@ def plot_vs_agg(orig_df, agg_df, validation_cases): "data_col": "capability_ratio", "weight_col": "", }, - { + { # XBRL data (post-2021) reports 0 capability for ~22% of plants, so we exclude. "title": "Capability Ratio (tails)", - "query": "", + "query": "report_year < 2021 | plant_capability_mw > 0", "low_q": 0.05, "low_bound": 0.5, "hi_q": 0.95, diff --git a/test/integration/etl_test.py b/test/integration/etl_test.py index f5781a4abe..21ea53dd13 100644 --- a/test/integration/etl_test.py +++ b/test/integration/etl_test.py @@ -191,6 +191,6 @@ def test_extract_xbrl(self, ferc1_engine_dbf): for table_type, df in xbrl_tables.items(): # Some raw xbrl tables are empty if not df.empty and table_type == "duration": - assert (df.report_year >= 2021).all() and ( + assert (df.report_year >= 2020).all() and ( df.report_year < 2022 ).all(), f"Unexpected years found in table: {table_name}" diff --git a/test/integration/glue_test.py b/test/integration/glue_test.py index 08d9c77734..2d8a7397e8 100644 --- a/test/integration/glue_test.py +++ b/test/integration/glue_test.py @@ -56,7 +56,7 @@ def plants_ferc1_raw(dataset_settings_config) -> pd.DataFrame: @pytest.fixture(scope="module") def glue_test_dfs( - pudl_out, + pudl_out: PudlTabl, ferc1_engine_xbrl, ferc1_engine_dbf, etl_settings, diff --git a/test/unit/io_managers_test.py b/test/unit/io_managers_test.py index a716aa01b4..28042a7373 100644 --- a/test/unit/io_managers_test.py +++ b/test/unit/io_managers_test.py @@ -1,11 +1,17 @@ """Test Dagster IO Managers.""" +import datetime +import json + +import hypothesis import pandas as pd +import pandera import pytest import sqlalchemy as sa from dagster import AssetKey, build_input_context, build_output_context from sqlalchemy.exc import IntegrityError, OperationalError from pudl.io_managers import ( + FercXBRLSQLiteIOManager, ForeignKeyError, ForeignKeyErrors, PudlSQLiteIOManager, @@ -249,3 +255,251 @@ def test_error_when_reading_view_without_metadata(pudl_sqlite_io_manager_fixture input_context = build_input_context(asset_key=AssetKey(asset_key)) with pytest.raises(ValueError): pudl_sqlite_io_manager_fixture.load_input(input_context) + + +def test_ferc_xbrl_sqlite_io_manager_dedupes(mocker, tmp_path): + db_path = tmp_path / "test_db.sqlite" + # fake datapackage descriptor just to see if we can find the primary keys - + # lots of optional stuff dropped. + datapackage = json.dumps( + { + "name": "test_db", + "title": "Ferc1 data extracted from XBRL filings", + "resources": [ + { + "path": f"{db_path}", + "name": "test_table_instant", + "schema": { + "fields": [ + { + "name": "entity_id", + "type": "string", + }, + { + "name": "utility_type_axis", + "type": "string", + }, + { + "name": "filing_name", + "type": "string", + }, + { + "name": "publication_time", + "type": "datetime", + }, + { + "name": "date", + "type": "date", + }, + { + "name": "str_factoid", + "type": "string", + }, + ], + "primary_key": [ + "entity_id", + "filing_name", + "publication_time", + "date", + "utility_type_axis", + ], + }, + } + ], + } + ) + + datapackage_path = tmp_path / "test_db_datapackage.json" + with datapackage_path.open("w") as f: + f.write(datapackage) + + df = pd.DataFrame.from_records( + [ + { + "entity_id": "C000001", + "utility_type_axis": "electric", + "filing_name": "Utility_Co_0001", + "date": datetime.date(2021, 12, 31), + "publication_time": datetime.datetime(2022, 2, 1, 0, 0, 0), + "str_factoid": "original 2021 EOY value", + }, + { + "entity_id": "C000001", + "utility_type_axis": "electric", + "filing_name": "Utility_Co_0002", + "date": datetime.date(2021, 12, 31), + "publication_time": datetime.datetime(2022, 2, 1, 1, 1, 1), + "str_factoid": "updated 2021 EOY value", + }, + ] + ) + + conn = sa.create_engine(f"sqlite:///{db_path}") + df.to_sql("test_table_instant", conn) + input_context = build_input_context( + asset_key=AssetKey("test_table_instant"), + resources={ + "dataset_settings": mocker.MagicMock( + ferc1=mocker.MagicMock(xbrl_years=[2021]) + ) + }, + ) + io_manager = FercXBRLSQLiteIOManager(base_dir=tmp_path, db_name="test_db") + observed_table = io_manager.load_input(input_context) + + assert len(observed_table) == 1 + assert observed_table.str_factoid.to_numpy().item() == "updated 2021 EOY value" + + +example_schema = pandera.DataFrameSchema( + { + "entity_id": pandera.Column(str, nullable=False), + "date": pandera.Column("datetime64[ns]", nullable=False), + "utility_type": pandera.Column( + str, + pandera.Check.isin(["electric", "gas", "total", "other"]), + nullable=False, + ), + "publication_time": pandera.Column("datetime64[ns]", nullable=False), + "int_factoid": pandera.Column(int), + "float_factoid": pandera.Column(float), + "str_factoid": pandera.Column("str"), + } +) + + +@hypothesis.given(example_schema.strategy(size=3)) +def test_filter_for_freshest_data(df): + # XBRL context is the identifying metadata for reported values + xbrl_context_cols = ["entity_id", "date", "utility_type"] + filing_metadata_cols = ["publication_time", "filing_name"] + primary_key = xbrl_context_cols + filing_metadata_cols + deduped = FercXBRLSQLiteIOManager.filter_for_freshest_data( + df, primary_key=primary_key + ) + example_schema.validate(deduped) + + # every post-deduplication row exists in the original rows + assert (deduped.merge(df, how="left", indicator=True)._merge != "left_only").all() + # for every [entity_id, utility_type, date] - th"true"e is only one row + assert (~deduped.duplicated(subset=xbrl_context_cols)).all() + # for every *context* in the input there is a corresponding row in the output + original_contexts = df.groupby(xbrl_context_cols, as_index=False).last() + paired_by_context = original_contexts.merge( + deduped, + on=xbrl_context_cols, + how="outer", + suffixes=["_in", "_out"], + indicator=True, + ).set_index(xbrl_context_cols) + hypothesis.note(f"Found these contexts in input data:\n{original_contexts}") + hypothesis.note(f"The freshest data:\n{deduped}") + hypothesis.note(f"Paired by context:\n{paired_by_context}") + assert (paired_by_context._merge == "both").all() + + # for every row in the output - its publication time is greater than or equal to all of the other ones for that [entity_id, utility_type, date] in the input data + assert ( + paired_by_context["publication_time_out"] + >= paired_by_context["publication_time_in"] + ).all() + + +def test_report_year_fixing_instant(): + instant_df = pd.DataFrame.from_records( + [ + { + "entity_id": "123", + "date": "2020-07-01", + "report_year": 3021, + "factoid": "replace report year with date year", + }, + ] + ) + + observed = FercXBRLSQLiteIOManager.refine_report_year( + instant_df, xbrl_years=[2021, 2022] + ).report_year + expected = pd.Series([2020]) + assert (observed == expected).all() + + +def test_report_year_fixing_duration(): + duration_df = pd.DataFrame.from_records( + [ + { + "entity_id": "123", + "start_date": "2004-01-01", + "end_date": "2004-12-31", + "report_year": 3021, + "factoid": "filter out since the report year is out of bounds", + }, + { + "entity_id": "123", + "start_date": "2021-01-01", + "end_date": "2021-12-31", + "report_year": 3021, + "factoid": "replace report year with date year", + }, + ] + ) + + observed = FercXBRLSQLiteIOManager.refine_report_year( + duration_df, xbrl_years=[2021, 2022] + ).report_year + expected = pd.Series([2021]) + assert (observed == expected).all() + + +@pytest.mark.parametrize( + "df, match", + [ + ( + pd.DataFrame.from_records( + [ + {"entity_id": "123", "report_year": 3021, "date": ""}, + ] + ), + "date has null values", + ), + ( + pd.DataFrame.from_records( + [ + { + "entity_id": "123", + "report_year": 3021, + "start_date": "", + "end_date": "2020-12-31", + }, + ] + ), + "start_date has null values", + ), + ( + pd.DataFrame.from_records( + [ + { + "entity_id": "123", + "report_year": 3021, + "start_date": "2020-06-01", + "end_date": "2021-05-31", + }, + ] + ), + "start_date and end_date are in different years", + ), + ( + pd.DataFrame.from_records( + [ + { + "entity_id": "123", + "report_year": 3021, + }, + ] + ), + "Attempted to read a non-instant, non-duration table", + ), + ], +) +def test_report_year_fixing_bad_values(df, match): + with pytest.raises(ValueError, match=match): + FercXBRLSQLiteIOManager.refine_report_year(df, xbrl_years=[2021, 2022]) diff --git a/test/unit/transform/ferc1_test.py b/test/unit/transform/ferc1_test.py index a942c436c3..8fd67a10c3 100644 --- a/test/unit/transform/ferc1_test.py +++ b/test/unit/transform/ferc1_test.py @@ -355,10 +355,11 @@ def test_unstack_balances_to_report_year_instant_xbrl(): StringIO( """ idx,entity_id,date,report_year,sched_table_name,test_value -0,1,2021-12-31,2021,table_name,2000 -1,1,2020-12-31,2021,table_name,1000 -2,2,2021-12-31,2021,table_name,21000 -3,2,2020-12-31,2021,table_name,8000 +0,1,2022-12-31,2022,table_name,2022.1 +1,1,2021-12-31,2021,table_name,2021.1 +2,1,2020-12-31,2020,table_name,2020.1 +3,2,2021-12-31,2021,table_name,2021.2 +4,2,2020-12-31,2020,table_name,2020.2 """ ), ) @@ -371,12 +372,15 @@ def test_unstack_balances_to_report_year_instant_xbrl(): params=params, primary_key_cols=pk_cols, ) + # because there are NaNs in idx when we unstack, both idx balances are floats. df_expected = pd.read_csv( StringIO( """ entity_id,report_year,sched_table_name,idx_ending_balance,idx_starting_balance,test_value_ending_balance,test_value_starting_balance -1,2021,table_name,0,1,2000,1000 -2,2021,table_name,2,3,21000,8000 +1,2021,table_name,1.0,2.0,2021.1,2020.1 +1,2022,table_name,0.0,1.0,2022.1,2021.1 +2,2021,table_name,3.0,4.0,2021.2,2020.2 +2,2022,table_name,,3.0,,2021.2 """ ), ) @@ -385,7 +389,15 @@ def test_unstack_balances_to_report_year_instant_xbrl(): # If there is more than one value per year (not report year) an AssertionError # should raise df_non_unique_years = df.copy() - df_non_unique_years.loc[4] = [4, 2, "2020-12-31", 2021, "table_name", 500] + df_non_unique_years.loc[len(df_non_unique_years.index)] = [ + 5, + 2, + "2020-12-31", + 2020, + "table_name", + 2020.15, + ] + with pytest.raises(AssertionError): unstack_balances_to_report_year_instant_xbrl( df_non_unique_years, params=params, primary_key_cols=pk_cols diff --git a/test/validate/ferc1_test.py b/test/validate/ferc1_test.py index 074f61f636..de3f1e5d1b 100644 --- a/test/validate/ferc1_test.py +++ b/test/validate/ferc1_test.py @@ -84,16 +84,16 @@ def test_no_null_cols_ferc1(pudl_out_ferc1, live_dbs, cols, df_name): @pytest.mark.parametrize( "df_name,expected_rows", [ - ("fbp_ferc1", 25_406), - ("fuel_ferc1", 48_815), - ("plant_in_service_ferc1", 315_112), - ("plants_all_ferc1", 54_415), - ("plants_hydro_ferc1", 6_798), - ("plants_pumped_storage_ferc1", 544), - ("plants_small_ferc1", 16_248), - ("plants_steam_ferc1", 30_825), - ("pu_ferc1", 7_528), - ("purchased_power_ferc1", 197_947), + ("fbp_ferc1", 26_188), + ("fuel_ferc1", 50_039), + ("plant_in_service_ferc1", 335_750), + ("plants_all_ferc1", 56_409), + ("plants_hydro_ferc1", 6_979), + ("plants_pumped_storage_ferc1", 562), + ("plants_small_ferc1", 16_989), + ("plants_steam_ferc1", 31_879), + ("pu_ferc1", 7_698), + ("purchased_power_ferc1", 204_720), ], ) def test_minmax_rows(pudl_out_ferc1, live_dbs, expected_rows, df_name):