Skip to content

Commit

Permalink
fix: Fix bugs in applying stream feature view and retrieving online f…
Browse files Browse the repository at this point in the history
…eatures (#2754)

* Fix apply workflow

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix issues

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Reformat

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>
  • Loading branch information
kevjumba authored Jun 2, 2022
1 parent 44a3f05 commit d024e5e
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 34 deletions.
4 changes: 4 additions & 0 deletions protos/feast/core/StreamFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ option java_package = "feast.proto.core";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "feast/core/OnDemandFeatureView.proto";
import "feast/core/FeatureView.proto";
import "feast/core/Feature.proto";
import "feast/core/DataSource.proto";
import "feast/core/Aggregation.proto";
Expand Down Expand Up @@ -95,4 +96,7 @@ message StreamFeatureViewMeta {

// Time where this Feature View is last updated
google.protobuf.Timestamp last_updated_timestamp = 2;

// List of pairs (start_time, end_time) for which this feature view has been materialized.
repeated MaterializationInterval materialization_intervals = 3;
}
6 changes: 6 additions & 0 deletions sdk/python/feast/diff/registry_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
from feast.protos.feast.core.RequestFeatureView_pb2 import (
RequestFeatureView as RequestFeatureViewProto,
)
from feast.protos.feast.core.StreamFeatureView_pb2 import (
StreamFeatureView as StreamFeatureViewProto,
)
from feast.protos.feast.core.ValidationProfile_pb2 import (
ValidationReference as ValidationReferenceProto,
)
Expand Down Expand Up @@ -106,6 +109,7 @@ def tag_objects_for_keep_delete_update_add(
FeatureServiceProto,
OnDemandFeatureViewProto,
RequestFeatureViewProto,
StreamFeatureViewProto,
ValidationReferenceProto,
)

Expand Down Expand Up @@ -292,6 +296,7 @@ def apply_diff_to_registry(
FeastObjectType.FEATURE_VIEW,
FeastObjectType.ON_DEMAND_FEATURE_VIEW,
FeastObjectType.REQUEST_FEATURE_VIEW,
FeastObjectType.STREAM_FEATURE_VIEW,
]:
feature_view_obj = cast(
BaseFeatureView, feast_object_diff.current_feast_object
Expand Down Expand Up @@ -331,6 +336,7 @@ def apply_diff_to_registry(
FeastObjectType.FEATURE_VIEW,
FeastObjectType.ON_DEMAND_FEATURE_VIEW,
FeastObjectType.REQUEST_FEATURE_VIEW,
FeastObjectType.STREAM_FEATURE_VIEW,
]:
registry.apply_feature_view(
cast(BaseFeatureView, feast_object_diff.new_feast_object),
Expand Down
75 changes: 51 additions & 24 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,40 @@ def _get_feature_view(
feature_view.entities = []
return feature_view

@log_exceptions_and_usage
def get_stream_feature_view(
self, name: str, allow_registry_cache: bool = False
) -> StreamFeatureView:
"""
Retrieves a stream feature view.
Args:
name: Name of stream feature view.
allow_registry_cache: (Optional) Whether to allow returning this entity from a cached registry
Returns:
The specified stream feature view.
Raises:
FeatureViewNotFoundException: The feature view could not be found.
"""
return self._get_stream_feature_view(
name, allow_registry_cache=allow_registry_cache
)

def _get_stream_feature_view(
self,
name: str,
hide_dummy_entity: bool = True,
allow_registry_cache: bool = False,
) -> StreamFeatureView:
stream_feature_view = self._registry.get_stream_feature_view(
name, self.project, allow_cache=allow_registry_cache
)
if hide_dummy_entity and stream_feature_view.entities[0] == DUMMY_ENTITY_NAME:
stream_feature_view.entities = []
return stream_feature_view

@log_exceptions_and_usage
def get_on_demand_feature_view(self, name: str) -> OnDemandFeatureView:
"""
Expand Down Expand Up @@ -935,7 +969,6 @@ def get_historical_features(
all_feature_views,
all_request_feature_views,
all_on_demand_feature_views,
all_stream_feature_views,
) = self._get_feature_views_to_use(features)

if all_request_feature_views:
Expand Down Expand Up @@ -1321,9 +1354,14 @@ def write_to_online_store(
ingests data directly into the Online store
"""
# TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type
feature_view = self.get_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
try:
feature_view = self.get_stream_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
except FeatureViewNotFoundException:
feature_view = self.get_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
entities = []
for entity_name in feature_view.entities:
entities.append(
Expand Down Expand Up @@ -1456,7 +1494,6 @@ def _get_online_features(
requested_feature_views,
requested_request_feature_views,
requested_on_demand_feature_views,
request_stream_feature_views,
) = self._get_feature_views_to_use(
features=features, allow_cache=True, hide_dummy_entity=False
)
Expand Down Expand Up @@ -1994,15 +2031,17 @@ def _get_feature_views_to_use(
allow_cache=False,
hide_dummy_entity: bool = True,
) -> Tuple[
List[FeatureView],
List[RequestFeatureView],
List[OnDemandFeatureView],
List[StreamFeatureView],
List[FeatureView], List[RequestFeatureView], List[OnDemandFeatureView],
]:

fvs = {
fv.name: fv
for fv in self._list_feature_views(allow_cache, hide_dummy_entity)
for fv in [
*self._list_feature_views(allow_cache, hide_dummy_entity),
*self._registry.list_stream_feature_views(
project=self.project, allow_cache=allow_cache
),
]
}

request_fvs = {
Expand All @@ -2019,15 +2058,8 @@ def _get_feature_views_to_use(
)
}

sfvs = {
fv.name: fv
for fv in self._registry.list_stream_feature_views(
project=self.project, allow_cache=allow_cache
)
}

if isinstance(features, FeatureService):
fvs_to_use, request_fvs_to_use, od_fvs_to_use, sfvs_to_use = [], [], [], []
fvs_to_use, request_fvs_to_use, od_fvs_to_use = [], [], []
for fv_name, projection in [
(projection.name, projection)
for projection in features.feature_view_projections
Expand All @@ -2048,23 +2080,18 @@ def _get_feature_views_to_use(
fv = fvs[projection.name].with_projection(copy.copy(projection))
if fv not in fvs_to_use:
fvs_to_use.append(fv)
elif fv_name in sfvs:
sfvs_to_use.append(
sfvs[fv_name].with_projection(copy.copy(projection))
)
else:
raise ValueError(
f"The provided feature service {features.name} contains a reference to a feature view"
f"{fv_name} which doesn't exist. Please make sure that you have created the feature view"
f'{fv_name} and that you have registered it by running "apply".'
)
views_to_use = (fvs_to_use, request_fvs_to_use, od_fvs_to_use, sfvs_to_use)
views_to_use = (fvs_to_use, request_fvs_to_use, od_fvs_to_use)
else:
views_to_use = (
[*fvs.values()],
[*request_fvs.values()],
[*od_fvs.values()],
[*sfvs.values()],
)

return views_to_use
Expand Down
5 changes: 4 additions & 1 deletion sdk/python/feast/infra/online_stores/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,10 @@ def plan(
path=self._get_db_path(config),
name=_table_id(project, FeatureView.from_proto(view)),
)
for view in desired_registry_proto.feature_views
for view in [
*desired_registry_proto.feature_views,
*desired_registry_proto.stream_feature_views,
]
]
return infra_objects

Expand Down
29 changes: 29 additions & 0 deletions sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class FeastObjectType(Enum):
FEATURE_VIEW = "feature view"
ON_DEMAND_FEATURE_VIEW = "on demand feature view"
REQUEST_FEATURE_VIEW = "request feature view"
STREAM_FEATURE_VIEW = "stream feature view"
FEATURE_SERVICE = "feature service"

@staticmethod
Expand All @@ -93,6 +94,9 @@ def get_objects_from_registry(
FeastObjectType.REQUEST_FEATURE_VIEW: registry.list_request_feature_views(
project=project
),
FeastObjectType.STREAM_FEATURE_VIEW: registry.list_stream_feature_views(
project=project,
),
FeastObjectType.FEATURE_SERVICE: registry.list_feature_services(
project=project
),
Expand All @@ -108,6 +112,7 @@ def get_objects_from_repo_contents(
FeastObjectType.FEATURE_VIEW: repo_contents.feature_views,
FeastObjectType.ON_DEMAND_FEATURE_VIEW: repo_contents.on_demand_feature_views,
FeastObjectType.REQUEST_FEATURE_VIEW: repo_contents.request_feature_views,
FeastObjectType.STREAM_FEATURE_VIEW: repo_contents.stream_feature_views,
FeastObjectType.FEATURE_SERVICE: repo_contents.feature_services,
}

Expand Down Expand Up @@ -717,6 +722,30 @@ def get_feature_view(
return FeatureView.from_proto(feature_view_proto)
raise FeatureViewNotFoundException(name, project)

def get_stream_feature_view(
self, name: str, project: str, allow_cache: bool = False
) -> StreamFeatureView:
"""
Retrieves a stream feature view.
Args:
name: Name of stream feature view
project: Feast project that this stream feature view belongs to
allow_cache: Allow returning feature view from the cached registry
Returns:
Returns either the specified feature view, or raises an exception if
none is found
"""
registry_proto = self._get_registry_proto(allow_cache=allow_cache)
for feature_view_proto in registry_proto.stream_feature_views:
if (
feature_view_proto.spec.name == name
and feature_view_proto.spec.project == project
):
return StreamFeatureView.from_proto(feature_view_proto)
raise FeatureViewNotFoundException(name, project)

def delete_feature_service(self, name: str, project: str, commit: bool = True):
"""
Deletes a feature service or raises an exception if not found.
Expand Down
38 changes: 33 additions & 5 deletions sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
from click.exceptions import BadParameter

from feast import PushSource
from feast.data_source import DataSource
from feast.batch_feature_view import BatchFeatureView
from feast.data_source import DataSource, KafkaSource
from feast.diff.registry_diff import extract_objects_for_keep_delete_update_add
from feast.entity import Entity
from feast.feature_service import FeatureService
Expand All @@ -25,6 +26,7 @@
from feast.repo_config import RepoConfig
from feast.repo_contents import RepoContents
from feast.request_feature_view import RequestFeatureView
from feast.stream_feature_view import StreamFeatureView
from feast.usage import log_exceptions_and_usage


Expand Down Expand Up @@ -122,8 +124,11 @@ def parse_repo(repo_root: Path) -> RepoContents:
):
res.data_sources.append(obj)
data_sources_set.add(obj)
if isinstance(obj, FeatureView) and not any(
(obj is fv) for fv in res.feature_views
if (
isinstance(obj, FeatureView)
and not any((obj is fv) for fv in res.feature_views)
and not isinstance(obj, StreamFeatureView)
and not isinstance(obj, BatchFeatureView)
):
res.feature_views.append(obj)
if isinstance(obj.stream_source, PushSource) and not any(
Expand All @@ -133,6 +138,19 @@ def parse_repo(repo_root: Path) -> RepoContents:
# Don't add if the push source's batch source is a duplicate of an existing batch source
if push_source_dep not in data_sources_set:
res.data_sources.append(push_source_dep)
elif isinstance(obj, StreamFeatureView) and not any(
(obj is sfv) for sfv in res.stream_feature_views
):
res.stream_feature_views.append(obj)
if (
isinstance(obj.stream_source, PushSource)
or isinstance(obj.stream_source, KafkaSource)
and not any((obj is ds) for ds in res.data_sources)
):
batch_source_dep = obj.stream_source.batch_source
# Don't add if the push source's batch source is a duplicate of an existing batch source
if batch_source_dep and batch_source_dep not in data_sources_set:
res.data_sources.append(batch_source_dep)
elif isinstance(obj, Entity) and not any(
(obj is entity) for entity in res.entities
):
Expand Down Expand Up @@ -196,7 +214,12 @@ def extract_objects_for_apply_delete(project, registry, repo):

all_to_apply: List[
Union[
Entity, FeatureView, RequestFeatureView, OnDemandFeatureView, FeatureService
Entity,
FeatureView,
RequestFeatureView,
OnDemandFeatureView,
StreamFeatureView,
FeatureService,
]
] = []
for object_type in FEAST_OBJECT_TYPES:
Expand All @@ -205,7 +228,12 @@ def extract_objects_for_apply_delete(project, registry, repo):

all_to_delete: List[
Union[
Entity, FeatureView, RequestFeatureView, OnDemandFeatureView, FeatureService
Entity,
FeatureView,
RequestFeatureView,
OnDemandFeatureView,
StreamFeatureView,
FeatureService,
]
] = []
for object_type in FEAST_OBJECT_TYPES:
Expand Down
Loading

0 comments on commit d024e5e

Please sign in to comment.