Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Updating protos to separate transformation #4018

Merged
merged 18 commits into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions protos/feast/core/OnDemandFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import "feast/core/FeatureView.proto";
import "feast/core/FeatureViewProjection.proto";
import "feast/core/Feature.proto";
import "feast/core/DataSource.proto";
import "feast/core/Transformation.proto";

message OnDemandFeatureView {
// User-specified specifications of this feature view.
Expand All @@ -49,9 +50,11 @@ message OnDemandFeatureViewSpec {
map<string, OnDemandSource> sources = 4;

oneof transformation {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This hasn't made it to an official release yet, so can't we remove transformation oneof here? just leave UserDefinedFunction user_defined_function = 5 [deprecated = true]. It would also enable you to call the new field transformation which sounds less awkward (FeatureTransformationV2 transformation = 10;) and cut down on overall code size.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to do that in a separate PR as a follow up. I'll probably end up with 4 by the end. I want to leave this one as is though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to have separate clean PRs than one massive one imo.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tokoko lmk if you're good with this. I have the next PR coming soon where I'll rename things and the subsequent will do the PythonTransformation type.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@franciscojavierarceo yeah, sounds good to me. Let's just not make a new release until we get all these ironed out.

UserDefinedFunction user_defined_function = 5;
OnDemandSubstraitTransformation on_demand_substrait_transformation = 9;
UserDefinedFunction user_defined_function = 5 [deprecated = true];
OnDemandSubstraitTransformation on_demand_substrait_transformation = 9 [deprecated = true];
}
// Oneof with {user_defined_function, on_demand_substrait_transformation}
FeatureTransformationV2 feature_transformation = 10;

// Description of the on demand feature view.
string description = 6;
Expand All @@ -61,6 +64,7 @@ message OnDemandFeatureViewSpec {

// Owner of the on demand feature view.
string owner = 8;
string mode = 11;
}

message OnDemandFeatureViewMeta {
Expand All @@ -81,6 +85,8 @@ message OnDemandSource {

// Serialized representation of python function.
message UserDefinedFunction {
option deprecated = true;

// The function name
string name = 1;

Expand All @@ -92,5 +98,7 @@ message UserDefinedFunction {
}

message OnDemandSubstraitTransformation {
option deprecated = true;

bytes substrait_plan = 1;
}
}
7 changes: 6 additions & 1 deletion protos/feast/core/StreamFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import "feast/core/FeatureView.proto";
import "feast/core/Feature.proto";
import "feast/core/DataSource.proto";
import "feast/core/Aggregation.proto";
import "feast/core/Transformation.proto";

message StreamFeatureView {
// User-specified specifications of this feature view.
Expand Down Expand Up @@ -77,7 +78,8 @@ message StreamFeatureViewSpec {
bool online = 12;

// Serialized function that is encoded in the streamfeatureview
UserDefinedFunction user_defined_function = 13;
UserDefinedFunction user_defined_function = 13 [deprecated = true];


// Mode of execution
string mode = 14;
Expand All @@ -87,5 +89,8 @@ message StreamFeatureViewSpec {

// Timestamp field for aggregation
string timestamp_field = 16;

// Oneof with {user_defined_function, on_demand_substrait_transformation}
FeatureTransformationV2 feature_transformation = 17;
}

33 changes: 33 additions & 0 deletions protos/feast/core/Transformation.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
syntax = "proto3";
package feast.core;

option go_package = "github.com/feast-dev/feast/go/protos/feast/core";
option java_outer_classname = "FeatureTransformationProto";
option java_package = "feast.proto.core";

import "google/protobuf/duration.proto";

// Serialized representation of python function.
message UserDefinedFunctionV2 {
// The function name
string name = 1;

// The python-syntax function body (serialized by dill)
bytes body = 2;

// The string representation of the udf
string body_text = 3;
}

// A feature transformation executed as a user-defined function
message FeatureTransformationV2 {
// Note this Transformation starts at 5 for backwards compatibility
oneof transformation {
UserDefinedFunctionV2 user_defined_function = 1;
OnDemandSubstraitTransformationV2 on_demand_substrait_transformation = 2;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a good time to rethink the naming here. My first suggestion was to rename the field (not message type) to on_demand_pandas_transformation instead of user_defined_function. But on second thought, since we are also aiming to reuse this in StreamFeatureViews, I think protos should no longer be called OnDemand... What do you think? I'm thinking of something like this:

UserDefinedFunctionV2 pandas_transformation = 1;
SubstraitTransformationV2 substrait_transformation = 2;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning on doing that in a follow up PR to not add too much complexity here.

}

message OnDemandSubstraitTransformationV2 {
bytes substrait_plan = 1;
}
21 changes: 18 additions & 3 deletions sdk/python/feast/diff/registry_diff.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import warnings
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, TypeVar, cast

Expand Down Expand Up @@ -144,11 +145,25 @@ def diff_registry_objects(
if _field.name in FIELDS_TO_IGNORE:
continue
elif getattr(current_spec, _field.name) != getattr(new_spec, _field.name):
if _field.name == "user_defined_function":
# TODO: Delete "transformation" after we've safely deprecated it from the proto
if _field.name in ["transformation", "feature_transformation"]:
warnings.warn(
"transformation will be deprecated in the future please use feature_transformation instead.",
DeprecationWarning,
)
current_spec = cast(OnDemandFeatureViewSpec, current_spec)
new_spec = cast(OnDemandFeatureViewSpec, new_spec)
current_udf = current_spec.user_defined_function
new_udf = new_spec.user_defined_function
# Check if the old proto is populated and use that if it is
deprecated_udf = current_spec.user_defined_function
feature_transformation_udf = (
current_spec.feature_transformation.user_defined_function
)
current_udf = (
deprecated_udf
if deprecated_udf.body_text != ""
else feature_transformation_udf
)
new_udf = new_spec.feature_transformation.user_defined_function
for _udf_field in current_udf.DESCRIPTOR.fields:
if _udf_field.name == "body":
continue
Expand Down
14 changes: 11 additions & 3 deletions sdk/python/feast/infra/registry/base_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import warnings
from abc import ABC, abstractmethod
from collections import defaultdict
from datetime import datetime
Expand Down Expand Up @@ -662,10 +663,16 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]:
key=lambda on_demand_feature_view: on_demand_feature_view.name,
):
odfv_dict = self._message_to_sorted_dict(on_demand_feature_view.to_proto())

odfv_dict["spec"]["userDefinedFunction"][
# We are logging a warning because the registry object may be read from a proto that is not updated
# i.e., we have to submit dual writes but in order to ensure the read behavior succeeds we have to load
# both objects to compare any changes in the registry
warnings.warn(
"We will be deprecating the usage of spec.userDefinedFunction in a future release please upgrade cautiously.",
DeprecationWarning,
)
odfv_dict["spec"]["featureTransformation"]["userDefinedFunction"][
"body"
] = on_demand_feature_view.transformation.udf_string
] = on_demand_feature_view.feature_transformation.udf_string
registry_dict["onDemandFeatureViews"].append(odfv_dict)
for request_feature_view in sorted(
self.list_request_feature_views(project=project),
Expand All @@ -684,6 +691,7 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]:
"body"
] = stream_feature_view.udf_string
registry_dict["streamFeatureViews"].append(sfv_dict)

for saved_dataset in sorted(
self.list_saved_datasets(project=project), key=lambda item: item.name
):
Expand Down
54 changes: 44 additions & 10 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@
OnDemandFeatureViewSpec,
OnDemandSource,
)
from feast.protos.feast.core.Transformation_pb2 import (
FeatureTransformationV2 as FeatureTransformationProto,
)
from feast.protos.feast.core.Transformation_pb2 import (
UserDefinedFunctionV2 as UserDefinedFunctionProto,
)
from feast.type_map import (
feast_value_type_to_pandas_type,
python_type_to_feast_value_type,
Expand Down Expand Up @@ -63,6 +69,7 @@ class OnDemandFeatureView(BaseFeatureView):
source_feature_view_projections: Dict[str, FeatureViewProjection]
source_request_sources: Dict[str, RequestSource]
transformation: Union[OnDemandPandasTransformation]
feature_transformation: Union[OnDemandPandasTransformation]
description: str
tags: Dict[str, str]
owner: str
Expand All @@ -83,6 +90,7 @@ def __init__( # noqa: C901
udf: Optional[FunctionType] = None,
udf_string: str = "",
transformation: Optional[Union[OnDemandPandasTransformation]] = None,
feature_transformation: Optional[Union[OnDemandPandasTransformation]] = None,
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
Expand All @@ -101,6 +109,7 @@ def __init__( # noqa: C901
dataframes as inputs.
udf_string (deprecated): The source code version of the udf (for diffing and displaying in Web UI)
transformation: The user defined transformation.
feature_transformation: The user defined transformation.
description (optional): A human-readable description.
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
owner (optional): The owner of the on demand feature view, typically the email
Expand Down Expand Up @@ -139,6 +148,7 @@ def __init__( # noqa: C901
] = odfv_source.projection

self.transformation = transformation
self.feature_transformation = self.transformation

@property
def proto_class(self) -> Type[OnDemandFeatureViewProto]:
Expand All @@ -151,6 +161,7 @@ def __copy__(self):
sources=list(self.source_feature_view_projections.values())
+ list(self.source_request_sources.values()),
transformation=self.transformation,
feature_transformation=self.transformation,
description=self.description,
tags=self.tags,
owner=self.owner,
Expand All @@ -172,6 +183,7 @@ def __eq__(self, other):
!= other.source_feature_view_projections
or self.source_request_sources != other.source_request_sources
or self.transformation != other.transformation
or self.feature_transformation != other.feature_transformation
):
return False

Expand Down Expand Up @@ -205,16 +217,19 @@ def to_proto(self) -> OnDemandFeatureViewProto:
request_data_source=request_sources.to_proto()
)

spec = OnDemandFeatureViewSpec(
name=self.name,
features=[feature.to_proto() for feature in self.features],
sources=sources,
feature_transformation = FeatureTransformationProto(
user_defined_function=self.transformation.to_proto()
if type(self.transformation) == OnDemandPandasTransformation
else None,
on_demand_substrait_transformation=self.transformation.to_proto() # type: ignore
on_demand_substrait_transformation=self.transformation.to_proto()
if type(self.transformation) == OnDemandSubstraitTransformation
else None,
else None, # type: ignore
)
spec = OnDemandFeatureViewSpec(
name=self.name,
features=[feature.to_proto() for feature in self.features],
sources=sources,
feature_transformation=feature_transformation,
description=self.description,
tags=self.tags,
owner=self.owner,
Expand Down Expand Up @@ -254,18 +269,37 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
)

if (
on_demand_feature_view_proto.spec.WhichOneof("transformation")
on_demand_feature_view_proto.spec.feature_transformation.WhichOneof(
"transformation"
)
== "user_defined_function"
and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text
!= ""
):
transformation = OnDemandPandasTransformation.from_proto(
on_demand_feature_view_proto.spec.user_defined_function
on_demand_feature_view_proto.spec.feature_transformation.user_defined_function
)
elif (
on_demand_feature_view_proto.spec.WhichOneof("transformation")
on_demand_feature_view_proto.spec.feature_transformation.WhichOneof(
"transformation"
)
== "on_demand_substrait_transformation"
):
transformation = OnDemandSubstraitTransformation.from_proto(
on_demand_feature_view_proto.spec.on_demand_substrait_transformation
on_demand_feature_view_proto.spec.feature_transformation.on_demand_substrait_transformation
)
elif (
hasattr(on_demand_feature_view_proto.spec, "user_defined_function")
and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text
== ""
):
backwards_compatible_udf = UserDefinedFunctionProto(
name=on_demand_feature_view_proto.spec.user_defined_function.name,
body=on_demand_feature_view_proto.spec.user_defined_function.body,
body_text=on_demand_feature_view_proto.spec.user_defined_function.body_text,
)
transformation = OnDemandPandasTransformation.from_proto(
user_defined_function_proto=backwards_compatible_udf,
)
else:
raise Exception("At least one transformation type needs to be provided")
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/on_demand_pandas_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import dill
import pandas as pd

from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
UserDefinedFunction as UserDefinedFunctionProto,
from feast.protos.feast.core.Transformation_pb2 import (
UserDefinedFunctionV2 as UserDefinedFunctionProto,
)


Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/on_demand_substrait_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import pyarrow
import pyarrow.substrait as substrait # type: ignore # noqa

from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandSubstraitTransformation as OnDemandSubstraitTransformationProto,
from feast.protos.feast.core.Transformation_pb2 import (
OnDemandSubstraitTransformationV2 as OnDemandSubstraitTransformationProto,
)


Expand Down
Loading
Loading