From 9b851f14a17075473b59b8c731261a8aa2a78fe7 Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Fri, 12 Mar 2021 10:06:39 -0800 Subject: [PATCH 1/3] add featureviews Signed-off-by: Oleg Avdeev --- protos/feast/core/FeatureView.proto | 70 ++++++++++ protos/feast/core/Registry.proto | 2 + sdk/python/feast/__init__.py | 2 + sdk/python/feast/big_query_source.py | 57 -------- sdk/python/feast/data_source.py | 4 + sdk/python/feast/feature_view.py | 125 +++++++++++++++--- sdk/python/feast/infra/gcp.py | 19 ++- sdk/python/feast/infra/local_sqlite.py | 21 +-- sdk/python/feast/infra/provider.py | 19 ++- sdk/python/feast/offline_store.py | 20 +-- sdk/python/feast/registry.py | 93 ++++++++++++- sdk/python/feast/repo_operations.py | 41 +++++- .../tests/cli/example_feature_repo_1.py | 10 +- .../tests/cli/online_read_write_test.py | 2 +- 14 files changed, 371 insertions(+), 114 deletions(-) create mode 100644 protos/feast/core/FeatureView.proto delete mode 100644 sdk/python/feast/big_query_source.py diff --git a/protos/feast/core/FeatureView.proto b/protos/feast/core/FeatureView.proto new file mode 100644 index 0000000000..d37f3f729d --- /dev/null +++ b/protos/feast/core/FeatureView.proto @@ -0,0 +1,70 @@ +// +// Copyright 2020 The Feast Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + + +syntax = "proto3"; +package feast.core; + +option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core"; +option java_outer_classname = "FeatureViewProto"; +option java_package = "feast.proto.core"; + +import "google/protobuf/duration.proto"; +import "google/protobuf/timestamp.proto"; +import "feast/core/DataSource.proto"; +import "feast/core/Feature.proto"; + +message FeatureView { + // User-specified specifications of this feature table. + FeatureViewSpec spec = 1; + + // System-populated metadata for this feature table. + FeatureViewMeta meta = 2; + string project = 3; +} + +message FeatureViewSpec { + // Name of the feature table. Must be unique. Not updated. + string name = 1; + + // List names of entities to associate with the Features defined in this + // Feature View. Not updatable. + repeated string entities = 3; + + // List of features specifications for each feature defined with this feature table. + repeated FeatureSpecV2 features = 4; + + // User defined metadata + map tags = 5; + + // Features in this feature table can only be retrieved from online serving + // younger than ttl. Ttl is measured as the duration of time between + // the feature's event timestamp and when the feature is retrieved + // Feature values outside ttl will be returned as unset values and indicated to end user + google.protobuf.Duration ttl = 6; + + DataSource input = 7; + + bool online = 8; +} + +message FeatureViewMeta { + // Time where this Feature View is created + google.protobuf.Timestamp created_timestamp = 1; + + // Time where this Feature View is last updated + google.protobuf.Timestamp last_updated_timestamp = 2; +} diff --git a/protos/feast/core/Registry.proto b/protos/feast/core/Registry.proto index 89145c156a..bb32d6fc3c 100644 --- a/protos/feast/core/Registry.proto +++ b/protos/feast/core/Registry.proto @@ -23,11 +23,13 @@ option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core"; import "feast/core/Entity.proto"; import "feast/core/FeatureTable.proto"; +import "feast/core/FeatureView.proto"; import "google/protobuf/timestamp.proto"; message Registry { repeated Entity entities = 1; repeated FeatureTable feature_tables = 2; + repeated FeatureView feature_views = 6; string registry_schema_version = 3; // to support migrations; incremented when schema is changed string version_id = 4; // version id, random string generated on each update of the data; now used only for debugging purposes diff --git a/sdk/python/feast/__init__.py b/sdk/python/feast/__init__.py index 5ac3658d18..948a551b2f 100644 --- a/sdk/python/feast/__init__.py +++ b/sdk/python/feast/__init__.py @@ -11,6 +11,7 @@ from .entity import Entity from .feature import Feature from .feature_table import FeatureTable +from .feature_view import FeatureView from .value_type import ValueType try: @@ -28,6 +29,7 @@ "KinesisSource", "Feature", "FeatureTable", + "FeatureView", "SourceType", "ValueType", ] diff --git a/sdk/python/feast/big_query_source.py b/sdk/python/feast/big_query_source.py deleted file mode 100644 index 53c3c842c5..0000000000 --- a/sdk/python/feast/big_query_source.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright 2019 The Feast Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from typing import Dict, Optional - - -class BigQuerySource: - """ - Represents a BigQuery table reference or BigQuery query that returns a set of features. - """ - - def __init__( - self, - event_timestamp_column: str, - table_ref: Optional[str] = None, - created_timestamp_column: Optional[str] = None, - field_mapping: Optional[Dict[str, str]] = None, - query: Optional[str] = None, - ): - if (table_ref is None) == (query is None): - raise ValueError("Exactly one of table_ref and query should be specified") - if field_mapping is not None: - for value in field_mapping.values(): - if list(field_mapping.values()).count(value) > 1: - raise ValueError( - f"Two fields cannot be mapped to the same name {value}" - ) - - if event_timestamp_column in field_mapping.keys(): - raise ValueError( - f"The field {event_timestamp_column} is mapped to {field_mapping[event_timestamp_column]}. Please either remove this field mapping or use {field_mapping[event_timestamp_column]} as the event_timestamp_column." - ) - - if ( - created_timestamp_column is not None - and created_timestamp_column in field_mapping.keys() - ): - raise ValueError( - f"The field {created_timestamp_column} is mapped to {field_mapping[created_timestamp_column]}. Please either remove this field mapping or use {field_mapping[created_timestamp_column]} as the _timestamp_column." - ) - - self.table_ref = table_ref - self.event_timestamp_column = event_timestamp_column - self.created_timestamp_column = created_timestamp_column - self.field_mapping = field_mapping - self.query = query - return diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 4d80a11457..a8a7d71871 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -579,6 +579,10 @@ def __eq__(self, other): return True + @property + def table_ref(self): + return self._bigquery_options.table_ref + @property def bigquery_options(self): """ diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 44f0106a04..8d938cd011 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -11,12 +11,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime -from typing import Dict, List +from datetime import timedelta +from typing import Dict, List, Optional, Union -from feast.big_query_source import BigQuerySource -from feast.entity import Entity +from google.protobuf.duration_pb2 import Duration +from google.protobuf.timestamp_pb2 import Timestamp + +from feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto +from feast.core.FeatureView_pb2 import FeatureViewMeta as FeatureViewMetaProto +from feast.core.FeatureView_pb2 import FeatureViewSpec as FeatureViewSpecProto +from feast.data_source import BigQuerySource, DataSource from feast.feature import Feature +from feast.value_type import ValueType class FeatureView: @@ -24,30 +30,119 @@ class FeatureView: A FeatureView defines a logical grouping of servable features. """ + name: str + entities: List[str] + features: List[Feature] + tags: Dict[str, str] + ttl: Optional[Duration] + online: bool + input: BigQuerySource + + created_timestamp: Optional[Timestamp] = None + last_updated_timestamp: Optional[Timestamp] = None + def __init__( self, name: str, - entities: List[Entity], + entities: List[str], features: List[Feature], tags: Dict[str, str], - ttl: str, + ttl: Optional[Union[Duration, timedelta]], online: bool, - inputs: BigQuerySource, - feature_start_time: datetime, + input: BigQuerySource, ): - cols = [entity.name for entity in entities] + [feat.name for feat in features] + cols = [entity for entity in entities] + [feat.name for feat in features] for col in cols: - if inputs.field_mapping is not None and col in inputs.field_mapping.keys(): + if input.field_mapping is not None and col in input.field_mapping.keys(): raise ValueError( - f"The field {col} is mapped to {inputs.field_mapping[col]} for this data source. Please either remove this field mapping or use {inputs.field_mapping[col]} as the Entity or Feature name." + f"The field {col} is mapped to {input.field_mapping[col]} for this data source. Please either remove this field mapping or use {input.field_mapping[col]} as the Entity or Feature name." ) self.name = name self.entities = entities self.features = features self.tags = tags - self.ttl = ttl + if isinstance(ttl, timedelta): + proto_ttl = Duration() + proto_ttl.FromTimedelta(ttl) + self.ttl = proto_ttl + else: + self.ttl = ttl + self.online = online - self.inputs = inputs - self.feature_start_time = feature_start_time - return + self.input = input + + def is_valid(self): + """ + Validates the state of a feature view locally. Raises an exception + if feature view is invalid. + """ + + if not self.name: + raise ValueError("Feature view needs a name") + + if not self.entities: + raise ValueError("Feature view has no entities") + + def to_proto(self) -> FeatureViewProto: + """ + Converts an feature view object to its protobuf representation + + Returns: + FeatureViewProto protobuf + """ + + meta = FeatureViewMetaProto( + created_timestamp=self.created_timestamp, + last_updated_timestamp=self.last_updated_timestamp, + ) + + spec = FeatureViewSpecProto( + name=self.name, + entities=self.entities, + features=[feature.to_proto() for feature in self.features], + tags=self.tags, + ttl=self.ttl, + online=self.online, + input=self.input.to_proto(), + ) + + return FeatureViewProto(spec=spec, meta=meta) + + @classmethod + def from_proto(cls, feature_view_proto: FeatureViewProto): + """ + Creates a feature view from a protobuf representation of a feature view + + Args: + feature_view_proto: A protobuf representation of a feature view + + Returns: + Returns a FeatureViewProto object based on the feature view protobuf + """ + + feature_view = cls( + name=feature_view_proto.spec.name, + entities=[entity for entity in feature_view_proto.spec.entities], + features=[ + Feature( + name=feature.name, + dtype=ValueType(feature.value_type), + labels=feature.labels, + ) + for feature in feature_view_proto.spec.features + ], + tags=dict(feature_view_proto.spec.tags), + online=feature_view_proto.spec.online, + ttl=( + None + if feature_view_proto.spec.ttl.seconds == 0 + and feature_view_proto.spec.ttl.nanos == 0 + else feature_view_proto.spec.ttl + ), + input=DataSource.from_proto(feature_view_proto.spec.input), + ) + + feature_view.created_timestamp = feature_view_proto.meta.created_timestamp + + return feature_view diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index 70a78ee814..ff0e40f644 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -1,10 +1,10 @@ from datetime import datetime -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Union import mmh3 from pytz import utc -from feast import FeatureTable +from feast import FeatureTable, FeatureView from feast.infra.provider import Provider from feast.repo_config import DatastoreOnlineStoreConfig from feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto @@ -66,8 +66,8 @@ def _initialize_client(self): def update_infra( self, project: str, - tables_to_delete: List[FeatureTable], - tables_to_keep: List[FeatureTable], + tables_to_delete: List[Union[FeatureTable, FeatureView]], + tables_to_keep: List[Union[FeatureTable, FeatureView]], ): from google.cloud import datastore @@ -88,7 +88,9 @@ def update_infra( key = client.key("Project", project, "Table", table.name) client.delete(key) - def teardown_infra(self, project: str, tables: List[FeatureTable]) -> None: + def teardown_infra( + self, project: str, tables: List[Union[FeatureTable, FeatureView]] + ) -> None: client = self._initialize_client() for table in tables: @@ -103,7 +105,7 @@ def teardown_infra(self, project: str, tables: List[FeatureTable]) -> None: def online_write_batch( self, project: str, - table: FeatureTable, + table: Union[FeatureTable, FeatureView], data: List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime]], created_ts: datetime, ) -> None: @@ -143,7 +145,10 @@ def online_write_batch( client.put(entity) def online_read( - self, project: str, table: FeatureTable, entity_key: EntityKeyProto + self, + project: str, + table: Union[FeatureTable, FeatureView], + entity_key: EntityKeyProto, ) -> Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]: client = self._initialize_client() diff --git a/sdk/python/feast/infra/local_sqlite.py b/sdk/python/feast/infra/local_sqlite.py index 6f4dbaeba8..e333472f71 100644 --- a/sdk/python/feast/infra/local_sqlite.py +++ b/sdk/python/feast/infra/local_sqlite.py @@ -1,9 +1,9 @@ import os import sqlite3 from datetime import datetime -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Union -from feast import FeatureTable +from feast import FeatureTable, FeatureView from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.provider import Provider from feast.repo_config import LocalOnlineStoreConfig @@ -11,7 +11,7 @@ from feast.types.Value_pb2 import Value as ValueProto -def _table_id(project: str, table: FeatureTable) -> str: +def _table_id(project: str, table: Union[FeatureTable, FeatureView]) -> str: return f"{project}_{table.name}" @@ -29,8 +29,8 @@ def _get_conn(self): def update_infra( self, project: str, - tables_to_delete: List[FeatureTable], - tables_to_keep: List[FeatureTable], + tables_to_delete: List[Union[FeatureTable, FeatureView]], + tables_to_keep: List[Union[FeatureTable, FeatureView]], ): conn = self._get_conn() for table in tables_to_keep: @@ -44,13 +44,15 @@ def update_infra( for table in tables_to_delete: conn.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}") - def teardown_infra(self, project: str, tables: List[FeatureTable]) -> None: + def teardown_infra( + self, project: str, tables: List[Union[FeatureTable, FeatureView]] + ) -> None: os.unlink(self._db_path) def online_write_batch( self, project: str, - table: FeatureTable, + table: Union[FeatureTable, FeatureView], data: List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime]], created_ts: datetime, ) -> None: @@ -96,7 +98,10 @@ def online_write_batch( ) def online_read( - self, project: str, table: FeatureTable, entity_key: EntityKeyProto + self, + project: str, + table: Union[FeatureTable, FeatureView], + entity_key: EntityKeyProto, ) -> Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]: entity_key_bin = serialize_entity_key(entity_key) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 4bb2cc383b..cd18178845 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -1,8 +1,8 @@ import abc from datetime import datetime -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Union -from feast import FeatureTable +from feast import FeatureTable, FeatureView from feast.repo_config import RepoConfig from feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.types.Value_pb2 import Value as ValueProto @@ -13,8 +13,8 @@ class Provider(abc.ABC): def update_infra( self, project: str, - tables_to_delete: List[FeatureTable], - tables_to_keep: List[FeatureTable], + tables_to_delete: List[Union[FeatureTable, FeatureView]], + tables_to_keep: List[Union[FeatureTable, FeatureView]], ): """ Reconcile cloud resources with the objects declared in the feature repo. @@ -28,7 +28,9 @@ def update_infra( ... @abc.abstractmethod - def teardown_infra(self, project: str, tables: List[FeatureTable]): + def teardown_infra( + self, project: str, tables: List[Union[FeatureTable, FeatureView]] + ): """ Tear down all cloud resources for a repo. @@ -41,7 +43,7 @@ def teardown_infra(self, project: str, tables: List[FeatureTable]): def online_write_batch( self, project: str, - table: FeatureTable, + table: Union[FeatureTable, FeatureView], data: List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime]], created_ts: datetime, ) -> None: @@ -63,7 +65,10 @@ def online_write_batch( @abc.abstractmethod def online_read( - self, project: str, table: FeatureTable, entity_key: EntityKeyProto + self, + project: str, + table: Union[FeatureTable, FeatureView], + entity_key: EntityKeyProto, ) -> Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]: """ Read feature values given an Entity Key. This is a low level interface, not diff --git a/sdk/python/feast/offline_store.py b/sdk/python/feast/offline_store.py index 54e9cbf08b..b66cf06696 100644 --- a/sdk/python/feast/offline_store.py +++ b/sdk/python/feast/offline_store.py @@ -38,7 +38,7 @@ class BigQueryOfflineStore(OfflineStore): def pull_latest_from_table( feature_view: FeatureView, start_date: datetime, end_date: datetime, ) -> pyarrow.Table: - if feature_view.inputs.table_ref is None: + if feature_view.input.table_ref is None: raise ValueError( "This function can only be called on a FeatureView with a table_ref" ) @@ -64,7 +64,7 @@ def pull_latest_from_table( FROM ( SELECT {field_string}, ROW_NUMBER() OVER({partition_by_entity_string} ORDER BY {timestamp_desc_string}) AS _feast_row - FROM `{feature_view.inputs.table_ref}` + FROM `{feature_view.input.table_ref}` WHERE {event_timestamp_column} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}') ) WHERE _feast_row = 1 @@ -95,13 +95,13 @@ def run_reverse_field_mapping( Tuple containing the list of reverse-mapped entity names, reverse-mapped feature names, reverse-mapped event timestamp column, and reverse-mapped created timestamp column that will be passed into the query to the offline store. """ # if we have mapped fields, use the original field names in the call to the offline store - event_timestamp_column = feature_view.inputs.event_timestamp_column - entity_names = [entity.name for entity in feature_view.entities] + event_timestamp_column = feature_view.input.event_timestamp_column + entity_names = [entity for entity in feature_view.entities] feature_names = [feature.name for feature in feature_view.features] - created_timestamp_column = feature_view.inputs.created_timestamp_column - if feature_view.inputs.field_mapping is not None: + created_timestamp_column = feature_view.input.created_timestamp_column + if feature_view.input.field_mapping is not None: reverse_field_mapping = { - v: k for k, v in feature_view.inputs.field_mapping.items() + v: k for k, v in feature_view.input.field_mapping.items() } event_timestamp_column = ( reverse_field_mapping[event_timestamp_column] @@ -134,11 +134,11 @@ def run_forward_field_mapping( table: pyarrow.Table, feature_view: FeatureView ) -> pyarrow.Table: # run field mapping in the forward direction - if table is not None and feature_view.inputs.field_mapping is not None: + if table is not None and feature_view.input.field_mapping is not None: cols = table.column_names mapped_cols = [ - feature_view.inputs.field_mapping[col] - if col in feature_view.inputs.field_mapping.keys() + feature_view.input.field_mapping[col] + if col in feature_view.input.field_mapping.keys() else col for col in cols ] diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 2ba1ab6f3d..4e921ed9f1 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -25,6 +25,7 @@ from feast.core.Registry_pb2 import Registry as RegistryProto from feast.entity import Entity from feast.feature_table import FeatureTable +from feast.feature_view import FeatureView REGISTRY_SCHEMA_VERSION = "1" @@ -117,7 +118,7 @@ def get_entity(self, name: str, project: str) -> Entity: def apply_feature_table(self, feature_table: FeatureTable, project: str): """ - Registers a single feature table with Feast + Registers a single feature table or feature view with Feast Args: feature_table: Feature table that will be registered @@ -145,6 +146,35 @@ def updater(registry_proto: RegistryProto): self._registry_store.update_registry(updater) return + def apply_feature_view(self, feature_view: FeatureView, project: str): + """ + Registers a single feature view or feature view with Feast + + Args: + feature_view: Feature view that will be registered + project: Feast project that this feature view belongs to + """ + feature_view.is_valid() + feature_view_proto = feature_view.to_proto() + feature_view_proto.project = project + + def updater(registry_proto: RegistryProto): + for idx, existing_feature_view_proto in enumerate( + registry_proto.feature_views + ): + if ( + existing_feature_view_proto.spec.name + == feature_view_proto.spec.name + and existing_feature_view_proto.project == project + ): + del registry_proto.feature_views[idx] + registry_proto.feature_views.append(feature_view_proto) + return registry_proto + registry_proto.feature_views.append(feature_view_proto) + return registry_proto + + self._registry_store.update_registry(updater) + def list_feature_tables(self, project: str) -> List[FeatureTable]: """ Retrieve a list of feature tables from the registry @@ -162,6 +192,23 @@ def list_feature_tables(self, project: str) -> List[FeatureTable]: feature_tables.append(FeatureTable.from_proto(feature_table_proto)) return feature_tables + def list_feature_views(self, project: str) -> List[FeatureView]: + """ + Retrieve a list of feature views from the registry + + Args: + project: Filter feature tables based on project name + + Returns: + List of feature tables + """ + registry_proto = self._registry_store.get_registry() + feature_views = [] + for feature_view_proto in registry_proto.feature_views: + if feature_view_proto.project == project: + feature_views.append(FeatureView.from_proto(feature_view_proto)) + return feature_views + def get_feature_table(self, name: str, project: str) -> FeatureTable: """ Retrieves a feature table. @@ -183,6 +230,27 @@ def get_feature_table(self, name: str, project: str) -> FeatureTable: return FeatureTable.from_proto(feature_table_proto) raise Exception(f"Feature table {name} does not exist in project {project}") + def get_feature_view(self, name: str, project: str) -> FeatureView: + """ + Retrieves a feature view. + + Args: + name: Name of feature view + project: Feast project that this feature view belongs to + + Returns: + Returns either the specified feature view, or raises an exception if + none is found + """ + registry_proto = self._registry_store.get_registry() + for feature_view_proto in registry_proto.feature_views: + if ( + feature_view_proto.spec.name == name + and feature_view_proto.project == project + ): + return FeatureView.from_proto(feature_view_proto) + raise Exception(f"Feature view {name} does not exist in project {project}") + def delete_feature_table(self, name: str, project: str): """ Deletes a feature table or raises an exception if not found. @@ -207,6 +275,29 @@ def updater(registry_proto: RegistryProto): self._registry_store.update_registry(updater) return + def delete_feature_view(self, name: str, project: str): + """ + Deletes a feature view or raises an exception if not found. + + Args: + name: Name of feature view + project: Feast project that this feature view belongs to + """ + + def updater(registry_proto: RegistryProto): + for idx, existing_feature_view_proto in enumerate( + registry_proto.feature_views + ): + if ( + existing_feature_view_proto.spec.name == name + and existing_feature_view_proto.project == project + ): + del registry_proto.feature_views[idx] + return registry_proto + raise Exception(f"Feature view {name} does not exist in project {project}") + + self._registry_store.update_registry(updater) + class RegistryStore(ABC): """ diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 706458e6c6..bd584d74c9 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -2,9 +2,10 @@ import os import sys from pathlib import Path -from typing import List, NamedTuple +from typing import List, NamedTuple, Union from feast import Entity, FeatureTable +from feast.feature_view import FeatureView from feast.infra.provider import get_provider from feast.registry import Registry from feast.repo_config import RepoConfig @@ -20,12 +21,13 @@ def py_path_to_module(path: Path, repo_root: Path) -> str: class ParsedRepo(NamedTuple): feature_tables: List[FeatureTable] + feature_views: List[FeatureView] entities: List[Entity] def parse_repo(repo_root: Path) -> ParsedRepo: """ Collect feature table definitions from feature repo """ - res = ParsedRepo(feature_tables=[], entities=[]) + res = ParsedRepo(feature_tables=[], entities=[], feature_views=[]) # FIXME: process subdirs but exclude hidden ones repo_files = [p.resolve() for p in repo_root.glob("*.py")] @@ -41,6 +43,8 @@ def parse_repo(repo_root: Path) -> ParsedRepo: obj = getattr(module, attr_name) if isinstance(obj, FeatureTable): res.feature_tables.append(obj) + if isinstance(obj, FeatureView): + res.feature_views.append(obj) elif isinstance(obj, Entity): res.entities.append(obj) return res @@ -58,21 +62,48 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): registry.apply_entity(entity, project=project) repo_table_names = set(t.name for t in repo.feature_tables) + + for t in repo.feature_views: + repo_table_names.add(t.name) + tables_to_delete = [] for registry_table in registry.list_feature_tables(project=project): if registry_table.name not in repo_table_names: tables_to_delete.append(registry_table) + views_to_delete = [] + for registry_view in registry.list_feature_views(project=project): + if registry_view.name not in repo_table_names: + views_to_delete.append(registry_view) + # Delete tables that should not exist for registry_table in tables_to_delete: registry.delete_feature_table(registry_table.name, project=project) + # Create tables that should for table in repo.feature_tables: registry.apply_feature_table(table, project) + # Delete views that should not exist + for registry_view in views_to_delete: + registry.delete_feature_view(registry_view.name, project=project) + + # Create views that should + for view in repo.feature_views: + registry.apply_feature_view(view, project) + infra_provider = get_provider(repo_config) + + all_to_delete: List[Union[FeatureTable, FeatureView]] = [] + all_to_delete.extend(tables_to_delete) + all_to_delete.extend(views_to_delete) + + all_to_keep: List[Union[FeatureTable, FeatureView]] = [] + all_to_keep.extend(repo.feature_tables) + all_to_keep.extend(repo.feature_views) + infra_provider.update_infra( - project, tables_to_delete=tables_to_delete, tables_to_keep=repo.feature_tables + project, tables_to_delete=all_to_delete, tables_to_keep=all_to_keep ) print("Done!") @@ -81,7 +112,9 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): def teardown(repo_config: RepoConfig, repo_path: Path): registry = Registry(repo_config.metadata_store) project = repo_config.project - registry_tables = registry.list_feature_tables(project=project) + registry_tables: List[Union[FeatureTable, FeatureView]] = [] + registry_tables.extend(registry.list_feature_tables(project=project)) + registry_tables.extend(registry.list_feature_views(project=project)) infra_provider = get_provider(repo_config) infra_provider.teardown_infra(project, tables=registry_tables) diff --git a/sdk/python/tests/cli/example_feature_repo_1.py b/sdk/python/tests/cli/example_feature_repo_1.py index 4a32700f5e..82bffc3d25 100644 --- a/sdk/python/tests/cli/example_feature_repo_1.py +++ b/sdk/python/tests/cli/example_feature_repo_1.py @@ -1,6 +1,6 @@ from google.protobuf.duration_pb2 import Duration -from feast import BigQuerySource, Entity, Feature, FeatureTable, ValueType +from feast import BigQuerySource, Entity, Feature, FeatureView, ValueType driver_locations_source = BigQuerySource( table_ref="rh_prod.ride_hailing_co.drivers", @@ -16,13 +16,15 @@ ) -driver_locations = FeatureTable( +driver_locations = FeatureView( name="driver_locations", entities=["driver"], - max_age=Duration(seconds=86400 * 1), + ttl=Duration(seconds=86400 * 1), features=[ Feature(name="lat", dtype=ValueType.FLOAT), Feature(name="lon", dtype=ValueType.STRING), ], - batch_source=driver_locations_source, + online=True, + input=driver_locations_source, + tags={}, ) diff --git a/sdk/python/tests/cli/online_read_write_test.py b/sdk/python/tests/cli/online_read_write_test.py index a78dc331e1..b9f8bf74f1 100644 --- a/sdk/python/tests/cli/online_read_write_test.py +++ b/sdk/python/tests/cli/online_read_write_test.py @@ -13,7 +13,7 @@ def basic_rw_test(repo_path: Path, project_name: str) -> None: """ store = FeatureStore(repo_path=repo_path, config=None) registry = store._get_registry() - table = registry.get_feature_table(project=project_name, name="driver_locations") + table = registry.get_feature_view(project=project_name, name="driver_locations") provider = store._get_provider() From 1e18a915c7eab85ffdeba2131851987d76e5821b Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Fri, 12 Mar 2021 10:28:48 -0800 Subject: [PATCH 2/3] fix comment Signed-off-by: Oleg Avdeev --- sdk/python/feast/feature_view.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 8d938cd011..9b05ceb612 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -86,7 +86,7 @@ def is_valid(self): def to_proto(self) -> FeatureViewProto: """ - Converts an feature view object to its protobuf representation + Converts an feature view object to its protobuf representation. Returns: FeatureViewProto protobuf From dda5555c863ea97abb170c14dd23c7d6900dcd02 Mon Sep 17 00:00:00 2001 From: Oleg Avdeev Date: Fri, 12 Mar 2021 18:33:21 -0800 Subject: [PATCH 3/3] fix comment Signed-off-by: Oleg Avdeev --- sdk/python/feast/registry.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 4e921ed9f1..5e4b935407 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -118,7 +118,7 @@ def get_entity(self, name: str, project: str) -> Entity: def apply_feature_table(self, feature_table: FeatureTable, project: str): """ - Registers a single feature table or feature view with Feast + Registers a single feature table with Feast Args: feature_table: Feature table that will be registered @@ -148,7 +148,7 @@ def updater(registry_proto: RegistryProto): def apply_feature_view(self, feature_view: FeatureView, project: str): """ - Registers a single feature view or feature view with Feast + Registers a single feature view with Feast Args: feature_view: Feature view that will be registered