From 9dc9e60aa6a5d6a85f012307f1910f5233a251c6 Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Wed, 1 Sep 2021 21:01:18 -0400 Subject: [PATCH] Initial scaffolding for on demand feature view (#1803) * Initial scaffolding for on demand feature view, with initial support for transforms on online fetches Signed-off-by: Danny Chiao * Fixing comments Signed-off-by: Danny Chiao * Comments Signed-off-by: Danny Chiao * Added basic test Signed-off-by: Danny Chiao * Simplifying function serialization Signed-off-by: Danny Chiao * Refactor logic into odfv Signed-off-by: Danny Chiao --- protos/feast/core/FeatureView.proto | 1 + protos/feast/core/OnDemandFeatureView.proto | 57 +++++++ protos/feast/core/Registry.proto | 2 + sdk/python/feast/__init__.py | 2 + sdk/python/feast/errors.py | 7 + sdk/python/feast/feature_store.py | 61 ++++++- sdk/python/feast/on_demand_feature_view.py | 150 ++++++++++++++++++ sdk/python/feast/registry.py | 87 +++++++++- sdk/python/feast/repo_operations.py | 38 ++++- sdk/python/setup.py | 1 + .../feature_repos/universal/feature_views.py | 22 ++- .../online_store/test_universal_online.py | 17 +- 12 files changed, 434 insertions(+), 11 deletions(-) create mode 100644 protos/feast/core/OnDemandFeatureView.proto create mode 100644 sdk/python/feast/on_demand_feature_view.py diff --git a/protos/feast/core/FeatureView.proto b/protos/feast/core/FeatureView.proto index f39fcf5e73..6edba9f7fe 100644 --- a/protos/feast/core/FeatureView.proto +++ b/protos/feast/core/FeatureView.proto @@ -35,6 +35,7 @@ message FeatureView { FeatureViewMeta meta = 2; } +// TODO(adchia): refactor common fields from this and ODFV into separate metadata proto message FeatureViewSpec { // Name of the feature view. Must be unique. Not updated. string name = 1; diff --git a/protos/feast/core/OnDemandFeatureView.proto b/protos/feast/core/OnDemandFeatureView.proto new file mode 100644 index 0000000000..6aa938e8ee --- /dev/null +++ b/protos/feast/core/OnDemandFeatureView.proto @@ -0,0 +1,57 @@ +// +// Copyright 2020 The Feast Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + + +syntax = "proto3"; +package feast.core; + +option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core"; +option java_outer_classname = "OnDemandFeatureViewProto"; +option java_package = "feast.proto.core"; + +import "feast/core/FeatureView.proto"; +import "feast/core/Feature.proto"; + +message OnDemandFeatureView { + // User-specified specifications of this feature view. + OnDemandFeatureViewSpec spec = 1; +} + +message OnDemandFeatureViewSpec { + // Name of the feature view. Must be unique. Not updated. + string name = 1; + + // Name of Feast project that this feature view belongs to. + string project = 2; + + // List of features specifications for each feature defined with this feature view. + repeated FeatureSpecV2 features = 3; + + // List of features specifications for each feature defined with this feature view. + // TODO(adchia): add support for request data + map inputs = 4; + + UserDefinedFunction user_defined_function = 5; +} + +// Serialized representation of python function. +message UserDefinedFunction { + // The function name + string name = 1; + + // The python-syntax function body (serialized by dill) + bytes body = 2; +} diff --git a/protos/feast/core/Registry.proto b/protos/feast/core/Registry.proto index 6900a5b1e7..b8570301e9 100644 --- a/protos/feast/core/Registry.proto +++ b/protos/feast/core/Registry.proto @@ -25,12 +25,14 @@ import "feast/core/Entity.proto"; import "feast/core/FeatureService.proto"; import "feast/core/FeatureTable.proto"; import "feast/core/FeatureView.proto"; +import "feast/core/OnDemandFeatureView.proto"; import "google/protobuf/timestamp.proto"; message Registry { repeated Entity entities = 1; repeated FeatureTable feature_tables = 2; repeated FeatureView feature_views = 6; + repeated OnDemandFeatureView on_demand_feature_views = 8; repeated FeatureService feature_services = 7; string registry_schema_version = 3; // to support migrations; incremented when schema is changed diff --git a/sdk/python/feast/__init__.py b/sdk/python/feast/__init__.py index 83d9286132..cd4730efa3 100644 --- a/sdk/python/feast/__init__.py +++ b/sdk/python/feast/__init__.py @@ -13,6 +13,7 @@ from .feature_store import FeatureStore from .feature_table import FeatureTable from .feature_view import FeatureView +from .on_demand_feature_view import OnDemandFeatureView from .repo_config import RepoConfig from .value_type import ValueType @@ -37,6 +38,7 @@ "FeatureStore", "FeatureTable", "FeatureView", + "OnDemandFeatureView", "RepoConfig", "SourceType", "ValueType", diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 1202d4df49..fa4a779a31 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -200,3 +200,10 @@ def __init__(self, entity_type: type): f"The entity dataframe you have provided must be a Pandas DataFrame or a SQL query, " f"but we found: {entity_type} " ) + + +class ConflictingFeatureViewNames(Exception): + def __init__(self, feature_view_name: str): + super().__init__( + f"The feature view name: {feature_view_name} refers to both an on-demand feature view and a feature view" + ) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 77650b5cf1..c4f1987572 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -37,6 +37,7 @@ update_entities_with_inferred_types_from_feature_views, ) from feast.infra.provider import Provider, RetrievalJob, get_provider +from feast.on_demand_feature_view import OnDemandFeatureView from feast.online_response import OnlineResponse, _infer_online_entity_rows from feast.protos.feast.serving.ServingService_pb2 import ( GetOnlineFeaturesRequestV2, @@ -45,6 +46,7 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.registry import Registry from feast.repo_config import RepoConfig, load_repo_config +from feast.type_map import python_value_to_proto_value from feast.usage import log_exceptions, log_exceptions_and_usage from feast.version import get_version @@ -267,8 +269,9 @@ def apply( objects: Union[ Entity, FeatureView, + OnDemandFeatureView, FeatureService, - List[Union[FeatureView, Entity, FeatureService]], + List[Union[FeatureView, OnDemandFeatureView, Entity, FeatureService]], ], commit: bool = True, ): @@ -314,6 +317,7 @@ def apply( assert isinstance(objects, list) views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)] + odfvs_to_update = [ob for ob in objects if isinstance(ob, OnDemandFeatureView)] _validate_feature_views(views_to_update) entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] services_to_update = [ob for ob in objects if isinstance(ob, FeatureService)] @@ -332,11 +336,15 @@ def apply( if len(views_to_update) + len(entities_to_update) + len( services_to_update - ) != len(objects): + ) + len(odfvs_to_update) != len(objects): raise ValueError("Unknown object type provided as part of apply() call") for view in views_to_update: self._registry.apply_feature_view(view, project=self.project, commit=False) + for odfv in odfvs_to_update: + self._registry.apply_on_demand_feature_view( + odfv, project=self.project, commit=False + ) for ent in entities_to_update: self._registry.apply_entity(ent, project=self.project, commit=False) for feature_service in services_to_update: @@ -717,7 +725,6 @@ def get_online_features( all_feature_views = self._registry.list_feature_views( project=self.project, allow_cache=True ) - _validate_feature_refs(_feature_refs, full_feature_names) grouped_refs = _group_feature_refs(_feature_refs, all_feature_views) for table, requested_features in grouped_refs: @@ -759,6 +766,47 @@ def get_online_features( feature_ref ] = GetOnlineFeaturesResponse.FieldStatus.PRESENT + initial_response = OnlineResponse( + GetOnlineFeaturesResponse(field_values=result_rows) + ) + return self._augment_response_with_on_demand_transforms( + _feature_refs, full_feature_names, initial_response, result_rows + ) + + def _augment_response_with_on_demand_transforms( + self, + feature_refs: List[str], + full_feature_names: bool, + initial_response: OnlineResponse, + result_rows: List[GetOnlineFeaturesResponse.FieldValues], + ) -> OnlineResponse: + all_on_demand_feature_views = self._registry.list_on_demand_feature_views( + project=self.project, allow_cache=True + ) + if len(all_on_demand_feature_views) == 0: + return initial_response + initial_response_df = initial_response.to_df() + # Apply on demand transformations + for odfv in all_on_demand_feature_views: + feature_ref = odfv.name + if feature_ref in feature_refs: + transformed_features_df = odfv.get_transformed_features_df( + full_feature_names, initial_response_df + ) + for row_idx in range(len(result_rows)): + result_row = result_rows[row_idx] + # TODO(adchia): support multiple output features in an ODFV, which requires different naming + # conventions + result_row.fields[odfv.name].CopyFrom( + python_value_to_proto_value( + transformed_features_df[odfv.features[0].name].values[ + row_idx + ] + ) + ) + result_row.statuses[ + feature_ref + ] = GetOnlineFeaturesResponse.FieldStatus.PRESENT return OnlineResponse(GetOnlineFeaturesResponse(field_values=result_rows)) @log_exceptions_and_usage @@ -791,7 +839,9 @@ def _validate_feature_refs(feature_refs: List[str], full_feature_names: bool = F ref for ref, occurrences in Counter(feature_refs).items() if occurrences > 1 ] else: - feature_names = [ref.split(":")[1] for ref in feature_refs] + feature_names = [ + ref.split(":")[1] if ":" in ref else ref for ref in feature_refs + ] collided_feature_names = [ ref for ref, occurrences in Counter(feature_names).items() @@ -820,6 +870,9 @@ def _group_feature_refs( if isinstance(features, list) and isinstance(features[0], str): for ref in features: + if ":" not in ref: + # This is an on demand feature view ref + continue view_name, feat_name = ref.split(":") if view_name not in view_index: raise FeatureViewNotFoundException(view_name) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py new file mode 100644 index 0000000000..b5b71c164c --- /dev/null +++ b/sdk/python/feast/on_demand_feature_view.py @@ -0,0 +1,150 @@ +import functools +from types import MethodType +from typing import Dict, List + +import dill +import pandas as pd + +from feast.feature import Feature +from feast.feature_view import FeatureView +from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( + OnDemandFeatureView as OnDemandFeatureViewProto, +) +from feast.protos.feast.core.OnDemandFeatureView_pb2 import OnDemandFeatureViewSpec +from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( + UserDefinedFunction as UserDefinedFunctionProto, +) +from feast.usage import log_exceptions +from feast.value_type import ValueType + + +class OnDemandFeatureView: + """ + An OnDemandFeatureView defines on demand transformations on existing feature view values and request data. + + Args: + name: Name of the group of features. + features: Output schema of transformation with feature names + inputs: The input feature views passed into the transform. + udf: User defined transformation function that takes as input pandas dataframes + """ + + name: str + features: List[Feature] + inputs: Dict[str, FeatureView] + udf: MethodType + + @log_exceptions + def __init__( + self, + name: str, + features: List[Feature], + inputs: Dict[str, FeatureView], + udf: MethodType, + ): + """ + Creates an OnDemandFeatureView object. + """ + + self.name = name + self.features = features + self.inputs = inputs + self.udf = udf + + def to_proto(self) -> OnDemandFeatureViewProto: + """ + Converts an on demand feature view object to its protobuf representation. + + Returns: + A OnDemandFeatureViewProto protobuf. + """ + spec = OnDemandFeatureViewSpec( + name=self.name, + features=[feature.to_proto() for feature in self.features], + inputs={k: fv.to_proto() for k, fv in self.inputs.items()}, + user_defined_function=UserDefinedFunctionProto( + name=self.udf.__name__, body=dill.dumps(self.udf, recurse=True), + ), + ) + + return OnDemandFeatureViewProto(spec=spec) + + @classmethod + def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): + """ + Creates an on demand feature view from a protobuf representation. + + Args: + on_demand_feature_view_proto: A protobuf representation of an on-demand feature view. + + Returns: + A OnDemandFeatureView object based on the on-demand feature view protobuf. + """ + on_demand_feature_view_obj = cls( + name=on_demand_feature_view_proto.spec.name, + features=[ + Feature( + name=feature.name, + dtype=ValueType(feature.value_type), + labels=dict(feature.labels), + ) + for feature in on_demand_feature_view_proto.spec.features + ], + inputs={ + feature_view_name: FeatureView.from_proto(feature_view_proto) + for feature_view_name, feature_view_proto in on_demand_feature_view_proto.spec.inputs.items() + }, + udf=dill.loads( + on_demand_feature_view_proto.spec.user_defined_function.body + ), + ) + + return on_demand_feature_view_obj + + def get_transformed_features_df( + self, full_feature_names: bool, df_with_features: pd.DataFrame + ) -> pd.DataFrame: + # Apply on demand transformations + # TODO(adchia): Include only the feature values from the specified input FVs in the ODFV. + # Copy over un-prefixed features even if not requested since transform may need it + columns_to_cleanup = [] + if full_feature_names: + for input_fv in self.inputs.values(): + for feature in input_fv.features: + full_feature_ref = f"{input_fv.name}__{feature.name}" + if full_feature_ref in df_with_features.keys(): + df_with_features[feature.name] = df_with_features[ + full_feature_ref + ] + columns_to_cleanup.append(feature.name) + + # Compute transformed values and apply to each result row + df_with_transformed_features = self.udf.__call__(df_with_features) + + # Cleanup extra columns used for transformation + df_with_features.drop(columns=columns_to_cleanup, inplace=True) + return df_with_transformed_features + + +def on_demand_feature_view(features: List[Feature], inputs: Dict[str, FeatureView]): + """ + Declare an on-demand feature view + + :param features: Output schema with feature names + :param inputs: The inputs passed into the transform. + :return: An On Demand Feature View. + """ + + def decorator(user_function): + on_demand_feature_view_obj = OnDemandFeatureView( + name=user_function.__name__, + inputs=inputs, + features=features, + udf=user_function, + ) + functools.update_wrapper( + wrapper=on_demand_feature_view_obj, wrapped=user_function + ) + return on_demand_feature_view_obj + + return decorator diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index af04de5f3f..8a1994bbc5 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -18,11 +18,12 @@ from datetime import datetime, timedelta from pathlib import Path from tempfile import TemporaryFile -from typing import List, Optional +from typing import List, Optional, Set from urllib.parse import urlparse from feast.entity import Entity from feast.errors import ( + ConflictingFeatureViewNames, EntityNotFoundException, FeatureServiceNotFoundException, FeatureTableNotFoundException, @@ -33,6 +34,7 @@ from feast.feature_service import FeatureService from feast.feature_table import FeatureTable from feast.feature_view import FeatureView +from feast.on_demand_feature_view import OnDemandFeatureView from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto REGISTRY_SCHEMA_VERSION = "1" @@ -272,6 +274,9 @@ def apply_feature_view( self._prepare_registry_for_changes() assert self.cached_registry_proto + if feature_view.name in self._get_existing_on_demand_feature_view_names(): + raise ConflictingFeatureViewNames(feature_view.name) + for idx, existing_feature_view_proto in enumerate( self.cached_registry_proto.feature_views ): @@ -289,6 +294,51 @@ def apply_feature_view( if commit: self.commit() + def apply_on_demand_feature_view( + self, + on_demand_feature_view: OnDemandFeatureView, + project: str, + commit: bool = True, + ): + """ + Registers a single on demand feature view with Feast + + Args: + on_demand_feature_view: Feature view that will be registered + project: Feast project that this feature view belongs to + commit: Whether the change should be persisted immediately + """ + on_demand_feature_view_proto = on_demand_feature_view.to_proto() + on_demand_feature_view_proto.spec.project = project + self._prepare_registry_for_changes() + assert self.cached_registry_proto + + if on_demand_feature_view.name in self._get_existing_feature_view_names(): + raise ConflictingFeatureViewNames(on_demand_feature_view.name) + + for idx, existing_feature_view_proto in enumerate( + self.cached_registry_proto.on_demand_feature_views + ): + if ( + existing_feature_view_proto.spec.name + == on_demand_feature_view_proto.spec.name + and existing_feature_view_proto.spec.project == project + ): + if ( + OnDemandFeatureView.from_proto(existing_feature_view_proto) + == on_demand_feature_view + ): + return + else: + del self.cached_registry_proto.on_demand_feature_views[idx] + break + + self.cached_registry_proto.on_demand_feature_views.append( + on_demand_feature_view_proto + ) + if commit: + self.commit() + def apply_materialization( self, feature_view: FeatureView, @@ -370,6 +420,28 @@ def list_feature_views( feature_views.append(FeatureView.from_proto(feature_view_proto)) return feature_views + def list_on_demand_feature_views( + self, project: str, allow_cache: bool = False + ) -> List[OnDemandFeatureView]: + """ + Retrieve a list of on demand feature views from the registry + + Args: + allow_cache: Allow returning feature views from the cached registry + project: Filter feature tables based on project name + + Returns: + List of on demand feature views + """ + registry_proto = self._get_registry_proto(allow_cache=allow_cache) + on_demand_feature_views = [] + for on_demand_feature_view_proto in registry_proto.on_demand_feature_views: + if on_demand_feature_view_proto.spec.project == project: + on_demand_feature_views.append( + OnDemandFeatureView.from_proto(on_demand_feature_view_proto) + ) + return on_demand_feature_views + def get_feature_table(self, name: str, project: str) -> FeatureTable: """ Retrieves a feature table. @@ -546,6 +618,19 @@ def _get_registry_proto(self, allow_cache: bool = False) -> RegistryProto: self.cache_being_updated = False return registry_proto + def _get_existing_feature_view_names(self) -> Set[str]: + assert self.cached_registry_proto + return set([fv.spec.name for fv in self.cached_registry_proto.feature_views]) + + def _get_existing_on_demand_feature_view_names(self) -> Set[str]: + assert self.cached_registry_proto + return set( + [ + odfv.spec.name + for odfv in self.cached_registry_proto.on_demand_feature_views + ] + ) + class RegistryStore(ABC): """ diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index ea0e79931b..1170701e15 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -17,6 +17,7 @@ from feast.feature_view import FeatureView from feast.infra.provider import get_provider from feast.names import adjectives, animals +from feast.on_demand_feature_view import OnDemandFeatureView from feast.registry import Registry from feast.repo_config import RepoConfig from feast.usage import log_exceptions_and_usage @@ -33,6 +34,7 @@ def py_path_to_module(path: Path, repo_root: Path) -> str: class ParsedRepo(NamedTuple): feature_tables: List[FeatureTable] feature_views: List[FeatureView] + on_demand_feature_views: List[OnDemandFeatureView] entities: List[Entity] feature_services: List[FeatureService] @@ -93,7 +95,11 @@ def get_repo_files(repo_root: Path) -> List[Path]: def parse_repo(repo_root: Path) -> ParsedRepo: """ Collect feature table definitions from feature repo """ res = ParsedRepo( - feature_tables=[], entities=[], feature_views=[], feature_services=[] + feature_tables=[], + entities=[], + feature_views=[], + feature_services=[], + on_demand_feature_views=[], ) for repo_file in get_repo_files(repo_root): @@ -109,6 +115,8 @@ def parse_repo(repo_root: Path) -> ParsedRepo: res.entities.append(obj) elif isinstance(obj, FeatureService): res.feature_services.append(obj) + elif isinstance(obj, OnDemandFeatureView): + res.on_demand_feature_views.append(obj) return res @@ -143,6 +151,10 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation views_to_keep, views_to_delete = _tag_registry_views_for_keep_delete( project, registry, repo ) + ( + odfvs_to_keep, + odfvs_to_delete, + ) = _tag_registry_on_demand_feature_views_for_keep_delete(project, registry, repo) tables_to_keep, tables_to_delete = _tag_registry_tables_for_keep_delete( project, registry, repo ) @@ -181,10 +193,14 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation # TODO: delete entities from the registry too # Add / update views + entities + services - all_to_apply: List[Union[Entity, FeatureView, FeatureService]] = [] + all_to_apply: List[ + Union[Entity, FeatureView, FeatureService, OnDemandFeatureView] + ] = [] all_to_apply.extend(entities_to_keep) all_to_apply.extend(views_to_keep) all_to_apply.extend(services_to_keep) + all_to_apply.extend(odfvs_to_keep) + # TODO: delete odfvs store.apply(all_to_apply, commit=False) for entity in entities_to_keep: click.echo( @@ -194,6 +210,10 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation click.echo( f"Registered feature view {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL}" ) + for odfv in odfvs_to_keep: + click.echo( + f"Registered on demand feature view {Style.BRIGHT + Fore.GREEN}{odfv.name}{Style.RESET_ALL}" + ) for feature_service in services_to_keep: click.echo( f"Registered feature service {Style.BRIGHT + Fore.GREEN}{feature_service.name}{Style.RESET_ALL}" @@ -263,6 +283,20 @@ def _tag_registry_views_for_keep_delete( return views_to_keep, views_to_delete +def _tag_registry_on_demand_feature_views_for_keep_delete( + project: str, registry: Registry, repo: ParsedRepo +) -> Tuple[List[OnDemandFeatureView], List[OnDemandFeatureView]]: + odfvs_to_keep: List[OnDemandFeatureView] = repo.on_demand_feature_views + odfvs_to_delete: List[OnDemandFeatureView] = [] + repo_on_demand_feature_view_names = set( + t.name for t in repo.on_demand_feature_views + ) + for registry_odfv in registry.list_on_demand_feature_views(project=project): + if registry_odfv.name not in repo_on_demand_feature_view_names: + odfvs_to_delete.append(registry_odfv) + return odfvs_to_keep, odfvs_to_delete + + def _tag_registry_tables_for_keep_delete( project: str, registry: Registry, repo: ParsedRepo ) -> Tuple[List[FeatureTable], List[FeatureTable]]: diff --git a/sdk/python/setup.py b/sdk/python/setup.py index af82301212..b776369ca6 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -113,6 +113,7 @@ "google-cloud-core==1.4.*", "redis-py-cluster==2.1.2", "boto3==1.17.*", + "dill==0.3.0" ] diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index d03b89f0e0..c8029474aa 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -1,6 +1,9 @@ from datetime import timedelta +from typing import Dict -from feast import Feature, FeatureView, ValueType +import pandas as pd + +from feast import Feature, FeatureView, OnDemandFeatureView, ValueType from feast.data_source import DataSource @@ -19,6 +22,23 @@ def driver_feature_view( ) +def conv_rate_plus_100(driver_hourly_stats: pd.DataFrame) -> pd.DataFrame: + df = pd.DataFrame() + df["conv_rate_plus_100"] = driver_hourly_stats["conv_rate"] + 100 + return df + + +def conv_rate_plus_100_feature_view( + inputs: Dict[str, FeatureView] +) -> OnDemandFeatureView: + return OnDemandFeatureView( + name=conv_rate_plus_100.__name__, + inputs=inputs, + features=[Feature("conv_rate_plus_100", ValueType.FLOAT)], + udf=conv_rate_plus_100, + ) + + def create_driver_hourly_stats_feature_view(source, infer_features: bool = True): driver_stats_feature_view = FeatureView( name="driver_stats", diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index b3bcf688bd..d86af4521c 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -3,6 +3,9 @@ import pandas as pd import pytest +from integration.feature_repos.universal.feature_views import ( + conv_rate_plus_100_feature_view, +) from tests.integration.feature_repos.repo_configuration import ( construct_universal_feature_views, @@ -17,10 +20,10 @@ def test_online_retrieval(environment, universal_data_sources, full_feature_name fs = environment.feature_store entities, datasets, data_sources = universal_data_sources feature_views = construct_universal_feature_views(data_sources) - + odfv = conv_rate_plus_100_feature_view(inputs={"driver": feature_views["driver"]}) feast_objects = [] feast_objects.extend(feature_views.values()) - feast_objects.extend([driver(), customer()]) + feast_objects.extend([odfv, driver(), customer()]) fs.apply(feast_objects) fs.materialize(environment.start_date, environment.end_date) @@ -45,8 +48,9 @@ def test_online_retrieval(environment, universal_data_sources, full_feature_name "customer_profile:current_balance", "customer_profile:avg_passenger_count", "customer_profile:lifetime_trip_count", + "conv_rate_plus_100", ] - unprefixed_feature_refs = [f.rsplit(":", 1)[-1] for f in feature_refs] + unprefixed_feature_refs = [f.rsplit(":", 1)[-1] for f in feature_refs if ":" in f] online_features = fs.get_online_features( features=feature_refs, @@ -60,6 +64,9 @@ def test_online_retrieval(environment, universal_data_sources, full_feature_name len(keys) == len(feature_refs) + 2 ) # Add two for the driver id and the customer id entity keys. for feature in feature_refs: + if ":" in feature: + # This is the ODFV + continue if full_feature_names: assert feature.replace(":", "__") in keys else: @@ -75,6 +82,10 @@ def test_online_retrieval(environment, universal_data_sources, full_feature_name assert df_features["customer_id"] == online_features_dict["customer_id"][i] assert df_features["driver_id"] == online_features_dict["driver_id"][i] + assert ( + online_features_dict["conv_rate_plus_100"][i] + == df_features["conv_rate"] + 100 + ) for unprefixed_feature_ref in unprefixed_feature_refs: tc.assertEqual( df_features[unprefixed_feature_ref],