Skip to content

Commit

Permalink
Modify feature_store.plan to produce an InfraDiff (#2211)
Browse files Browse the repository at this point in the history
* Implement InfraObject.from_proto for easier conversion

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Implement InfraDiff.update

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Modify feature_store.plan to produce an InfraDiff

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Stricter typing for FcoDiff and InfraObjectDiff

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Small fixes

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Fix typevar names

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Add comment

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Fix protos

Signed-off-by: Felix Wang <wangfelix98@gmail.com>
  • Loading branch information
felixwang9817 authored Jan 11, 2022
1 parent d5cb044 commit 2a95629
Show file tree
Hide file tree
Showing 12 changed files with 208 additions and 44 deletions.
41 changes: 28 additions & 13 deletions sdk/python/feast/diff/FcoDiff.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,38 @@
from dataclasses import dataclass
from typing import Any, Iterable, List, Set, Tuple, TypeVar
from typing import Generic, Iterable, List, Set, Tuple, TypeVar

from feast.base_feature_view import BaseFeatureView
from feast.diff.property_diff import PropertyDiff, TransitionType
from feast.entity import Entity
from feast.feature_service import FeatureService
from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto
from feast.protos.feast.core.FeatureService_pb2 import (
FeatureService as FeatureServiceProto,
)
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureView as OnDemandFeatureViewProto,
)
from feast.protos.feast.core.RequestFeatureView_pb2 import (
RequestFeatureView as RequestFeatureViewProto,
)

FcoProto = TypeVar(
"FcoProto",
EntityProto,
FeatureViewProto,
FeatureServiceProto,
OnDemandFeatureViewProto,
RequestFeatureViewProto,
)


@dataclass
class FcoDiff:
class FcoDiff(Generic[FcoProto]):
name: str
fco_type: str
current_fco: Any
new_fco: Any
current_fco: FcoProto
new_fco: FcoProto
fco_property_diffs: List[PropertyDiff]
transition_type: TransitionType

Expand All @@ -30,12 +48,12 @@ def add_fco_diff(self, fco_diff: FcoDiff):
self.fco_diffs.append(fco_diff)


T = TypeVar("T", Entity, BaseFeatureView, FeatureService)
Fco = TypeVar("Fco", Entity, BaseFeatureView, FeatureService)


def tag_objects_for_keep_delete_add(
existing_objs: Iterable[T], desired_objs: Iterable[T]
) -> Tuple[Set[T], Set[T], Set[T]]:
existing_objs: Iterable[Fco], desired_objs: Iterable[Fco]
) -> Tuple[Set[Fco], Set[Fco], Set[Fco]]:
existing_obj_names = {e.name for e in existing_objs}
desired_obj_names = {e.name for e in desired_objs}

Expand All @@ -46,12 +64,9 @@ def tag_objects_for_keep_delete_add(
return objs_to_keep, objs_to_delete, objs_to_add


U = TypeVar("U", EntityProto, FeatureViewProto)


def tag_proto_objects_for_keep_delete_add(
existing_objs: Iterable[U], desired_objs: Iterable[U]
) -> Tuple[Iterable[U], Iterable[U], Iterable[U]]:
existing_objs: Iterable[FcoProto], desired_objs: Iterable[FcoProto]
) -> Tuple[Iterable[FcoProto], Iterable[FcoProto], Iterable[FcoProto]]:
existing_obj_names = {e.spec.name for e in existing_objs}
desired_obj_names = {e.spec.name for e in desired_objs}

Expand All @@ -65,7 +80,7 @@ def tag_proto_objects_for_keep_delete_add(
FIELDS_TO_IGNORE = {"project"}


def diff_between(current: U, new: U, object_type: str) -> FcoDiff:
def diff_between(current: FcoProto, new: FcoProto, object_type: str) -> FcoDiff:
assert current.DESCRIPTOR.full_name == new.DESCRIPTOR.full_name
property_diffs = []
transition: TransitionType = TransitionType.UNCHANGED
Expand Down
46 changes: 34 additions & 12 deletions sdk/python/feast/diff/infra_diff.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Any, Iterable, List, Tuple, TypeVar
from typing import Generic, Iterable, List, Tuple, TypeVar

from feast.diff.property_diff import PropertyDiff, TransitionType
from feast.infra.infra_object import (
Expand All @@ -17,13 +17,17 @@
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
from feast.protos.feast.core.SqliteTable_pb2 import SqliteTable as SqliteTableProto

InfraObjectProto = TypeVar(
"InfraObjectProto", DatastoreTableProto, DynamoDBTableProto, SqliteTableProto
)


@dataclass
class InfraObjectDiff:
class InfraObjectDiff(Generic[InfraObjectProto]):
name: str
infra_object_type: str
current_infra_object: Any
new_infra_object: Any
current_infra_object: InfraObjectProto
new_infra_object: InfraObjectProto
infra_object_property_diffs: List[PropertyDiff]
transition_type: TransitionType

Expand All @@ -36,18 +40,34 @@ def __init__(self):
self.infra_object_diffs = []

def update(self):
pass
"""Apply the infrastructure changes specified in this object."""
for infra_object_diff in self.infra_object_diffs:
if infra_object_diff.transition_type in [
TransitionType.DELETE,
TransitionType.UPDATE,
]:
infra_object = InfraObject.from_proto(
infra_object_diff.current_infra_object
)
infra_object.teardown()
elif infra_object_diff.transition_type in [
TransitionType.CREATE,
TransitionType.UPDATE,
]:
infra_object = InfraObject.from_proto(
infra_object_diff.new_infra_object
)
infra_object.update()

def to_string(self):
pass


U = TypeVar("U", DatastoreTableProto, DynamoDBTableProto, SqliteTableProto)


def tag_infra_proto_objects_for_keep_delete_add(
existing_objs: Iterable[U], desired_objs: Iterable[U]
) -> Tuple[Iterable[U], Iterable[U], Iterable[U]]:
existing_objs: Iterable[InfraObjectProto], desired_objs: Iterable[InfraObjectProto]
) -> Tuple[
Iterable[InfraObjectProto], Iterable[InfraObjectProto], Iterable[InfraObjectProto]
]:
existing_obj_names = {e.name for e in existing_objs}
desired_obj_names = {e.name for e in desired_objs}

Expand Down Expand Up @@ -123,7 +143,7 @@ def diff_infra_protos(

def get_infra_object_protos_by_type(
infra_proto: InfraProto, infra_object_class_type: str
) -> List[U]:
) -> List[InfraObjectProto]:
return [
InfraObject.from_infra_object_proto(infra_object).to_proto()
for infra_object in infra_proto.infra_objects
Expand All @@ -134,7 +154,9 @@ def get_infra_object_protos_by_type(
FIELDS_TO_IGNORE = {"project"}


def diff_between(current: U, new: U, infra_object_type: str) -> InfraObjectDiff:
def diff_between(
current: InfraObjectProto, new: InfraObjectProto, infra_object_type: str
) -> InfraObjectDiff:
assert current.DESCRIPTOR.full_name == new.DESCRIPTOR.full_name
property_diffs = []
transition: TransitionType = TransitionType.UNCHANGED
Expand Down
5 changes: 5 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,8 @@ def __init__(self, actual_class: str, expected_class: str):
super().__init__(
f"The registry store class was expected to be {expected_class}, but was instead {actual_class}."
)


class FeastInvalidInfraObjectType(Exception):
def __init__(self):
super().__init__("Could not identify the type of the InfraObject.")
25 changes: 21 additions & 4 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from feast import feature_server, flags, flags_helper, utils
from feast.base_feature_view import BaseFeatureView
from feast.diff.FcoDiff import RegistryDiff
from feast.diff.infra_diff import InfraDiff, diff_infra_protos
from feast.entity import Entity
from feast.errors import (
EntityNotFoundException,
Expand All @@ -63,6 +64,7 @@
from feast.infra.provider import Provider, RetrievalJob, get_provider
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.online_response import OnlineResponse
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.protos.feast.serving.ServingService_pb2 import (
FieldStatus,
Expand Down Expand Up @@ -405,7 +407,9 @@ def _get_features(
return _feature_refs

@log_exceptions_and_usage
def plan(self, desired_repo_objects: RepoContents) -> RegistryDiff:
def plan(
self, desired_repo_objects: RepoContents
) -> Tuple[RegistryDiff, InfraDiff]:
"""Dry-run registering objects to metadata store.
The plan method dry-runs registering one or more definitions (e.g., Entity, FeatureView), and produces
Expand Down Expand Up @@ -440,7 +444,7 @@ def plan(self, desired_repo_objects: RepoContents) -> RegistryDiff:
... ttl=timedelta(seconds=86400 * 1),
... batch_source=driver_hourly_stats,
... )
>>> diff = fs.plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view
>>> registry_diff, infra_diff = fs.plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view
"""

current_registry_proto = (
Expand All @@ -450,8 +454,21 @@ def plan(self, desired_repo_objects: RepoContents) -> RegistryDiff:
)

desired_registry_proto = desired_repo_objects.to_registry_proto()
diffs = Registry.diff_between(current_registry_proto, desired_registry_proto)
return diffs
registry_diff = Registry.diff_between(
current_registry_proto, desired_registry_proto
)

current_infra_proto = (
self._registry.cached_registry_proto.infra.__deepcopy__()
if self._registry.cached_registry_proto
else InfraProto()
)
new_infra_proto = self._provider.plan_infra(
self.config, desired_registry_proto
).to_proto()
infra_diff = diff_infra_protos(current_infra_proto, new_infra_proto)

return (registry_diff, infra_diff)

@log_exceptions_and_usage
def apply(
Expand Down
39 changes: 35 additions & 4 deletions sdk/python/feast/infra/infra_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,21 @@
from dataclasses import dataclass, field
from typing import Any, List

from feast.errors import FeastInvalidInfraObjectType
from feast.importer import import_class
from feast.protos.feast.core.DatastoreTable_pb2 import (
DatastoreTable as DatastoreTableProto,
)
from feast.protos.feast.core.DynamoDBTable_pb2 import (
DynamoDBTable as DynamoDBTableProto,
)
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto
from feast.protos.feast.core.SqliteTable_pb2 import SqliteTable as SqliteTableProto

DATASTORE_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_stores.datastore.DatastoreTable"
DYNAMODB_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_stores.dynamodb.DynamoDBTable"
SQLITE_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_store.sqlite.SqliteTable"
SQLITE_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_stores.sqlite.SqliteTable"


class InfraObject(ABC):
Expand Down Expand Up @@ -49,15 +57,38 @@ def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:
infra_object_proto: A protobuf representation of an InfraObject.
Raises:
ValueError: The type of InfraObject could not be identified.
FeastInvalidInfraObjectType: The type of InfraObject could not be identified.
"""
if infra_object_proto.infra_object_class_type:
cls = _get_infra_object_class_from_type(
infra_object_proto.infra_object_class_type
)
return cls.from_infra_object_proto(infra_object_proto)

raise ValueError("Could not identify the type of the InfraObject.")
raise FeastInvalidInfraObjectType()

@staticmethod
def from_proto(infra_object_proto: Any) -> Any:
"""
Converts a protobuf representation of a subclass to an object of that subclass.
Args:
infra_object_proto: A protobuf representation of an InfraObject.
Raises:
FeastInvalidInfraObjectType: The type of InfraObject could not be identified.
"""
if isinstance(infra_object_proto, DatastoreTableProto):
infra_object_class_type = DATASTORE_INFRA_OBJECT_CLASS_TYPE
elif isinstance(infra_object_proto, DynamoDBTableProto):
infra_object_class_type = DYNAMODB_INFRA_OBJECT_CLASS_TYPE
elif isinstance(infra_object_proto, SqliteTableProto):
infra_object_class_type = SQLITE_INFRA_OBJECT_CLASS_TYPE
else:
raise FeastInvalidInfraObjectType()

cls = _get_infra_object_class_from_type(infra_object_class_type)
return cls.from_proto(infra_object_proto)

@abstractmethod
def update(self):
Expand Down Expand Up @@ -94,7 +125,7 @@ def to_proto(self) -> InfraProto:
"""
infra_proto = InfraProto()
for infra_object in self.infra_objects:
infra_object_proto = infra_object.to_proto()
infra_object_proto = infra_object.to_infra_object_proto()
infra_proto.infra_objects.append(infra_object_proto)

return infra_proto
Expand Down
19 changes: 12 additions & 7 deletions sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import uuid
from datetime import datetime
from pathlib import Path
from typing import List

from feast.feature_view import FeatureView
from feast.infra.infra_object import Infra, InfraObject
from feast.infra.passthrough_provider import PassthroughProvider
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.registry_store import RegistryStore
from feast.repo_config import RegistryConfig
from feast.repo_config import RegistryConfig, RepoConfig
from feast.usage import log_exceptions_and_usage


Expand All @@ -15,11 +16,15 @@ class LocalProvider(PassthroughProvider):
This class only exists for backwards compatibility.
"""

pass


def _table_id(project: str, table: FeatureView) -> str:
return f"{project}_{table.name}"
def plan_infra(
self, config: RepoConfig, desired_registry_proto: RegistryProto
) -> Infra:
infra_objects: List[InfraObject] = self.online_store.plan(
config, desired_registry_proto
)
infra = Infra()
infra.infra_objects += infra_objects
return infra


class LocalRegistryStore(RegistryStore):
Expand Down
15 changes: 15 additions & 0 deletions sdk/python/feast/infra/online_stores/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:
name=infra_object_proto.datastore_table.name,
)

# Distinguish between null and empty string, since project_id and namespace are StringValues.
if infra_object_proto.datastore_table.HasField("project_id"):
datastore_table.project_id = (
infra_object_proto.datastore_table.project_id.value
Expand All @@ -387,6 +388,20 @@ def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:

return datastore_table

@staticmethod
def from_proto(datastore_table_proto: DatastoreTableProto) -> Any:
datastore_table = DatastoreTable(
project=datastore_table_proto.project, name=datastore_table_proto.name,
)

# Distinguish between null and empty string, since project_id and namespace are StringValues.
if datastore_table_proto.HasField("project_id"):
datastore_table.project_id = datastore_table_proto.project_id.value
if datastore_table_proto.HasField("namespace"):
datastore_table.namespace = datastore_table_proto.namespace.value

return datastore_table

def update(self):
client = _initialize_client(self.project_id, self.namespace)
key = client.key("Project", self.project, "Table", self.name)
Expand Down
6 changes: 6 additions & 0 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:
region=infra_object_proto.dynamodb_table.region,
)

@staticmethod
def from_proto(dynamodb_table_proto: DynamoDBTableProto) -> Any:
return DynamoDBTable(
name=dynamodb_table_proto.name, region=dynamodb_table_proto.region,
)

def update(self):
dynamodb_client = _initialize_dynamodb_client(region=self.region)
dynamodb_resource = _initialize_dynamodb_resource(region=self.region)
Expand Down
Loading

0 comments on commit 2a95629

Please sign in to comment.