Skip to content

Commit

Permalink
Feast plan clean up (#2256)
Browse files Browse the repository at this point in the history
* Run validation and inference on views and entities during plan

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Do not log objects that are unchanged

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Rename Fco to FeastObject

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Remove useless method

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Lint

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Always initialize registry during feature store initialization

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Fix usage test

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Remove print statements

Signed-off-by: Felix Wang <wangfelix98@gmail.com>
  • Loading branch information
felixwang9817 authored Jan 30, 2022
1 parent 08d6881 commit 895589a
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 171 deletions.
8 changes: 8 additions & 0 deletions sdk/python/feast/diff/infra_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,20 @@ def to_string(self):
TransitionType.UPDATE: ("Updated", Fore.YELLOW),
}
for infra_object_diff in self.infra_object_diffs:
if infra_object_diff.transition_type == TransitionType.UNCHANGED:
continue
action, color = message_action_map[infra_object_diff.transition_type]
log_string += f"{action} {infra_object_diff.infra_object_type} {Style.BRIGHT + color}{infra_object_diff.name}{Style.RESET_ALL}\n"
if infra_object_diff.transition_type == TransitionType.UPDATE:
for _p in infra_object_diff.infra_object_property_diffs:
log_string += f"\t{_p.property_name}: {Style.BRIGHT + color}{_p.val_existing}{Style.RESET_ALL} -> {Style.BRIGHT + Fore.LIGHTGREEN_EX}{_p.val_declared}{Style.RESET_ALL}\n"

log_string = (
f"{Style.BRIGHT + Fore.LIGHTBLUE_EX}No changes to infrastructure"
if not log_string
else log_string
)

return log_string


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,28 @@
from feast.registry import FEAST_OBJECT_TYPES, FeastObjectType, Registry
from feast.repo_contents import RepoContents

Fco = TypeVar("Fco", Entity, BaseFeatureView, FeatureService)
FeastObject = TypeVar("FeastObject", Entity, BaseFeatureView, FeatureService)


@dataclass
class FcoDiff(Generic[Fco]):
class FeastObjectDiff(Generic[FeastObject]):
name: str
fco_type: FeastObjectType
current_fco: Fco
new_fco: Fco
fco_property_diffs: List[PropertyDiff]
feast_object_type: FeastObjectType
current_feast_object: FeastObject
new_feast_object: FeastObject
feast_object_property_diffs: List[PropertyDiff]
transition_type: TransitionType


@dataclass
class RegistryDiff:
fco_diffs: List[FcoDiff]
feast_object_diffs: List[FeastObjectDiff]

def __init__(self):
self.fco_diffs = []
self.feast_object_diffs = []

def add_fco_diff(self, fco_diff: FcoDiff):
self.fco_diffs.append(fco_diff)
def add_feast_object_diff(self, feast_object_diff: FeastObjectDiff):
self.feast_object_diffs.append(feast_object_diff)

def to_string(self):
from colorama import Fore, Style
Expand All @@ -54,21 +54,29 @@ def to_string(self):
TransitionType.UNCHANGED: ("Unchanged", Fore.LIGHTBLUE_EX),
TransitionType.UPDATE: ("Updated", Fore.YELLOW),
}
for fco_diff in self.fco_diffs:
if fco_diff.name == DUMMY_ENTITY_NAME:
for feast_object_diff in self.feast_object_diffs:
if feast_object_diff.name == DUMMY_ENTITY_NAME:
continue
action, color = message_action_map[fco_diff.transition_type]
log_string += f"{action} {fco_diff.fco_type.value} {Style.BRIGHT + color}{fco_diff.name}{Style.RESET_ALL}\n"
if fco_diff.transition_type == TransitionType.UPDATE:
for _p in fco_diff.fco_property_diffs:
if feast_object_diff.transition_type == TransitionType.UNCHANGED:
continue
action, color = message_action_map[feast_object_diff.transition_type]
log_string += f"{action} {feast_object_diff.feast_object_type.value} {Style.BRIGHT + color}{feast_object_diff.name}{Style.RESET_ALL}\n"
if feast_object_diff.transition_type == TransitionType.UPDATE:
for _p in feast_object_diff.feast_object_property_diffs:
log_string += f"\t{_p.property_name}: {Style.BRIGHT + color}{_p.val_existing}{Style.RESET_ALL} -> {Style.BRIGHT + Fore.LIGHTGREEN_EX}{_p.val_declared}{Style.RESET_ALL}\n"

log_string = (
f"{Style.BRIGHT + Fore.LIGHTBLUE_EX}No changes to registry"
if not log_string
else log_string
)

return log_string


def tag_objects_for_keep_delete_update_add(
existing_objs: Iterable[Fco], desired_objs: Iterable[Fco]
) -> Tuple[Set[Fco], Set[Fco], Set[Fco], Set[Fco]]:
existing_objs: Iterable[FeastObject], desired_objs: Iterable[FeastObject]
) -> Tuple[Set[FeastObject], Set[FeastObject], Set[FeastObject], Set[FeastObject]]:
existing_obj_names = {e.name for e in existing_objs}
desired_obj_names = {e.name for e in desired_objs}

Expand All @@ -80,8 +88,8 @@ def tag_objects_for_keep_delete_update_add(
return objs_to_keep, objs_to_delete, objs_to_update, objs_to_add


FcoProto = TypeVar(
"FcoProto",
FeastObjectProto = TypeVar(
"FeastObjectProto",
EntityProto,
FeatureViewProto,
FeatureServiceProto,
Expand All @@ -90,25 +98,12 @@ def tag_objects_for_keep_delete_update_add(
)


def tag_proto_objects_for_keep_delete_add(
existing_objs: Iterable[FcoProto], desired_objs: Iterable[FcoProto]
) -> Tuple[Iterable[FcoProto], Iterable[FcoProto], Iterable[FcoProto]]:
existing_obj_names = {e.spec.name for e in existing_objs}
desired_obj_names = {e.spec.name for e in desired_objs}

objs_to_add = [e for e in desired_objs if e.spec.name not in existing_obj_names]
objs_to_keep = [e for e in desired_objs if e.spec.name in existing_obj_names]
objs_to_delete = [e for e in existing_objs if e.spec.name not in desired_obj_names]

return objs_to_keep, objs_to_delete, objs_to_add


FIELDS_TO_IGNORE = {"project"}


def diff_registry_objects(
current: Fco, new: Fco, object_type: FeastObjectType
) -> FcoDiff:
current: FeastObject, new: FeastObject, object_type: FeastObjectType
) -> FeastObjectDiff:
current_proto = current.to_proto()
new_proto = new.to_proto()
assert current_proto.DESCRIPTOR.full_name == new_proto.DESCRIPTOR.full_name
Expand All @@ -129,23 +124,23 @@ def diff_registry_objects(
getattr(new_proto.spec, _field.name),
)
)
return FcoDiff(
return FeastObjectDiff(
name=new_proto.spec.name,
fco_type=object_type,
current_fco=current,
new_fco=new,
fco_property_diffs=property_diffs,
feast_object_type=object_type,
current_feast_object=current,
new_feast_object=new,
feast_object_property_diffs=property_diffs,
transition_type=transition,
)


def extract_objects_for_keep_delete_update_add(
registry: Registry, current_project: str, desired_repo_contents: RepoContents,
) -> Tuple[
Dict[FeastObjectType, Set[Fco]],
Dict[FeastObjectType, Set[Fco]],
Dict[FeastObjectType, Set[Fco]],
Dict[FeastObjectType, Set[Fco]],
Dict[FeastObjectType, Set[FeastObject]],
Dict[FeastObjectType, Set[FeastObject]],
Dict[FeastObjectType, Set[FeastObject]],
Dict[FeastObjectType, Set[FeastObject]],
]:
"""
Returns the objects in the registry that must be modified to achieve the desired repo state.
Expand Down Expand Up @@ -215,30 +210,32 @@ def diff_between(
objects_to_add = objs_to_add[object_type]

for e in objects_to_add:
diff.add_fco_diff(
FcoDiff(
diff.add_feast_object_diff(
FeastObjectDiff(
name=e.name,
fco_type=object_type,
current_fco=None,
new_fco=e,
fco_property_diffs=[],
feast_object_type=object_type,
current_feast_object=None,
new_feast_object=e,
feast_object_property_diffs=[],
transition_type=TransitionType.CREATE,
)
)
for e in objects_to_delete:
diff.add_fco_diff(
FcoDiff(
diff.add_feast_object_diff(
FeastObjectDiff(
name=e.name,
fco_type=object_type,
current_fco=e,
new_fco=None,
fco_property_diffs=[],
feast_object_type=object_type,
current_feast_object=e,
new_feast_object=None,
feast_object_property_diffs=[],
transition_type=TransitionType.DELETE,
)
)
for e in objects_to_update:
current_obj = [_e for _e in objects_to_keep if _e.name == e.name][0]
diff.add_fco_diff(diff_registry_objects(current_obj, e, object_type))
diff.add_feast_object_diff(
diff_registry_objects(current_obj, e, object_type)
)

return diff

Expand All @@ -255,39 +252,47 @@ def apply_diff_to_registry(
project: Feast project to be updated.
commit: Whether the change should be persisted immediately
"""
for fco_diff in registry_diff.fco_diffs:
# There is no need to delete the FCO on an update, since applying the new FCO
# will automatically delete the existing FCO.
if fco_diff.transition_type == TransitionType.DELETE:
if fco_diff.fco_type == FeastObjectType.ENTITY:
registry.delete_entity(fco_diff.current_fco.name, project, commit=False)
elif fco_diff.fco_type == FeastObjectType.FEATURE_SERVICE:
for feast_object_diff in registry_diff.feast_object_diffs:
# There is no need to delete the object on an update, since applying the new object
# will automatically delete the existing object.
if feast_object_diff.transition_type == TransitionType.DELETE:
if feast_object_diff.feast_object_type == FeastObjectType.ENTITY:
registry.delete_entity(
feast_object_diff.current_feast_object.name, project, commit=False
)
elif feast_object_diff.feast_object_type == FeastObjectType.FEATURE_SERVICE:
registry.delete_feature_service(
fco_diff.current_fco.name, project, commit=False
feast_object_diff.current_feast_object.name, project, commit=False
)
elif fco_diff.fco_type in [
elif feast_object_diff.feast_object_type in [
FeastObjectType.FEATURE_VIEW,
FeastObjectType.ON_DEMAND_FEATURE_VIEW,
FeastObjectType.REQUEST_FEATURE_VIEW,
]:
registry.delete_feature_view(
fco_diff.current_fco.name, project, commit=False,
feast_object_diff.current_feast_object.name, project, commit=False,
)

if fco_diff.transition_type in [
if feast_object_diff.transition_type in [
TransitionType.CREATE,
TransitionType.UPDATE,
]:
if fco_diff.fco_type == FeastObjectType.ENTITY:
registry.apply_entity(fco_diff.new_fco, project, commit=False)
elif fco_diff.fco_type == FeastObjectType.FEATURE_SERVICE:
registry.apply_feature_service(fco_diff.new_fco, project, commit=False)
elif fco_diff.fco_type in [
if feast_object_diff.feast_object_type == FeastObjectType.ENTITY:
registry.apply_entity(
feast_object_diff.new_feast_object, project, commit=False
)
elif feast_object_diff.feast_object_type == FeastObjectType.FEATURE_SERVICE:
registry.apply_feature_service(
feast_object_diff.new_feast_object, project, commit=False
)
elif feast_object_diff.feast_object_type in [
FeastObjectType.FEATURE_VIEW,
FeastObjectType.ON_DEMAND_FEATURE_VIEW,
FeastObjectType.REQUEST_FEATURE_VIEW,
]:
registry.apply_feature_view(fco_diff.new_fco, project, commit=False)
registry.apply_feature_view(
feast_object_diff.new_feast_object, project, commit=False
)

if commit:
registry.commit()
63 changes: 22 additions & 41 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@

from feast import feature_server, flags, flags_helper, utils
from feast.base_feature_view import BaseFeatureView
from feast.diff.FcoDiff import RegistryDiff, apply_diff_to_registry, diff_between
from feast.diff.infra_diff import InfraDiff, diff_infra_protos
from feast.diff.property_diff import TransitionType
from feast.diff.registry_diff import RegistryDiff, apply_diff_to_registry, diff_between
from feast.entity import Entity
from feast.errors import (
EntityNotFoundException,
Expand Down Expand Up @@ -75,7 +74,7 @@
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import RepeatedValue, Value
from feast.registry import FeastObjectType, Registry
from feast.registry import Registry
from feast.repo_config import RepoConfig, load_repo_config
from feast.repo_contents import RepoContents
from feast.request_feature_view import RequestFeatureView
Expand Down Expand Up @@ -126,6 +125,7 @@ def __init__(

registry_config = self.config.get_registry_config()
self._registry = Registry(registry_config, repo_path=self.repo_path)
self._registry._initialize_registry()
self._provider = get_provider(self.config, self.repo_path)

@log_exceptions
Expand Down Expand Up @@ -429,8 +429,10 @@ def _make_inferences(
[view.batch_source for view in views_to_update], self.config
)

# New feature views may reference previously applied entities.
entities = self._list_entities()
update_feature_views_with_inferred_features(
views_to_update, entities_to_update, self.config
views_to_update, entities + entities_to_update, self.config
)

for odfv in odfvs_to_update:
Expand Down Expand Up @@ -476,10 +478,26 @@ def _plan(
... )
>>> registry_diff, infra_diff, new_infra = fs._plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view
"""
# Validate and run inference on all the objects to be registered.
self._validate_all_feature_views(
list(desired_repo_contents.feature_views),
list(desired_repo_contents.on_demand_feature_views),
list(desired_repo_contents.request_feature_views),
)
self._make_inferences(
list(desired_repo_contents.entities),
list(desired_repo_contents.feature_views),
list(desired_repo_contents.on_demand_feature_views),
)

# Compute the desired difference between the current objects in the registry and
# the desired repo state.
registry_diff = diff_between(
self._registry, self.project, desired_repo_contents
)

# Compute the desired difference between the current infra, as stored in the registry,
# and the desired infra.
self._registry.refresh()
current_infra_proto = (
self._registry.cached_registry_proto.infra.__deepcopy__()
Expand All @@ -504,43 +522,6 @@ def _apply_diffs(
infra_diff: The diff between the current infra and the desired infra.
new_infra: The desired infra.
"""
entities_to_update = [
fco_diff.new_fco
for fco_diff in registry_diff.fco_diffs
if fco_diff.fco_type == FeastObjectType.ENTITY
and fco_diff.transition_type
in [TransitionType.CREATE, TransitionType.UPDATE]
]
views_to_update = [
fco_diff.new_fco
for fco_diff in registry_diff.fco_diffs
if fco_diff.fco_type == FeastObjectType.FEATURE_VIEW
and fco_diff.transition_type
in [TransitionType.CREATE, TransitionType.UPDATE]
]
odfvs_to_update = [
fco_diff.new_fco
for fco_diff in registry_diff.fco_diffs
if fco_diff.fco_type == FeastObjectType.ON_DEMAND_FEATURE_VIEW
and fco_diff.transition_type
in [TransitionType.CREATE, TransitionType.UPDATE]
]
request_views_to_update = [
fco_diff.new_fco
for fco_diff in registry_diff.fco_diffs
if fco_diff.fco_type == FeastObjectType.REQUEST_FEATURE_VIEW
and fco_diff.transition_type
in [TransitionType.CREATE, TransitionType.UPDATE]
]

# TODO(felixwang9817): move validation logic into _plan.
# Validate all feature views and make inferences.
self._validate_all_feature_views(
views_to_update, odfvs_to_update, request_views_to_update
)
self._make_inferences(entities_to_update, views_to_update, odfvs_to_update)

# Apply infra and registry changes.
infra_diff.update()
apply_diff_to_registry(
self._registry, registry_diff, self.project, commit=False
Expand Down
Loading

0 comments on commit 895589a

Please sign in to comment.