diff --git a/sdk/python/feast/diff/infra_diff.py b/sdk/python/feast/diff/infra_diff.py index d5bcbbc44a..a09eaf39eb 100644 --- a/sdk/python/feast/diff/infra_diff.py +++ b/sdk/python/feast/diff/infra_diff.py @@ -71,12 +71,20 @@ def to_string(self): TransitionType.UPDATE: ("Updated", Fore.YELLOW), } for infra_object_diff in self.infra_object_diffs: + if infra_object_diff.transition_type == TransitionType.UNCHANGED: + continue action, color = message_action_map[infra_object_diff.transition_type] log_string += f"{action} {infra_object_diff.infra_object_type} {Style.BRIGHT + color}{infra_object_diff.name}{Style.RESET_ALL}\n" if infra_object_diff.transition_type == TransitionType.UPDATE: for _p in infra_object_diff.infra_object_property_diffs: log_string += f"\t{_p.property_name}: {Style.BRIGHT + color}{_p.val_existing}{Style.RESET_ALL} -> {Style.BRIGHT + Fore.LIGHTGREEN_EX}{_p.val_declared}{Style.RESET_ALL}\n" + log_string = ( + f"{Style.BRIGHT + Fore.LIGHTBLUE_EX}No changes to infrastructure" + if not log_string + else log_string + ) + return log_string diff --git a/sdk/python/feast/diff/FcoDiff.py b/sdk/python/feast/diff/registry_diff.py similarity index 63% rename from sdk/python/feast/diff/FcoDiff.py rename to sdk/python/feast/diff/registry_diff.py index 1ea66ec659..1f68d3ff65 100644 --- a/sdk/python/feast/diff/FcoDiff.py +++ b/sdk/python/feast/diff/registry_diff.py @@ -20,28 +20,28 @@ from feast.registry import FEAST_OBJECT_TYPES, FeastObjectType, Registry from feast.repo_contents import RepoContents -Fco = TypeVar("Fco", Entity, BaseFeatureView, FeatureService) +FeastObject = TypeVar("FeastObject", Entity, BaseFeatureView, FeatureService) @dataclass -class FcoDiff(Generic[Fco]): +class FeastObjectDiff(Generic[FeastObject]): name: str - fco_type: FeastObjectType - current_fco: Fco - new_fco: Fco - fco_property_diffs: List[PropertyDiff] + feast_object_type: FeastObjectType + current_feast_object: FeastObject + new_feast_object: FeastObject + feast_object_property_diffs: List[PropertyDiff] transition_type: TransitionType @dataclass class RegistryDiff: - fco_diffs: List[FcoDiff] + feast_object_diffs: List[FeastObjectDiff] def __init__(self): - self.fco_diffs = [] + self.feast_object_diffs = [] - def add_fco_diff(self, fco_diff: FcoDiff): - self.fco_diffs.append(fco_diff) + def add_feast_object_diff(self, feast_object_diff: FeastObjectDiff): + self.feast_object_diffs.append(feast_object_diff) def to_string(self): from colorama import Fore, Style @@ -54,21 +54,29 @@ def to_string(self): TransitionType.UNCHANGED: ("Unchanged", Fore.LIGHTBLUE_EX), TransitionType.UPDATE: ("Updated", Fore.YELLOW), } - for fco_diff in self.fco_diffs: - if fco_diff.name == DUMMY_ENTITY_NAME: + for feast_object_diff in self.feast_object_diffs: + if feast_object_diff.name == DUMMY_ENTITY_NAME: continue - action, color = message_action_map[fco_diff.transition_type] - log_string += f"{action} {fco_diff.fco_type.value} {Style.BRIGHT + color}{fco_diff.name}{Style.RESET_ALL}\n" - if fco_diff.transition_type == TransitionType.UPDATE: - for _p in fco_diff.fco_property_diffs: + if feast_object_diff.transition_type == TransitionType.UNCHANGED: + continue + action, color = message_action_map[feast_object_diff.transition_type] + log_string += f"{action} {feast_object_diff.feast_object_type.value} {Style.BRIGHT + color}{feast_object_diff.name}{Style.RESET_ALL}\n" + if feast_object_diff.transition_type == TransitionType.UPDATE: + for _p in feast_object_diff.feast_object_property_diffs: log_string += f"\t{_p.property_name}: {Style.BRIGHT + color}{_p.val_existing}{Style.RESET_ALL} -> {Style.BRIGHT + Fore.LIGHTGREEN_EX}{_p.val_declared}{Style.RESET_ALL}\n" + log_string = ( + f"{Style.BRIGHT + Fore.LIGHTBLUE_EX}No changes to registry" + if not log_string + else log_string + ) + return log_string def tag_objects_for_keep_delete_update_add( - existing_objs: Iterable[Fco], desired_objs: Iterable[Fco] -) -> Tuple[Set[Fco], Set[Fco], Set[Fco], Set[Fco]]: + existing_objs: Iterable[FeastObject], desired_objs: Iterable[FeastObject] +) -> Tuple[Set[FeastObject], Set[FeastObject], Set[FeastObject], Set[FeastObject]]: existing_obj_names = {e.name for e in existing_objs} desired_obj_names = {e.name for e in desired_objs} @@ -80,8 +88,8 @@ def tag_objects_for_keep_delete_update_add( return objs_to_keep, objs_to_delete, objs_to_update, objs_to_add -FcoProto = TypeVar( - "FcoProto", +FeastObjectProto = TypeVar( + "FeastObjectProto", EntityProto, FeatureViewProto, FeatureServiceProto, @@ -90,25 +98,12 @@ def tag_objects_for_keep_delete_update_add( ) -def tag_proto_objects_for_keep_delete_add( - existing_objs: Iterable[FcoProto], desired_objs: Iterable[FcoProto] -) -> Tuple[Iterable[FcoProto], Iterable[FcoProto], Iterable[FcoProto]]: - existing_obj_names = {e.spec.name for e in existing_objs} - desired_obj_names = {e.spec.name for e in desired_objs} - - objs_to_add = [e for e in desired_objs if e.spec.name not in existing_obj_names] - objs_to_keep = [e for e in desired_objs if e.spec.name in existing_obj_names] - objs_to_delete = [e for e in existing_objs if e.spec.name not in desired_obj_names] - - return objs_to_keep, objs_to_delete, objs_to_add - - FIELDS_TO_IGNORE = {"project"} def diff_registry_objects( - current: Fco, new: Fco, object_type: FeastObjectType -) -> FcoDiff: + current: FeastObject, new: FeastObject, object_type: FeastObjectType +) -> FeastObjectDiff: current_proto = current.to_proto() new_proto = new.to_proto() assert current_proto.DESCRIPTOR.full_name == new_proto.DESCRIPTOR.full_name @@ -129,12 +124,12 @@ def diff_registry_objects( getattr(new_proto.spec, _field.name), ) ) - return FcoDiff( + return FeastObjectDiff( name=new_proto.spec.name, - fco_type=object_type, - current_fco=current, - new_fco=new, - fco_property_diffs=property_diffs, + feast_object_type=object_type, + current_feast_object=current, + new_feast_object=new, + feast_object_property_diffs=property_diffs, transition_type=transition, ) @@ -142,10 +137,10 @@ def diff_registry_objects( def extract_objects_for_keep_delete_update_add( registry: Registry, current_project: str, desired_repo_contents: RepoContents, ) -> Tuple[ - Dict[FeastObjectType, Set[Fco]], - Dict[FeastObjectType, Set[Fco]], - Dict[FeastObjectType, Set[Fco]], - Dict[FeastObjectType, Set[Fco]], + Dict[FeastObjectType, Set[FeastObject]], + Dict[FeastObjectType, Set[FeastObject]], + Dict[FeastObjectType, Set[FeastObject]], + Dict[FeastObjectType, Set[FeastObject]], ]: """ Returns the objects in the registry that must be modified to achieve the desired repo state. @@ -215,30 +210,32 @@ def diff_between( objects_to_add = objs_to_add[object_type] for e in objects_to_add: - diff.add_fco_diff( - FcoDiff( + diff.add_feast_object_diff( + FeastObjectDiff( name=e.name, - fco_type=object_type, - current_fco=None, - new_fco=e, - fco_property_diffs=[], + feast_object_type=object_type, + current_feast_object=None, + new_feast_object=e, + feast_object_property_diffs=[], transition_type=TransitionType.CREATE, ) ) for e in objects_to_delete: - diff.add_fco_diff( - FcoDiff( + diff.add_feast_object_diff( + FeastObjectDiff( name=e.name, - fco_type=object_type, - current_fco=e, - new_fco=None, - fco_property_diffs=[], + feast_object_type=object_type, + current_feast_object=e, + new_feast_object=None, + feast_object_property_diffs=[], transition_type=TransitionType.DELETE, ) ) for e in objects_to_update: current_obj = [_e for _e in objects_to_keep if _e.name == e.name][0] - diff.add_fco_diff(diff_registry_objects(current_obj, e, object_type)) + diff.add_feast_object_diff( + diff_registry_objects(current_obj, e, object_type) + ) return diff @@ -255,39 +252,47 @@ def apply_diff_to_registry( project: Feast project to be updated. commit: Whether the change should be persisted immediately """ - for fco_diff in registry_diff.fco_diffs: - # There is no need to delete the FCO on an update, since applying the new FCO - # will automatically delete the existing FCO. - if fco_diff.transition_type == TransitionType.DELETE: - if fco_diff.fco_type == FeastObjectType.ENTITY: - registry.delete_entity(fco_diff.current_fco.name, project, commit=False) - elif fco_diff.fco_type == FeastObjectType.FEATURE_SERVICE: + for feast_object_diff in registry_diff.feast_object_diffs: + # There is no need to delete the object on an update, since applying the new object + # will automatically delete the existing object. + if feast_object_diff.transition_type == TransitionType.DELETE: + if feast_object_diff.feast_object_type == FeastObjectType.ENTITY: + registry.delete_entity( + feast_object_diff.current_feast_object.name, project, commit=False + ) + elif feast_object_diff.feast_object_type == FeastObjectType.FEATURE_SERVICE: registry.delete_feature_service( - fco_diff.current_fco.name, project, commit=False + feast_object_diff.current_feast_object.name, project, commit=False ) - elif fco_diff.fco_type in [ + elif feast_object_diff.feast_object_type in [ FeastObjectType.FEATURE_VIEW, FeastObjectType.ON_DEMAND_FEATURE_VIEW, FeastObjectType.REQUEST_FEATURE_VIEW, ]: registry.delete_feature_view( - fco_diff.current_fco.name, project, commit=False, + feast_object_diff.current_feast_object.name, project, commit=False, ) - if fco_diff.transition_type in [ + if feast_object_diff.transition_type in [ TransitionType.CREATE, TransitionType.UPDATE, ]: - if fco_diff.fco_type == FeastObjectType.ENTITY: - registry.apply_entity(fco_diff.new_fco, project, commit=False) - elif fco_diff.fco_type == FeastObjectType.FEATURE_SERVICE: - registry.apply_feature_service(fco_diff.new_fco, project, commit=False) - elif fco_diff.fco_type in [ + if feast_object_diff.feast_object_type == FeastObjectType.ENTITY: + registry.apply_entity( + feast_object_diff.new_feast_object, project, commit=False + ) + elif feast_object_diff.feast_object_type == FeastObjectType.FEATURE_SERVICE: + registry.apply_feature_service( + feast_object_diff.new_feast_object, project, commit=False + ) + elif feast_object_diff.feast_object_type in [ FeastObjectType.FEATURE_VIEW, FeastObjectType.ON_DEMAND_FEATURE_VIEW, FeastObjectType.REQUEST_FEATURE_VIEW, ]: - registry.apply_feature_view(fco_diff.new_fco, project, commit=False) + registry.apply_feature_view( + feast_object_diff.new_feast_object, project, commit=False + ) if commit: registry.commit() diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 01b1dc0f0c..6b1dadde5c 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -39,9 +39,8 @@ from feast import feature_server, flags, flags_helper, utils from feast.base_feature_view import BaseFeatureView -from feast.diff.FcoDiff import RegistryDiff, apply_diff_to_registry, diff_between from feast.diff.infra_diff import InfraDiff, diff_infra_protos -from feast.diff.property_diff import TransitionType +from feast.diff.registry_diff import RegistryDiff, apply_diff_to_registry, diff_between from feast.entity import Entity from feast.errors import ( EntityNotFoundException, @@ -75,7 +74,7 @@ ) from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import RepeatedValue, Value -from feast.registry import FeastObjectType, Registry +from feast.registry import Registry from feast.repo_config import RepoConfig, load_repo_config from feast.repo_contents import RepoContents from feast.request_feature_view import RequestFeatureView @@ -126,6 +125,7 @@ def __init__( registry_config = self.config.get_registry_config() self._registry = Registry(registry_config, repo_path=self.repo_path) + self._registry._initialize_registry() self._provider = get_provider(self.config, self.repo_path) @log_exceptions @@ -429,8 +429,10 @@ def _make_inferences( [view.batch_source for view in views_to_update], self.config ) + # New feature views may reference previously applied entities. + entities = self._list_entities() update_feature_views_with_inferred_features( - views_to_update, entities_to_update, self.config + views_to_update, entities + entities_to_update, self.config ) for odfv in odfvs_to_update: @@ -476,10 +478,26 @@ def _plan( ... ) >>> registry_diff, infra_diff, new_infra = fs._plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view """ + # Validate and run inference on all the objects to be registered. + self._validate_all_feature_views( + list(desired_repo_contents.feature_views), + list(desired_repo_contents.on_demand_feature_views), + list(desired_repo_contents.request_feature_views), + ) + self._make_inferences( + list(desired_repo_contents.entities), + list(desired_repo_contents.feature_views), + list(desired_repo_contents.on_demand_feature_views), + ) + + # Compute the desired difference between the current objects in the registry and + # the desired repo state. registry_diff = diff_between( self._registry, self.project, desired_repo_contents ) + # Compute the desired difference between the current infra, as stored in the registry, + # and the desired infra. self._registry.refresh() current_infra_proto = ( self._registry.cached_registry_proto.infra.__deepcopy__() @@ -504,43 +522,6 @@ def _apply_diffs( infra_diff: The diff between the current infra and the desired infra. new_infra: The desired infra. """ - entities_to_update = [ - fco_diff.new_fco - for fco_diff in registry_diff.fco_diffs - if fco_diff.fco_type == FeastObjectType.ENTITY - and fco_diff.transition_type - in [TransitionType.CREATE, TransitionType.UPDATE] - ] - views_to_update = [ - fco_diff.new_fco - for fco_diff in registry_diff.fco_diffs - if fco_diff.fco_type == FeastObjectType.FEATURE_VIEW - and fco_diff.transition_type - in [TransitionType.CREATE, TransitionType.UPDATE] - ] - odfvs_to_update = [ - fco_diff.new_fco - for fco_diff in registry_diff.fco_diffs - if fco_diff.fco_type == FeastObjectType.ON_DEMAND_FEATURE_VIEW - and fco_diff.transition_type - in [TransitionType.CREATE, TransitionType.UPDATE] - ] - request_views_to_update = [ - fco_diff.new_fco - for fco_diff in registry_diff.fco_diffs - if fco_diff.fco_type == FeastObjectType.REQUEST_FEATURE_VIEW - and fco_diff.transition_type - in [TransitionType.CREATE, TransitionType.UPDATE] - ] - - # TODO(felixwang9817): move validation logic into _plan. - # Validate all feature views and make inferences. - self._validate_all_feature_views( - views_to_update, odfvs_to_update, request_views_to_update - ) - self._make_inferences(entities_to_update, views_to_update, odfvs_to_update) - - # Apply infra and registry changes. infra_diff.update() apply_diff_to_registry( self._registry, registry_diff, self.project, commit=False diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 39a77264bc..642a3c6442 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -13,7 +13,12 @@ def update_entities_with_inferred_types_from_feature_views( entities: List[Entity], feature_views: List[FeatureView], config: RepoConfig ) -> None: """ - Infer entity value type by examining schema of feature view batch sources + Infers the types of the entities by examining the schemas of feature view batch sources. + + Args: + entities: The entities to be updated. + feature_views: A list containing feature views associated with the entities. + config: The config for the current feature store. """ incomplete_entities = { entity.name: entity @@ -127,6 +132,11 @@ def update_feature_views_with_inferred_features( Infers the set of features associated to each FeatureView and updates the FeatureView with those features. Inference occurs through considering each column of the underlying data source as a feature except columns that are associated with the data source's timestamp columns and the FeatureView's entity columns. + + Args: + fvs: The feature views to be updated. + entities: A list containing entities associated with the feature views. + config: The config for the current feature store. """ entity_name_to_join_key_map = {entity.name: entity.join_key for entity in entities} diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index f34346871d..8a3a202c6d 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -12,7 +12,7 @@ import click from click.exceptions import BadParameter -from feast.diff.FcoDiff import extract_objects_for_keep_delete_update_add +from feast.diff.registry_diff import extract_objects_for_keep_delete_update_add from feast.entity import Entity from feast.feature_service import FeatureService from feast.feature_store import FeatureStore @@ -147,7 +147,6 @@ def _prepare_registry_and_repo(repo_config, repo_path): ) sys.exit(1) registry = store.registry - registry._initialize_registry() sys.dont_write_bytecode = True repo = parse_repo(repo_path) return project, registry, repo, store diff --git a/sdk/python/tests/integration/e2e/test_usage_e2e.py b/sdk/python/tests/integration/e2e/test_usage_e2e.py index f55fbce55c..0bae973063 100644 --- a/sdk/python/tests/integration/e2e/test_usage_e2e.py +++ b/sdk/python/tests/integration/e2e/test_usage_e2e.py @@ -66,10 +66,16 @@ def test_usage_on(dummy_exporter, enabling_toggle): test_feature_store.apply([entity]) - assert len(dummy_exporter) == 1 + assert len(dummy_exporter) == 3 assert { - "entrypoint": "feast.feature_store.FeatureStore.apply" + "entrypoint": "feast.infra.local.LocalRegistryStore.get_registry_proto" }.items() <= dummy_exporter[0].items() + assert { + "entrypoint": "feast.infra.local.LocalRegistryStore.update_registry_proto" + }.items() <= dummy_exporter[1].items() + assert { + "entrypoint": "feast.feature_store.FeatureStore.apply" + }.items() <= dummy_exporter[2].items() @pytest.mark.integration diff --git a/sdk/python/tests/unit/diff/test_fco_diff.py b/sdk/python/tests/unit/diff/test_registry_diff.py similarity index 53% rename from sdk/python/tests/unit/diff/test_fco_diff.py rename to sdk/python/tests/unit/diff/test_registry_diff.py index fa3c84d035..0322ab47ab 100644 --- a/sdk/python/tests/unit/diff/test_fco_diff.py +++ b/sdk/python/tests/unit/diff/test_registry_diff.py @@ -1,54 +1,11 @@ -from feast.diff.FcoDiff import ( +from feast.diff.registry_diff import ( diff_registry_objects, tag_objects_for_keep_delete_update_add, - tag_proto_objects_for_keep_delete_add, ) from feast.feature_view import FeatureView from tests.utils.data_source_utils import prep_file_source -def test_tag_proto_objects_for_keep_delete_add(simple_dataset_1): - with prep_file_source( - df=simple_dataset_1, event_timestamp_column="ts_1" - ) as file_source: - to_delete = FeatureView( - name="to_delete", entities=["id"], batch_source=file_source, ttl=None, - ).to_proto() - unchanged_fv = FeatureView( - name="fv1", entities=["id"], batch_source=file_source, ttl=None, - ).to_proto() - pre_changed = FeatureView( - name="fv2", - entities=["id"], - batch_source=file_source, - ttl=None, - tags={"when": "before"}, - ).to_proto() - post_changed = FeatureView( - name="fv2", - entities=["id"], - batch_source=file_source, - ttl=None, - tags={"when": "after"}, - ).to_proto() - to_add = FeatureView( - name="to_add", entities=["id"], batch_source=file_source, ttl=None, - ).to_proto() - - keep, delete, add = tag_proto_objects_for_keep_delete_add( - [unchanged_fv, pre_changed, to_delete], [unchanged_fv, post_changed, to_add] - ) - - assert len(list(keep)) == 2 - assert unchanged_fv in keep - assert post_changed in keep - assert pre_changed not in keep - assert len(list(delete)) == 1 - assert to_delete in delete - assert len(list(add)) == 1 - assert to_add in add - - def test_tag_objects_for_keep_delete_update_add(simple_dataset_1): with prep_file_source( df=simple_dataset_1, event_timestamp_column="ts_1" @@ -114,12 +71,20 @@ def test_diff_registry_objects_feature_views(simple_dataset_1): tags={"when": "after"}, ) - fco_diffs = diff_registry_objects(pre_changed, pre_changed, "feature view") - assert len(fco_diffs.fco_property_diffs) == 0 + feast_object_diffs = diff_registry_objects( + pre_changed, pre_changed, "feature view" + ) + assert len(feast_object_diffs.feast_object_property_diffs) == 0 - fco_diffs = diff_registry_objects(pre_changed, post_changed, "feature view") - assert len(fco_diffs.fco_property_diffs) == 1 + feast_object_diffs = diff_registry_objects( + pre_changed, post_changed, "feature view" + ) + assert len(feast_object_diffs.feast_object_property_diffs) == 1 - assert fco_diffs.fco_property_diffs[0].property_name == "tags" - assert fco_diffs.fco_property_diffs[0].val_existing == {"when": "before"} - assert fco_diffs.fco_property_diffs[0].val_declared == {"when": "after"} + assert feast_object_diffs.feast_object_property_diffs[0].property_name == "tags" + assert feast_object_diffs.feast_object_property_diffs[0].val_existing == { + "when": "before" + } + assert feast_object_diffs.feast_object_property_diffs[0].val_declared == { + "when": "after" + }