Skip to content

Commit

Permalink
feat: Updating FeatureViewProjection and OnDemandFeatureView to add b…
Browse files Browse the repository at this point in the history
…atch_source and entities (#4530)

* feat: Updating protos for Projections to include more info

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>

* adding unit test

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>

* adding type checking where batch source is already serialized into protobuf

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>

* almost got everything working and type validation behaving

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>

* cleaned up and have tests behaving

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>

* removed comment

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>

* updated FeatureViewProjection batch_source serialization

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>

* trying to debug a test

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>

* handling snowflake issue, cant confirm why it is happening so just going to put a workaround

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>

* linter

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>

* trying to handle it correctly

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>

* handling the else case for from_feature_view_definition

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>

* adding print

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>

* adding test of issue

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>

* think i got everything working now

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>

* removing print

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>

---------

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
  • Loading branch information
franciscojavierarceo authored Sep 23, 2024
1 parent d5ef57e commit 0795496
Show file tree
Hide file tree
Showing 12 changed files with 363 additions and 34 deletions.
10 changes: 10 additions & 0 deletions protos/feast/core/FeatureViewProjection.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ option java_outer_classname = "FeatureReferenceProto";
option java_package = "feast.proto.core";

import "feast/core/Feature.proto";
import "feast/core/DataSource.proto";


// A projection to be applied on top of a FeatureView.
Expand All @@ -22,4 +23,13 @@ message FeatureViewProjection {

// Map for entity join_key overrides of feature data entity join_key to entity data join_key
map<string,string> join_key_map = 4;

string timestamp_field = 5;
string date_partition_column = 6;
string created_timestamp_column = 7;
// Batch/Offline DataSource where this view can retrieve offline feature data.
DataSource batch_source = 8;
// Streaming DataSource from where this view can consume "online" feature data.
DataSource stream_source = 9;

}
6 changes: 6 additions & 0 deletions protos/feast/core/OnDemandFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ message OnDemandFeatureViewSpec {
// Owner of the on demand feature view.
string owner = 8;
string mode = 11;
bool write_to_online_store = 12;

// List of names of entities associated with this feature view.
repeated string entities = 13;
// List of specifications for each entity defined as part of this feature view.
repeated FeatureSpecV2 entity_columns = 14;
}

message OnDemandFeatureViewMeta {
Expand Down
12 changes: 11 additions & 1 deletion sdk/python/feast/base_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from google.protobuf.json_format import MessageToJson
from google.protobuf.message import Message

from feast.data_source import DataSource
from feast.feature_view_projection import FeatureViewProjection
from feast.field import Field
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
Expand Down Expand Up @@ -65,6 +66,7 @@ def __init__(
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
source: Optional[DataSource] = None,
):
"""
Creates a BaseFeatureView object.
Expand All @@ -76,7 +78,8 @@ def __init__(
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
owner (optional): The owner of the base feature view, typically the email of the
primary maintainer.
source (optional): The source of data for this group of features. May be a stream source, or a batch source.
If a stream source, the source should contain a batch_source for backfills & batch materialization.
Raises:
ValueError: A field mapping conflicts with an Entity or a Feature.
"""
Expand All @@ -90,6 +93,9 @@ def __init__(
self.created_timestamp = None
self.last_updated_timestamp = None

if source:
self.source = source

@property
@abstractmethod
def proto_class(self) -> Type[Message]:
Expand Down Expand Up @@ -156,6 +162,10 @@ def __eq__(self, other):
or self.tags != other.tags
or self.owner != other.owner
):
# This is meant to ignore the File Source change to Push Source
if isinstance(type(self.source), type(other.source)):
if self.source != other.source:
return False
return False

return True
Expand Down
5 changes: 4 additions & 1 deletion sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ def __init__(
description=description,
tags=tags,
owner=owner,
source=source,
)
self.online = online
self.materialization_intervals = []
Expand Down Expand Up @@ -429,7 +430,9 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):

# FeatureViewProjections are not saved in the FeatureView proto.
# Create the default projection.
feature_view.projection = FeatureViewProjection.from_definition(feature_view)
feature_view.projection = FeatureViewProjection.from_feature_view_definition(
feature_view
)

if feature_view_proto.meta.HasField("created_timestamp"):
feature_view.created_timestamp = (
Expand Down
86 changes: 79 additions & 7 deletions sdk/python/feast/feature_view_projection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

from attr import dataclass

from feast.data_source import DataSource
from feast.field import Field
from feast.protos.feast.core.FeatureViewProjection_pb2 import (
FeatureViewProjection as FeatureViewProjectionProto,
)

if TYPE_CHECKING:
from feast.base_feature_view import BaseFeatureView
from feast.feature_view import FeatureView


@dataclass
Expand All @@ -27,50 +29,120 @@ class FeatureViewProjection:
is not ready to be projected, i.e. still needs to go through feature inference.
join_key_map: A map to modify join key columns during retrieval of this feature
view projection.
timestamp_field: The timestamp field of the feature view projection.
date_partition_column: The date partition column of the feature view projection.
created_timestamp_column: The created timestamp column of the feature view projection.
batch_source: The batch source of data where this group of features
is stored. This is optional ONLY if a push source is specified as the
stream_source, since push sources contain their own batch sources.
"""

name: str
name_alias: Optional[str]
desired_features: List[str]
features: List[Field]
join_key_map: Dict[str, str] = {}
timestamp_field: Optional[str] = None
date_partition_column: Optional[str] = None
created_timestamp_column: Optional[str] = None
batch_source: Optional[DataSource] = None

def name_to_use(self):
return self.name_alias or self.name

def to_proto(self) -> FeatureViewProjectionProto:
batch_source = None
if getattr(self, "batch_source", None):
if isinstance(self.batch_source, DataSource):
batch_source = self.batch_source.to_proto()
else:
batch_source = self.batch_source
feature_reference_proto = FeatureViewProjectionProto(
feature_view_name=self.name,
feature_view_name_alias=self.name_alias or "",
join_key_map=self.join_key_map,
timestamp_field=self.timestamp_field or "",
date_partition_column=self.date_partition_column or "",
created_timestamp_column=self.created_timestamp_column or "",
batch_source=batch_source,
)
for feature in self.features:
feature_reference_proto.feature_columns.append(feature.to_proto())

return feature_reference_proto

@staticmethod
def from_proto(proto: FeatureViewProjectionProto):
def from_proto(proto: FeatureViewProjectionProto) -> "FeatureViewProjection":
batch_source = (
DataSource.from_proto(proto.batch_source)
if str(getattr(proto, "batch_source"))
else None
)
feature_view_projection = FeatureViewProjection(
name=proto.feature_view_name,
name_alias=proto.feature_view_name_alias or None,
features=[],
join_key_map=dict(proto.join_key_map),
desired_features=[],
timestamp_field=proto.timestamp_field or None,
date_partition_column=proto.date_partition_column or None,
created_timestamp_column=proto.created_timestamp_column or None,
batch_source=batch_source,
)
for feature_column in proto.feature_columns:
feature_view_projection.features.append(Field.from_proto(feature_column))

return feature_view_projection

@staticmethod
def from_feature_view_definition(feature_view: "FeatureView"):
# TODO need to implement this for StreamFeatureViews
if getattr(feature_view, "batch_source", None):
return FeatureViewProjection(
name=feature_view.name,
name_alias=None,
features=feature_view.features,
desired_features=[],
timestamp_field=feature_view.batch_source.created_timestamp_column
or None,
created_timestamp_column=feature_view.batch_source.created_timestamp_column
or None,
date_partition_column=feature_view.batch_source.date_partition_column
or None,
batch_source=feature_view.batch_source or None,
)
else:
return FeatureViewProjection(
name=feature_view.name,
name_alias=None,
features=feature_view.features,
desired_features=[],
)

@staticmethod
def from_definition(base_feature_view: "BaseFeatureView"):
return FeatureViewProjection(
name=base_feature_view.name,
name_alias=None,
features=base_feature_view.features,
desired_features=[],
)
if getattr(base_feature_view, "batch_source", None):
return FeatureViewProjection(
name=base_feature_view.name,
name_alias=None,
features=base_feature_view.features,
desired_features=[],
timestamp_field=base_feature_view.batch_source.created_timestamp_column # type:ignore[attr-defined]
or None,
created_timestamp_column=base_feature_view.batch_source.created_timestamp_column # type:ignore[attr-defined]
or None,
date_partition_column=base_feature_view.batch_source.date_partition_column # type:ignore[attr-defined]
or None,
batch_source=base_feature_view.batch_source or None, # type:ignore[attr-defined]
)
else:
return FeatureViewProjection(
name=base_feature_view.name,
name_alias=None,
features=base_feature_view.features,
desired_features=[],
)

def get_feature(self, feature_name: str) -> Field:
try:
Expand Down
Loading

0 comments on commit 0795496

Please sign in to comment.