Skip to content

Commit

Permalink
Hide FeatureViewProjections from user interface & have FeatureViews c…
Browse files Browse the repository at this point in the history
…arry FVProjections that carries the modified info of the FeatureView (#1899)

* Revised FeatureService class and proto

Signed-off-by: David Y Liu <davidyliuliu@gmail.com>

* wip transitioning feast off of FVProjections

Signed-off-by: David Y Liu <davidyliuliu@gmail.com>

* corrected a lot of errors and tests failures

Signed-off-by: David Y Liu <davidyliuliu@gmail.com>

* fix unit test failure and most lint issues

Signed-off-by: Cody Lin <codyl@twitter.com>

* remove debugging print statement

Signed-off-by: Cody Lin <codyl@twitter.com>

* simplified _group_feature_refs

Signed-off-by: David Y Liu <davidyliuliu@gmail.com>

* deleted FeatureViewProjection files

Signed-off-by: David Y Liu <davidyliuliu@gmail.com>

* code review fix and test fix

Signed-off-by: David Y Liu <davidyliuliu@gmail.com>

* Correction in cli.py

Signed-off-by: David Y Liu <davidyliuliu@gmail.com>

* Made fixes to still get objects from registry but potentially modify those objects after

Signed-off-by: David Y Liu <davidyliuliu@gmail.com>

* fixed lint oopsie & added to docstring on feature reference string convention

Signed-off-by: David Y Liu <davidyliuliu@gmail.com>

* Brought back FVProjections without exposing it ever to user-facing API

Signed-off-by: David Y Liu <davidyliuliu@gmail.com>

* Updated OnDemandFV to this new paradigm. Other misc updates

Signed-off-by: David Y Liu <davidyliuliu@gmail.com>

* corrections based on integration test failures

Signed-off-by: David Y Liu <davidyliuliu@gmail.com>

* corrections based on CR comments

Signed-off-by: David Y Liu <davidyliuliu@gmail.com>

Co-authored-by: Cody Lin <codyl@twitter.com>
  • Loading branch information
mavysavydav and Cody Lin authored Oct 5, 2021
1 parent 41535d0 commit b1ccf8d
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 84 deletions.
4 changes: 2 additions & 2 deletions protos/feast/core/FeatureService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ message FeatureServiceSpec {
// Name of Feast project that this Feature Service belongs to.
string project = 2;

// List of features that this feature service encapsulates.
// Stored as a list of references to other features views and the features from those views.
// Represents a projection that's to be applied on top of the FeatureView.
// Contains data such as the features to use from a FeatureView.
repeated FeatureViewProjection features = 3;

// User defined metadata
Expand Down
3 changes: 2 additions & 1 deletion protos/feast/core/FeatureViewProjection.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ option java_package = "feast.proto.core";
import "feast/core/Feature.proto";


// A reference to features in a feature view
// A projection to be applied on top of a FeatureView.
// Contains the modifications to a FeatureView such as the features subset to use.
message FeatureViewProjection {
// The feature view name
string feature_view_name = 1;
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def feature_service_list(ctx: click.Context):
feature_services = []
for feature_service in store.list_feature_services():
feature_names = []
for projection in feature_service.features:
for projection in feature_service.feature_view_projections:
feature_names.extend(
[f"{projection.name}:{feature.name}" for feature in projection.features]
)
Expand Down
65 changes: 33 additions & 32 deletions sdk/python/feast/feature_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class FeatureService:
"""

name: str
features: List[FeatureViewProjection]
feature_view_projections: List[FeatureViewProjection]
tags: Dict[str, str]
description: Optional[str] = None
created_timestamp: Optional[datetime] = None
Expand All @@ -41,9 +41,7 @@ class FeatureService:
def __init__(
self,
name: str,
features: List[
Union[FeatureTable, FeatureView, OnDemandFeatureView, FeatureViewProjection]
],
features: List[Union[FeatureTable, FeatureView, OnDemandFeatureView]],
tags: Optional[Dict[str, str]] = None,
description: Optional[str] = None,
):
Expand All @@ -54,18 +52,23 @@ def __init__(
ValueError: If one of the specified features is not a valid type.
"""
self.name = name
self.features = []
for feature in features:
if (
isinstance(feature, FeatureTable)
or isinstance(feature, FeatureView)
or isinstance(feature, OnDemandFeatureView)
self.feature_view_projections = []

for feature_grouping in features:
if isinstance(feature_grouping, FeatureTable):
self.feature_view_projections.append(
FeatureViewProjection.from_definition(feature_grouping)
)
elif isinstance(feature_grouping, FeatureView) or isinstance(
feature_grouping, OnDemandFeatureView
):
self.features.append(FeatureViewProjection.from_definition(feature))
elif isinstance(feature, FeatureViewProjection):
self.features.append(feature)
self.feature_view_projections.append(feature_grouping.projection)
else:
raise ValueError(f"Unexpected type: {type(feature)}")
raise ValueError(
"The FeatureService {fs_name} has been provided with an invalid type"
f'{type(feature_grouping)} as part of the "features" argument.)'
)

self.tags = tags or {}
self.description = description
self.created_timestamp = None
Expand All @@ -89,7 +92,9 @@ def __eq__(self, other):
if self.tags != other.tags or self.name != other.name:
return False

if sorted(self.features) != sorted(other.features):
if sorted(self.feature_view_projections) != sorted(
other.feature_view_projections
):
return False

return True
Expand All @@ -104,17 +109,20 @@ def from_proto(feature_service_proto: FeatureServiceProto):
"""
fs = FeatureService(
name=feature_service_proto.spec.name,
features=[
FeatureViewProjection.from_proto(fp)
for fp in feature_service_proto.spec.features
],
features=[],
tags=dict(feature_service_proto.spec.tags),
description=(
feature_service_proto.spec.description
if feature_service_proto.spec.description != ""
else None
),
)
fs.feature_view_projections.extend(
[
FeatureViewProjection.from_proto(projection)
for projection in feature_service_proto.spec.features
]
)

if feature_service_proto.meta.HasField("created_timestamp"):
fs.created_timestamp = (
Expand All @@ -138,19 +146,12 @@ def to_proto(self) -> FeatureServiceProto:
if self.created_timestamp:
meta.created_timestamp.FromDatetime(self.created_timestamp)

spec = FeatureServiceSpec()
spec.name = self.name
for definition in self.features:
if isinstance(definition, FeatureTable) or isinstance(
definition, FeatureView
):
feature_ref = FeatureViewProjection(
definition.name, definition.features
)
else:
feature_ref = definition

spec.features.append(feature_ref.to_proto())
spec = FeatureServiceSpec(
name=self.name,
features=[
projection.to_proto() for projection in self.feature_view_projections
],
)

if self.tags:
spec.tags.update(self.tags)
Expand Down
103 changes: 62 additions & 41 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,12 +316,19 @@ def _get_features(
if not _features:
raise ValueError("No features specified for retrieval")

_feature_refs: List[str]
_feature_refs = []
if isinstance(_features, FeatureService):
# Get the latest value of the feature service, in case the object passed in has been updated underneath us.
_feature_refs = _get_feature_refs_from_feature_services(
self.get_feature_service(_features.name)
)
feature_service_from_registry = self.get_feature_service(_features.name)
if feature_service_from_registry != _features:
warnings.warn(
"The FeatureService object that has been passed in as an argument is"
"inconsistent with the version from Registry. Potentially a newer version"
"of the FeatureService has been applied to the registry."
)
for projection in feature_service_from_registry.feature_view_projections:
_feature_refs.extend(
[f"{projection.name}:{f.name}" for f in projection.features]
)
else:
assert isinstance(_features, list)
_feature_refs = _features
Expand Down Expand Up @@ -542,10 +549,8 @@ def get_historical_features(
)

_feature_refs = self._get_features(features, feature_refs)

all_feature_views = self.list_feature_views()
all_on_demand_feature_views = self._registry.list_on_demand_feature_views(
project=self.project
all_feature_views, all_on_demand_feature_views = self._get_feature_views_to_use(
features
)

# TODO(achal): _group_feature_refs returns the on demand feature views, but it's no passed into the provider.
Expand Down Expand Up @@ -805,11 +810,8 @@ def get_online_features(
>>> online_response_dict = online_response.to_dict()
"""
_feature_refs = self._get_features(features, feature_refs)
all_feature_views = self._list_feature_views(
allow_cache=True, hide_dummy_entity=False
)
all_on_demand_feature_views = self._registry.list_on_demand_feature_views(
project=self.project, allow_cache=True
all_feature_views, all_on_demand_feature_views = self._get_feature_views_to_use(
features=features, allow_cache=True, hide_dummy_entity=False
)

_validate_feature_refs(_feature_refs, full_feature_names)
Expand Down Expand Up @@ -1018,6 +1020,43 @@ def _augment_response_with_on_demand_transforms(
] = GetOnlineFeaturesResponse.FieldStatus.PRESENT
return OnlineResponse(GetOnlineFeaturesResponse(field_values=result_rows))

def _get_feature_views_to_use(
self,
features: Optional[Union[List[str], FeatureService]],
allow_cache=False,
hide_dummy_entity: bool = True,
) -> Tuple[List[FeatureView], List[OnDemandFeatureView]]:

fvs = {
fv.name: fv
for fv in self._list_feature_views(allow_cache, hide_dummy_entity)
}

od_fvs = {
fv.name: fv
for fv in self._registry.list_on_demand_feature_views(
project=self.project, allow_cache=allow_cache
)
}

if isinstance(features, FeatureService):
for fv_name, projection in {
projection.name: projection
for projection in features.feature_view_projections
}.items():
if fv_name in fvs:
fvs[fv_name].set_projection(projection)
elif fv_name in od_fvs:
od_fvs[fv_name].set_projection(projection)
else:
raise ValueError(
f"The provided feature service {features.name} contains a reference to a feature view"
f"{fv_name} which doesn't exist. Please make sure that you have created the feature view"
f'{fv_name} and that you have registered it by running "apply".'
)

return [*fvs.values()], [*od_fvs.values()]

@log_exceptions_and_usage
def serve(self, port: int) -> None:
"""Start the feature consumption server locally on a given port."""
Expand Down Expand Up @@ -1070,7 +1109,7 @@ def _validate_feature_refs(feature_refs: List[str], full_feature_names: bool = F


def _group_feature_refs(
features: Union[List[str], FeatureService],
features: List[str],
all_feature_views: List[FeatureView],
all_on_demand_feature_views: List[OnDemandFeatureView],
) -> Tuple[
Expand All @@ -1090,21 +1129,14 @@ def _group_feature_refs(
# on demand view name to feature names
on_demand_view_features = defaultdict(list)

if isinstance(features, list) and isinstance(features[0], str):
for ref in features:
view_name, feat_name = ref.split(":")
if view_name in view_index:
views_features[view_name].append(feat_name)
elif view_name in on_demand_view_index:
on_demand_view_features[view_name].append(feat_name)
else:
raise FeatureViewNotFoundException(view_name)
elif isinstance(features, FeatureService):
for feature_projection in features.features:
projected_features = feature_projection.features
views_features[feature_projection.name].extend(
[f.name for f in projected_features]
)
for ref in features:
view_name, feat_name = ref.split(":")
if view_name in view_index:
views_features[view_name].append(feat_name)
elif view_name in on_demand_view_index:
on_demand_view_features[view_name].append(feat_name)
else:
raise FeatureViewNotFoundException(view_name)

fvs_result: List[Tuple[FeatureView, List[str]]] = []
odfvs_result: List[Tuple[OnDemandFeatureView, List[str]]] = []
Expand All @@ -1116,17 +1148,6 @@ def _group_feature_refs(
return fvs_result, odfvs_result


def _get_feature_refs_from_feature_services(
feature_service: FeatureService,
) -> List[str]:
feature_refs = []
for projection in feature_service.features:
feature_refs.extend(
[f"{projection.name}:{f.name}" for f in projection.features]
)
return feature_refs


def _get_table_entity_keys(
table: FeatureView, entity_keys: List[EntityKeyProto], join_key_map: Dict[str, str],
) -> List[EntityKeyProto]:
Expand Down
41 changes: 39 additions & 2 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class FeatureView:
created_timestamp: Optional[datetime] = None
last_updated_timestamp: Optional[datetime] = None
materialization_intervals: List[Tuple[datetime, datetime]]
projection: FeatureViewProjection

@log_exceptions
def __init__(
Expand Down Expand Up @@ -141,6 +142,8 @@ def __init__(
self.created_timestamp: Optional[datetime] = None
self.last_updated_timestamp: Optional[datetime] = None

self.projection = FeatureViewProjection.from_definition(self)

def __repr__(self):
items = (f"{k} = {v}" for k, v in self.__dict__.items())
return f"<{self.__class__.__name__}({', '.join(items)})>"
Expand All @@ -151,15 +154,17 @@ def __str__(self):
def __hash__(self):
return hash((id(self), self.name))

def __getitem__(self, item) -> FeatureViewProjection:
def __getitem__(self, item):
assert isinstance(item, list)

referenced_features = []
for feature in self.features:
if feature.name in item:
referenced_features.append(feature)

return FeatureViewProjection(self.name, referenced_features)
self.projection.features = referenced_features

return self

def __eq__(self, other):
if not isinstance(other, FeatureView):
Expand Down Expand Up @@ -283,6 +288,10 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
stream_source=stream_source,
)

# FeatureViewProjections are not saved in the FeatureView proto.
# Create the default projection.
feature_view.projection = FeatureViewProjection.from_definition(feature_view)

if feature_view_proto.meta.HasField("created_timestamp"):
feature_view.created_timestamp = (
feature_view_proto.meta.created_timestamp.ToDatetime()
Expand Down Expand Up @@ -379,3 +388,31 @@ def infer_features_from_batch_source(self, config: RepoConfig):
"FeatureView",
f"Could not infer Features for the FeatureView named {self.name}.",
)

def set_projection(self, feature_view_projection: FeatureViewProjection) -> None:
"""
Setter for the projection object held by this FeatureView. A projection is an
object that stores the modifications to a FeatureView that is applied to the FeatureView
when the FeatureView is used such as during feature_store.get_historical_features.
This method also performs checks to ensure the projection is consistent with this
FeatureView before doing the set.
Args:
feature_view_projection: The FeatureViewProjection object to set this FeatureView's
'projection' field to.
"""
if feature_view_projection.name != self.name:
raise ValueError(
f"The projection for the {self.name} FeatureView cannot be applied because it differs in name. "
f"The projection is named {feature_view_projection.name} and the name indicates which "
"FeatureView the projection is for."
)

for feature in feature_view_projection.features:
if feature not in self.features:
raise ValueError(
f"The projection for {self.name} cannot be applied because it contains {feature.name} which the "
"FeatureView doesn't have."
)

self.projection = feature_view_projection
4 changes: 2 additions & 2 deletions sdk/python/feast/feature_view_projection.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def from_proto(proto: FeatureViewProjectionProto):
return ref

@staticmethod
def from_definition(feature_definition):
def from_definition(feature_grouping):
return FeatureViewProjection(
name=feature_definition.name, features=feature_definition.features
name=feature_grouping.name, features=feature_grouping.features
)
Loading

0 comments on commit b1ccf8d

Please sign in to comment.