Skip to content

Commit

Permalink
On demand transforms for historical retrieval
Browse files Browse the repository at this point in the history
Signed-off-by: Danny Chiao <danny@tecton.ai>
  • Loading branch information
adchia committed Sep 2, 2021
1 parent 9dc9e60 commit cfd49cf
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 18 deletions.
49 changes: 41 additions & 8 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Dict, List, Optional, Union

import numpy as np
import pandas
import pandas as pd
import pyarrow
from pydantic import StrictStr
from pydantic.typing import Literal
Expand All @@ -19,6 +19,7 @@
from feast.feature_view import FeatureView
from feast.infra.offline_stores import offline_utils
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.registry import Registry
from feast.repo_config import FeastConfigBaseModel, RepoConfig

Expand Down Expand Up @@ -87,14 +88,21 @@ def pull_latest_from_table_or_query(
WHERE _feast_row = 1
"""

return BigQueryRetrievalJob(query=query, client=client, config=config)
# When materializing a single feature view, we don't need full feature names. On demand transforms aren't materialized
return BigQueryRetrievalJob(
query=query,
client=client,
config=config,
full_feature_names=False,
on_demand_feature_views=None,
)

@staticmethod
def get_historical_features(
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pandas.DataFrame, str],
entity_df: Union[pd.DataFrame, str],
registry: Registry,
project: str,
full_feature_names: bool = False,
Expand Down Expand Up @@ -140,16 +148,41 @@ def get_historical_features(
full_feature_names=full_feature_names,
)

return BigQueryRetrievalJob(query=query, client=client, config=config)
return BigQueryRetrievalJob(
query=query,
client=client,
config=config,
full_feature_names=full_feature_names,
on_demand_feature_views=registry.list_on_demand_feature_views(
project, allow_cache=True
),
)


class BigQueryRetrievalJob(RetrievalJob):
def __init__(self, query, client, config):
def __init__(
self,
query: str,
client: bigquery.Client,
config: RepoConfig,
full_feature_names: bool,
on_demand_feature_views: Optional[List[OnDemandFeatureView]],
):
self.query = query
self.client = client
self.config = config
self._full_feature_names = full_feature_names
self._on_demand_feature_views = on_demand_feature_views

@property
def full_feature_names(self) -> bool:
return self._full_feature_names

@property
def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]:
return self._on_demand_feature_views

def to_df(self):
def to_df_internal(self) -> pd.DataFrame:
# TODO: Ideally only start this job when the user runs "get_historical_features", not when they run to_df()
df = self.client.query(self.query).to_dataframe(create_bqstorage_client=True)
return df
Expand Down Expand Up @@ -266,7 +299,7 @@ def _get_table_reference_for_new_entity(


def _upload_entity_df_and_get_entity_schema(
client: Client, table_name: str, entity_df: Union[pandas.DataFrame, str],
client: Client, table_name: str, entity_df: Union[pd.DataFrame, str],
) -> Dict[str, np.dtype]:
"""Uploads a Pandas entity dataframe into a BigQuery table and returns the resulting table"""

Expand All @@ -278,7 +311,7 @@ def _upload_entity_df_and_get_entity_schema(
client.query(f"SELECT * FROM {table_name} LIMIT 1").result().to_dataframe()
)
entity_schema = dict(zip(limited_entity_df.columns, limited_entity_df.dtypes))
elif isinstance(entity_df, pandas.DataFrame):
elif isinstance(entity_df, pd.DataFrame):
# Drop the index so that we dont have unnecessary columns
entity_df.reset_index(drop=True, inplace=True)

Expand Down
36 changes: 31 additions & 5 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytz
from pydantic.typing import Literal

from feast import FileSource
from feast import FileSource, OnDemandFeatureView
from feast.data_source import DataSource
from feast.errors import FeastJoinKeysDuringMaterialization
from feast.feature_view import FeatureView
Expand All @@ -30,13 +30,28 @@ class FileOfflineStoreConfig(FeastConfigBaseModel):


class FileRetrievalJob(RetrievalJob):
def __init__(self, evaluation_function: Callable):
def __init__(
self,
evaluation_function: Callable,
full_feature_names: bool,
on_demand_feature_views: Optional[List[OnDemandFeatureView]],
):
"""Initialize a lazy historical retrieval job"""

# The evaluation function executes a stored procedure to compute a historical retrieval.
self.evaluation_function = evaluation_function
self._full_feature_names = full_feature_names
self._on_demand_feature_views = on_demand_feature_views

def to_df(self):
@property
def full_feature_names(self) -> bool:
return self._full_feature_names

@property
def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]:
return self._on_demand_feature_views

def to_df_internal(self) -> pd.DataFrame:
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
df = self.evaluation_function()
return df
Expand Down Expand Up @@ -224,7 +239,13 @@ def evaluate_historical_retrieval():

return entity_df_with_features

job = FileRetrievalJob(evaluation_function=evaluate_historical_retrieval)
job = FileRetrievalJob(
evaluation_function=evaluate_historical_retrieval,
full_feature_names=full_feature_names,
on_demand_feature_views=registry.list_on_demand_feature_views(
project, allow_cache=True
),
)
return job

@staticmethod
Expand Down Expand Up @@ -284,4 +305,9 @@ def evaluate_offline_job():
)
return last_values_df[columns_to_extract]

return FileRetrievalJob(evaluation_function=evaluate_offline_job)
# When materializing a single feature view, we don't need full feature names. On demand transforms aren't materialized
return FileRetrievalJob(
evaluation_function=evaluate_offline_job,
full_feature_names=False,
on_demand_feature_views=None,
)
24 changes: 24 additions & 0 deletions sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,42 @@

from feast.data_source import DataSource
from feast.feature_view import FeatureView
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.registry import Registry
from feast.repo_config import RepoConfig


class RetrievalJob(ABC):
"""RetrievalJob is used to manage the execution of a historical feature retrieval"""

@property
@abstractmethod
def full_feature_names(self) -> bool:
pass

@property
@abstractmethod
def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]:
pass

def to_df(self) -> pd.DataFrame:
"""Return dataset as Pandas DataFrame synchronously including on demand transforms"""
features_df = self.to_df_internal()
if self.on_demand_feature_views is None:
return features_df

for odfv in self.on_demand_feature_views:
features_df = features_df.join(
odfv.get_transformed_features_df(self.full_feature_names, features_df)
)
return features_df

@abstractmethod
def to_df_internal(self) -> pd.DataFrame:
"""Return dataset as Pandas DataFrame synchronously"""
pass

# TODO(adchia): implement ODFV for to_arrow method
@abstractmethod
def to_arrow(self) -> pyarrow.Table:
"""Return dataset as pyarrow Table synchronously"""
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/offline_stores/offline_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def build_point_in_time_query(
entity_df_event_timestamp_col: str,
query_template: str,
full_feature_names: bool = False,
):
) -> str:
"""Build point-in-time query between each feature view table and the entity dataframe for Bigquery and Redshift"""
template = Environment(loader=BaseLoader()).from_string(source=query_template)

Expand Down
32 changes: 29 additions & 3 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pydantic import StrictStr
from pydantic.typing import Literal

from feast import RedshiftSource
from feast import OnDemandFeatureView, RedshiftSource
from feast.data_source import DataSource
from feast.errors import InvalidEntityType
from feast.feature_view import FeatureView
Expand Down Expand Up @@ -90,11 +90,14 @@ def pull_latest_from_table_or_query(
)
WHERE _feast_row = 1
"""
# When materializing a single feature view, we don't need full feature names. On demand transforms aren't materialized
return RedshiftRetrievalJob(
query=query,
redshift_client=redshift_client,
s3_resource=s3_resource,
config=config,
full_feature_names=False,
on_demand_feature_views=None,
)

@staticmethod
Expand Down Expand Up @@ -164,6 +167,10 @@ def query_generator() -> Iterator[str]:
redshift_client=redshift_client,
s3_resource=s3_resource,
config=config,
full_feature_names=full_feature_names,
on_demand_feature_views=registry.list_on_demand_feature_views(
project=project, allow_cache=True
),
drop_columns=["entity_timestamp"]
+ [
f"{feature_view.name}__entity_row_unique_id"
Expand All @@ -179,6 +186,8 @@ def __init__(
redshift_client,
s3_resource,
config: RepoConfig,
full_feature_names: bool,
on_demand_feature_views: Optional[List[OnDemandFeatureView]],
drop_columns: Optional[List[str]] = None,
):
"""Initialize RedshiftRetrievalJob object.
Expand All @@ -188,6 +197,8 @@ def __init__(
redshift_client: boto3 redshift-data client
s3_resource: boto3 s3 resource object
config: Feast repo config
full_feature_names: Whether to add the feature view prefixes to the feature names
on_demand_feature_views: A list of on demand transforms to apply at retrieval time
drop_columns: Optionally a list of columns to drop before unloading to S3.
This is a convenient field, since "SELECT ... EXCEPT col" isn't supported in Redshift.
"""
Expand All @@ -209,9 +220,19 @@ def query_generator() -> Iterator[str]:
+ "/unload/"
+ str(uuid.uuid4())
)
self._full_feature_names = full_feature_names
self._on_demand_feature_views = on_demand_feature_views
self._drop_columns = drop_columns

def to_df(self) -> pd.DataFrame:
@property
def full_feature_names(self) -> bool:
return self._full_feature_names

@property
def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]:
return self._on_demand_feature_views

def to_df_internal(self) -> pd.DataFrame:
with self._query_generator() as query:
return aws_utils.unload_redshift_query_to_df(
self._redshift_client,
Expand Down Expand Up @@ -304,7 +325,12 @@ def _upload_entity_df_and_get_entity_schema(
f"CREATE TABLE {table_name} AS ({entity_df})",
)
limited_entity_df = RedshiftRetrievalJob(
f"SELECT * FROM {table_name} LIMIT 1", redshift_client, s3_resource, config
f"SELECT * FROM {table_name} LIMIT 1",
redshift_client,
s3_resource,
config,
full_feature_names=False,
on_demand_feature_views=None,
).to_df()
return dict(zip(limited_entity_df.columns, limited_entity_df.dtypes))
else:
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ def _get_requested_feature_views_to_features_dict(
feature_views_to_feature_map: Dict[FeatureView, List[str]] = {}

for ref in feature_refs:
if ":" not in ref:
# ODFV
continue
ref_parts = ref.split(":")
feature_view_from_ref = ref_parts[0]
feature_from_ref = ref_parts[1]
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):

def get_transformed_features_df(
self, full_feature_names: bool, df_with_features: pd.DataFrame
) -> pd.DataFrame:
):
# Apply on demand transformations
# TODO(adchia): Include only the feature values from the specified input FVs in the ODFV.
# Copy over un-prefixed features even if not requested since transform may need it
Expand Down

0 comments on commit cfd49cf

Please sign in to comment.