Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adding write capability to online store to on demand feature views #4585

Merged
merged 66 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
508bad2
merged changes
franciscojavierarceo Sep 24, 2024
899f7f4
saving progress
franciscojavierarceo Aug 17, 2024
5b69c71
merged changes to odfv
franciscojavierarceo Sep 24, 2024
d107354
linted
franciscojavierarceo Aug 18, 2024
a34ec4d
adding the test needed to show the expected behavior
franciscojavierarceo Aug 18, 2024
b95d2a2
updated test case
franciscojavierarceo Aug 21, 2024
47974c2
saving progress
franciscojavierarceo Aug 21, 2024
ceb75a2
merging
franciscojavierarceo Sep 24, 2024
6688933
merged
franciscojavierarceo Sep 24, 2024
4c28acc
merged
franciscojavierarceo Sep 24, 2024
fd577dc
merging
franciscojavierarceo Sep 24, 2024
167fe6c
adding the entity keys for now to do retrieval
franciscojavierarceo Aug 29, 2024
54811d7
adding entity to odfv
franciscojavierarceo Aug 29, 2024
f0d87fc
checking in progress...getting closer
franciscojavierarceo Aug 29, 2024
b7091ca
may have to revert some of this...looks like the challenge is getting…
franciscojavierarceo Aug 31, 2024
d2a12f8
moving things around to make it easier to debug
franciscojavierarceo Sep 1, 2024
9d496ba
debugging
franciscojavierarceo Sep 1, 2024
58280aa
merged
franciscojavierarceo Sep 24, 2024
c42be75
merging
franciscojavierarceo Sep 24, 2024
fb3b315
Rebasing and merging changes from other PR
franciscojavierarceo Sep 6, 2024
82f3f8b
Merging changes continued
franciscojavierarceo Sep 7, 2024
172693d
update the _make_inference to include odfvs with writes in the update…
franciscojavierarceo Sep 7, 2024
66c5b57
have the table being written now...the create table happens in the Sq…
franciscojavierarceo Sep 8, 2024
ea3b98a
checking in progress
franciscojavierarceo Sep 9, 2024
905912b
adding logs
franciscojavierarceo Sep 10, 2024
25d42dd
updating permissions
franciscojavierarceo Sep 10, 2024
03d6116
going to error out on purpose
franciscojavierarceo Sep 10, 2024
9b16615
adding unit test and merging changes
franciscojavierarceo Sep 18, 2024
adbaeb6
almost got everything working and type validation behaving
franciscojavierarceo Sep 18, 2024
11d2914
cleaned up and have tests behaving
franciscojavierarceo Sep 18, 2024
64375ee
adding print
franciscojavierarceo Sep 21, 2024
3e6912a
removing print
franciscojavierarceo Sep 21, 2024
5751a72
checking in progress
franciscojavierarceo Sep 23, 2024
b0208e1
updating test
franciscojavierarceo Sep 25, 2024
463d8bb
adding test
franciscojavierarceo Sep 25, 2024
2981817
linted and updated
franciscojavierarceo Sep 25, 2024
3a33368
removed print
franciscojavierarceo Sep 25, 2024
22bf637
updated tests to test actual behavior
franciscojavierarceo Sep 25, 2024
0d0d117
checking in progress
franciscojavierarceo Sep 28, 2024
5bff836
changing typo
franciscojavierarceo Sep 28, 2024
271f814
updating test
franciscojavierarceo Sep 28, 2024
754b0e8
testing changes
franciscojavierarceo Sep 28, 2024
25c7181
checking to see if thing still working
franciscojavierarceo Sep 29, 2024
1d4023f
removed print
franciscojavierarceo Sep 29, 2024
3662102
undo change for odfv file
franciscojavierarceo Sep 29, 2024
74e7ede
updated tests
franciscojavierarceo Sep 30, 2024
59940cf
okay well have the unit test working
franciscojavierarceo Oct 1, 2024
b223feb
type changes, hope i dont regret them
franciscojavierarceo Oct 1, 2024
01770e2
updated stream feature view piece
franciscojavierarceo Oct 2, 2024
7606481
updated sfv ifelse
franciscojavierarceo Oct 2, 2024
c4ebf18
removing print
franciscojavierarceo Oct 2, 2024
72add32
formatted and updated test
franciscojavierarceo Oct 2, 2024
24e0a84
resolving some linter errors
franciscojavierarceo Oct 3, 2024
b92bf32
fixed linter and formatting
franciscojavierarceo Oct 3, 2024
934c1e9
okay think it is working
franciscojavierarceo Oct 3, 2024
0ce93f2
linter
franciscojavierarceo Oct 3, 2024
bf31d59
updated type map for integration tests
franciscojavierarceo Oct 4, 2024
9aff889
Merge branch 'master' into podfv2
franciscojavierarceo Oct 5, 2024
acc7b7f
updated local feature store test
franciscojavierarceo Oct 5, 2024
12d9100
fixed local fs test
franciscojavierarceo Oct 5, 2024
3c94614
chore: Updated snowflake test to be more explicit about post apply en…
franciscojavierarceo Oct 5, 2024
f01b691
merging
franciscojavierarceo Oct 5, 2024
c26ae75
fixed test to entity_rows_to_read
franciscojavierarceo Oct 5, 2024
c6d55c7
resolved inf conflicts
franciscojavierarceo Oct 5, 2024
52b8c4d
lint
franciscojavierarceo Oct 5, 2024
ca0971a
Updated tests and lint, think I have everything working
franciscojavierarceo Oct 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,10 +467,15 @@ def __init__(self):


class DataFrameSerializationError(FeastError):
def __init__(self, input_dict: dict):
super().__init__(
f"Failed to serialize the provided dictionary into a pandas DataFrame: {input_dict.keys()}"
)
def __init__(self, input: Any):
if isinstance(input, dict):
super().__init__(
f"Failed to serialize the provided dictionary into a pandas DataFrame: {input.keys()}"
)
else:
super().__init__(
"Failed to serialize the provided input into a pandas DataFrame"
)


class PermissionNotFoundException(FeastError):
Expand Down
4 changes: 3 additions & 1 deletion sdk/python/feast/feature_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ def __init__(
if isinstance(feature_grouping, BaseFeatureView):
self.feature_view_projections.append(feature_grouping.projection)

def infer_features(self, fvs_to_update: Dict[str, FeatureView]):
def infer_features(
self, fvs_to_update: Dict[str, Union[FeatureView, BaseFeatureView]]
):
"""
Infers the features for the projections of this feature service, and updates this feature
service in place.
Expand Down
70 changes: 54 additions & 16 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,12 @@ def _make_inferences(
update_feature_views_with_inferred_features_and_entities(
sfvs_to_update, entities + entities_to_update, self.config
)
# TODO(kevjumba): Update schema inferrence
# We need to attach the time stamp fields to the underlying data sources
# and cascade the dependencies
update_feature_views_with_inferred_features_and_entities(
odfvs_to_update, entities + entities_to_update, self.config
)
# TODO(kevjumba): Update schema inference
for sfv in sfvs_to_update:
if not sfv.schema:
raise ValueError(
Expand All @@ -618,8 +623,13 @@ def _make_inferences(
for odfv in odfvs_to_update:
odfv.infer_features()

odfvs_to_write = [
odfv for odfv in odfvs_to_update if odfv.write_to_online_store
]
# Update to include ODFVs with write to online store
fvs_to_update_map = {
view.name: view for view in [*views_to_update, *sfvs_to_update]
view.name: view
for view in [*views_to_update, *sfvs_to_update, *odfvs_to_write]
}
for feature_service in feature_services_to_update:
feature_service.infer_features(fvs_to_update=fvs_to_update_map)
Expand Down Expand Up @@ -847,6 +857,11 @@ def apply(
]
sfvs_to_update = [ob for ob in objects if isinstance(ob, StreamFeatureView)]
odfvs_to_update = [ob for ob in objects if isinstance(ob, OnDemandFeatureView)]
odfvs_with_writes_to_update = [
ob
for ob in objects
if isinstance(ob, OnDemandFeatureView) and ob.write_to_online_store
]
services_to_update = [ob for ob in objects if isinstance(ob, FeatureService)]
data_sources_set_to_update = {
ob for ob in objects if isinstance(ob, DataSource)
Expand All @@ -868,10 +883,23 @@ def apply(
for batch_source in batch_sources_to_add:
data_sources_set_to_update.add(batch_source)

for fv in itertools.chain(views_to_update, sfvs_to_update):
data_sources_set_to_update.add(fv.batch_source)
if fv.stream_source:
data_sources_set_to_update.add(fv.stream_source)
for fv in itertools.chain(
views_to_update, sfvs_to_update, odfvs_with_writes_to_update
):
if isinstance(fv, FeatureView):
data_sources_set_to_update.add(fv.batch_source)
if hasattr(fv, "stream_source"):
if fv.stream_source:
data_sources_set_to_update.add(fv.stream_source)
if isinstance(fv, OnDemandFeatureView):
for source_fvp in fv.source_feature_view_projections:
odfv_batch_source: Optional[DataSource] = (
fv.source_feature_view_projections[source_fvp].batch_source
)
if odfv_batch_source is not None:
data_sources_set_to_update.add(odfv_batch_source)
else:
pass

for odfv in odfvs_to_update:
for v in odfv.source_request_sources.values():
Expand All @@ -884,7 +912,9 @@ def apply(

# Validate all feature views and make inferences.
self._validate_all_feature_views(
views_to_update, odfvs_to_update, sfvs_to_update
views_to_update,
odfvs_to_update,
sfvs_to_update,
)
self._make_inferences(
data_sources_to_update,
Expand Down Expand Up @@ -989,7 +1019,9 @@ def apply(
tables_to_delete: List[FeatureView] = (
views_to_delete + sfvs_to_delete if not partial else [] # type: ignore
)
tables_to_keep: List[FeatureView] = views_to_update + sfvs_to_update # type: ignore
tables_to_keep: List[
Union[FeatureView, StreamFeatureView, OnDemandFeatureView]
] = views_to_update + sfvs_to_update + odfvs_with_writes_to_update # type: ignore

self._get_provider().update_infra(
project=self.project,
Expand Down Expand Up @@ -1444,19 +1476,18 @@ def write_to_online_store(
inputs: Optional the dictionary object to be written
allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry.
"""
# TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type
feature_view_dict = {
fv_proto.name: fv_proto
for fv_proto in self.list_all_feature_views(allow_registry_cache)
}
try:
feature_view: FeatureView = self.get_stream_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
feature_view = feature_view_dict[feature_view_name]
except FeatureViewNotFoundException:
feature_view = self.get_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
raise FeatureViewNotFoundException(feature_view_name, self.project)
if df is not None and inputs is not None:
raise ValueError("Both df and inputs cannot be provided at the same time.")
if df is None and inputs is not None:
if isinstance(inputs, dict):
if isinstance(inputs, dict) or isinstance(inputs, List):
try:
df = pd.DataFrame(inputs)
except Exception as _:
Expand All @@ -1465,6 +1496,13 @@ def write_to_online_store(
pass
else:
raise ValueError("inputs must be a dictionary or a pandas DataFrame.")
if df is not None and inputs is None:
if isinstance(df, dict) or isinstance(df, List):
try:
df = pd.DataFrame(df)
except Exception as _:
raise DataFrameSerializationError(df)

provider = self._get_provider()
provider.ingest_df(feature_view, df)

Expand Down
39 changes: 23 additions & 16 deletions sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,13 @@ def update_feature_views_with_inferred_features_and_entities(
)

if not fv.features:
raise RegistryInferenceFailure(
"FeatureView",
f"Could not infer Features for the FeatureView named {fv.name}.",
)
if isinstance(fv, OnDemandFeatureView):
return None
else:
raise RegistryInferenceFailure(
"FeatureView",
f"Could not infer Features for the FeatureView named {fv.name}.",
)


def _infer_features_and_entities(
Expand All @@ -209,6 +212,7 @@ def _infer_features_and_entities(
fv, join_keys, run_inference_for_features, config
)

entity_columns: List[Field] = fv.entity_columns if fv.entity_columns else []
columns_to_exclude = {
fv.batch_source.timestamp_field,
fv.batch_source.created_timestamp_column,
Expand All @@ -235,7 +239,7 @@ def _infer_features_and_entities(
if field.name not in [
entity_column.name for entity_column in fv.entity_columns
]:
fv.entity_columns.append(field)
entity_columns.append(field)
elif not re.match(
"^__|__$", col_name
): # double underscores often signal an internal-use column
Expand All @@ -256,6 +260,8 @@ def _infer_features_and_entities(
if field.name not in [feature.name for feature in fv.features]:
fv.features.append(field)

fv.entity_columns = entity_columns


def _infer_on_demand_features_and_entities(
fv: OnDemandFeatureView,
Expand All @@ -282,18 +288,19 @@ def _infer_on_demand_features_and_entities(

batch_source = getattr(source_feature_view, "batch_source")
batch_field_mapping = getattr(batch_source or None, "field_mapping")
if batch_field_mapping:
for (
original_col,
mapped_col,
) in batch_field_mapping.items():
if mapped_col in columns_to_exclude:
columns_to_exclude.remove(mapped_col)
columns_to_exclude.add(original_col)
for (
original_col,
mapped_col,
) in batch_field_mapping.items():
if mapped_col in columns_to_exclude:
columns_to_exclude.remove(mapped_col)
columns_to_exclude.add(original_col)

table_column_names_and_types = batch_source.get_table_column_names_and_types(
config
)
batch_field_mapping = getattr(batch_source, "field_mapping", {})

table_column_names_and_types = (
batch_source.get_table_column_names_and_types(config)
)
for col_name, col_datatype in table_column_names_and_types:
if col_name in columns_to_exclude:
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _get_column_names
Expand Down Expand Up @@ -80,10 +81,10 @@ def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.repo_config import RepoConfig
from feast.stream_feature_view import StreamFeatureView

Expand Down Expand Up @@ -89,7 +90,7 @@ def update(
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.passthrough_provider import PassthroughProvider
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.stream_feature_view import StreamFeatureView
Expand Down Expand Up @@ -77,10 +78,10 @@ def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.repo_config import FeastConfigBaseModel
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _get_column_names
Expand Down Expand Up @@ -122,10 +123,10 @@ def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
Expand Down
5 changes: 3 additions & 2 deletions sdk/python/feast/infra/materialization/local_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.stream_feature_view import StreamFeatureView
from feast.utils import (
Expand Down Expand Up @@ -69,10 +70,10 @@ def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
Expand Down
5 changes: 3 additions & 2 deletions sdk/python/feast/infra/materialization/snowflake_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
get_snowflake_online_store_path,
package_snowpark_zip,
)
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
Expand Down Expand Up @@ -120,10 +121,10 @@ def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
Expand Down
6 changes: 5 additions & 1 deletion sdk/python/feast/infra/online_stores/online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Tuple, Union

from feast import Entity, utils
from feast.batch_feature_view import BatchFeatureView
from feast.feature_service import FeatureService
from feast.feature_view import FeatureView
from feast.infra.infra_object import InfraObject
Expand All @@ -27,6 +28,7 @@
from feast.protos.feast.types.Value_pb2 import RepeatedValue
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import RepoConfig
from feast.stream_feature_view import StreamFeatureView


class OnlineStore(ABC):
Expand Down Expand Up @@ -288,7 +290,9 @@ def update(
self,
config: RepoConfig,
tables_to_delete: Sequence[FeatureView],
tables_to_keep: Sequence[FeatureView],
tables_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
Expand Down
Loading
Loading