Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hide FeatureViewProjections from user interface & have FeatureViews carry FVProjections that carries the modified info of the FeatureView #1899

Merged
merged 17 commits into from
Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions protos/feast/core/FeatureService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ message FeatureServiceSpec {
// Name of Feast project that this Feature Service belongs to.
string project = 2;

// List of features that this feature service encapsulates.
// Stored as a list of references to other features views and the features from those views.
// Represents a projection that's to be applied on top of the FeatureView.
// Contains data such as the features to use from a FeatureView.
repeated FeatureViewProjection features = 3;
mavysavydav marked this conversation as resolved.
Show resolved Hide resolved

// User defined metadata
Expand Down
3 changes: 2 additions & 1 deletion protos/feast/core/FeatureViewProjection.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ option java_package = "feast.proto.core";
import "feast/core/Feature.proto";


// A reference to features in a feature view
// A projection to be applied on top of a FeatureView.
// Contains the modifications to a FeatureView such as the features subset to use.
message FeatureViewProjection {
// The feature view name
string feature_view_name = 1;
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def feature_service_list(ctx: click.Context):
feature_services = []
for feature_service in store.list_feature_services():
feature_names = []
for projection in feature_service.features:
for projection in feature_service.feature_view_projections:
feature_names.extend(
[f"{projection.name}:{feature.name}" for feature in projection.features]
)
Expand Down
65 changes: 33 additions & 32 deletions sdk/python/feast/feature_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class FeatureService:
"""

name: str
features: List[FeatureViewProjection]
feature_view_projections: List[FeatureViewProjection]
tags: Dict[str, str]
description: Optional[str] = None
created_timestamp: Optional[datetime] = None
Expand All @@ -41,9 +41,7 @@ class FeatureService:
def __init__(
self,
name: str,
features: List[
Union[FeatureTable, FeatureView, OnDemandFeatureView, FeatureViewProjection]
],
features: List[Union[FeatureTable, FeatureView, OnDemandFeatureView]],
tags: Optional[Dict[str, str]] = None,
description: Optional[str] = None,
):
Expand All @@ -54,18 +52,23 @@ def __init__(
ValueError: If one of the specified features is not a valid type.
"""
self.name = name
self.features = []
for feature in features:
if (
isinstance(feature, FeatureTable)
or isinstance(feature, FeatureView)
or isinstance(feature, OnDemandFeatureView)
self.feature_view_projections = []

for feature_grouping in features:
if isinstance(feature_grouping, FeatureTable):
self.feature_view_projections.append(
FeatureViewProjection.from_definition(feature_grouping)
)
elif isinstance(feature_grouping, FeatureView) or isinstance(
feature_grouping, OnDemandFeatureView
):
self.features.append(FeatureViewProjection.from_definition(feature))
elif isinstance(feature, FeatureViewProjection):
self.features.append(feature)
self.feature_view_projections.append(feature_grouping.projection)
else:
raise ValueError(f"Unexpected type: {type(feature)}")
raise ValueError(
"The FeatureService {fs_name} has been provided with an invalid type"
f'{type(feature_grouping)} as part of the "features" argument.)'
)

self.tags = tags or {}
self.description = description
self.created_timestamp = None
Expand All @@ -89,7 +92,9 @@ def __eq__(self, other):
if self.tags != other.tags or self.name != other.name:
return False

if sorted(self.features) != sorted(other.features):
if sorted(self.feature_view_projections) != sorted(
other.feature_view_projections
):
return False

return True
Expand All @@ -104,17 +109,20 @@ def from_proto(feature_service_proto: FeatureServiceProto):
"""
fs = FeatureService(
name=feature_service_proto.spec.name,
features=[
FeatureViewProjection.from_proto(fp)
for fp in feature_service_proto.spec.features
],
features=[],
tags=dict(feature_service_proto.spec.tags),
description=(
feature_service_proto.spec.description
if feature_service_proto.spec.description != ""
else None
),
)
fs.feature_view_projections.extend(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious why you moved this code out of the constructor. Don't feel strongly about it though.

[
FeatureViewProjection.from_proto(projection)
for projection in feature_service_proto.spec.features
]
)

if feature_service_proto.meta.HasField("created_timestamp"):
fs.created_timestamp = (
Expand All @@ -138,19 +146,12 @@ def to_proto(self) -> FeatureServiceProto:
if self.created_timestamp:
meta.created_timestamp.FromDatetime(self.created_timestamp)

spec = FeatureServiceSpec()
spec.name = self.name
for definition in self.features:
if isinstance(definition, FeatureTable) or isinstance(
definition, FeatureView
):
feature_ref = FeatureViewProjection(
definition.name, definition.features
)
else:
feature_ref = definition

spec.features.append(feature_ref.to_proto())
spec = FeatureServiceSpec(
name=self.name,
features=[
projection.to_proto() for projection in self.feature_view_projections
],
)

if self.tags:
spec.tags.update(self.tags)
Expand Down
103 changes: 62 additions & 41 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,12 +316,19 @@ def _get_features(
if not _features:
raise ValueError("No features specified for retrieval")

_feature_refs: List[str]
_feature_refs = []
if isinstance(_features, FeatureService):
# Get the latest value of the feature service, in case the object passed in has been updated underneath us.
_feature_refs = _get_feature_refs_from_feature_services(
self.get_feature_service(_features.name)
)
feature_service_from_registry = self.get_feature_service(_features.name)
if feature_service_from_registry != _features:
warnings.warn(
"The FeatureService object that has been passed in as an argument is"
"inconsistent with the version from Registry. Potentially a newer version"
"of the FeatureService has been applied to the registry."
)
for projection in feature_service_from_registry.feature_view_projections:
_feature_refs.extend(
[f"{projection.name}:{f.name}" for f in projection.features]
)
else:
assert isinstance(_features, list)
_feature_refs = _features
Expand Down Expand Up @@ -542,10 +549,8 @@ def get_historical_features(
)

_feature_refs = self._get_features(features, feature_refs)

all_feature_views = self.list_feature_views()
all_on_demand_feature_views = self._registry.list_on_demand_feature_views(
project=self.project
all_feature_views, all_on_demand_feature_views = self._get_feature_views_to_use(
features
)

# TODO(achal): _group_feature_refs returns the on demand feature views, but it's no passed into the provider.
Expand Down Expand Up @@ -805,11 +810,8 @@ def get_online_features(
>>> online_response_dict = online_response.to_dict()
"""
_feature_refs = self._get_features(features, feature_refs)
all_feature_views = self._list_feature_views(
allow_cache=True, hide_dummy_entity=False
)
all_on_demand_feature_views = self._registry.list_on_demand_feature_views(
project=self.project, allow_cache=True
all_feature_views, all_on_demand_feature_views = self._get_feature_views_to_use(
features=features, allow_cache=True, hide_dummy_entity=False
)

_validate_feature_refs(_feature_refs, full_feature_names)
Expand Down Expand Up @@ -1018,6 +1020,43 @@ def _augment_response_with_on_demand_transforms(
] = GetOnlineFeaturesResponse.FieldStatus.PRESENT
return OnlineResponse(GetOnlineFeaturesResponse(field_values=result_rows))

def _get_feature_views_to_use(
self,
features: Optional[Union[List[str], FeatureService]],
woop marked this conversation as resolved.
Show resolved Hide resolved
allow_cache=False,
hide_dummy_entity: bool = True,
) -> Tuple[List[FeatureView], List[OnDemandFeatureView]]:

fvs = {
fv.name: fv
for fv in self._list_feature_views(allow_cache, hide_dummy_entity)
}

od_fvs = {
fv.name: fv
for fv in self._registry.list_on_demand_feature_views(
project=self.project, allow_cache=allow_cache
)
}

if isinstance(features, FeatureService):
for fv_name, projection in {
projection.name: projection
for projection in features.feature_view_projections
}.items():
if fv_name in fvs:
fvs[fv_name].set_projection(projection)
elif fv_name in od_fvs:
od_fvs[fv_name].set_projection(projection)
else:
raise ValueError(
f"The provided feature service {features.name} contains a reference to a feature view"
f"{fv_name} which doesn't exist. Please make sure that you have created the feature view"
f'{fv_name} and that you have registered it by running "apply".'
)

return [*fvs.values()], [*od_fvs.values()]

@log_exceptions_and_usage
def serve(self, port: int) -> None:
"""Start the feature consumption server locally on a given port."""
Expand Down Expand Up @@ -1070,7 +1109,7 @@ def _validate_feature_refs(feature_refs: List[str], full_feature_names: bool = F


def _group_feature_refs(
features: Union[List[str], FeatureService],
features: List[str],
all_feature_views: List[FeatureView],
all_on_demand_feature_views: List[OnDemandFeatureView],
) -> Tuple[
Expand All @@ -1090,21 +1129,14 @@ def _group_feature_refs(
# on demand view name to feature names
on_demand_view_features = defaultdict(list)

if isinstance(features, list) and isinstance(features[0], str):
for ref in features:
view_name, feat_name = ref.split(":")
if view_name in view_index:
views_features[view_name].append(feat_name)
elif view_name in on_demand_view_index:
on_demand_view_features[view_name].append(feat_name)
else:
raise FeatureViewNotFoundException(view_name)
elif isinstance(features, FeatureService):
for feature_projection in features.features:
projected_features = feature_projection.features
views_features[feature_projection.name].extend(
[f.name for f in projected_features]
)
for ref in features:
view_name, feat_name = ref.split(":")
if view_name in view_index:
views_features[view_name].append(feat_name)
elif view_name in on_demand_view_index:
on_demand_view_features[view_name].append(feat_name)
else:
raise FeatureViewNotFoundException(view_name)

fvs_result: List[Tuple[FeatureView, List[str]]] = []
odfvs_result: List[Tuple[OnDemandFeatureView, List[str]]] = []
Expand All @@ -1116,17 +1148,6 @@ def _group_feature_refs(
return fvs_result, odfvs_result


def _get_feature_refs_from_feature_services(
feature_service: FeatureService,
) -> List[str]:
feature_refs = []
for projection in feature_service.features:
feature_refs.extend(
[f"{projection.name}:{f.name}" for f in projection.features]
)
return feature_refs


def _get_table_entity_keys(
table: FeatureView, entity_keys: List[EntityKeyProto], join_key_map: Dict[str, str],
) -> List[EntityKeyProto]:
Expand Down
41 changes: 39 additions & 2 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class FeatureView:
created_timestamp: Optional[datetime] = None
last_updated_timestamp: Optional[datetime] = None
materialization_intervals: List[Tuple[datetime, datetime]]
projection: FeatureViewProjection

@log_exceptions
def __init__(
Expand Down Expand Up @@ -141,6 +142,8 @@ def __init__(
self.created_timestamp: Optional[datetime] = None
self.last_updated_timestamp: Optional[datetime] = None

self.projection = FeatureViewProjection.from_definition(self)

def __repr__(self):
items = (f"{k} = {v}" for k, v in self.__dict__.items())
return f"<{self.__class__.__name__}({', '.join(items)})>"
Expand All @@ -151,15 +154,17 @@ def __str__(self):
def __hash__(self):
return hash((id(self), self.name))

def __getitem__(self, item) -> FeatureViewProjection:
def __getitem__(self, item):
assert isinstance(item, list)

referenced_features = []
for feature in self.features:
if feature.name in item:
referenced_features.append(feature)

return FeatureViewProjection(self.name, referenced_features)
self.projection.features = referenced_features

return self

def __eq__(self, other):
if not isinstance(other, FeatureView):
Expand Down Expand Up @@ -283,6 +288,10 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
stream_source=stream_source,
)

# FeatureViewProjections are not saved in the FeatureView proto.
# Create the default projection.
feature_view.projection = FeatureViewProjection.from_definition(feature_view)

if feature_view_proto.meta.HasField("created_timestamp"):
feature_view.created_timestamp = (
feature_view_proto.meta.created_timestamp.ToDatetime()
Expand Down Expand Up @@ -379,3 +388,31 @@ def infer_features_from_batch_source(self, config: RepoConfig):
"FeatureView",
f"Could not infer Features for the FeatureView named {self.name}.",
)

def set_projection(self, feature_view_projection: FeatureViewProjection) -> None:
"""
Setter for the projection object held by this FeatureView. A projection is an
object that stores the modifications to a FeatureView that is applied to the FeatureView
when the FeatureView is used such as during feature_store.get_historical_features.
This method also performs checks to ensure the projection is consistent with this
FeatureView before doing the set.
Args:
feature_view_projection: The FeatureViewProjection object to set this FeatureView's
'projection' field to.
"""
if feature_view_projection.name != self.name:
raise ValueError(
f"The projection for the {self.name} FeatureView cannot be applied because it differs in name. "
f"The projection is named {feature_view_projection.name} and the name indicates which "
"FeatureView the projection is for."
)

for feature in feature_view_projection.features:
if feature not in self.features:
raise ValueError(
f"The projection for {self.name} cannot be applied because it contains {feature.name} which the "
"FeatureView doesn't have."
)

self.projection = feature_view_projection
4 changes: 2 additions & 2 deletions sdk/python/feast/feature_view_projection.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def from_proto(proto: FeatureViewProjectionProto):
return ref

@staticmethod
def from_definition(feature_definition):
def from_definition(feature_grouping):
return FeatureViewProjection(
name=feature_definition.name, features=feature_definition.features
name=feature_grouping.name, features=feature_grouping.features
)
Loading