Skip to content

Commit

Permalink
Add factory function for creating dataframe embedder
Browse files Browse the repository at this point in the history
  • Loading branch information
zschira committed Dec 10, 2023
1 parent c2869c3 commit 49cf1a8
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 310 deletions.
141 changes: 58 additions & 83 deletions src/pudl/analysis/record_linkage/classify_plants_ferc1.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
from dagster import graph, op

import pudl
from pudl.analysis.record_linkage.models import (
column_transform_from_key,
link_ids_cross_year,
)
from pudl.analysis.record_linkage import embed_dataframe
from pudl.analysis.record_linkage.link_cross_year import link_ids_cross_year

logger = pudl.logging_helpers.get_logger(__name__)

Expand All @@ -31,84 +29,60 @@
"waste_fraction_mmbtu",
]

_MODEL_CONFIG = {
"link_ids_cross_year": {
"ops": {
"train_dataframe_embedder": {
"config": {
"transform_steps": {
"plant_name": {
"transforms": [
column_transform_from_key("name_cleaner_transform"),
column_transform_from_key("string_transform"),
],
"weight": 2.0,
"columns": ["plant_name_ferc1"],
},
"plant_type": {
"transforms": [
column_transform_from_key(
"cleaning_function_transform",
transform_function="null_to_empty_str",
),
column_transform_from_key("categorical_transform"),
],
"weight": 2.0,
"columns": ["plant_type"],
},
"construction_type": {
"transforms": [
column_transform_from_key(
"cleaning_function_transform",
transform_function="null_to_empty_str",
),
column_transform_from_key("categorical_transform"),
],
"columns": ["construction_type"],
},
"capacity_mw": {
"transforms": [
column_transform_from_key(
"cleaning_function_transform",
transform_function="null_to_zero",
),
column_transform_from_key("numerical_transform"),
],
"columns": ["capacity_mw"],
},
"construction_year": {
"transforms": [
column_transform_from_key(
"cleaning_function_transform",
transform_function="fix_int_na",
),
column_transform_from_key("categorical_transform"),
],
"columns": ["construction_year"],
},
"utility_id_ferc1": {
"transforms": [
column_transform_from_key("categorical_transform")
],
"columns": ["utility_id_ferc1"],
},
"fuel_fractions": {
"transforms": [
column_transform_from_key(
"cleaning_function_transform",
transform_function="null_to_zero",
),
column_transform_from_key("numerical_transform"),
column_transform_from_key("normalize_transform"),
],
"columns": _FUEL_COLS,
},
}
}
},
}

embed_dataframe = embed_dataframe.dataframe_embedder_factory(
{
"plant_name": embed_dataframe.ColumnVectorizer(
transform_steps=[
embed_dataframe.NameCleaner(),
embed_dataframe.TextVectorizer(),
],
weight=2.0,
columns=["plant_name_ferc1"],
),
"plant_type": embed_dataframe.ColumnVectorizer(
transform_steps=[
embed_dataframe.ColumnCleaner(cleaning_function="null_to_empty_str"),
embed_dataframe.CategoricalVectorizer(),
],
weight=2.0,
columns=["plant_type"],
),
"construction_type": embed_dataframe.ColumnVectorizer(
transform_steps=[
embed_dataframe.ColumnCleaner(cleaning_function="null_to_empty_str"),
embed_dataframe.CategoricalVectorizer(),
],
columns=["construction_type"],
),
"capacity_mw": embed_dataframe.ColumnVectorizer(
transform_steps=[
embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"),
embed_dataframe.NumericalVectorizer(),
],
columns=["capacity_mw"],
),
"construction_year": embed_dataframe.ColumnVectorizer(
transform_steps=[
embed_dataframe.ColumnCleaner(cleaning_function="fix_int_na"),
embed_dataframe.CategoricalVectorizer(),
],
columns=["construction_year"],
),
"utility_id_ferc1": embed_dataframe.ColumnVectorizer(
transform_steps=[embed_dataframe.CategoricalVectorizer()],
columns=["utility_id_ferc1"],
),
"fuel_fractions": embed_dataframe.ColumnVectorizer(
transform_steps=[
embed_dataframe.ColumnCleaner(cleaning_function="null_to_zero"),
embed_dataframe.NumericalVectorizer(),
embed_dataframe.NumericalNormalizer(),
],
columns=_FUEL_COLS,
),
}
}
)


@op
Expand Down Expand Up @@ -171,7 +145,7 @@ def merge_steam_fuel_dfs(
)


@graph(config=_MODEL_CONFIG)
@graph
def plants_steam_assign_plant_ids(
ferc1_steam_df: pd.DataFrame,
ferc1_fuel_df: pd.DataFrame,
Expand All @@ -186,7 +160,8 @@ def plants_steam_assign_plant_ids(
logger.info("Identifying distinct large FERC plants for ID assignment.")

input_df = merge_steam_fuel_dfs(ferc1_steam_df, ferc1_fuel_df, fuel_categories)
label_df = link_ids_cross_year(input_df)
feature_matrix = embed_dataframe(input_df)
label_df = link_ids_cross_year(input_df, feature_matrix)

return plants_steam_validate_ids(ferc1_steam_df, label_df)

Expand Down
181 changes: 181 additions & 0 deletions src/pudl/analysis/record_linkage/embed_dataframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
"""Tools for embedding a DataFrame to create feature matrix for models."""
from abc import ABC, abstractmethod
from dataclasses import dataclass

import numpy as np
import pandas as pd
import scipy
from dagster import graph, op
from pydantic import BaseModel
from sklearn.base import BaseEstimator
from sklearn.compose import ColumnTransformer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import (
FunctionTransformer,
MinMaxScaler,
Normalizer,
OneHotEncoder,
)

import pudl
from pudl.analysis.record_linkage.name_cleaner import CompanyNameCleaner


@dataclass
class FeatureMatrix:
"""Class to wrap a feature matrix returned from dataframe embedding.
Depending on the transformations applied, a feature matrix may be sparse or dense
matrix. Using this wrapper enables Dagsters type checking while allowing both dense
and sparse matrices underneath.
"""

matrix: np.ndarray | scipy.sparse.csr_matrix


class TransformStep(BaseModel, ABC):
"""TransformStep's can be combined to vectorize one or more columns.
This class defines a very simple interface for TransformStep's, which essentially
says that a TransformStep should take configuration and implement the method as_transformer.
"""

name: str

@abstractmethod
def as_transformer(self) -> BaseEstimator:
"""This method should use configuration to produce a :class:`sklearn.base.BaseEstimator`."""
...


class ColumnVectorizer(BaseModel):
"""Define a set of transformations to apply to one or more columns."""

transform_steps: list[TransformStep]
weight: float = 1.0
columns: list[str]

def as_pipeline(self):
"""Return :class:`sklearn.pipeline.Pipeline` with configuration."""
return Pipeline(
[
(
step.name,
step.as_transformer(),
)
for step in self.transform_steps
]
)


def dataframe_embedder_factory(vectorizers: dict[str, ColumnVectorizer]):
"""Return a configured op graph to embed an input dataframe."""

@op
def train_dataframe_embedder(df: pd.DataFrame):
"""Train :class:`sklearn.compose.ColumnTransformer` on input."""
column_transformer = ColumnTransformer(
transformers=[
(name, column_transform.as_pipeline(), column_transform.columns)
for name, column_transform in vectorizers.items()
],
transformer_weights={
name: column_transform.weight
for name, column_transform in vectorizers.items()
},
)

return column_transformer.fit(df)

@op
def apply_embed_dataframe(df: pd.DataFrame, transformer: ColumnTransformer):
"""Use :class:`sklearn.compose.ColumnTransformer` to transform input."""
return FeatureMatrix(matrix=transformer.transform(df))

@graph
def embed_dataframe(df: pd.DataFrame) -> FeatureMatrix:
"""Train dataframe embedder and apply to input df."""
transformer = train_dataframe_embedder(df)
return apply_embed_dataframe(df, transformer)

return embed_dataframe


class TextVectorizer(TransformStep):
"""Implement TransformStep for :class:`sklearn.feature_extraction.text.TfidfVectorizer`."""

name: str = "tfidf_vectorizer"

#: See sklearn documentation for all options
options: dict = {"analyzer": "char", "ngram_range": (2, 10)}

def as_transformer(self):
"""Return configured TfidfVectorizer."""
return TfidfVectorizer(**self.options)


class CategoricalVectorizer(TransformStep):
"""Implement TransformStep for :class:`sklearn.preprocessing.OneHotEncoder`."""

name: str = "one_hot_encoder_vectorizer"

options: dict = {"categories": "auto"}

def as_transformer(self):
"""Return configured OneHotEncoder."""
return OneHotEncoder(**self.options)


class NumericalVectorizer(TransformStep):
"""Implement ColumnTransformation for MinMaxScaler."""

name: str = "numerical_vectorizer"

def as_transformer(self):
"""Return configured MinMaxScalerConfig."""
return MinMaxScaler()


class NumericalNormalizer(TransformStep):
"""Implement ColumnTransformation for Normalizer."""

name: str = "numerical_normalizer"

def as_transformer(self):
"""Return configured NormalizerConfig."""
return Normalizer()


def _apply_cleaning_func(df, function_key: str = None):
function_transforms = {
"null_to_zero": lambda df: df.fillna(value=0.0),
"null_to_empty_str": lambda df: df.fillna(value=""),
"fix_int_na": lambda df: pudl.helpers.fix_int_na(df, columns=list(df.columns)),
}

return function_transforms[function_key](df)


class ColumnCleaner(TransformStep):
"""Implement ColumnTransformation for cleaning functions."""

name: str = "column_cleaner"
cleaning_function: str

def as_transformer(self):
"""Return configured NormalizerConfig."""
return FunctionTransformer(
_apply_cleaning_func, kw_args={"function_key": self.cleaning_function}
)


class NameCleaner(TransformStep):
"""Implement ColumnTransformation for CompanyNameCleaner."""

name: str = "name_cleaner"
company_cleaner: CompanyNameCleaner = CompanyNameCleaner()

def as_transformer(self):
"""Return configured CompanyNameCleaner."""
return FunctionTransformer(self.company_cleaner.apply_name_cleaning)
Loading

0 comments on commit 49cf1a8

Please sign in to comment.