From 2a95629e8c4a760b282da3ccce0897d6b9c528a0 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Tue, 11 Jan 2022 13:50:51 -0800 Subject: [PATCH] Modify feature_store.plan to produce an InfraDiff (#2211) * Implement InfraObject.from_proto for easier conversion Signed-off-by: Felix Wang * Implement InfraDiff.update Signed-off-by: Felix Wang * Modify feature_store.plan to produce an InfraDiff Signed-off-by: Felix Wang * Stricter typing for FcoDiff and InfraObjectDiff Signed-off-by: Felix Wang * Small fixes Signed-off-by: Felix Wang * Fix typevar names Signed-off-by: Felix Wang * Add comment Signed-off-by: Felix Wang * Fix protos Signed-off-by: Felix Wang --- sdk/python/feast/diff/FcoDiff.py | 41 +++++++++++------ sdk/python/feast/diff/infra_diff.py | 46 ++++++++++++++----- sdk/python/feast/errors.py | 5 ++ sdk/python/feast/feature_store.py | 25 ++++++++-- sdk/python/feast/infra/infra_object.py | 39 ++++++++++++++-- sdk/python/feast/infra/local.py | 19 +++++--- .../feast/infra/online_stores/datastore.py | 15 ++++++ .../feast/infra/online_stores/dynamodb.py | 6 +++ .../feast/infra/online_stores/online_store.py | 14 ++++++ .../feast/infra/online_stores/sqlite.py | 20 ++++++++ sdk/python/feast/infra/provider.py | 14 ++++++ sdk/python/feast/repo_operations.py | 8 ++-- 12 files changed, 208 insertions(+), 44 deletions(-) diff --git a/sdk/python/feast/diff/FcoDiff.py b/sdk/python/feast/diff/FcoDiff.py index e4b044dcc4..b85897019f 100644 --- a/sdk/python/feast/diff/FcoDiff.py +++ b/sdk/python/feast/diff/FcoDiff.py @@ -1,20 +1,38 @@ from dataclasses import dataclass -from typing import Any, Iterable, List, Set, Tuple, TypeVar +from typing import Generic, Iterable, List, Set, Tuple, TypeVar from feast.base_feature_view import BaseFeatureView from feast.diff.property_diff import PropertyDiff, TransitionType from feast.entity import Entity from feast.feature_service import FeatureService from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto +from feast.protos.feast.core.FeatureService_pb2 import ( + FeatureService as FeatureServiceProto, +) from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto +from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( + OnDemandFeatureView as OnDemandFeatureViewProto, +) +from feast.protos.feast.core.RequestFeatureView_pb2 import ( + RequestFeatureView as RequestFeatureViewProto, +) + +FcoProto = TypeVar( + "FcoProto", + EntityProto, + FeatureViewProto, + FeatureServiceProto, + OnDemandFeatureViewProto, + RequestFeatureViewProto, +) @dataclass -class FcoDiff: +class FcoDiff(Generic[FcoProto]): name: str fco_type: str - current_fco: Any - new_fco: Any + current_fco: FcoProto + new_fco: FcoProto fco_property_diffs: List[PropertyDiff] transition_type: TransitionType @@ -30,12 +48,12 @@ def add_fco_diff(self, fco_diff: FcoDiff): self.fco_diffs.append(fco_diff) -T = TypeVar("T", Entity, BaseFeatureView, FeatureService) +Fco = TypeVar("Fco", Entity, BaseFeatureView, FeatureService) def tag_objects_for_keep_delete_add( - existing_objs: Iterable[T], desired_objs: Iterable[T] -) -> Tuple[Set[T], Set[T], Set[T]]: + existing_objs: Iterable[Fco], desired_objs: Iterable[Fco] +) -> Tuple[Set[Fco], Set[Fco], Set[Fco]]: existing_obj_names = {e.name for e in existing_objs} desired_obj_names = {e.name for e in desired_objs} @@ -46,12 +64,9 @@ def tag_objects_for_keep_delete_add( return objs_to_keep, objs_to_delete, objs_to_add -U = TypeVar("U", EntityProto, FeatureViewProto) - - def tag_proto_objects_for_keep_delete_add( - existing_objs: Iterable[U], desired_objs: Iterable[U] -) -> Tuple[Iterable[U], Iterable[U], Iterable[U]]: + existing_objs: Iterable[FcoProto], desired_objs: Iterable[FcoProto] +) -> Tuple[Iterable[FcoProto], Iterable[FcoProto], Iterable[FcoProto]]: existing_obj_names = {e.spec.name for e in existing_objs} desired_obj_names = {e.spec.name for e in desired_objs} @@ -65,7 +80,7 @@ def tag_proto_objects_for_keep_delete_add( FIELDS_TO_IGNORE = {"project"} -def diff_between(current: U, new: U, object_type: str) -> FcoDiff: +def diff_between(current: FcoProto, new: FcoProto, object_type: str) -> FcoDiff: assert current.DESCRIPTOR.full_name == new.DESCRIPTOR.full_name property_diffs = [] transition: TransitionType = TransitionType.UNCHANGED diff --git a/sdk/python/feast/diff/infra_diff.py b/sdk/python/feast/diff/infra_diff.py index d716422261..fc79a74f67 100644 --- a/sdk/python/feast/diff/infra_diff.py +++ b/sdk/python/feast/diff/infra_diff.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Any, Iterable, List, Tuple, TypeVar +from typing import Generic, Iterable, List, Tuple, TypeVar from feast.diff.property_diff import PropertyDiff, TransitionType from feast.infra.infra_object import ( @@ -17,13 +17,17 @@ from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto from feast.protos.feast.core.SqliteTable_pb2 import SqliteTable as SqliteTableProto +InfraObjectProto = TypeVar( + "InfraObjectProto", DatastoreTableProto, DynamoDBTableProto, SqliteTableProto +) + @dataclass -class InfraObjectDiff: +class InfraObjectDiff(Generic[InfraObjectProto]): name: str infra_object_type: str - current_infra_object: Any - new_infra_object: Any + current_infra_object: InfraObjectProto + new_infra_object: InfraObjectProto infra_object_property_diffs: List[PropertyDiff] transition_type: TransitionType @@ -36,18 +40,34 @@ def __init__(self): self.infra_object_diffs = [] def update(self): - pass + """Apply the infrastructure changes specified in this object.""" + for infra_object_diff in self.infra_object_diffs: + if infra_object_diff.transition_type in [ + TransitionType.DELETE, + TransitionType.UPDATE, + ]: + infra_object = InfraObject.from_proto( + infra_object_diff.current_infra_object + ) + infra_object.teardown() + elif infra_object_diff.transition_type in [ + TransitionType.CREATE, + TransitionType.UPDATE, + ]: + infra_object = InfraObject.from_proto( + infra_object_diff.new_infra_object + ) + infra_object.update() def to_string(self): pass -U = TypeVar("U", DatastoreTableProto, DynamoDBTableProto, SqliteTableProto) - - def tag_infra_proto_objects_for_keep_delete_add( - existing_objs: Iterable[U], desired_objs: Iterable[U] -) -> Tuple[Iterable[U], Iterable[U], Iterable[U]]: + existing_objs: Iterable[InfraObjectProto], desired_objs: Iterable[InfraObjectProto] +) -> Tuple[ + Iterable[InfraObjectProto], Iterable[InfraObjectProto], Iterable[InfraObjectProto] +]: existing_obj_names = {e.name for e in existing_objs} desired_obj_names = {e.name for e in desired_objs} @@ -123,7 +143,7 @@ def diff_infra_protos( def get_infra_object_protos_by_type( infra_proto: InfraProto, infra_object_class_type: str -) -> List[U]: +) -> List[InfraObjectProto]: return [ InfraObject.from_infra_object_proto(infra_object).to_proto() for infra_object in infra_proto.infra_objects @@ -134,7 +154,9 @@ def get_infra_object_protos_by_type( FIELDS_TO_IGNORE = {"project"} -def diff_between(current: U, new: U, infra_object_type: str) -> InfraObjectDiff: +def diff_between( + current: InfraObjectProto, new: InfraObjectProto, infra_object_type: str +) -> InfraObjectDiff: assert current.DESCRIPTOR.full_name == new.DESCRIPTOR.full_name property_diffs = [] transition: TransitionType = TransitionType.UNCHANGED diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 615069e579..8592960acd 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -293,3 +293,8 @@ def __init__(self, actual_class: str, expected_class: str): super().__init__( f"The registry store class was expected to be {expected_class}, but was instead {actual_class}." ) + + +class FeastInvalidInfraObjectType(Exception): + def __init__(self): + super().__init__("Could not identify the type of the InfraObject.") diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index f1fee70336..64bf23ebde 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -38,6 +38,7 @@ from feast import feature_server, flags, flags_helper, utils from feast.base_feature_view import BaseFeatureView from feast.diff.FcoDiff import RegistryDiff +from feast.diff.infra_diff import InfraDiff, diff_infra_protos from feast.entity import Entity from feast.errors import ( EntityNotFoundException, @@ -63,6 +64,7 @@ from feast.infra.provider import Provider, RetrievalJob, get_provider from feast.on_demand_feature_view import OnDemandFeatureView from feast.online_response import OnlineResponse +from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.serving.ServingService_pb2 import ( FieldStatus, @@ -405,7 +407,9 @@ def _get_features( return _feature_refs @log_exceptions_and_usage - def plan(self, desired_repo_objects: RepoContents) -> RegistryDiff: + def plan( + self, desired_repo_objects: RepoContents + ) -> Tuple[RegistryDiff, InfraDiff]: """Dry-run registering objects to metadata store. The plan method dry-runs registering one or more definitions (e.g., Entity, FeatureView), and produces @@ -440,7 +444,7 @@ def plan(self, desired_repo_objects: RepoContents) -> RegistryDiff: ... ttl=timedelta(seconds=86400 * 1), ... batch_source=driver_hourly_stats, ... ) - >>> diff = fs.plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view + >>> registry_diff, infra_diff = fs.plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view """ current_registry_proto = ( @@ -450,8 +454,21 @@ def plan(self, desired_repo_objects: RepoContents) -> RegistryDiff: ) desired_registry_proto = desired_repo_objects.to_registry_proto() - diffs = Registry.diff_between(current_registry_proto, desired_registry_proto) - return diffs + registry_diff = Registry.diff_between( + current_registry_proto, desired_registry_proto + ) + + current_infra_proto = ( + self._registry.cached_registry_proto.infra.__deepcopy__() + if self._registry.cached_registry_proto + else InfraProto() + ) + new_infra_proto = self._provider.plan_infra( + self.config, desired_registry_proto + ).to_proto() + infra_diff = diff_infra_protos(current_infra_proto, new_infra_proto) + + return (registry_diff, infra_diff) @log_exceptions_and_usage def apply( diff --git a/sdk/python/feast/infra/infra_object.py b/sdk/python/feast/infra/infra_object.py index 282b4bcfab..f21016dea5 100644 --- a/sdk/python/feast/infra/infra_object.py +++ b/sdk/python/feast/infra/infra_object.py @@ -15,13 +15,21 @@ from dataclasses import dataclass, field from typing import Any, List +from feast.errors import FeastInvalidInfraObjectType from feast.importer import import_class +from feast.protos.feast.core.DatastoreTable_pb2 import ( + DatastoreTable as DatastoreTableProto, +) +from feast.protos.feast.core.DynamoDBTable_pb2 import ( + DynamoDBTable as DynamoDBTableProto, +) from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto +from feast.protos.feast.core.SqliteTable_pb2 import SqliteTable as SqliteTableProto DATASTORE_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_stores.datastore.DatastoreTable" DYNAMODB_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_stores.dynamodb.DynamoDBTable" -SQLITE_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_store.sqlite.SqliteTable" +SQLITE_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_stores.sqlite.SqliteTable" class InfraObject(ABC): @@ -49,7 +57,7 @@ def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any: infra_object_proto: A protobuf representation of an InfraObject. Raises: - ValueError: The type of InfraObject could not be identified. + FeastInvalidInfraObjectType: The type of InfraObject could not be identified. """ if infra_object_proto.infra_object_class_type: cls = _get_infra_object_class_from_type( @@ -57,7 +65,30 @@ def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any: ) return cls.from_infra_object_proto(infra_object_proto) - raise ValueError("Could not identify the type of the InfraObject.") + raise FeastInvalidInfraObjectType() + + @staticmethod + def from_proto(infra_object_proto: Any) -> Any: + """ + Converts a protobuf representation of a subclass to an object of that subclass. + + Args: + infra_object_proto: A protobuf representation of an InfraObject. + + Raises: + FeastInvalidInfraObjectType: The type of InfraObject could not be identified. + """ + if isinstance(infra_object_proto, DatastoreTableProto): + infra_object_class_type = DATASTORE_INFRA_OBJECT_CLASS_TYPE + elif isinstance(infra_object_proto, DynamoDBTableProto): + infra_object_class_type = DYNAMODB_INFRA_OBJECT_CLASS_TYPE + elif isinstance(infra_object_proto, SqliteTableProto): + infra_object_class_type = SQLITE_INFRA_OBJECT_CLASS_TYPE + else: + raise FeastInvalidInfraObjectType() + + cls = _get_infra_object_class_from_type(infra_object_class_type) + return cls.from_proto(infra_object_proto) @abstractmethod def update(self): @@ -94,7 +125,7 @@ def to_proto(self) -> InfraProto: """ infra_proto = InfraProto() for infra_object in self.infra_objects: - infra_object_proto = infra_object.to_proto() + infra_object_proto = infra_object.to_infra_object_proto() infra_proto.infra_objects.append(infra_object_proto) return infra_proto diff --git a/sdk/python/feast/infra/local.py b/sdk/python/feast/infra/local.py index 31c46cf282..060ac64d53 100644 --- a/sdk/python/feast/infra/local.py +++ b/sdk/python/feast/infra/local.py @@ -1,12 +1,13 @@ import uuid from datetime import datetime from pathlib import Path +from typing import List -from feast.feature_view import FeatureView +from feast.infra.infra_object import Infra, InfraObject from feast.infra.passthrough_provider import PassthroughProvider from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.registry_store import RegistryStore -from feast.repo_config import RegistryConfig +from feast.repo_config import RegistryConfig, RepoConfig from feast.usage import log_exceptions_and_usage @@ -15,11 +16,15 @@ class LocalProvider(PassthroughProvider): This class only exists for backwards compatibility. """ - pass - - -def _table_id(project: str, table: FeatureView) -> str: - return f"{project}_{table.name}" + def plan_infra( + self, config: RepoConfig, desired_registry_proto: RegistryProto + ) -> Infra: + infra_objects: List[InfraObject] = self.online_store.plan( + config, desired_registry_proto + ) + infra = Infra() + infra.infra_objects += infra_objects + return infra class LocalRegistryStore(RegistryStore): diff --git a/sdk/python/feast/infra/online_stores/datastore.py b/sdk/python/feast/infra/online_stores/datastore.py index 348583a202..5a8d4b7180 100644 --- a/sdk/python/feast/infra/online_stores/datastore.py +++ b/sdk/python/feast/infra/online_stores/datastore.py @@ -376,6 +376,7 @@ def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any: name=infra_object_proto.datastore_table.name, ) + # Distinguish between null and empty string, since project_id and namespace are StringValues. if infra_object_proto.datastore_table.HasField("project_id"): datastore_table.project_id = ( infra_object_proto.datastore_table.project_id.value @@ -387,6 +388,20 @@ def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any: return datastore_table + @staticmethod + def from_proto(datastore_table_proto: DatastoreTableProto) -> Any: + datastore_table = DatastoreTable( + project=datastore_table_proto.project, name=datastore_table_proto.name, + ) + + # Distinguish between null and empty string, since project_id and namespace are StringValues. + if datastore_table_proto.HasField("project_id"): + datastore_table.project_id = datastore_table_proto.project_id.value + if datastore_table_proto.HasField("namespace"): + datastore_table.namespace = datastore_table_proto.namespace.value + + return datastore_table + def update(self): client = _initialize_client(self.project_id, self.namespace) key = client.key("Project", self.project, "Table", self.name) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 202cfa54bb..b7f8680e1f 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -254,6 +254,12 @@ def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any: region=infra_object_proto.dynamodb_table.region, ) + @staticmethod + def from_proto(dynamodb_table_proto: DynamoDBTableProto) -> Any: + return DynamoDBTable( + name=dynamodb_table_proto.name, region=dynamodb_table_proto.region, + ) + def update(self): dynamodb_client = _initialize_dynamodb_client(region=self.region) dynamodb_resource = _initialize_dynamodb_resource(region=self.region) diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index b2aa1e46d0..1f177996de 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -18,6 +18,8 @@ from feast import Entity from feast.feature_view import FeatureView +from feast.infra.infra_object import InfraObject +from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import RepoConfig @@ -92,6 +94,18 @@ def update( ): ... + def plan( + self, config: RepoConfig, desired_registry_proto: RegistryProto + ) -> List[InfraObject]: + """ + Returns the set of InfraObjects required to support the desired registry. + + Args: + config: The RepoConfig for the current FeatureStore. + desired_registry_proto: The desired registry, in proto form. + """ + return [] + @abstractmethod def teardown( self, diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 2dcbf319c3..1e7ecf1024 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -27,6 +27,7 @@ from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.online_store import OnlineStore from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto +from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.core.SqliteTable_pb2 import SqliteTable as SqliteTableProto from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto @@ -199,6 +200,21 @@ def update( for table in tables_to_delete: conn.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}") + @log_exceptions_and_usage(online_store="sqlite") + def plan( + self, config: RepoConfig, desired_registry_proto: RegistryProto + ) -> List[InfraObject]: + project = config.project + + infra_objects: List[InfraObject] = [ + SqliteTable( + path=self._get_db_path(config), + name=_table_id(project, FeatureView.from_proto(view)), + ) + for view in desired_registry_proto.feature_views + ] + return infra_objects + def teardown( self, config: RepoConfig, @@ -261,6 +277,10 @@ def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any: name=infra_object_proto.sqlite_table.name, ) + @staticmethod + def from_proto(sqlite_table_proto: SqliteTableProto) -> Any: + return SqliteTable(path=sqlite_table_proto.path, name=sqlite_table_proto.name,) + def update(self): self.conn.execute( f"CREATE TABLE IF NOT EXISTS {self.name} (entity_key BLOB, feature_name TEXT, value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))" diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 3c761f1195..8f9dda9351 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -12,8 +12,10 @@ from feast.entity import Entity from feast.feature_view import DUMMY_ENTITY_ID, FeatureView from feast.importer import import_class +from feast.infra.infra_object import Infra from feast.infra.offline_stores.offline_store import RetrievalJob from feast.on_demand_feature_view import OnDemandFeatureView +from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.registry import Registry @@ -61,6 +63,18 @@ def update_infra( """ ... + def plan_infra( + self, config: RepoConfig, desired_registry_proto: RegistryProto + ) -> Infra: + """ + Returns the Infra required to support the desired registry. + + Args: + config: The RepoConfig for the current FeatureStore. + desired_registry_proto: The desired registry, in proto form. + """ + return Infra() + @abc.abstractmethod def teardown_infra( self, project: str, tables: Sequence[FeatureView], entities: Sequence[Entity], diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 9299a36123..3e9ddb6e30 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -127,20 +127,20 @@ def plan(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool) for data_source in data_sources: data_source.validate(store.config) - diff = store.plan(repo) + registry_diff, _ = store.plan(repo) views_to_delete = [ v - for v in diff.fco_diffs + for v in registry_diff.fco_diffs if v.fco_type == "feature view" and v.transition_type == TransitionType.DELETE ] views_to_keep = [ v - for v in diff.fco_diffs + for v in registry_diff.fco_diffs if v.fco_type == "feature view" and v.transition_type in {TransitionType.CREATE, TransitionType.UNCHANGED} ] - log_cli_output(diff, views_to_delete, views_to_keep) + log_cli_output(registry_diff, views_to_delete, views_to_keep) def _prepare_registry_and_repo(repo_config, repo_path):