From 0e802f47ca7e9c63bd40e3309fa0297c7165d26c Mon Sep 17 00:00:00 2001 From: David Y Liu Date: Tue, 21 Sep 2021 23:22:01 -0700 Subject: [PATCH 01/15] Revised FeatureService class and proto Signed-off-by: David Y Liu --- protos/feast/core/FeatureService.proto | 16 ++++-- sdk/python/feast/feature_service.py | 67 +++++++++++++++----------- 2 files changed, 51 insertions(+), 32 deletions(-) diff --git a/protos/feast/core/FeatureService.proto b/protos/feast/core/FeatureService.proto index 061e65cfd9..b8a0bef810 100644 --- a/protos/feast/core/FeatureService.proto +++ b/protos/feast/core/FeatureService.proto @@ -6,7 +6,9 @@ option java_outer_classname = "FeatureServiceProto"; option java_package = "feast.proto.core"; import "google/protobuf/timestamp.proto"; -import "feast/core/FeatureViewProjection.proto"; +import "feast/core/OnDemandFeatureView.proto"; +import "feast/core/FeatureTable.proto"; +import "feast/core/FeatureView.proto"; message FeatureService { // User-specified specifications of this feature service. @@ -25,13 +27,19 @@ message FeatureServiceSpec { // List of features that this feature service encapsulates. // Stored as a list of references to other features views and the features from those views. - repeated FeatureViewProjection features = 3; + repeated string features = 3; + + repeated FeatureTable feature_tables = 4; + + repeated FeatureView feature_views = 5; + + repeated OnDemandFeatureView on_demand_feature_views = 6; // User defined metadata - map tags = 4; + map tags = 7; // Description of the feature service. - string description = 5; + string description = 8; } diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index a12152deba..94177badff 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -5,7 +5,6 @@ from feast.feature_table import FeatureTable from feast.feature_view import FeatureView -from feast.feature_view_projection import FeatureViewProjection from feast.on_demand_feature_view import OnDemandFeatureView from feast.protos.feast.core.FeatureService_pb2 import ( FeatureService as FeatureServiceProto, @@ -30,7 +29,10 @@ class FeatureService: """ name: str - features: List[FeatureViewProjection] + features: List[str] + feature_tables: List[FeatureTable] + feature_views: List[FeatureView] + on_demand_feature_views: List[OnDemandFeatureView] tags: Dict[str, str] description: Optional[str] = None created_timestamp: Optional[datetime] = None @@ -40,7 +42,7 @@ def __init__( self, name: str, features: List[ - Union[FeatureTable, FeatureView, OnDemandFeatureView, FeatureViewProjection] + Union[FeatureTable, FeatureView, OnDemandFeatureView] ], tags: Optional[Dict[str, str]] = None, description: Optional[str] = None, @@ -52,18 +54,23 @@ def __init__( ValueError: If one of the specified features is not a valid type. """ self.name = name - self.features = [] - for feature in features: - if ( - isinstance(feature, FeatureTable) - or isinstance(feature, FeatureView) - or isinstance(feature, OnDemandFeatureView) - ): - self.features.append(FeatureViewProjection.from_definition(feature)) - elif isinstance(feature, FeatureViewProjection): - self.features.append(feature) + self.features = [] + self.feature_tables, self.feature_views, self.on_demand_feature_views = [], [], [] + + for feature_grouping in features: + if (isinstance(feature_grouping, FeatureTable)): + self.feature_tables.append(feature_grouping) + elif (isinstance(feature_grouping, FeatureView)): + self.feature_views.append(feature_grouping) + elif (isinstance(feature_grouping, OnDemandFeatureView)): + self.on_demand_feature_views.append(feature_grouping) else: raise ValueError(f"Unexpected type: {type(feature)}") + + self.features.extend([ + f"{feature_grouping.name}:{f.name}" for f in feature_grouping.features] + ) + self.tags = tags or {} self.description = description self.created_timestamp = None @@ -102,10 +109,7 @@ def from_proto(feature_service_proto: FeatureServiceProto): """ fs = FeatureService( name=feature_service_proto.spec.name, - features=[ - FeatureViewProjection.from_proto(fp) - for fp in feature_service_proto.spec.features - ], + features=[], tags=dict(feature_service_proto.spec.tags), description=( feature_service_proto.spec.description @@ -114,6 +118,20 @@ def from_proto(feature_service_proto: FeatureServiceProto): ), ) + fs.features = feature_service_proto.spec.features + fs.feature_tables = [ + FeatureTable.from_proto(table) + for table in feature_service_proto.spec.feature_tables + ] + fs.feature_views = [ + FeatureView.from_proto(view) + for view in feature_service_proto.spec.feature_views + ] + fs.on_demand_feature_views = [ + OnDemandFeatureView.from_proto(view) + for view in feature_service_proto.spec.on_demand_feature_views + ] + if feature_service_proto.meta.HasField("created_timestamp"): fs.created_timestamp = ( feature_service_proto.meta.created_timestamp.ToDatetime() @@ -138,17 +156,10 @@ def to_proto(self) -> FeatureServiceProto: spec = FeatureServiceSpec() spec.name = self.name - for definition in self.features: - if isinstance(definition, FeatureTable) or isinstance( - definition, FeatureView - ): - feature_ref = FeatureViewProjection( - definition.name, definition.features - ) - else: - feature_ref = definition - - spec.features.append(feature_ref.to_proto()) + spec.features = self.features + spec.feature_tables = [table.to_proto for table in self.feature_tables] + spec.feature_views = [view.to_proto for view in self.feature_views] + spec.on_demand_feature_views = [view.to_proto for view in self.on_demand_feature_views] if self.tags: spec.tags.update(self.tags) From f17b07544d067f39024cc7c9c7c3749fbb327278 Mon Sep 17 00:00:00 2001 From: David Y Liu Date: Wed, 22 Sep 2021 13:52:36 -0700 Subject: [PATCH 02/15] wip transitioning feast off of FVProjections Signed-off-by: David Y Liu --- sdk/python/feast/feature_service.py | 16 +++++++------- sdk/python/feast/feature_store.py | 34 +++++++++++++++++++++++------ sdk/python/feast/feature_view.py | 15 ++++++++++--- sdk/python/setup.py | 0 4 files changed, 47 insertions(+), 18 deletions(-) mode change 100644 => 100755 sdk/python/setup.py diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 94177badff..40be1e48fd 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -65,7 +65,7 @@ def __init__( elif (isinstance(feature_grouping, OnDemandFeatureView)): self.on_demand_feature_views.append(feature_grouping) else: - raise ValueError(f"Unexpected type: {type(feature)}") + raise ValueError(f"Unexpected type: {type(feature_grouping)}") self.features.extend([ f"{feature_grouping.name}:{f.name}" for f in feature_grouping.features] @@ -154,13 +154,13 @@ def to_proto(self) -> FeatureServiceProto: if self.created_timestamp: meta.created_timestamp.FromDatetime(self.created_timestamp) - spec = FeatureServiceSpec() - spec.name = self.name - spec.features = self.features - spec.feature_tables = [table.to_proto for table in self.feature_tables] - spec.feature_views = [view.to_proto for view in self.feature_views] - spec.on_demand_feature_views = [view.to_proto for view in self.on_demand_feature_views] - + spec = FeatureServiceSpec( + name=self.name, + features=self.features, + feature_tables=[table.to_proto() for table in self.feature_tables], + feature_views=[view.to_proto() for view in self.feature_views], + on_demand_feature_views=[view.to_proto() for view in self.on_demand_feature_views] + ) if self.tags: spec.tags.update(self.tags) if self.description: diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 8d8344d8eb..923b6c15ad 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import itertools import os import warnings from collections import Counter, OrderedDict, defaultdict @@ -319,9 +320,7 @@ def _get_features( _feature_refs: List[str] if isinstance(_features, FeatureService): # Get the latest value of the feature service, in case the object passed in has been updated underneath us. - _feature_refs = _get_feature_refs_from_feature_services( - self.get_feature_service(_features.name) - ) + _feature_refs = _get_feature_refs_from_feature_services(FeatureService) else: _feature_refs = _features return _feature_refs @@ -542,7 +541,18 @@ def get_historical_features( _feature_refs = self._get_features(features, feature_refs) - all_feature_views = self.list_feature_views() + passed_in_feature_views = {view.name: view for view in + itertools.chain([ + grouping.feature_views for grouping in features + if isinstance(grouping, FeatureService) + ]) + } + + all_feature_views = [*filter( + lambda view: view.name not in [*passed_in_feature_views.keys()], + self.list_feature_views() + )] + [*passed_in_feature_views.values()] + all_on_demand_feature_views = self._registry.list_on_demand_feature_views( project=self.project ) @@ -804,9 +814,19 @@ def get_online_features( >>> online_response_dict = online_response.to_dict() """ _feature_refs = self._get_features(features, feature_refs) - all_feature_views = self._list_feature_views( - allow_cache=True, hide_dummy_entity=False - ) + + passed_in_feature_views = {view.name: view for view in + itertools.chain([ + grouping.feature_views for grouping in features + if isinstance(grouping, FeatureService) + ]) + } + + all_feature_views = [*filter( + lambda view: view.name not in [*passed_in_feature_views.keys()], + self.list_feature_views() + )] + [*passed_in_feature_views.values()] + all_on_demand_feature_views = self._registry.list_on_demand_feature_views( project=self.project, allow_cache=True ) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index f95abb49c7..841b2bfc46 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -23,7 +23,6 @@ from feast.data_source import DataSource from feast.errors import RegistryInferenceFailure from feast.feature import Feature -from feast.feature_view_projection import FeatureViewProjection from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto from feast.protos.feast.core.FeatureView_pb2 import ( FeatureViewMeta as FeatureViewMetaProto, @@ -151,7 +150,7 @@ def __str__(self): def __hash__(self): return hash(self.name) - def __getitem__(self, item) -> FeatureViewProjection: + def __getitem__(self, item): assert isinstance(item, list) referenced_features = [] @@ -159,7 +158,17 @@ def __getitem__(self, item) -> FeatureViewProjection: if feature.name in item: referenced_features.append(feature) - return FeatureViewProjection(self.name, referenced_features) + return FeatureView( + name=self.name, + entities=self.entities, + ttl=self.ttl, + input=self.input, + batch_source=self.batch_source, + stream_source=self.stream_source, + features=referenced_features, + tags=self.tags, + online=self.online, + ) def __eq__(self, other): if not isinstance(other, FeatureView): diff --git a/sdk/python/setup.py b/sdk/python/setup.py old mode 100644 new mode 100755 From e24d04a64c6b1a0b1ccb701166bb5acc14599e2b Mon Sep 17 00:00:00 2001 From: David Y Liu Date: Wed, 22 Sep 2021 15:31:59 -0700 Subject: [PATCH 03/15] corrected a lot of errors and tests failures Signed-off-by: David Y Liu --- sdk/python/feast/feature_service.py | 6 +-- sdk/python/feast/feature_store.py | 62 ++++++++++------------------- 2 files changed, 25 insertions(+), 43 deletions(-) diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 40be1e48fd..d64db8bc99 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -67,8 +67,8 @@ def __init__( else: raise ValueError(f"Unexpected type: {type(feature_grouping)}") - self.features.extend([ - f"{feature_grouping.name}:{f.name}" for f in feature_grouping.features] + self.features.extend( + [f"{feature_grouping.name}:{f.name}" for f in feature_grouping.features] ) self.tags = tags or {} @@ -118,7 +118,7 @@ def from_proto(feature_service_proto: FeatureServiceProto): ), ) - fs.features = feature_service_proto.spec.features + fs.features = [feature for feature in feature_service_proto.spec.features] fs.feature_tables = [ FeatureTable.from_proto(table) for table in feature_service_proto.spec.feature_tables diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 923b6c15ad..eb81d73dd4 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import itertools import os import warnings from collections import Counter, OrderedDict, defaultdict @@ -319,8 +318,7 @@ def _get_features( _feature_refs: List[str] if isinstance(_features, FeatureService): - # Get the latest value of the feature service, in case the object passed in has been updated underneath us. - _feature_refs = _get_feature_refs_from_feature_services(FeatureService) + _feature_refs = _features.features else: _feature_refs = _features return _feature_refs @@ -540,19 +538,7 @@ def get_historical_features( ) _feature_refs = self._get_features(features, feature_refs) - - passed_in_feature_views = {view.name: view for view in - itertools.chain([ - grouping.feature_views for grouping in features - if isinstance(grouping, FeatureService) - ]) - } - - all_feature_views = [*filter( - lambda view: view.name not in [*passed_in_feature_views.keys()], - self.list_feature_views() - )] + [*passed_in_feature_views.values()] - + all_feature_views = self._get_feature_views_to_use(features) all_on_demand_feature_views = self._registry.list_on_demand_feature_views( project=self.project ) @@ -814,19 +800,7 @@ def get_online_features( >>> online_response_dict = online_response.to_dict() """ _feature_refs = self._get_features(features, feature_refs) - - passed_in_feature_views = {view.name: view for view in - itertools.chain([ - grouping.feature_views for grouping in features - if isinstance(grouping, FeatureService) - ]) - } - - all_feature_views = [*filter( - lambda view: view.name not in [*passed_in_feature_views.keys()], - self.list_feature_views() - )] + [*passed_in_feature_views.values()] - + all_feature_views = self._get_feature_views_to_use(features) all_on_demand_feature_views = self._registry.list_on_demand_feature_views( project=self.project, allow_cache=True ) @@ -1037,6 +1011,25 @@ def _augment_response_with_on_demand_transforms( ] = GetOnlineFeaturesResponse.FieldStatus.PRESENT return OnlineResponse(GetOnlineFeaturesResponse(field_values=result_rows)) + + def _get_feature_views_to_use( + self, + features: Union[List[str], FeatureService], + ) -> List[FeatureView]: + + passed_in_feature_views = ({view.name: view for view in features.feature_views} + if isinstance(features, FeatureService) + else {} + ) + + all_feature_views = [*filter( + lambda view: view.name not in [*passed_in_feature_views.keys()], + self.list_feature_views() + )] + [*passed_in_feature_views.values()] + + return all_feature_views + + @log_exceptions_and_usage def serve(self, port: int) -> None: """Start the feature consumption server locally on a given port.""" @@ -1135,17 +1128,6 @@ def _group_feature_refs( return fvs_result, odfvs_result -def _get_feature_refs_from_feature_services( - feature_service: FeatureService, -) -> List[str]: - feature_refs = [] - for projection in feature_service.features: - feature_refs.extend( - [f"{projection.name}:{f.name}" for f in projection.features] - ) - return feature_refs - - def _get_table_entity_keys( table: FeatureView, entity_keys: List[EntityKeyProto], join_key_map: Dict[str, str], ) -> List[EntityKeyProto]: From ffe284ad236579c3637895f46657e08675bd6d26 Mon Sep 17 00:00:00 2001 From: Cody Lin Date: Thu, 23 Sep 2021 12:55:17 -0700 Subject: [PATCH 04/15] fix unit test failure and most lint issues Signed-off-by: Cody Lin --- sdk/python/feast/feature_service.py | 22 +++++++++------- sdk/python/feast/feature_store.py | 25 ++++++++++++------- .../online_store/test_e2e_local.py | 1 + 3 files changed, 30 insertions(+), 18 deletions(-) diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index d64db8bc99..1deb195d05 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -41,9 +41,7 @@ class FeatureService: def __init__( self, name: str, - features: List[ - Union[FeatureTable, FeatureView, OnDemandFeatureView] - ], + features: List[Union[FeatureTable, FeatureView, OnDemandFeatureView]], tags: Optional[Dict[str, str]] = None, description: Optional[str] = None, ): @@ -54,15 +52,19 @@ def __init__( ValueError: If one of the specified features is not a valid type. """ self.name = name - self.features = [] - self.feature_tables, self.feature_views, self.on_demand_feature_views = [], [], [] + self.features = [] + self.feature_tables, self.feature_views, self.on_demand_feature_views = ( + [], + [], + [], + ) for feature_grouping in features: - if (isinstance(feature_grouping, FeatureTable)): + if isinstance(feature_grouping, FeatureTable): self.feature_tables.append(feature_grouping) - elif (isinstance(feature_grouping, FeatureView)): + elif isinstance(feature_grouping, FeatureView): self.feature_views.append(feature_grouping) - elif (isinstance(feature_grouping, OnDemandFeatureView)): + elif isinstance(feature_grouping, OnDemandFeatureView): self.on_demand_feature_views.append(feature_grouping) else: raise ValueError(f"Unexpected type: {type(feature_grouping)}") @@ -159,7 +161,9 @@ def to_proto(self) -> FeatureServiceProto: features=self.features, feature_tables=[table.to_proto() for table in self.feature_tables], feature_views=[view.to_proto() for view in self.feature_views], - on_demand_feature_views=[view.to_proto() for view in self.on_demand_feature_views] + on_demand_feature_views=[ + view.to_proto() for view in self.on_demand_feature_views + ], ) if self.tags: spec.tags.update(self.tags) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index eb81d73dd4..e30dd5fb46 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -800,7 +800,9 @@ def get_online_features( >>> online_response_dict = online_response.to_dict() """ _feature_refs = self._get_features(features, feature_refs) - all_feature_views = self._get_feature_views_to_use(features) + all_feature_views = self._get_feature_views_to_use( + features=features, allow_cache=True, hide_dummy_entity=False + ) all_on_demand_feature_views = self._registry.list_on_demand_feature_views( project=self.project, allow_cache=True ) @@ -1011,25 +1013,30 @@ def _augment_response_with_on_demand_transforms( ] = GetOnlineFeaturesResponse.FieldStatus.PRESENT return OnlineResponse(GetOnlineFeaturesResponse(field_values=result_rows)) - def _get_feature_views_to_use( self, - features: Union[List[str], FeatureService], + features: Optional[Union[List[str], FeatureService]], + allow_cache=False, + hide_dummy_entity: bool = True, ) -> List[FeatureView]: - passed_in_feature_views = ({view.name: view for view in features.feature_views} + passed_in_feature_views = ( + {view.name: view for view in features.feature_views} if isinstance(features, FeatureService) else {} ) - all_feature_views = [*filter( - lambda view: view.name not in [*passed_in_feature_views.keys()], - self.list_feature_views() - )] + [*passed_in_feature_views.values()] + all_feature_views = [ + *filter( + lambda view: view.name not in [*passed_in_feature_views.keys()], + self._list_feature_views( + allow_cache=allow_cache, hide_dummy_entity=hide_dummy_entity + ), + ) + ] + [*passed_in_feature_views.values()] return all_feature_views - @log_exceptions_and_usage def serve(self, port: int) -> None: """Start the feature consumption server locally on a given port.""" diff --git a/sdk/python/tests/integration/online_store/test_e2e_local.py b/sdk/python/tests/integration/online_store/test_e2e_local.py index 6a3bc02d2f..579139ef60 100644 --- a/sdk/python/tests/integration/online_store/test_e2e_local.py +++ b/sdk/python/tests/integration/online_store/test_e2e_local.py @@ -103,6 +103,7 @@ def test_e2e_local() -> None: assert r.returncode == 0 + print("START LOOKING AT LOGS HERE") _assert_online_features(store, driver_df, end_date - timedelta(days=7)) # feast materialize-incremental From 37b6d87e69b416a6a4a016d75c7a2e11409f21dd Mon Sep 17 00:00:00 2001 From: Cody Lin Date: Thu, 23 Sep 2021 13:35:05 -0700 Subject: [PATCH 05/15] remove debugging print statement Signed-off-by: Cody Lin --- sdk/python/tests/integration/online_store/test_e2e_local.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/python/tests/integration/online_store/test_e2e_local.py b/sdk/python/tests/integration/online_store/test_e2e_local.py index 579139ef60..6a3bc02d2f 100644 --- a/sdk/python/tests/integration/online_store/test_e2e_local.py +++ b/sdk/python/tests/integration/online_store/test_e2e_local.py @@ -103,7 +103,6 @@ def test_e2e_local() -> None: assert r.returncode == 0 - print("START LOOKING AT LOGS HERE") _assert_online_features(store, driver_df, end_date - timedelta(days=7)) # feast materialize-incremental From 2971740d541f73ec0e523df7a5c8c93ff0490f42 Mon Sep 17 00:00:00 2001 From: David Y Liu Date: Fri, 24 Sep 2021 09:28:51 -0700 Subject: [PATCH 06/15] simplified _group_feature_refs Signed-off-by: David Y Liu --- sdk/python/feast/feature_store.py | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index e30dd5fb46..46c3b89df7 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1089,7 +1089,7 @@ def _validate_feature_refs(feature_refs: List[str], full_feature_names: bool = F def _group_feature_refs( - features: Union[List[str], FeatureService], + features: List[str], all_feature_views: List[FeatureView], all_on_demand_feature_views: List[OnDemandFeatureView], ) -> Tuple[ @@ -1109,21 +1109,14 @@ def _group_feature_refs( # on demand view name to feature names on_demand_view_features = defaultdict(list) - if isinstance(features, list) and isinstance(features[0], str): - for ref in features: - view_name, feat_name = ref.split(":") - if view_name in view_index: - views_features[view_name].append(feat_name) - elif view_name in on_demand_view_index: - on_demand_view_features[view_name].append(feat_name) - else: - raise FeatureViewNotFoundException(view_name) - elif isinstance(features, FeatureService): - for feature_projection in features.features: - projected_features = feature_projection.features - views_features[feature_projection.name].extend( - [f.name for f in projected_features] - ) + for ref in features: + view_name, feat_name = ref.split(":") + if view_name in view_index: + views_features[view_name].append(feat_name) + elif view_name in on_demand_view_index: + on_demand_view_features[view_name].append(feat_name) + else: + raise FeatureViewNotFoundException(view_name) fvs_result: List[Tuple[FeatureView, List[str]]] = [] odfvs_result: List[Tuple[OnDemandFeatureView, List[str]]] = [] From 86b3665c04d2f7bbcb75ddfcbae73cf45807cfaa Mon Sep 17 00:00:00 2001 From: David Y Liu Date: Fri, 24 Sep 2021 09:54:01 -0700 Subject: [PATCH 07/15] deleted FeatureViewProjection files Signed-off-by: David Y Liu --- protos/feast/core/FeatureViewProjection.proto | 18 --------- sdk/python/feast/feature_view_projection.py | 37 ------------------- sdk/python/feast/on_demand_feature_view.py | 7 ++-- 3 files changed, 4 insertions(+), 58 deletions(-) delete mode 100644 protos/feast/core/FeatureViewProjection.proto delete mode 100644 sdk/python/feast/feature_view_projection.py diff --git a/protos/feast/core/FeatureViewProjection.proto b/protos/feast/core/FeatureViewProjection.proto deleted file mode 100644 index a7b9ae9a89..0000000000 --- a/protos/feast/core/FeatureViewProjection.proto +++ /dev/null @@ -1,18 +0,0 @@ -syntax = "proto3"; -package feast.core; - -option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core"; -option java_outer_classname = "FeatureReferenceProto"; -option java_package = "feast.proto.core"; - -import "feast/core/Feature.proto"; - - -// A reference to features in a feature view -message FeatureViewProjection { - // The feature view name - string feature_view_name = 1; - - // The features of the feature view that are a part of the feature reference. - repeated FeatureSpecV2 feature_columns = 2; -} diff --git a/sdk/python/feast/feature_view_projection.py b/sdk/python/feast/feature_view_projection.py deleted file mode 100644 index 15b24889da..0000000000 --- a/sdk/python/feast/feature_view_projection.py +++ /dev/null @@ -1,37 +0,0 @@ -from typing import List - -from attr import dataclass - -from feast.feature import Feature -from feast.protos.feast.core.FeatureViewProjection_pb2 import ( - FeatureViewProjection as FeatureViewProjectionProto, -) - - -@dataclass -class FeatureViewProjection: - name: str - features: List[Feature] - - def to_proto(self): - feature_reference_proto = FeatureViewProjectionProto( - feature_view_name=self.name - ) - for feature in self.features: - feature_reference_proto.feature_columns.append(feature.to_proto()) - - return feature_reference_proto - - @staticmethod - def from_proto(proto: FeatureViewProjectionProto): - ref = FeatureViewProjection(name=proto.feature_view_name, features=[]) - for feature_column in proto.feature_columns: - ref.features.append(Feature.from_proto(feature_column)) - - return ref - - @staticmethod - def from_definition(feature_definition): - return FeatureViewProjection( - name=feature_definition.name, features=feature_definition.features - ) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index b5f17b9e90..f2767ecc8e 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -10,7 +10,6 @@ from feast.errors import RegistryInferenceFailure from feast.feature import Feature from feast.feature_view import FeatureView -from feast.feature_view_projection import FeatureViewProjection from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( OnDemandFeatureView as OnDemandFeatureViewProto, ) @@ -161,7 +160,7 @@ def get_transformed_features_df( df_with_features.drop(columns=columns_to_cleanup, inplace=True) return df_with_transformed_features - def __getitem__(self, item) -> FeatureViewProjection: + def __getitem__(self, item): assert isinstance(item, list) referenced_features = [] @@ -169,7 +168,9 @@ def __getitem__(self, item) -> FeatureViewProjection: if feature.name in item: referenced_features.append(feature) - return FeatureViewProjection(self.name, referenced_features) + return OnDemandFeatureView( + self.name, referenced_features, self.inputs, self.udf + ) def infer_features(self): """ From afc07d50b496182558c7e1f706686d6c4cf6e4c8 Mon Sep 17 00:00:00 2001 From: David Y Liu Date: Fri, 24 Sep 2021 12:11:03 -0700 Subject: [PATCH 08/15] code review fix and test fix Signed-off-by: David Y Liu --- protos/feast/core/FeatureService.proto | 15 +++++++++------ .../online_store/test_universal_online.py | 4 +--- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/protos/feast/core/FeatureService.proto b/protos/feast/core/FeatureService.proto index b8a0bef810..5f06c47b8d 100644 --- a/protos/feast/core/FeatureService.proto +++ b/protos/feast/core/FeatureService.proto @@ -19,6 +19,9 @@ message FeatureService { } message FeatureServiceSpec { + // was previously used with 'repeated FeatureViewProjection features' + reserved 3; + // Name of the Feature Service. Must be unique. Not updated. string name = 1; @@ -27,19 +30,19 @@ message FeatureServiceSpec { // List of features that this feature service encapsulates. // Stored as a list of references to other features views and the features from those views. - repeated string features = 3; + repeated string features = 6; - repeated FeatureTable feature_tables = 4; + repeated FeatureTable feature_tables = 7; - repeated FeatureView feature_views = 5; + repeated FeatureView feature_views = 8; - repeated OnDemandFeatureView on_demand_feature_views = 6; + repeated OnDemandFeatureView on_demand_feature_views = 9; // User defined metadata - map tags = 7; + map tags = 4; // Description of the feature service. - string description = 8; + string description = 5; } diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index e6543ae54a..36b524dfd7 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -249,9 +249,7 @@ def assert_feature_service_correctness( feature_service_keys = feature_service_online_features_dict.keys() assert ( - len(feature_service_keys) - == sum([len(projection.features) for projection in feature_service.features]) - + 3 + len(feature_service_keys) == len(feature_service.features) + 3 ) # Add two for the driver id and the customer id entity keys and val_to_add request data for i, entity_row in enumerate(entity_rows): From dfdf35e221a4326e106483ea31d303907890d3e8 Mon Sep 17 00:00:00 2001 From: David Y Liu Date: Fri, 24 Sep 2021 12:26:15 -0700 Subject: [PATCH 09/15] Correction in cli.py Signed-off-by: David Y Liu --- sdk/python/feast/cli.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 2701b6d4ed..f1b2914618 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -172,14 +172,10 @@ def feature_service_list(ctx: click.Context): repo = ctx.obj["CHDIR"] cli_check_repo(repo) store = FeatureStore(repo_path=str(repo)) - feature_services = [] - for feature_service in store.list_feature_services(): - feature_names = [] - for projection in feature_service.features: - feature_names.extend( - [f"{projection.name}:{feature.name}" for feature in projection.features] - ) - feature_services.append([feature_service.name, ", ".join(feature_names)]) + feature_services = [ + [feature_service.name, ", ".join(feature_service.features)] + for feature_service in store.list_feature_services() + ] from tabulate import tabulate From 9e00be5b7a7741d2a9a881f1cb2a35357c88fc17 Mon Sep 17 00:00:00 2001 From: David Y Liu Date: Thu, 30 Sep 2021 11:47:02 -0700 Subject: [PATCH 10/15] Made fixes to still get objects from registry but potentially modify those objects after Signed-off-by: David Y Liu --- sdk/python/feast/feature_store.py | 34 ++++++++++++----------- sdk/python/feast/feature_view.py | 45 +++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 15 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 46c3b89df7..0a6cc622b9 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -318,7 +318,11 @@ def _get_features( _feature_refs: List[str] if isinstance(_features, FeatureService): - _feature_refs = _features.features + _feature_refs = self.get_feature_service(_features.name).features + if _features.features != _feature_refs: + raise ValueError( + "FeatureService object that's passed in is inconsistent with version from Registry." + ) else: _feature_refs = _features return _feature_refs @@ -1020,22 +1024,22 @@ def _get_feature_views_to_use( hide_dummy_entity: bool = True, ) -> List[FeatureView]: - passed_in_feature_views = ( - {view.name: view for view in features.feature_views} - if isinstance(features, FeatureService) - else {} - ) + all_feature_views = { + fv.name: fv + for fv in self._list_feature_views(allow_cache, hide_dummy_entity) + } - all_feature_views = [ - *filter( - lambda view: view.name not in [*passed_in_feature_views.keys()], - self._list_feature_views( - allow_cache=allow_cache, hide_dummy_entity=hide_dummy_entity - ), - ) - ] + [*passed_in_feature_views.values()] + if isinstance(features, FeatureService): + for fv in features.feature_views: + if fv.name not in all_feature_views: + raise ValueError( + f"{fv.name} used in the FeatureService is not in the registry." + ) + all_feature_views[fv.name] = all_feature_views[fv.name].get_projection( + fv + ) - return all_feature_views + return [*all_feature_views.values()] @log_exceptions_and_usage def serve(self, port: int) -> None: diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 841b2bfc46..97b176e439 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -388,3 +388,48 @@ def infer_features_from_batch_source(self, config: RepoConfig): "FeatureView", f"Could not infer Features for the FeatureView named {self.name}.", ) + + def get_projection(self, feature_view): + """ + Produces a copy of this FeatureView (self) with specific fields modified according to the FeatureView object + that's passed in as the argument. This allows users to make modifications to a FeatureView object + (e.g. the name) and then projecting those changes onto the corresponding actual FeatureView from the registry. + Currently all FeatureViews that are used must be registered and this method enables modifying those FeatureViews + while still making sure we're pulling the most up-to-date FeatureView from the registry to modify from. + + Currently, only `FeatureView.features` is the field that's replaced the features from feature_view. + + Args: + feature_view: The FeatureView object that's likely not registered and has the modified fields that should + be projected onto a copy of this FeatureView (self). + + Returns: + A copy of this FeatureView (self) with some of its fields modified according to the feature_view argument. + + """ + if not isinstance(feature_view, FeatureView): + raise TypeError( + "A projection can only be created from a passed in FeatureView." + ) + + features_to_use = [] + features_dict = {feature.name: feature for feature in self.features} + for feature in feature_view.features: + if feature not in self.features: + raise ValueError( + "There are features in the passed in FeatureView object that are not in the FeatureView object" + "of the same name from the registry." + ) + features_to_use.append(features_dict[feature.name]) + + return FeatureView( + name=self.name, + entities=self.entities, + ttl=self.ttl, + input=self.input, + batch_source=self.batch_source, + stream_source=self.stream_source, + features=features_to_use, + tags=self.tags, + online=self.online, + ) From e69ad89f089f10010799a30a248ef7912910ba22 Mon Sep 17 00:00:00 2001 From: David Y Liu Date: Thu, 30 Sep 2021 12:59:21 -0700 Subject: [PATCH 11/15] fixed lint oopsie & added to docstring on feature reference string convention Signed-off-by: David Y Liu --- sdk/python/feast/feature_service.py | 3 ++- sdk/python/feast/feature_view.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 9152ec3b84..9e313c7413 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -26,7 +26,8 @@ class FeatureService: Args: name: Unique name of the feature service. features: A list of Features that are grouped as part of this FeatureService. - The list may contain Feature Views, Feature Tables, or a subset of either. + The list may contain Feature Views, Feature Tables, or a subset of either. The + strings should be in the format 'my_feature_view:my_feature'. tags (optional): A dictionary of key-value pairs used for organizing Feature Services. """ diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index afc9d522f6..adf3b6a0c9 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -401,7 +401,7 @@ def get_projection(self, feature_view): that's passed in as the argument. This allows users to make modifications to a FeatureView object (e.g. the name) and then projecting those changes onto the corresponding actual FeatureView from the registry. Currently all FeatureViews that are used must be registered and this method enables modifying those FeatureViews - while still making sure we're pulling the most up-to-date FeatureView from the registry to modify from. + while still making sure we're pulling the most up-to-date FeatureView from the registry to modify from. Currently, only `FeatureView.features` is the field that's replaced the features from feature_view. From 009edf47d9866a595f86c25b18d15ce1190e9d39 Mon Sep 17 00:00:00 2001 From: David Y Liu Date: Mon, 4 Oct 2021 10:10:54 -0700 Subject: [PATCH 12/15] Brought back FVProjections without exposing it ever to user-facing API Signed-off-by: David Y Liu --- protos/feast/core/FeatureService.proto | 19 ++---- protos/feast/core/FeatureViewProjection.proto | 18 +++++ sdk/python/feast/feature_service.py | 58 +++++----------- sdk/python/feast/feature_store.py | 32 ++++++--- sdk/python/feast/feature_view.py | 67 +++++-------------- sdk/python/feast/feature_view_projection.py | 37 ++++++++++ sdk/python/setup.py | 0 7 files changed, 114 insertions(+), 117 deletions(-) create mode 100644 protos/feast/core/FeatureViewProjection.proto create mode 100644 sdk/python/feast/feature_view_projection.py mode change 100755 => 100644 sdk/python/setup.py diff --git a/protos/feast/core/FeatureService.proto b/protos/feast/core/FeatureService.proto index 5f06c47b8d..1e9be59d51 100644 --- a/protos/feast/core/FeatureService.proto +++ b/protos/feast/core/FeatureService.proto @@ -6,9 +6,7 @@ option java_outer_classname = "FeatureServiceProto"; option java_package = "feast.proto.core"; import "google/protobuf/timestamp.proto"; -import "feast/core/OnDemandFeatureView.proto"; -import "feast/core/FeatureTable.proto"; -import "feast/core/FeatureView.proto"; +import "feast/core/FeatureViewProjection.proto"; message FeatureService { // User-specified specifications of this feature service. @@ -19,24 +17,15 @@ message FeatureService { } message FeatureServiceSpec { - // was previously used with 'repeated FeatureViewProjection features' - reserved 3; - // Name of the Feature Service. Must be unique. Not updated. string name = 1; // Name of Feast project that this Feature Service belongs to. string project = 2; - // List of features that this feature service encapsulates. - // Stored as a list of references to other features views and the features from those views. - repeated string features = 6; - - repeated FeatureTable feature_tables = 7; - - repeated FeatureView feature_views = 8; - - repeated OnDemandFeatureView on_demand_feature_views = 9; + // Represents a projection that's to be applied on top of the FeatureView. + // Contains data like a name alias or the features to use from a FeatureView. + repeated FeatureViewProjection features = 3; // User defined metadata map tags = 4; diff --git a/protos/feast/core/FeatureViewProjection.proto b/protos/feast/core/FeatureViewProjection.proto new file mode 100644 index 0000000000..af5c90ea62 --- /dev/null +++ b/protos/feast/core/FeatureViewProjection.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; +package feast.core; + +option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core"; +option java_outer_classname = "FeatureReferenceProto"; +option java_package = "feast.proto.core"; + +import "feast/core/Feature.proto"; + + +// A projection of a Feature View. Contains the modifications to a Feature View. +message FeatureViewProjection { + // The feature view name + string feature_view_name = 1; + + // The features of the feature view that are a part of the feature reference. + repeated FeatureSpecV2 feature_columns = 2; +} \ No newline at end of file diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 2c99304bad..c5059b5b96 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -5,6 +5,7 @@ from feast.feature_table import FeatureTable from feast.feature_view import FeatureView +from feast.feature_view_projection import FeatureViewProjection from feast.on_demand_feature_view import OnDemandFeatureView from feast.protos.feast.core.FeatureService_pb2 import ( FeatureService as FeatureServiceProto, @@ -31,10 +32,7 @@ class FeatureService: """ name: str - features: List[str] - feature_tables: List[FeatureTable] - feature_views: List[FeatureView] - on_demand_feature_views: List[OnDemandFeatureView] + feature_view_projections: List[FeatureViewProjection] tags: Dict[str, str] description: Optional[str] = None created_timestamp: Optional[datetime] = None @@ -55,27 +53,20 @@ def __init__( ValueError: If one of the specified features is not a valid type. """ self.name = name - self.features = [] - self.feature_tables, self.feature_views, self.on_demand_feature_views = ( - [], - [], - [], - ) + self.feature_view_projections = [] for feature_grouping in features: - if isinstance(feature_grouping, FeatureTable): - self.feature_tables.append(feature_grouping) + if isinstance(feature_grouping, FeatureTable) or isinstance( + feature_grouping, OnDemandFeatureView + ): + self.feature_view_projections.append( + FeatureViewProjection.from_definition(feature_grouping) + ) elif isinstance(feature_grouping, FeatureView): - self.feature_views.append(feature_grouping) - elif isinstance(feature_grouping, OnDemandFeatureView): - self.on_demand_feature_views.append(feature_grouping) + self.feature_view_projections.append(feature_grouping.projection) else: raise ValueError(f"Unexpected type: {type(feature_grouping)}") - self.features.extend( - [f"{feature_grouping.name}:{f.name}" for f in feature_grouping.features] - ) - self.tags = tags or {} self.description = description self.created_timestamp = None @@ -108,13 +99,15 @@ def __eq__(self, other): def from_proto(feature_service_proto: FeatureServiceProto): """ Converts a FeatureServiceProto to a FeatureService object. - Args: feature_service_proto: A protobuf representation of a FeatureService. """ fs = FeatureService( name=feature_service_proto.spec.name, - features=[], + features=[ + FeatureViewProjection.from_proto(projection) + for projection in feature_service_proto.spec.features + ], tags=dict(feature_service_proto.spec.tags), description=( feature_service_proto.spec.description @@ -123,20 +116,6 @@ def from_proto(feature_service_proto: FeatureServiceProto): ), ) - fs.features = [feature for feature in feature_service_proto.spec.features] - fs.feature_tables = [ - FeatureTable.from_proto(table) - for table in feature_service_proto.spec.feature_tables - ] - fs.feature_views = [ - FeatureView.from_proto(view) - for view in feature_service_proto.spec.feature_views - ] - fs.on_demand_feature_views = [ - OnDemandFeatureView.from_proto(view) - for view in feature_service_proto.spec.on_demand_feature_views - ] - if feature_service_proto.meta.HasField("created_timestamp"): fs.created_timestamp = ( feature_service_proto.meta.created_timestamp.ToDatetime() @@ -151,7 +130,6 @@ def from_proto(feature_service_proto: FeatureServiceProto): def to_proto(self) -> FeatureServiceProto: """ Converts a FeatureService to its protobuf representation. - Returns: A FeatureServiceProto protobuf. """ @@ -161,13 +139,11 @@ def to_proto(self) -> FeatureServiceProto: spec = FeatureServiceSpec( name=self.name, - features=self.features, - feature_tables=[table.to_proto() for table in self.feature_tables], - feature_views=[view.to_proto() for view in self.feature_views], - on_demand_feature_views=[ - view.to_proto() for view in self.on_demand_feature_views + features=[ + projection.to_proto() for projection in self.feature_view_projections ], ) + if self.tags: spec.tags.update(self.tags) if self.description: diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index e67d8d43e8..8985e253a4 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -316,12 +316,18 @@ def _get_features( if not _features: raise ValueError("No features specified for retrieval") - _feature_refs: List[str] + _feature_refs = [] if isinstance(_features, FeatureService): - _feature_refs = self.get_feature_service(_features.name).features - if _features.features != _feature_refs: - raise ValueError( - "FeatureService object that's passed in is inconsistent with version from Registry." + feature_service_from_registry = self.get_feature_service(_features.name) + if feature_service_from_registry != _features: + warnings.warn( + "The FeatureService object that has been passed in as an argument is" + "inconsistent with the version from Registry. Potentially a newer version" + "of the FeatureService has been applied to the registry." + ) + for projection in feature_service_from_registry.features: + _feature_refs.extend( + [f"{projection.name}:{f.name}" for f in projection.features] ) else: assert isinstance(_features, list) @@ -1031,14 +1037,18 @@ def _get_feature_views_to_use( } if isinstance(features, FeatureService): - for fv in features.feature_views: - if fv.name not in all_feature_views: + for fv_name, projection in { + projection.name: projection + for projection in features.feature_view_projections + }.items(): + if fv_name in all_feature_views: + all_feature_views[fv_name].set_projection(projection) + else: raise ValueError( - f"{fv.name} used in the FeatureService is not in the registry." + f"The provided feature service {features.name} contains a reference to a feature view" + f"{fv_name} which doesn't exist. Please make sure that you have created the feature view" + f'{fv_name} and that you have registered it by running "apply".' ) - all_feature_views[fv.name] = all_feature_views[fv.name].get_projection( - fv - ) return [*all_feature_views.values()] diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 66ed8f316a..e151660c48 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -23,6 +23,7 @@ from feast.data_source import DataSource from feast.errors import RegistryInferenceFailure from feast.feature import Feature +from feast.feature_view_projection import FeatureViewProjection from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto from feast.protos.feast.core.FeatureView_pb2 import ( FeatureViewMeta as FeatureViewMetaProto, @@ -73,6 +74,7 @@ class FeatureView: online: bool input: DataSource batch_source: DataSource + projection: FeatureViewProjection stream_source: Optional[DataSource] = None created_timestamp: Optional[datetime] = None last_updated_timestamp: Optional[datetime] = None @@ -140,6 +142,8 @@ def __init__( self.created_timestamp: Optional[datetime] = None self.last_updated_timestamp: Optional[datetime] = None + self.projection = FeatureViewProjection.from_definition(self) + def __repr__(self): items = (f"{k} = {v}" for k, v in self.__dict__.items()) return f"<{self.__class__.__name__}({', '.join(items)})>" @@ -158,17 +162,9 @@ def __getitem__(self, item): if feature.name in item: referenced_features.append(feature) - return FeatureView( - name=self.name, - entities=self.entities, - ttl=self.ttl, - input=self.input, - batch_source=self.batch_source, - stream_source=self.stream_source, - features=referenced_features, - tags=self.tags, - online=self.online, - ) + self.projection.features = referenced_features + + return self def __eq__(self, other): if not isinstance(other, FeatureView): @@ -292,6 +288,10 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): stream_source=stream_source, ) + # FeatureViewProjections are not saved in the FeatureView proto. + # Create the default projection. + feature_view.projection = FeatureViewProjection.from_definition(feature_view) + if feature_view_proto.meta.HasField("created_timestamp"): feature_view.created_timestamp = ( feature_view_proto.meta.created_timestamp.ToDatetime() @@ -389,47 +389,14 @@ def infer_features_from_batch_source(self, config: RepoConfig): f"Could not infer Features for the FeatureView named {self.name}.", ) - def get_projection(self, feature_view): - """ - Produces a copy of this FeatureView (self) with specific fields modified according to the FeatureView object - that's passed in as the argument. This allows users to make modifications to a FeatureView object - (e.g. the name) and then projecting those changes onto the corresponding actual FeatureView from the registry. - Currently all FeatureViews that are used must be registered and this method enables modifying those FeatureViews - while still making sure we're pulling the most up-to-date FeatureView from the registry to modify from. - - Currently, only `FeatureView.features` is the field that's replaced the features from feature_view. - - Args: - feature_view: The FeatureView object that's likely not registered and has the modified fields that should - be projected onto a copy of this FeatureView (self). + def set_projection(self, feature_view_projection: FeatureViewProjection): + assert feature_view_projection.name == self.name - Returns: - A copy of this FeatureView (self) with some of its fields modified according to the feature_view argument. - - """ - if not isinstance(feature_view, FeatureView): - raise TypeError( - "A projection can only be created from a passed in FeatureView." - ) - - features_to_use = [] - features_dict = {feature.name: feature for feature in self.features} - for feature in feature_view.features: + for feature in feature_view_projection.features: if feature not in self.features: raise ValueError( - "There are features in the passed in FeatureView object that are not in the FeatureView object" - "of the same name from the registry." + f"The projection for {self.name} cannot be applied because it contains {feature.name} which the" + "FeatureView doesn't have." ) - features_to_use.append(features_dict[feature.name]) - return FeatureView( - name=self.name, - entities=self.entities, - ttl=self.ttl, - input=self.input, - batch_source=self.batch_source, - stream_source=self.stream_source, - features=features_to_use, - tags=self.tags, - online=self.online, - ) + self.projection = feature_view_projection diff --git a/sdk/python/feast/feature_view_projection.py b/sdk/python/feast/feature_view_projection.py new file mode 100644 index 0000000000..e1ec04288f --- /dev/null +++ b/sdk/python/feast/feature_view_projection.py @@ -0,0 +1,37 @@ +from typing import List + +from attr import dataclass + +from feast.feature import Feature +from feast.protos.feast.core.FeatureViewProjection_pb2 import ( + FeatureViewProjection as FeatureViewProjectionProto, +) + + +@dataclass +class FeatureViewProjection: + name: str + features: List[Feature] + + def to_proto(self): + feature_reference_proto = FeatureViewProjectionProto( + feature_view_name=self.name, + ) + for feature in self.features: + feature_reference_proto.feature_columns.append(feature.to_proto()) + + return feature_reference_proto + + @staticmethod + def from_proto(proto: FeatureViewProjectionProto): + ref = FeatureViewProjection(name=proto.feature_view_name, features=[]) + for feature_column in proto.feature_columns: + ref.features.append(Feature.from_proto(feature_column)) + + return ref + + @staticmethod + def from_definition(feature_grouping): + return FeatureViewProjection( + name=feature_grouping.name, features=feature_grouping.features + ) diff --git a/sdk/python/setup.py b/sdk/python/setup.py old mode 100755 new mode 100644 From d65326dea1cfbfa1f8213fed504856ac309b70a7 Mon Sep 17 00:00:00 2001 From: David Y Liu Date: Mon, 4 Oct 2021 11:05:39 -0700 Subject: [PATCH 13/15] Updated OnDemandFV to this new paradigm. Other misc updates Signed-off-by: David Y Liu --- protos/feast/core/FeatureService.proto | 2 +- protos/feast/core/FeatureViewProjection.proto | 5 +-- sdk/python/feast/cli.py | 12 ++++--- sdk/python/feast/feature_service.py | 12 +++---- sdk/python/feast/feature_store.py | 29 ++++++++------- sdk/python/feast/feature_view.py | 12 +++++-- sdk/python/feast/feature_view_projection.py | 2 +- sdk/python/feast/on_demand_feature_view.py | 35 +++++++++++++++++-- .../online_store/test_universal_online.py | 4 ++- 9 files changed, 81 insertions(+), 32 deletions(-) diff --git a/protos/feast/core/FeatureService.proto b/protos/feast/core/FeatureService.proto index 1e9be59d51..952b30eb0a 100644 --- a/protos/feast/core/FeatureService.proto +++ b/protos/feast/core/FeatureService.proto @@ -24,7 +24,7 @@ message FeatureServiceSpec { string project = 2; // Represents a projection that's to be applied on top of the FeatureView. - // Contains data like a name alias or the features to use from a FeatureView. + // Contains data such as the features to use from a FeatureView. repeated FeatureViewProjection features = 3; // User defined metadata diff --git a/protos/feast/core/FeatureViewProjection.proto b/protos/feast/core/FeatureViewProjection.proto index af5c90ea62..d9c80db0b8 100644 --- a/protos/feast/core/FeatureViewProjection.proto +++ b/protos/feast/core/FeatureViewProjection.proto @@ -8,11 +8,12 @@ option java_package = "feast.proto.core"; import "feast/core/Feature.proto"; -// A projection of a Feature View. Contains the modifications to a Feature View. +// A projection to be applied on top of a FeatureView. +// Contains the modifications to a FeatureView such as the features subset to use. message FeatureViewProjection { // The feature view name string feature_view_name = 1; // The features of the feature view that are a part of the feature reference. repeated FeatureSpecV2 feature_columns = 2; -} \ No newline at end of file +} diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index f1b2914618..2701b6d4ed 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -172,10 +172,14 @@ def feature_service_list(ctx: click.Context): repo = ctx.obj["CHDIR"] cli_check_repo(repo) store = FeatureStore(repo_path=str(repo)) - feature_services = [ - [feature_service.name, ", ".join(feature_service.features)] - for feature_service in store.list_feature_services() - ] + feature_services = [] + for feature_service in store.list_feature_services(): + feature_names = [] + for projection in feature_service.features: + feature_names.extend( + [f"{projection.name}:{feature.name}" for feature in projection.features] + ) + feature_services.append([feature_service.name, ", ".join(feature_names)]) from tabulate import tabulate diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index c5059b5b96..003f3824dd 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -25,8 +25,7 @@ class FeatureService: Args: name: Unique name of the feature service. features: A list of Features that are grouped as part of this FeatureService. - The list may contain Feature Views, Feature Tables, or a subset of either. The - strings should be in the format 'my_feature_view:my_feature'. + The list may contain Feature Views, Feature Tables, or a subset of either. tags (optional): A dictionary of key-value pairs used for organizing Feature Services. """ @@ -56,13 +55,13 @@ def __init__( self.feature_view_projections = [] for feature_grouping in features: - if isinstance(feature_grouping, FeatureTable) or isinstance( - feature_grouping, OnDemandFeatureView - ): + if isinstance(feature_grouping, FeatureTable): self.feature_view_projections.append( FeatureViewProjection.from_definition(feature_grouping) ) - elif isinstance(feature_grouping, FeatureView): + elif isinstance(feature_grouping, FeatureView) or isinstance( + feature_grouping, OnDemandFeatureView + ): self.feature_view_projections.append(feature_grouping.projection) else: raise ValueError(f"Unexpected type: {type(feature_grouping)}") @@ -130,6 +129,7 @@ def from_proto(feature_service_proto: FeatureServiceProto): def to_proto(self) -> FeatureServiceProto: """ Converts a FeatureService to its protobuf representation. + Returns: A FeatureServiceProto protobuf. """ diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 8985e253a4..c0051e5212 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -549,9 +549,8 @@ def get_historical_features( ) _feature_refs = self._get_features(features, feature_refs) - all_feature_views = self._get_feature_views_to_use(features) - all_on_demand_feature_views = self._registry.list_on_demand_feature_views( - project=self.project + all_feature_views, all_on_demand_feature_views = self._get_feature_views_to_use( + features ) # TODO(achal): _group_feature_refs returns the on demand feature views, but it's no passed into the provider. @@ -811,12 +810,9 @@ def get_online_features( >>> online_response_dict = online_response.to_dict() """ _feature_refs = self._get_features(features, feature_refs) - all_feature_views = self._get_feature_views_to_use( + all_feature_views, all_on_demand_feature_views = self._get_feature_views_to_use( features=features, allow_cache=True, hide_dummy_entity=False ) - all_on_demand_feature_views = self._registry.list_on_demand_feature_views( - project=self.project, allow_cache=True - ) _validate_feature_refs(_feature_refs, full_feature_names) grouped_refs, grouped_odfv_refs = _group_feature_refs( @@ -1029,20 +1025,29 @@ def _get_feature_views_to_use( features: Optional[Union[List[str], FeatureService]], allow_cache=False, hide_dummy_entity: bool = True, - ) -> List[FeatureView]: + ) -> Tuple[List[FeatureView], List[OnDemandFeatureView]]: - all_feature_views = { + fvs = { fv.name: fv for fv in self._list_feature_views(allow_cache, hide_dummy_entity) } + od_fvs = { + fv.name: fv + for fv in self._registry.list_on_demand_feature_views( + project=self.project, allow_cache=allow_cache + ) + } + if isinstance(features, FeatureService): for fv_name, projection in { projection.name: projection for projection in features.feature_view_projections }.items(): - if fv_name in all_feature_views: - all_feature_views[fv_name].set_projection(projection) + if fv_name in fvs: + fvs[fv_name].set_projection(projection) + elif fv_name in od_fvs: + od_fvs[fv_name].set_projection(projection) else: raise ValueError( f"The provided feature service {features.name} contains a reference to a feature view" @@ -1050,7 +1055,7 @@ def _get_feature_views_to_use( f'{fv_name} and that you have registered it by running "apply".' ) - return [*all_feature_views.values()] + return [*fvs.values()], [*od_fvs.values()] @log_exceptions_and_usage def serve(self, port: int) -> None: diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index e151660c48..65c727e979 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -74,11 +74,11 @@ class FeatureView: online: bool input: DataSource batch_source: DataSource - projection: FeatureViewProjection stream_source: Optional[DataSource] = None created_timestamp: Optional[datetime] = None last_updated_timestamp: Optional[datetime] = None materialization_intervals: List[Tuple[datetime, datetime]] + projection: FeatureViewProjection @log_exceptions def __init__( @@ -389,7 +389,15 @@ def infer_features_from_batch_source(self, config: RepoConfig): f"Could not infer Features for the FeatureView named {self.name}.", ) - def set_projection(self, feature_view_projection: FeatureViewProjection): + def set_projection(self, feature_view_projection: FeatureViewProjection) -> None: + """ + Setter for the projection object held by this FeatureView. Performs checks to ensure + the projection is consistent with this FeatureView before doing the set. + + Args: + feature_view_projection: The FeatureViewProjection object to set this FeatureView's + 'projection' field to. + """ assert feature_view_projection.name == self.name for feature in feature_view_projection.features: diff --git a/sdk/python/feast/feature_view_projection.py b/sdk/python/feast/feature_view_projection.py index e1ec04288f..1b2961302a 100644 --- a/sdk/python/feast/feature_view_projection.py +++ b/sdk/python/feast/feature_view_projection.py @@ -15,7 +15,7 @@ class FeatureViewProjection: def to_proto(self): feature_reference_proto = FeatureViewProjectionProto( - feature_view_name=self.name, + feature_view_name=self.name ) for feature in self.features: feature_reference_proto.feature_columns.append(feature.to_proto()) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index b6b2e7def7..d97da20781 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -10,6 +10,7 @@ from feast.errors import RegistryInferenceFailure from feast.feature import Feature from feast.feature_view import FeatureView +from feast.feature_view_projection import FeatureViewProjection from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( OnDemandFeatureView as OnDemandFeatureViewProto, ) @@ -44,6 +45,7 @@ class OnDemandFeatureView: features: List[Feature] inputs: Dict[str, Union[FeatureView, RequestDataSource]] udf: MethodType + projection: FeatureViewProjection @log_exceptions def __init__( @@ -61,6 +63,7 @@ def __init__( self.features = features self.inputs = inputs self.udf = udf + self.projection = FeatureViewProjection.from_definition(self) def __hash__(self) -> int: return hash((id(self), self.name)) @@ -134,6 +137,12 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): ), ) + # FeatureViewProjections are not saved in the OnDemandFeatureView proto. + # Create the default projection. + on_demand_feature_view_obj.projection = FeatureViewProjection.from_definition( + on_demand_feature_view_obj + ) + return on_demand_feature_view_obj def get_transformed_features_df( @@ -171,9 +180,9 @@ def __getitem__(self, item): if feature.name in item: referenced_features.append(feature) - return OnDemandFeatureView( - self.name, referenced_features, self.inputs, self.udf - ) + self.projection.features = referenced_features + + return self def infer_features(self): """ @@ -238,6 +247,26 @@ def get_requested_odfvs(feature_refs, project, registry): break return requested_on_demand_feature_views + def set_projection(self, feature_view_projection: FeatureViewProjection) -> None: + """ + Setter for the projection object held by this FeatureView. Performs checks to ensure + the projection is consistent with this FeatureView before doing the set. + + Args: + feature_view_projection: The FeatureViewProjection object to set this FeatureView's + 'projection' field to. + """ + assert feature_view_projection.name == self.name + + for feature in feature_view_projection.features: + if feature not in self.features: + raise ValueError( + f"The projection for {self.name} cannot be applied because it contains {feature.name} which the" + "FeatureView doesn't have." + ) + + self.projection = feature_view_projection + def on_demand_feature_view(features: List[Feature], inputs: Dict[str, FeatureView]): """ diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 34ef6e92dc..3f124632e7 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -252,7 +252,9 @@ def assert_feature_service_correctness( feature_service_keys = feature_service_online_features_dict.keys() assert ( - len(feature_service_keys) == len(feature_service.features) + 3 + len(feature_service_keys) + == sum([len(projection.features) for projection in feature_service.features]) + + 3 ) # Add two for the driver id and the customer id entity keys and val_to_add request data tc = unittest.TestCase() From 659374a504cd484f72471664ae09e25550476415 Mon Sep 17 00:00:00 2001 From: David Y Liu Date: Mon, 4 Oct 2021 11:36:39 -0700 Subject: [PATCH 14/15] corrections based on integration test failures Signed-off-by: David Y Liu --- sdk/python/feast/cli.py | 2 +- sdk/python/feast/feature_service.py | 16 +++++++++++----- sdk/python/feast/feature_store.py | 2 +- .../online_store/test_universal_online.py | 7 ++++++- 4 files changed, 19 insertions(+), 8 deletions(-) diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 2701b6d4ed..12be1e356d 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -175,7 +175,7 @@ def feature_service_list(ctx: click.Context): feature_services = [] for feature_service in store.list_feature_services(): feature_names = [] - for projection in feature_service.features: + for projection in feature_service.feature_view_projections: feature_names.extend( [f"{projection.name}:{feature.name}" for feature in projection.features] ) diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 003f3824dd..628c706263 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -89,7 +89,9 @@ def __eq__(self, other): if self.tags != other.tags or self.name != other.name: return False - if sorted(self.features) != sorted(other.features): + if sorted(self.feature_view_projections) != sorted( + other.feature_view_projections + ): return False return True @@ -98,15 +100,13 @@ def __eq__(self, other): def from_proto(feature_service_proto: FeatureServiceProto): """ Converts a FeatureServiceProto to a FeatureService object. + Args: feature_service_proto: A protobuf representation of a FeatureService. """ fs = FeatureService( name=feature_service_proto.spec.name, - features=[ - FeatureViewProjection.from_proto(projection) - for projection in feature_service_proto.spec.features - ], + features=[], tags=dict(feature_service_proto.spec.tags), description=( feature_service_proto.spec.description @@ -114,6 +114,12 @@ def from_proto(feature_service_proto: FeatureServiceProto): else None ), ) + fs.feature_view_projections.extend( + [ + FeatureViewProjection.from_proto(projection) + for projection in feature_service_proto.spec.features + ] + ) if feature_service_proto.meta.HasField("created_timestamp"): fs.created_timestamp = ( diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index c0051e5212..8c93a39767 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -325,7 +325,7 @@ def _get_features( "inconsistent with the version from Registry. Potentially a newer version" "of the FeatureService has been applied to the registry." ) - for projection in feature_service_from_registry.features: + for projection in feature_service_from_registry.feature_view_projections: _feature_refs.extend( [f"{projection.name}:{f.name}" for f in projection.features] ) diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 3f124632e7..aeab91e5f8 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -253,7 +253,12 @@ def assert_feature_service_correctness( assert ( len(feature_service_keys) - == sum([len(projection.features) for projection in feature_service.features]) + == sum( + [ + len(projection.features) + for projection in feature_service.feature_view_projections + ] + ) + 3 ) # Add two for the driver id and the customer id entity keys and val_to_add request data From 31cd6a8bba48529a3d8cb41defa6de76b06f4d42 Mon Sep 17 00:00:00 2001 From: David Y Liu Date: Mon, 4 Oct 2021 15:59:21 -0700 Subject: [PATCH 15/15] corrections based on CR comments Signed-off-by: David Y Liu --- sdk/python/feast/feature_service.py | 5 ++++- sdk/python/feast/feature_view.py | 16 ++++++++++++---- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 628c706263..9f8e9af1fb 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -64,7 +64,10 @@ def __init__( ): self.feature_view_projections.append(feature_grouping.projection) else: - raise ValueError(f"Unexpected type: {type(feature_grouping)}") + raise ValueError( + "The FeatureService {fs_name} has been provided with an invalid type" + f'{type(feature_grouping)} as part of the "features" argument.)' + ) self.tags = tags or {} self.description = description diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 65c727e979..9752449c59 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -391,19 +391,27 @@ def infer_features_from_batch_source(self, config: RepoConfig): def set_projection(self, feature_view_projection: FeatureViewProjection) -> None: """ - Setter for the projection object held by this FeatureView. Performs checks to ensure - the projection is consistent with this FeatureView before doing the set. + Setter for the projection object held by this FeatureView. A projection is an + object that stores the modifications to a FeatureView that is applied to the FeatureView + when the FeatureView is used such as during feature_store.get_historical_features. + This method also performs checks to ensure the projection is consistent with this + FeatureView before doing the set. Args: feature_view_projection: The FeatureViewProjection object to set this FeatureView's 'projection' field to. """ - assert feature_view_projection.name == self.name + if feature_view_projection.name != self.name: + raise ValueError( + f"The projection for the {self.name} FeatureView cannot be applied because it differs in name. " + f"The projection is named {feature_view_projection.name} and the name indicates which " + "FeatureView the projection is for." + ) for feature in feature_view_projection.features: if feature not in self.features: raise ValueError( - f"The projection for {self.name} cannot be applied because it contains {feature.name} which the" + f"The projection for {self.name} cannot be applied because it contains {feature.name} which the " "FeatureView doesn't have." )