Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Substrait-based ODFV transformation #3969

Merged
merged 9 commits into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions protos/feast/core/OnDemandFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ message OnDemandFeatureViewSpec {

oneof transformation {
UserDefinedFunction user_defined_function = 5;
OnDemandSubstraitTransformation on_demand_substrait_transformation = 9;
}

// Description of the on demand feature view.
Expand Down Expand Up @@ -89,3 +90,7 @@ message UserDefinedFunction {
// The string representation of the udf
string body_text = 3;
}

message OnDemandSubstraitTransformation {
bytes substrait_plan = 1;
}
55 changes: 52 additions & 3 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import copy
import functools
import inspect
import warnings
from datetime import datetime
from types import FunctionType
Expand All @@ -17,6 +18,7 @@
from feast.feature_view_projection import FeatureViewProjection
from feast.field import Field, from_value_type
from feast.on_demand_pandas_transformation import OnDemandPandasTransformation
from feast.on_demand_substrait_transformation import OnDemandSubstraitTransformation
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureView as OnDemandFeatureViewProto,
)
Expand Down Expand Up @@ -210,6 +212,9 @@ def to_proto(self) -> OnDemandFeatureViewProto:
user_defined_function=self.transformation.to_proto()
if type(self.transformation) == OnDemandPandasTransformation
else None,
on_demand_substrait_transformation=self.transformation.to_proto() # type: ignore
if type(self.transformation) == OnDemandSubstraitTransformation
else None,
description=self.description,
tags=self.tags,
owner=self.owner,
Expand Down Expand Up @@ -255,6 +260,13 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
transformation = OnDemandPandasTransformation.from_proto(
on_demand_feature_view_proto.spec.user_defined_function
)
elif (
on_demand_feature_view_proto.spec.WhichOneof("transformation")
== "on_demand_substrait_transformation"
):
transformation = OnDemandSubstraitTransformation.from_proto(
on_demand_feature_view_proto.spec.on_demand_substrait_transformation
)
else:
raise Exception("At least one transformation type needs to be provided")

Expand Down Expand Up @@ -460,10 +472,47 @@ def mainify(obj) -> None:
obj.__module__ = "__main__"

def decorator(user_function):
udf_string = dill.source.getsource(user_function)
mainify(user_function)
return_annotation = inspect.signature(user_function).return_annotation
if (
return_annotation
and return_annotation.__module__ == "ibis.expr.types.relations"
and return_annotation.__name__ == "Table"
):
import ibis
import ibis.expr.datatypes as dt
from ibis_substrait.compiler.core import SubstraitCompiler

compiler = SubstraitCompiler()

input_fields: Field = []

for s in sources:
if type(s) == FeatureView:
fields = s.projection.features
else:
fields = s.features

input_fields.extend(
[
(
f.name,
dt.dtype(
feast_value_type_to_pandas_type(f.dtype.to_value_type())
),
)
for f in fields
]
)

expr = user_function(ibis.table(input_fields, "t"))

transformation = OnDemandPandasTransformation(user_function, udf_string)
transformation = OnDemandSubstraitTransformation(
substrait_plan=compiler.compile(expr).SerializeToString()
)
else:
udf_string = dill.source.getsource(user_function)
mainify(user_function)
transformation = OnDemandPandasTransformation(user_function, udf_string)

on_demand_feature_view_obj = OnDemandFeatureView(
name=user_function.__name__,
Expand Down
50 changes: 50 additions & 0 deletions sdk/python/feast/on_demand_substrait_transformation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import pandas as pd
import pyarrow
import pyarrow.substrait as substrait # type: ignore # noqa

from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandSubstraitTransformation as OnDemandSubstraitTransformationProto,
)


class OnDemandSubstraitTransformation:
def __init__(self, substrait_plan: bytes):
"""
Creates an OnDemandSubstraitTransformation object.

Args:
substrait_plan: The user-provided substrait plan.
"""
self.substrait_plan = substrait_plan

def transform(self, df: pd.DataFrame) -> pd.DataFrame:
def table_provider(names, schema: pyarrow.Schema):
return pyarrow.Table.from_pandas(df[schema.names])

table: pyarrow.Table = pyarrow.substrait.run_query(
self.substrait_plan, table_provider=table_provider
).read_all()
return table.to_pandas()

def __eq__(self, other):
if not isinstance(other, OnDemandSubstraitTransformation):
raise TypeError(
"Comparisons should only involve OnDemandSubstraitTransformation class objects."
)

if not super().__eq__(other):
return False

return self.substrait_plan == other.substrait_plan

def to_proto(self) -> OnDemandSubstraitTransformationProto:
return OnDemandSubstraitTransformationProto(substrait_plan=self.substrait_plan)

@classmethod
def from_proto(
cls,
on_demand_substrait_transformation_proto: OnDemandSubstraitTransformationProto,
):
return OnDemandSubstraitTransformation(
substrait_plan=on_demand_substrait_transformation_proto.substrait_plan
)
Loading
Loading