Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix bugs in applying stream feature view and retrieving online features #2754

Merged
merged 6 commits into from
Jun 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions protos/feast/core/StreamFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ option java_package = "feast.proto.core";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "feast/core/OnDemandFeatureView.proto";
import "feast/core/FeatureView.proto";
import "feast/core/Feature.proto";
import "feast/core/DataSource.proto";
import "feast/core/Aggregation.proto";
Expand Down Expand Up @@ -95,4 +96,7 @@ message StreamFeatureViewMeta {

// Time where this Feature View is last updated
google.protobuf.Timestamp last_updated_timestamp = 2;

// List of pairs (start_time, end_time) for which this feature view has been materialized.
repeated MaterializationInterval materialization_intervals = 3;
felixwang9817 marked this conversation as resolved.
Show resolved Hide resolved
}
6 changes: 6 additions & 0 deletions sdk/python/feast/diff/registry_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
from feast.protos.feast.core.RequestFeatureView_pb2 import (
RequestFeatureView as RequestFeatureViewProto,
)
from feast.protos.feast.core.StreamFeatureView_pb2 import (
StreamFeatureView as StreamFeatureViewProto,
)
from feast.protos.feast.core.ValidationProfile_pb2 import (
ValidationReference as ValidationReferenceProto,
)
Expand Down Expand Up @@ -106,6 +109,7 @@ def tag_objects_for_keep_delete_update_add(
FeatureServiceProto,
OnDemandFeatureViewProto,
RequestFeatureViewProto,
StreamFeatureViewProto,
ValidationReferenceProto,
)

Expand Down Expand Up @@ -292,6 +296,7 @@ def apply_diff_to_registry(
FeastObjectType.FEATURE_VIEW,
FeastObjectType.ON_DEMAND_FEATURE_VIEW,
FeastObjectType.REQUEST_FEATURE_VIEW,
FeastObjectType.STREAM_FEATURE_VIEW,
]:
feature_view_obj = cast(
BaseFeatureView, feast_object_diff.current_feast_object
Expand Down Expand Up @@ -331,6 +336,7 @@ def apply_diff_to_registry(
FeastObjectType.FEATURE_VIEW,
FeastObjectType.ON_DEMAND_FEATURE_VIEW,
FeastObjectType.REQUEST_FEATURE_VIEW,
FeastObjectType.STREAM_FEATURE_VIEW,
]:
registry.apply_feature_view(
cast(BaseFeatureView, feast_object_diff.new_feast_object),
Expand Down
75 changes: 51 additions & 24 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,40 @@ def _get_feature_view(
feature_view.entities = []
return feature_view

@log_exceptions_and_usage
def get_stream_feature_view(
self, name: str, allow_registry_cache: bool = False
) -> StreamFeatureView:
"""
Retrieves a stream feature view.

Args:
name: Name of stream feature view.
allow_registry_cache: (Optional) Whether to allow returning this entity from a cached registry

Returns:
The specified stream feature view.

Raises:
FeatureViewNotFoundException: The feature view could not be found.
"""
return self._get_stream_feature_view(
name, allow_registry_cache=allow_registry_cache
)

def _get_stream_feature_view(
self,
name: str,
hide_dummy_entity: bool = True,
allow_registry_cache: bool = False,
) -> StreamFeatureView:
stream_feature_view = self._registry.get_stream_feature_view(
name, self.project, allow_cache=allow_registry_cache
)
if hide_dummy_entity and stream_feature_view.entities[0] == DUMMY_ENTITY_NAME:
stream_feature_view.entities = []
return stream_feature_view

@log_exceptions_and_usage
def get_on_demand_feature_view(self, name: str) -> OnDemandFeatureView:
"""
Expand Down Expand Up @@ -935,7 +969,6 @@ def get_historical_features(
all_feature_views,
all_request_feature_views,
all_on_demand_feature_views,
all_stream_feature_views,
) = self._get_feature_views_to_use(features)

if all_request_feature_views:
Expand Down Expand Up @@ -1321,9 +1354,14 @@ def write_to_online_store(
ingests data directly into the Online store
"""
# TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type
feature_view = self.get_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
try:
feature_view = self.get_stream_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
except FeatureViewNotFoundException:
feature_view = self.get_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
Comment on lines +1357 to +1364
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should try to build on top of #2733 to migrate these kinds of stray feature views to the right location.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm I see, I'll add a todo for this.

entities = []
for entity_name in feature_view.entities:
entities.append(
Expand Down Expand Up @@ -1456,7 +1494,6 @@ def _get_online_features(
requested_feature_views,
requested_request_feature_views,
requested_on_demand_feature_views,
request_stream_feature_views,
) = self._get_feature_views_to_use(
features=features, allow_cache=True, hide_dummy_entity=False
)
Expand Down Expand Up @@ -1994,15 +2031,17 @@ def _get_feature_views_to_use(
allow_cache=False,
hide_dummy_entity: bool = True,
) -> Tuple[
List[FeatureView],
List[RequestFeatureView],
List[OnDemandFeatureView],
List[StreamFeatureView],
List[FeatureView], List[RequestFeatureView], List[OnDemandFeatureView],
]:

fvs = {
fv.name: fv
for fv in self._list_feature_views(allow_cache, hide_dummy_entity)
for fv in [
*self._list_feature_views(allow_cache, hide_dummy_entity),
*self._registry.list_stream_feature_views(
project=self.project, allow_cache=allow_cache
),
]
}

request_fvs = {
Expand All @@ -2019,15 +2058,8 @@ def _get_feature_views_to_use(
)
}

sfvs = {
fv.name: fv
for fv in self._registry.list_stream_feature_views(
project=self.project, allow_cache=allow_cache
)
}

if isinstance(features, FeatureService):
fvs_to_use, request_fvs_to_use, od_fvs_to_use, sfvs_to_use = [], [], [], []
fvs_to_use, request_fvs_to_use, od_fvs_to_use = [], [], []
for fv_name, projection in [
(projection.name, projection)
for projection in features.feature_view_projections
Expand All @@ -2048,23 +2080,18 @@ def _get_feature_views_to_use(
fv = fvs[projection.name].with_projection(copy.copy(projection))
if fv not in fvs_to_use:
fvs_to_use.append(fv)
elif fv_name in sfvs:
sfvs_to_use.append(
sfvs[fv_name].with_projection(copy.copy(projection))
)
else:
raise ValueError(
f"The provided feature service {features.name} contains a reference to a feature view"
f"{fv_name} which doesn't exist. Please make sure that you have created the feature view"
f'{fv_name} and that you have registered it by running "apply".'
)
views_to_use = (fvs_to_use, request_fvs_to_use, od_fvs_to_use, sfvs_to_use)
views_to_use = (fvs_to_use, request_fvs_to_use, od_fvs_to_use)
else:
views_to_use = (
[*fvs.values()],
[*request_fvs.values()],
[*od_fvs.values()],
[*sfvs.values()],
)

return views_to_use
Expand Down
5 changes: 4 additions & 1 deletion sdk/python/feast/infra/online_stores/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,10 @@ def plan(
path=self._get_db_path(config),
name=_table_id(project, FeatureView.from_proto(view)),
)
for view in desired_registry_proto.feature_views
for view in [
*desired_registry_proto.feature_views,
*desired_registry_proto.stream_feature_views,
]
]
return infra_objects

Expand Down
29 changes: 29 additions & 0 deletions sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class FeastObjectType(Enum):
FEATURE_VIEW = "feature view"
ON_DEMAND_FEATURE_VIEW = "on demand feature view"
REQUEST_FEATURE_VIEW = "request feature view"
STREAM_FEATURE_VIEW = "stream feature view"
FEATURE_SERVICE = "feature service"

@staticmethod
Expand All @@ -93,6 +94,9 @@ def get_objects_from_registry(
FeastObjectType.REQUEST_FEATURE_VIEW: registry.list_request_feature_views(
project=project
),
FeastObjectType.STREAM_FEATURE_VIEW: registry.list_stream_feature_views(
project=project,
),
FeastObjectType.FEATURE_SERVICE: registry.list_feature_services(
project=project
),
Expand All @@ -108,6 +112,7 @@ def get_objects_from_repo_contents(
FeastObjectType.FEATURE_VIEW: repo_contents.feature_views,
FeastObjectType.ON_DEMAND_FEATURE_VIEW: repo_contents.on_demand_feature_views,
FeastObjectType.REQUEST_FEATURE_VIEW: repo_contents.request_feature_views,
FeastObjectType.STREAM_FEATURE_VIEW: repo_contents.stream_feature_views,
FeastObjectType.FEATURE_SERVICE: repo_contents.feature_services,
}

Expand Down Expand Up @@ -717,6 +722,30 @@ def get_feature_view(
return FeatureView.from_proto(feature_view_proto)
raise FeatureViewNotFoundException(name, project)

def get_stream_feature_view(
self, name: str, project: str, allow_cache: bool = False
) -> StreamFeatureView:
"""
Retrieves a stream feature view.

Args:
name: Name of stream feature view
project: Feast project that this stream feature view belongs to
allow_cache: Allow returning feature view from the cached registry

Returns:
Returns either the specified feature view, or raises an exception if
none is found
"""
registry_proto = self._get_registry_proto(allow_cache=allow_cache)
for feature_view_proto in registry_proto.stream_feature_views:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we moved this into a different field in the Registry proto? Could we have reused the existing FeatureView proto message?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, the proto for stream feature view and feature view are different.

if (
feature_view_proto.spec.name == name
and feature_view_proto.spec.project == project
):
return StreamFeatureView.from_proto(feature_view_proto)
raise FeatureViewNotFoundException(name, project)

def delete_feature_service(self, name: str, project: str, commit: bool = True):
"""
Deletes a feature service or raises an exception if not found.
Expand Down
38 changes: 33 additions & 5 deletions sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
from click.exceptions import BadParameter

from feast import PushSource
from feast.data_source import DataSource
from feast.batch_feature_view import BatchFeatureView
from feast.data_source import DataSource, KafkaSource
from feast.diff.registry_diff import extract_objects_for_keep_delete_update_add
from feast.entity import Entity
from feast.feature_service import FeatureService
Expand All @@ -25,6 +26,7 @@
from feast.repo_config import RepoConfig
from feast.repo_contents import RepoContents
from feast.request_feature_view import RequestFeatureView
from feast.stream_feature_view import StreamFeatureView
from feast.usage import log_exceptions_and_usage


Expand Down Expand Up @@ -122,8 +124,11 @@ def parse_repo(repo_root: Path) -> RepoContents:
):
res.data_sources.append(obj)
data_sources_set.add(obj)
if isinstance(obj, FeatureView) and not any(
(obj is fv) for fv in res.feature_views
if (
isinstance(obj, FeatureView)
and not any((obj is fv) for fv in res.feature_views)
and not isinstance(obj, StreamFeatureView)
and not isinstance(obj, BatchFeatureView)
):
res.feature_views.append(obj)
if isinstance(obj.stream_source, PushSource) and not any(
Expand All @@ -133,6 +138,19 @@ def parse_repo(repo_root: Path) -> RepoContents:
# Don't add if the push source's batch source is a duplicate of an existing batch source
if push_source_dep not in data_sources_set:
res.data_sources.append(push_source_dep)
elif isinstance(obj, StreamFeatureView) and not any(
(obj is sfv) for sfv in res.stream_feature_views
):
res.stream_feature_views.append(obj)
if (
isinstance(obj.stream_source, PushSource)
or isinstance(obj.stream_source, KafkaSource)
and not any((obj is ds) for ds in res.data_sources)
):
batch_source_dep = obj.stream_source.batch_source
# Don't add if the push source's batch source is a duplicate of an existing batch source
if batch_source_dep and batch_source_dep not in data_sources_set:
res.data_sources.append(batch_source_dep)
elif isinstance(obj, Entity) and not any(
(obj is entity) for entity in res.entities
):
Expand Down Expand Up @@ -196,7 +214,12 @@ def extract_objects_for_apply_delete(project, registry, repo):

all_to_apply: List[
Union[
Entity, FeatureView, RequestFeatureView, OnDemandFeatureView, FeatureService
Entity,
FeatureView,
RequestFeatureView,
OnDemandFeatureView,
StreamFeatureView,
FeatureService,
]
] = []
for object_type in FEAST_OBJECT_TYPES:
Expand All @@ -205,7 +228,12 @@ def extract_objects_for_apply_delete(project, registry, repo):

all_to_delete: List[
Union[
Entity, FeatureView, RequestFeatureView, OnDemandFeatureView, FeatureService
Entity,
FeatureView,
RequestFeatureView,
OnDemandFeatureView,
StreamFeatureView,
FeatureService,
]
] = []
for object_type in FEAST_OBJECT_TYPES:
Expand Down
Loading