Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial scaffolding for on demand feature view #1803

Merged
merged 6 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions protos/feast/core/FeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ message FeatureView {
FeatureViewMeta meta = 2;
}

// TODO(adchia): refactor common fields from this and ODFV into separate metadata proto
message FeatureViewSpec {
// Name of the feature view. Must be unique. Not updated.
string name = 1;
Expand Down
57 changes: 57 additions & 0 deletions protos/feast/core/OnDemandFeatureView.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//
// Copyright 2020 The Feast Authors
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, 2021?

//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//


syntax = "proto3";
package feast.core;

option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";
option java_outer_classname = "OnDemandFeatureViewProto";
option java_package = "feast.proto.core";

import "feast/core/FeatureView.proto";
import "feast/core/Feature.proto";

message OnDemandFeatureView {
// User-specified specifications of this feature view.
OnDemandFeatureViewSpec spec = 1;
}

message OnDemandFeatureViewSpec {
// Name of the feature view. Must be unique. Not updated.
string name = 1;

// Name of Feast project that this feature view belongs to.
string project = 2;

// List of features specifications for each feature defined with this feature view.
repeated FeatureSpecV2 features = 3;

adchia marked this conversation as resolved.
Show resolved Hide resolved
// List of features specifications for each feature defined with this feature view.
// TODO(adchia): add support for request data
map<string, FeatureView> inputs = 4;
adchia marked this conversation as resolved.
Show resolved Hide resolved

UserDefinedFunction user_defined_function = 5;
}

// Serialized representation of python function.
message UserDefinedFunction {
// The function name
string name = 1;

// The python-syntax function body (serialized by dill)
bytes body = 2;
}
2 changes: 2 additions & 0 deletions protos/feast/core/Registry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import "feast/core/Entity.proto";
import "feast/core/FeatureService.proto";
import "feast/core/FeatureTable.proto";
import "feast/core/FeatureView.proto";
import "feast/core/OnDemandFeatureView.proto";
import "google/protobuf/timestamp.proto";

message Registry {
repeated Entity entities = 1;
repeated FeatureTable feature_tables = 2;
repeated FeatureView feature_views = 6;
repeated OnDemandFeatureView on_demand_feature_views = 8;
repeated FeatureService feature_services = 7;

string registry_schema_version = 3; // to support migrations; incremented when schema is changed
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/feast/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .feature_store import FeatureStore
from .feature_table import FeatureTable
from .feature_view import FeatureView
from .on_demand_feature_view import OnDemandFeatureView
from .repo_config import RepoConfig
from .value_type import ValueType

Expand All @@ -37,6 +38,7 @@
"FeatureStore",
"FeatureTable",
"FeatureView",
"OnDemandFeatureView",
"RepoConfig",
"SourceType",
"ValueType",
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,10 @@ def __init__(self, entity_type: type):
f"The entity dataframe you have provided must be a Pandas DataFrame or a SQL query, "
f"but we found: {entity_type} "
)


class ConflictingFeatureViewNames(Exception):
def __init__(self, feature_view_name: str):
super().__init__(
f"The feature view name: {feature_view_name} refers to both an on-demand feature view and a feature view"
)
61 changes: 57 additions & 4 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
update_entities_with_inferred_types_from_feature_views,
)
from feast.infra.provider import Provider, RetrievalJob, get_provider
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.online_response import OnlineResponse, _infer_online_entity_rows
from feast.protos.feast.serving.ServingService_pb2 import (
GetOnlineFeaturesRequestV2,
Expand All @@ -45,6 +46,7 @@
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.registry import Registry
from feast.repo_config import RepoConfig, load_repo_config
from feast.type_map import python_value_to_proto_value
from feast.usage import log_exceptions, log_exceptions_and_usage
from feast.version import get_version

Expand Down Expand Up @@ -267,8 +269,9 @@ def apply(
objects: Union[
Entity,
FeatureView,
OnDemandFeatureView,
FeatureService,
List[Union[FeatureView, Entity, FeatureService]],
List[Union[FeatureView, OnDemandFeatureView, Entity, FeatureService]],
],
commit: bool = True,
):
Expand Down Expand Up @@ -314,6 +317,7 @@ def apply(
assert isinstance(objects, list)

views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)]
odfvs_to_update = [ob for ob in objects if isinstance(ob, OnDemandFeatureView)]
_validate_feature_views(views_to_update)
entities_to_update = [ob for ob in objects if isinstance(ob, Entity)]
services_to_update = [ob for ob in objects if isinstance(ob, FeatureService)]
Expand All @@ -332,11 +336,15 @@ def apply(

if len(views_to_update) + len(entities_to_update) + len(
services_to_update
) != len(objects):
) + len(odfvs_to_update) != len(objects):
raise ValueError("Unknown object type provided as part of apply() call")

for view in views_to_update:
self._registry.apply_feature_view(view, project=self.project, commit=False)
for odfv in odfvs_to_update:
self._registry.apply_on_demand_feature_view(
odfv, project=self.project, commit=False
)
for ent in entities_to_update:
self._registry.apply_entity(ent, project=self.project, commit=False)
for feature_service in services_to_update:
Expand Down Expand Up @@ -717,7 +725,6 @@ def get_online_features(
all_feature_views = self._registry.list_feature_views(
project=self.project, allow_cache=True
)

_validate_feature_refs(_feature_refs, full_feature_names)
grouped_refs = _group_feature_refs(_feature_refs, all_feature_views)
for table, requested_features in grouped_refs:
Expand Down Expand Up @@ -759,6 +766,47 @@ def get_online_features(
feature_ref
] = GetOnlineFeaturesResponse.FieldStatus.PRESENT

initial_response = OnlineResponse(
GetOnlineFeaturesResponse(field_values=result_rows)
)
return self._augment_response_with_on_demand_transforms(
_feature_refs, full_feature_names, initial_response, result_rows
)

def _augment_response_with_on_demand_transforms(
self,
feature_refs: List[str],
full_feature_names: bool,
initial_response: OnlineResponse,
result_rows: List[GetOnlineFeaturesResponse.FieldValues],
) -> OnlineResponse:
all_on_demand_feature_views = self._registry.list_on_demand_feature_views(
project=self.project, allow_cache=True
)
if len(all_on_demand_feature_views) == 0:
return initial_response
initial_response_df = initial_response.to_df()
# Apply on demand transformations
for odfv in all_on_demand_feature_views:
feature_ref = odfv.name
if feature_ref in feature_refs:
transformed_features_df = odfv.get_transformed_features_df(
full_feature_names, initial_response_df
)
for row_idx in range(len(result_rows)):
result_row = result_rows[row_idx]
# TODO(adchia): support multiple output features in an ODFV, which requires different naming
# conventions
result_row.fields[odfv.name].CopyFrom(
python_value_to_proto_value(
transformed_features_df[odfv.features[0].name].values[
row_idx
]
)
)
result_row.statuses[
feature_ref
] = GetOnlineFeaturesResponse.FieldStatus.PRESENT
return OnlineResponse(GetOnlineFeaturesResponse(field_values=result_rows))

@log_exceptions_and_usage
Expand Down Expand Up @@ -791,7 +839,9 @@ def _validate_feature_refs(feature_refs: List[str], full_feature_names: bool = F
ref for ref, occurrences in Counter(feature_refs).items() if occurrences > 1
]
else:
feature_names = [ref.split(":")[1] for ref in feature_refs]
feature_names = [
ref.split(":")[1] if ":" in ref else ref for ref in feature_refs
]
collided_feature_names = [
ref
for ref, occurrences in Counter(feature_names).items()
Expand Down Expand Up @@ -820,6 +870,9 @@ def _group_feature_refs(

if isinstance(features, list) and isinstance(features[0], str):
for ref in features:
if ":" not in ref:
# This is an on demand feature view ref
continue
view_name, feat_name = ref.split(":")
if view_name not in view_index:
raise FeatureViewNotFoundException(view_name)
Expand Down
150 changes: 150 additions & 0 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import functools
from types import MethodType
from typing import Dict, List

import dill
import pandas as pd

from feast.feature import Feature
from feast.feature_view import FeatureView
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureView as OnDemandFeatureViewProto,
)
from feast.protos.feast.core.OnDemandFeatureView_pb2 import OnDemandFeatureViewSpec
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
UserDefinedFunction as UserDefinedFunctionProto,
)
from feast.usage import log_exceptions
from feast.value_type import ValueType


class OnDemandFeatureView:
"""
An OnDemandFeatureView defines on demand transformations on existing feature view values and request data.

Args:
name: Name of the group of features.
features: Output schema of transformation with feature names
inputs: The input feature views passed into the transform.
udf: User defined transformation function that takes as input pandas dataframes
"""

name: str
features: List[Feature]
inputs: Dict[str, FeatureView]
udf: MethodType

@log_exceptions
def __init__(
self,
name: str,
features: List[Feature],
inputs: Dict[str, FeatureView],
udf: MethodType,
):
"""
Creates an OnDemandFeatureView object.
"""

self.name = name
self.features = features
self.inputs = inputs
self.udf = udf

def to_proto(self) -> OnDemandFeatureViewProto:
"""
Converts an on demand feature view object to its protobuf representation.

Returns:
A OnDemandFeatureViewProto protobuf.
"""
spec = OnDemandFeatureViewSpec(
name=self.name,
features=[feature.to_proto() for feature in self.features],
inputs={k: fv.to_proto() for k, fv in self.inputs.items()},
user_defined_function=UserDefinedFunctionProto(
name=self.udf.__name__, body=dill.dumps(self.udf, recurse=True),
),
)

return OnDemandFeatureViewProto(spec=spec)

@classmethod
def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
"""
Creates an on demand feature view from a protobuf representation.

Args:
on_demand_feature_view_proto: A protobuf representation of an on-demand feature view.

Returns:
A OnDemandFeatureView object based on the on-demand feature view protobuf.
"""
on_demand_feature_view_obj = cls(
name=on_demand_feature_view_proto.spec.name,
features=[
Feature(
name=feature.name,
dtype=ValueType(feature.value_type),
labels=dict(feature.labels),
)
for feature in on_demand_feature_view_proto.spec.features
],
inputs={
feature_view_name: FeatureView.from_proto(feature_view_proto)
for feature_view_name, feature_view_proto in on_demand_feature_view_proto.spec.inputs.items()
},
udf=dill.loads(
on_demand_feature_view_proto.spec.user_defined_function.body
),
)

return on_demand_feature_view_obj

def get_transformed_features_df(
self, full_feature_names: bool, df_with_features: pd.DataFrame
) -> pd.DataFrame:
# Apply on demand transformations
# TODO(adchia): Include only the feature values from the specified input FVs in the ODFV.
# Copy over un-prefixed features even if not requested since transform may need it
columns_to_cleanup = []
if full_feature_names:
for input_fv in self.inputs.values():
for feature in input_fv.features:
full_feature_ref = f"{input_fv.name}__{feature.name}"
if full_feature_ref in df_with_features.keys():
df_with_features[feature.name] = df_with_features[
full_feature_ref
]
columns_to_cleanup.append(feature.name)

# Compute transformed values and apply to each result row
df_with_transformed_features = self.udf.__call__(df_with_features)

# Cleanup extra columns used for transformation
df_with_features.drop(columns=columns_to_cleanup, inplace=True)
return df_with_transformed_features


def on_demand_feature_view(features: List[Feature], inputs: Dict[str, FeatureView]):
"""
Declare an on-demand feature view

:param features: Output schema with feature names
:param inputs: The inputs passed into the transform.
:return: An On Demand Feature View.
"""

def decorator(user_function):
on_demand_feature_view_obj = OnDemandFeatureView(
name=user_function.__name__,
inputs=inputs,
features=features,
udf=user_function,
)
functools.update_wrapper(
wrapper=on_demand_feature_view_obj, wrapped=user_function
)
return on_demand_feature_view_obj

return decorator
Loading