-
Notifications
You must be signed in to change notification settings - Fork 996
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Update on demand feature view api #2587
Changes from 17 commits
3235ce3
27b7b6e
6f01fd3
d77f9a1
fb49cf3
14fe923
46eec61
3742cf1
f1f8f27
7ef6462
4c147ee
e0078b1
0b9d4fd
390772c
f6ab5da
2047033
4a2125f
4b9dd34
99e0cf5
e232ef2
05617db
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,15 +7,15 @@ | |
from google.protobuf.duration_pb2 import Duration | ||
from feast.field import Field | ||
|
||
from feast import Entity, Feature, FeatureView, FileSource, ValueType | ||
from feast import Entity, Feature, BaseFeatureView, FileSource, ValueType | ||
|
||
driver_hourly_stats = FileSource( | ||
path="data/driver_stats_with_string.parquet", | ||
timestamp_field="event_timestamp", | ||
created_timestamp_column="created", | ||
) | ||
driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",) | ||
driver_hourly_stats_view = FeatureView( | ||
driver_hourly_stats_view = BaseFeatureView( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
name="driver_hourly_stats", | ||
entities=["driver_id"], | ||
ttl=Duration(seconds=86400000), | ||
|
@@ -43,10 +43,10 @@ | |
# Define an on demand feature view which can generate new features based on | ||
# existing feature views and RequestSource features | ||
@on_demand_feature_view( | ||
inputs={ | ||
"driver_hourly_stats": driver_hourly_stats_view, | ||
"vals_to_add": input_request, | ||
}, | ||
inputs=[ | ||
driver_hourly_stats_view, | ||
input_request, | ||
], | ||
schema=[ | ||
Field(name="conv_rate_plus_val1", dtype=Float64), | ||
Field(name="conv_rate_plus_val2", dtype=Float64), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ | |
import pandas as pd | ||
|
||
from feast.base_feature_view import BaseFeatureView | ||
from feast.batch_feature_view import BatchFeatureView | ||
from feast.data_source import RequestSource | ||
from feast.errors import RegistryInferenceFailure, SpecifiedFeaturesNotPresentError | ||
from feast.feature import Feature | ||
|
@@ -25,6 +26,7 @@ | |
from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( | ||
UserDefinedFunction as UserDefinedFunctionProto, | ||
) | ||
from feast.stream_feature_view import StreamFeatureView | ||
from feast.type_map import ( | ||
feast_value_type_to_pandas_type, | ||
python_type_to_feast_value_type, | ||
|
@@ -67,13 +69,13 @@ class OnDemandFeatureView(BaseFeatureView): | |
owner: str | ||
|
||
@log_exceptions | ||
def __init__( | ||
def __init__( # noqa: C901 | ||
achals marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self, | ||
*args, | ||
name: Optional[str] = None, | ||
features: Optional[List[Feature]] = None, | ||
sources: Optional[ | ||
Dict[str, Union[FeatureView, FeatureViewProjection, RequestSource]] | ||
List[Union[BatchFeatureView, StreamFeatureView, RequestSource]] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should probably continue to support There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
] = None, | ||
udf: Optional[MethodType] = None, | ||
inputs: Optional[ | ||
|
@@ -92,11 +94,11 @@ def __init__( | |
features (deprecated): The list of features in the output of the on demand | ||
feature view, after the transformation has been applied. | ||
sources (optional): A map from input source names to the actual input sources, | ||
which may be feature views, feature view projections, or request data sources. | ||
which may be feature views, or request data sources. | ||
These sources serve as inputs to the udf, which will refer to them by name. | ||
udf (optional): The user defined transformation function, which must take pandas | ||
dataframes as inputs. | ||
inputs (optional): A map from input source names to the actual input sources, | ||
inputs (optional): (Deprecated) A map from input source names to the actual input sources, | ||
which may be feature views, feature view projections, or request data sources. | ||
These sources serve as inputs to the udf, which will refer to them by name. | ||
schema (optional): The list of features in the output of the on demand feature | ||
|
@@ -123,8 +125,7 @@ def __init__( | |
), | ||
DeprecationWarning, | ||
) | ||
|
||
_sources = sources or inputs | ||
_sources = sources or [] | ||
if inputs and sources: | ||
raise ValueError("At most one of `sources` or `inputs` can be specified.") | ||
elif inputs: | ||
|
@@ -135,7 +136,19 @@ def __init__( | |
), | ||
DeprecationWarning, | ||
) | ||
|
||
for _, source in inputs.items(): | ||
if isinstance(source, FeatureView): | ||
_sources.append(feature_view_to_batch_feature_view(source)) | ||
elif isinstance(source, FeatureViewProjection): | ||
_sources.append( | ||
BatchFeatureView(name=source.name, schema=source.features,) | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmmm I'm not sure exactly how projections are being used today, but i think it makes to continue to support them. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh I didn't push my changes I think, I updated this. |
||
elif isinstance(source, RequestSource): | ||
_sources.append(source) | ||
else: | ||
raise ValueError( | ||
"input can only accept FeatureView, FeatureViewProjection, or RequestSource" | ||
) | ||
_udf = udf | ||
|
||
if args: | ||
|
@@ -169,7 +182,20 @@ def __init__( | |
DeprecationWarning, | ||
) | ||
if len(args) >= 3: | ||
_sources = args[2] | ||
_inputs = args[2] | ||
for _, source in _inputs.items(): | ||
if isinstance(source, FeatureView): | ||
_sources.append(feature_view_to_batch_feature_view(source)) | ||
elif isinstance(source, FeatureViewProjection): | ||
_sources.append( | ||
BatchFeatureView(name=source.name, schema=source.features,) | ||
) | ||
elif isinstance(source, RequestSource): | ||
_sources.append(source) | ||
else: | ||
raise ValueError( | ||
"input can only accept FeatureView, FeatureViewProjection, or RequestSource" | ||
) | ||
warnings.warn( | ||
( | ||
"The `inputs` parameter is being deprecated. Please use `sources` instead. " | ||
|
@@ -195,18 +221,17 @@ def __init__( | |
tags=tags, | ||
owner=owner, | ||
) | ||
|
||
assert _sources is not None | ||
self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {} | ||
self.source_request_sources: Dict[str, RequestSource] = {} | ||
for source_name, odfv_source in _sources.items(): | ||
for odfv_source in _sources: | ||
if isinstance(odfv_source, RequestSource): | ||
self.source_request_sources[source_name] = odfv_source | ||
self.source_request_sources[odfv_source.name] = odfv_source | ||
elif isinstance(odfv_source, FeatureViewProjection): | ||
self.source_feature_view_projections[source_name] = odfv_source | ||
self.source_feature_view_projections[odfv_source.name] = odfv_source | ||
else: | ||
self.source_feature_view_projections[ | ||
source_name | ||
odfv_source.name | ||
] = odfv_source.projection | ||
|
||
if _udf is None: | ||
|
@@ -219,12 +244,12 @@ def proto_class(self) -> Type[OnDemandFeatureViewProto]: | |
return OnDemandFeatureViewProto | ||
|
||
def __copy__(self): | ||
|
||
fv = OnDemandFeatureView( | ||
name=self.name, | ||
schema=self.features, | ||
sources=dict( | ||
**self.source_feature_view_projections, **self.source_request_sources, | ||
), | ||
sources=list(self.source_feature_view_projections.values()) | ||
+ list(self.source_request_sources.values()), | ||
udf=self.udf, | ||
description=self.description, | ||
tags=self.tags, | ||
|
@@ -302,22 +327,21 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): | |
Returns: | ||
A OnDemandFeatureView object based on the on-demand feature view protobuf. | ||
""" | ||
sources = {} | ||
for ( | ||
source_name, | ||
on_demand_source, | ||
) in on_demand_feature_view_proto.spec.sources.items(): | ||
sources = [] | ||
for (_, on_demand_source,) in on_demand_feature_view_proto.spec.sources.items(): | ||
if on_demand_source.WhichOneof("source") == "feature_view": | ||
sources[source_name] = FeatureView.from_proto( | ||
on_demand_source.feature_view | ||
).projection | ||
sources.append( | ||
FeatureView.from_proto(on_demand_source.feature_view).projection | ||
) | ||
elif on_demand_source.WhichOneof("source") == "feature_view_projection": | ||
sources[source_name] = FeatureViewProjection.from_proto( | ||
on_demand_source.feature_view_projection | ||
sources.append( | ||
FeatureViewProjection.from_proto( | ||
on_demand_source.feature_view_projection | ||
) | ||
) | ||
else: | ||
sources[source_name] = RequestSource.from_proto( | ||
on_demand_source.request_data_source | ||
sources.append( | ||
RequestSource.from_proto(on_demand_source.request_data_source) | ||
) | ||
on_demand_feature_view_obj = cls( | ||
name=on_demand_feature_view_proto.spec.name, | ||
|
@@ -476,7 +500,9 @@ def get_requested_odfvs(feature_refs, project, registry): | |
def on_demand_feature_view( | ||
*args, | ||
features: Optional[List[Feature]] = None, | ||
sources: Optional[Dict[str, Union[FeatureView, RequestSource]]] = None, | ||
sources: Optional[ | ||
List[Union[BatchFeatureView, StreamFeatureView, RequestSource]] | ||
] = None, | ||
inputs: Optional[Dict[str, Union[FeatureView, RequestSource]]] = None, | ||
schema: Optional[List[Field]] = None, | ||
description: str = "", | ||
|
@@ -490,7 +516,7 @@ def on_demand_feature_view( | |
features (deprecated): The list of features in the output of the on demand | ||
feature view, after the transformation has been applied. | ||
sources (optional): A map from input source names to the actual input sources, | ||
which may be feature views, feature view projections, or request data sources. | ||
which may be feature views, or request data sources. | ||
These sources serve as inputs to the udf, which will refer to them by name. | ||
inputs (optional): A map from input source names to the actual input sources, | ||
which may be feature views, feature view projections, or request data sources. | ||
|
@@ -517,8 +543,7 @@ def on_demand_feature_view( | |
), | ||
DeprecationWarning, | ||
) | ||
|
||
_sources = sources or inputs | ||
_sources = sources or [] | ||
if inputs and sources: | ||
raise ValueError("At most one of `sources` or `inputs` can be specified.") | ||
elif inputs: | ||
|
@@ -529,6 +554,19 @@ def on_demand_feature_view( | |
), | ||
DeprecationWarning, | ||
) | ||
for _, source in inputs.items(): | ||
if isinstance(source, FeatureView): | ||
_sources.append(feature_view_to_batch_feature_view(source)) | ||
elif isinstance(source, FeatureViewProjection): | ||
_sources.append( | ||
BatchFeatureView(name=source.name, schema=source.features,) | ||
) | ||
elif isinstance(source, RequestSource): | ||
_sources.append(source) | ||
else: | ||
raise ValueError( | ||
"input can only accept FeatureView, FeatureViewProjection, or RequestSource" | ||
) | ||
|
||
if args: | ||
warnings.warn( | ||
|
@@ -559,14 +597,30 @@ def on_demand_feature_view( | |
DeprecationWarning, | ||
) | ||
if len(args) >= 2: | ||
_sources = args[1] | ||
warnings.warn( | ||
( | ||
"The `inputs` parameter is being deprecated. Please use `sources` instead. " | ||
"Feast 0.21 and onwards will not support the `inputs` parameter." | ||
), | ||
DeprecationWarning, | ||
) | ||
_inputs = args[1] | ||
for _, source in _inputs.items(): | ||
if isinstance(source, FeatureView): | ||
_sources.append(feature_view_to_batch_feature_view(source)) | ||
elif isinstance(source, FeatureViewProjection): | ||
_sources.append( | ||
BatchFeatureView( | ||
name=source.name, # type: ignore | ||
schema=source.features, # type: ignore | ||
) | ||
) | ||
elif isinstance(source, RequestSource): | ||
_sources.append(source) | ||
else: | ||
raise ValueError( | ||
"input can only accept FeatureView, FeatureViewProjection, or RequestSource" | ||
) | ||
warnings.warn( | ||
( | ||
"The `inputs` parameter is being deprecated. Please use `sources` instead. " | ||
"Feast 0.21 and onwards will not support the `inputs` parameter." | ||
), | ||
DeprecationWarning, | ||
) | ||
|
||
if not _sources: | ||
raise ValueError("The `sources` parameter must be specified.") | ||
|
@@ -587,3 +641,16 @@ def decorator(user_function): | |
return on_demand_feature_view_obj | ||
|
||
return decorator | ||
|
||
|
||
def feature_view_to_batch_feature_view(fv: FeatureView) -> BatchFeatureView: | ||
return BatchFeatureView( | ||
name=fv.name, | ||
entities=fv.entities, | ||
ttl=fv.ttl, | ||
tags=fv.tags, | ||
online=fv.online, | ||
owner=fv.owner, | ||
schema=fv.schema, | ||
source=fv.source, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is wrong, docs/user facing stuff should never refer to
BaseFeatureView
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah thanks for the catch