Skip to content

Commit

Permalink
Merge pull request #2948 from catalyst-cooperative/2811-ferc1-2022
Browse files Browse the repository at this point in the history
FERC1 2022
  • Loading branch information
zaneselvans authored Oct 26, 2023
2 parents b51d235 + 76e456e commit 0a1a125
Show file tree
Hide file tree
Showing 18 changed files with 601 additions and 150 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ notebooks/*.tgz
**.tfstate.*
terraform/.terraform/*
.env
.hypothesis/
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ dependencies = [
"boto3>=1.28.55",
"bottleneck>=1.3.4", # pandas[performance]
"catalystcoop.dbfread>=3.0,<3.1",
"catalystcoop.ferc-xbrl-extractor>=1.1.1,<1.2",
"catalystcoop.ferc-xbrl-extractor>=1.2.0,<2",
"coloredlogs>=14.0,<15.1", # Dagster requires 14.0
"dagster-webserver>=1.4,<1.6",
"dagster>=1.4,<1.6",
Expand All @@ -30,6 +30,7 @@ dependencies = [
"grpcio==1.57.0", # Required by dagster. Version works with MacOS
"grpcio-health-checking==1.57.0", # Required by dagster. Version works with MacOS
"grpcio-status==1.57.0", # Required by dagster. Version works with MacOS
"hypothesis>=6.87,<7.0",
"jellyfish>=1.0.1,<1.1", # recordlinkage dependency
"jinja2>=3,<3.2",
"matplotlib>=3.6.1,<3.9",
Expand All @@ -39,6 +40,7 @@ dependencies = [
"numpy>=1.24,<2.0a0",
"openpyxl>=3.0.10", # pandas[excel]
"pandas[parquet,excel,fss,gcp,compression]>=2,<2.2",
"pandera>=0.17,<1.0",
"pyarrow>=13,<14", # pandas[parquet]
"pydantic>=1.7,<2",
"python-dotenv>=1,<1.1",
Expand Down
28 changes: 28 additions & 0 deletions src/pudl/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1798,3 +1798,31 @@ def scale_by_ownership(
gens["fraction_owned"], axis="index"
)
return gens


def assert_cols_areclose(
df: pd.DataFrame,
a_cols: list[str],
b_cols: list[str],
mismatch_threshold: float,
message: str,
):
"""Check if two column sets of a dataframe are close to each other.
Ignores NANs and raises if there are too many mismatches.
"""
# we use df.loc, so if we use a debugger in here we can see the actual data
# instead of just whether or not there are matches.
mismatch = df.loc[
~np.isclose(
np.ma.masked_where(np.isnan(df[a_cols]), df[a_cols]),
np.ma.masked_where(np.isnan(df[b_cols]), df[b_cols]),
equal_nan=True,
).filled()
]
mismatch_ratio = len(mismatch) / len(df)
if mismatch_ratio > mismatch_threshold:
raise AssertionError(
f"{message} Mismatch ratio {mismatch_ratio:.01%} > "
f"threshold {mismatch_threshold:.01%}."
)
130 changes: 118 additions & 12 deletions src/pudl/io_managers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Dagster IO Managers."""
import json
import re
from pathlib import Path
from sqlite3 import sqlite_version
Expand Down Expand Up @@ -643,6 +644,7 @@ def load_input(self, context: InputContext) -> pd.DataFrame:
context: dagster keyword that provides access output information like asset
name.
"""
# TODO (daz): this is hard-coded to FERC1, though this is nominally for all FERC datasets.
ferc1_settings = context.resources.dataset_settings.ferc1

table_name = self._get_table_name(context)
Expand Down Expand Up @@ -683,6 +685,105 @@ class FercXBRLSQLiteIOManager(FercSQLiteIOManager):
metadata.
"""

@staticmethod
def filter_for_freshest_data(
table: pd.DataFrame, primary_key: list[str]
) -> pd.DataFrame:
"""Get most updated values for each XBRL context.
An XBRL context includes an entity ID, the time period the data applies
to, and other dimensions such as utility type. Each context has its own
ID, but they are frequently redefined with the same contents but
different IDs - so we identify them by their actual content.
Each row in our SQLite database includes all the facts for one
context/filing pair.
If one context is represented in multiple filings, we take the facts from the most recently-published filing.
This means that if a recently-published filing does not include a value for a fact that was previously reported, then that value will remain null. We do not
forward-fill facts on a fact-by-fact basis.
"""
filing_metadata_cols = {"publication_time", "filing_name"}
xbrl_context_cols = [c for c in primary_key if c not in filing_metadata_cols]
# we do this in multiple stages so we can log the drop-off at each stage.
stages = [
{
"message": "completely duplicated rows",
"subset": table.columns,
},
{
"message": "rows that are exactly the same in multiple filings",
"subset": [c for c in table.columns if c not in filing_metadata_cols],
},
{
"message": "rows that were updated by later filings",
"subset": xbrl_context_cols,
},
]
original = table.sort_values("publication_time")
for stage in stages:
deduped = original.drop_duplicates(subset=stage["subset"], keep="last")
logger.debug(f"Dropped {len(original) - len(deduped)} {stage['message']}")
original = deduped

return deduped

@staticmethod
def refine_report_year(df: pd.DataFrame, xbrl_years: list[int]) -> pd.DataFrame:
"""Set a fact's report year by its actual dates.
Sometimes a fact belongs to a context which has no ReportYear
associated with it; other times there are multiple ReportYears
associated with a single filing. In these cases the report year of a
specific fact may be associated with the other years in the filing.
In many cases we can infer the actual report year from the fact's
associated time period - either duration or instant.
"""
is_duration = len({"start_date", "end_date"} - set(df.columns)) == 0
is_instant = "date" in df.columns

def get_year(df: pd.DataFrame, col: str) -> pd.Series:
datetimes = pd.to_datetime(df.loc[:, col])
if datetimes.isna().any():
raise ValueError(f"{col} has null values!")
return datetimes.apply(lambda x: x.year)

if is_duration:
start_years = get_year(df, "start_date")
end_years = get_year(df, "end_date")
if not (start_years == end_years).all():
raise ValueError("start_date and end_date are in different years!")
new_report_years = start_years
elif is_instant:
new_report_years = get_year(df, "date")
else:
raise ValueError("Attempted to read a non-instant, non-duration table.")

# we include XBRL data from before our "officially supported" XBRL
# range because we want to use it to set start-of-year values for the
# first XBRL year.
xbrl_years_plus_one_previous = [min(xbrl_years) - 1] + xbrl_years
return (
df.assign(report_year=new_report_years)
.loc[lambda df: df.report_year.isin(xbrl_years_plus_one_previous)]
.reset_index(drop=True)
)

def _get_primary_key(self, sched_table_name: str) -> list[str]:
# TODO (daz): as of 2023-10-13, our datapackage.json is merely
# "frictionless-like" so we manually parse it as JSON. once we make our
# datapackage.json conformant, we will need to at least update the
# "primary_key" to "primaryKey", but maybe there will be other changes
# as well.
with (self.base_dir / f"{self.db_name}_datapackage.json").open() as f:
datapackage = json.loads(f.read())
[table_resource] = [
tr for tr in datapackage["resources"] if tr["name"] == sched_table_name
]
return table_resource["schema"]["primary_key"]

def handle_output(self, context: OutputContext, obj: pd.DataFrame | str):
"""Handle an op or asset output."""
raise NotImplementedError("FercXBRLSQLiteIOManager can't write outputs yet.")
Expand All @@ -694,6 +795,7 @@ def load_input(self, context: InputContext) -> pd.DataFrame:
context: dagster keyword that provides access output information like asset
name.
"""
# TODO (daz): this is hard-coded to FERC1, though this is nominally for all FERC datasets.
ferc1_settings = context.resources.dataset_settings.ferc1

table_name = self._get_table_name(context)
Expand All @@ -709,23 +811,27 @@ def load_input(self, context: InputContext) -> pd.DataFrame:

engine = self.engine

id_table = "identification_001_duration"

sched_table_name = re.sub("_instant|_duration", "", table_name)
with engine.connect() as con:
return pd.read_sql(
f"""
SELECT {table_name}.*, {id_table}.report_year FROM {table_name}
JOIN {id_table} ON {id_table}.filing_name = {table_name}.filing_name
WHERE {id_table}.report_year BETWEEN :min_year AND :max_year;
""", # noqa: S608 - table names not supplied by user
df = pd.read_sql(
f"SELECT {table_name}.* FROM {table_name}", # noqa: S608 - table names not supplied by user
con=con,
params={
"min_year": min(ferc1_settings.xbrl_years),
"max_year": max(ferc1_settings.xbrl_years),
},
).assign(sched_table_name=sched_table_name)

primary_key = self._get_primary_key(table_name)

return (
df.pipe(
FercXBRLSQLiteIOManager.filter_for_freshest_data,
primary_key=primary_key,
)
.pipe(
FercXBRLSQLiteIOManager.refine_report_year,
xbrl_years=ferc1_settings.xbrl_years,
)
.drop(columns=["publication_time"])
)


@io_manager(required_resource_keys={"dataset_settings"})
def ferc1_xbrl_sqlite_io_manager(init_context) -> FercXBRLSQLiteIOManager:
Expand Down
6 changes: 3 additions & 3 deletions src/pudl/metadata/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@
},
"field_namespace": "ferc1",
"working_partitions": {
"years": sorted(set(range(1994, 2022))),
"years": sorted(set(range(1994, 2023))),
},
"contributors": [
CONTRIBUTORS["catalyst-cooperative"],
Expand Down Expand Up @@ -405,7 +405,7 @@
# Years 1991-1995 use strange formats that need to be investigated further.
# Years 1996-1999 come in split archives and full archives and we are going
# to be using the aggregated archives (part=None).
"years": sorted(set(range(1996, 2022))),
"years": sorted(set(range(1996, 2023))),
},
"contributors": [
CONTRIBUTORS["catalyst-cooperative"],
Expand Down Expand Up @@ -463,7 +463,7 @@
),
"field_namespace": "ferc60",
"working_partitions": {
"years": sorted(set(range(2006, 2022))),
"years": sorted(set(range(2006, 2023))),
},
"contributors": [
CONTRIBUTORS["catalyst-cooperative"],
Expand Down
Binary file modified src/pudl/package_data/glue/pudl_id_mapping.xlsx
Binary file not shown.
8 changes: 5 additions & 3 deletions src/pudl/package_data/glue/utility_id_ferc1.csv
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ utility_id_ferc1,utility_id_ferc1_xbrl,utility_id_ferc1_dbf
150,,1001
151,,1003
152,C000029,231
153,C000030,447
153,C000030,255
154,C000038,250
155,C000041,161
156,C000045,294
Expand Down Expand Up @@ -402,7 +402,7 @@ utility_id_ferc1,utility_id_ferc1_xbrl,utility_id_ferc1_dbf
408,R001052,52
409,R001072,72
411,R001087,87
414,R001140,140
414,C011644,140
419,R001268,268
420,R001298,298
421,R001301,301
Expand All @@ -412,10 +412,12 @@ utility_id_ferc1,utility_id_ferc1_xbrl,utility_id_ferc1_dbf
428,R001419,419
429,R001422,422
436,R001445,445
437,,255
439,R001515,515
440,R001520,520
441,R001521,521
442,,278
443,C010845,
444,C011304,
445,C011745,447
446,C004679,
447,C011785,
4 changes: 3 additions & 1 deletion src/pudl/package_data/glue/utility_id_pudl.csv
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ utility_id_pudl,utility_id_ferc1,utility_name_ferc1,utility_id_eia,utility_name_
258,111,Pinnacle West Marketing & Trading Co. LLC,,
259,414,Pioneer Power and Light Company,,
260,153,"PJM Interconnection, LLC",,
261,437,"PJM Settlement, Inc.",,
261,445,"PJM Settlement, Inc.",,
262,84,Platte-Clay Electric Cooperative Inc.,,
263,250,Portland General Electric Company,15248,Portland General Electric Co
264,290,Potomac Electric Power Company,,
Expand Down Expand Up @@ -15826,3 +15826,5 @@ utility_id_pudl,utility_id_ferc1,utility_name_ferc1,utility_id_eia,utility_name_
15868,,,65633,"BKV Barnett, LLC"
15869,,,65634,Highpeak Solar LLC
15870,,,1937004,
15871,446,"Transource Oklahoma, LLC",,
15872,447,"Mountrail-Williams Electric Cooperative",,
20 changes: 10 additions & 10 deletions src/pudl/package_data/settings/etl_fast.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@
ferc_to_sqlite_settings:
ferc1_dbf_to_sqlite_settings:
# What years of original FERC data should be cloned into the SQLite DB?
years: [2020]
years: [2019, 2020]
ferc1_xbrl_to_sqlite_settings:
years: [2021]
years: [2021, 2022]
ferc2_dbf_to_sqlite_settings:
years: [2020]
years: [2019, 2020]
ferc2_xbrl_to_sqlite_settings:
years: [2021]
years: [2021, 2022]
ferc6_dbf_to_sqlite_settings:
years: [2020]
years: [2019, 2020]
ferc6_xbrl_to_sqlite_settings:
years: [2021]
years: [2021, 2022]
ferc60_dbf_to_sqlite_settings:
years: [2020]
years: [2019, 2020]
ferc60_xbrl_to_sqlite_settings:
years: [2021]
years: [2021, 2022]
ferc714_xbrl_to_sqlite_settings:
years: [2021]
years: [2021, 2022]

###########################################################################
# Settings for pudl_etl script
Expand All @@ -34,7 +34,7 @@ description: >
version: 0.1.0
datasets:
ferc1:
years: [2020, 2021]
years: [2020, 2021, 2022]
ferc714:
years: [2019, 2020]
eia:
Expand Down
11 changes: 6 additions & 5 deletions src/pudl/package_data/settings/etl_full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ ferc_to_sqlite_settings:
# A list of tables to be loaded into the local SQLite database. These are
# the table names as they appear in the 2015 FERC Form 1 database.
ferc1_xbrl_to_sqlite_settings:
years: [2021]
years: [2021, 2022]
# A list of tables to be loaded into the local SQLite database. These are
# the table names as created from the 2022 XBRL taxonomy.
ferc2_dbf_to_sqlite_settings:
Expand Down Expand Up @@ -71,7 +71,7 @@ ferc_to_sqlite_settings:
2020,
]
ferc2_xbrl_to_sqlite_settings:
years: [2021]
years: [2021, 2022]
ferc6_dbf_to_sqlite_settings:
years:
[
Expand All @@ -98,7 +98,7 @@ ferc_to_sqlite_settings:
2020,
]
ferc6_xbrl_to_sqlite_settings:
years: [2021]
years: [2021, 2022]
ferc60_dbf_to_sqlite_settings:
years:
[
Expand All @@ -119,9 +119,9 @@ ferc_to_sqlite_settings:
2020,
]
ferc60_xbrl_to_sqlite_settings:
years: [2021]
years: [2021, 2022]
ferc714_xbrl_to_sqlite_settings:
years: [2021]
years: [2021, 2022]

###########################################################################
# Settings for pudl_etl script
Expand Down Expand Up @@ -164,6 +164,7 @@ datasets:
2019,
2020,
2021,
2022,
]
ferc714:
years:
Expand Down
Loading

0 comments on commit 0a1a125

Please sign in to comment.