From f7ee85c16b9032d9599bf555b8e3f445198a2a2e Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Sat, 4 Feb 2023 19:59:07 -0800 Subject: [PATCH] fix: Fix SQL Registry cache miss Signed-off-by: Danny Chiao --- sdk/python/feast/feature_store.py | 8 +- .../infra/registry/proto_registry_utils.py | 24 +++++- sdk/python/feast/infra/registry/registry.py | 61 +++++++-------- sdk/python/feast/infra/registry/sql.py | 25 +++++- sdk/python/feast/repo_operations.py | 2 +- .../integration/registration/test_registry.py | 4 +- .../tests/unit/infra/test_local_registry.py | 8 +- sdk/python/tests/unit/test_sql_registry.py | 76 ++++++++++++++++++- 8 files changed, 160 insertions(+), 48 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 43787701bf..55d66e185c 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -166,9 +166,9 @@ def __init__( registry_config = self.config.get_registry_config() if registry_config.registry_type == "sql": - self._registry = SqlRegistry(registry_config, None) + self._registry = SqlRegistry(registry_config, self.config.project, None) else: - r = Registry(registry_config, repo_path=self.repo_path) + r = Registry(self.config.project, registry_config, repo_path=self.repo_path) r._initialize_registry(self.config.project) self._registry = r @@ -210,7 +210,9 @@ def refresh_registry(self): downloaded synchronously, which may increase latencies if the triggering method is get_online_features(). """ registry_config = self.config.get_registry_config() - registry = Registry(registry_config, repo_path=self.repo_path) + registry = Registry( + self.config.project, registry_config, repo_path=self.repo_path + ) registry.refresh(self.config.project) self._registry = registry diff --git a/sdk/python/feast/infra/registry/proto_registry_utils.py b/sdk/python/feast/infra/registry/proto_registry_utils.py index 4dbc95d2a5..f43805cd9b 100644 --- a/sdk/python/feast/infra/registry/proto_registry_utils.py +++ b/sdk/python/feast/infra/registry/proto_registry_utils.py @@ -1,5 +1,7 @@ -from typing import List +import uuid +from typing import List, Optional +from feast import usage from feast.data_source import DataSource from feast.entity import Entity from feast.errors import ( @@ -15,12 +17,32 @@ from feast.feature_view import FeatureView from feast.on_demand_feature_view import OnDemandFeatureView from feast.project_metadata import ProjectMetadata +from feast.protos.feast.core.Registry_pb2 import ProjectMetadata as ProjectMetadataProto from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.request_feature_view import RequestFeatureView from feast.saved_dataset import SavedDataset, ValidationReference from feast.stream_feature_view import StreamFeatureView +def init_project_metadata(cached_registry_proto: RegistryProto, project: str): + new_project_uuid = f"{uuid.uuid4()}" + usage.set_current_project_uuid(new_project_uuid) + cached_registry_proto.project_metadata.append( + ProjectMetadata(project_name=project, project_uuid=new_project_uuid).to_proto() + ) + + +def get_project_metadata( + registry_proto: Optional[RegistryProto], project: str +) -> Optional[ProjectMetadataProto]: + if not registry_proto: + return None + for pm in registry_proto.project_metadata: + if pm.project == project: + return pm + return None + + def get_feature_service( registry_proto: RegistryProto, name: str, project: str ) -> FeatureService: diff --git a/sdk/python/feast/infra/registry/registry.py b/sdk/python/feast/infra/registry/registry.py index 3aee7e12f6..c6552da0c8 100644 --- a/sdk/python/feast/infra/registry/registry.py +++ b/sdk/python/feast/infra/registry/registry.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import uuid from datetime import datetime, timedelta from enum import Enum from pathlib import Path @@ -44,7 +43,6 @@ from feast.infra.registry.registry_store import NoopRegistryStore from feast.on_demand_feature_view import OnDemandFeatureView from feast.project_metadata import ProjectMetadata -from feast.protos.feast.core.Registry_pb2 import ProjectMetadata as ProjectMetadataProto from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.repo_config import RegistryConfig from feast.repo_contents import RepoContents @@ -143,25 +141,6 @@ def get_registry_store_class_from_scheme(registry_path: str): return get_registry_store_class_from_type(registry_store_type) -def _get_project_metadata( - registry_proto: Optional[RegistryProto], project: str -) -> Optional[ProjectMetadataProto]: - if not registry_proto: - return None - for pm in registry_proto.project_metadata: - if pm.project == project: - return pm - return None - - -def _init_project_metadata(cached_registry_proto: RegistryProto, project: str): - new_project_uuid = f"{uuid.uuid4()}" - usage.set_current_project_uuid(new_project_uuid) - cached_registry_proto.project_metadata.append( - ProjectMetadata(project_name=project, project_uuid=new_project_uuid).to_proto() - ) - - class Registry(BaseRegistry): def apply_user_metadata( self, @@ -184,19 +163,25 @@ def get_user_metadata( cached_registry_proto_ttl: timedelta def __new__( - cls, registry_config: Optional[RegistryConfig], repo_path: Optional[Path] + cls, + project: str, + registry_config: Optional[RegistryConfig], + repo_path: Optional[Path], ): # We override __new__ so that we can inspect registry_config and create a SqlRegistry without callers # needing to make any changes. if registry_config and registry_config.registry_type == "sql": from feast.infra.registry.sql import SqlRegistry - return SqlRegistry(registry_config, repo_path) + return SqlRegistry(registry_config, project, repo_path) else: return super(Registry, cls).__new__(cls) def __init__( - self, registry_config: Optional[RegistryConfig], repo_path: Optional[Path] + self, + project: str, + registry_config: Optional[RegistryConfig], + repo_path: Optional[Path], ): """ Create the Registry object. @@ -225,7 +210,7 @@ def __init__( ) def clone(self) -> "Registry": - new_registry = Registry(None, None) + new_registry = Registry("project", None, None) new_registry.cached_registry_proto_ttl = timedelta(seconds=0) new_registry.cached_registry_proto = ( self.cached_registry_proto.__deepcopy__() @@ -243,7 +228,7 @@ def _initialize_registry(self, project: str): except FileNotFoundError: registry_proto = RegistryProto() registry_proto.registry_schema_version = REGISTRY_SCHEMA_VERSION - _init_project_metadata(registry_proto, project) + proto_registry_utils.init_project_metadata(registry_proto, project) self._registry_store.update_registry_proto(registry_proto) def update_infra(self, infra: Infra, project: str, commit: bool = True): @@ -791,7 +776,12 @@ def _prepare_registry_for_changes(self, project: str): """Prepares the Registry for changes by refreshing the cache if necessary.""" try: self._get_registry_proto(project=project, allow_cache=True) - if _get_project_metadata(self.cached_registry_proto, project) is None: + if ( + proto_registry_utils.get_project_metadata( + self.cached_registry_proto, project + ) + is None + ): # Project metadata not initialized yet. Try pulling without cache self._get_registry_proto(project=project, allow_cache=False) except FileNotFoundError: @@ -802,8 +792,15 @@ def _prepare_registry_for_changes(self, project: str): # Initialize project metadata if needed assert self.cached_registry_proto - if _get_project_metadata(self.cached_registry_proto, project) is None: - _init_project_metadata(self.cached_registry_proto, project) + if ( + proto_registry_utils.get_project_metadata( + self.cached_registry_proto, project + ) + is None + ): + proto_registry_utils.init_project_metadata( + self.cached_registry_proto, project + ) self.commit() return self.cached_registry_proto @@ -836,7 +833,7 @@ def _get_registry_proto( ) if project: - old_project_metadata = _get_project_metadata( + old_project_metadata = proto_registry_utils.get_project_metadata( registry_proto=self.cached_registry_proto, project=project ) @@ -854,13 +851,13 @@ def _get_registry_proto( if not project: return registry_proto - project_metadata = _get_project_metadata( + project_metadata = proto_registry_utils.get_project_metadata( registry_proto=registry_proto, project=project ) if project_metadata: usage.set_current_project_uuid(project_metadata.project_uuid) else: - _init_project_metadata(registry_proto, project) + proto_registry_utils.init_project_metadata(registry_proto, project) self.commit() return registry_proto diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 2326651b1c..de21e3c056 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -180,12 +180,16 @@ class FeastMetadataKeys(Enum): class SqlRegistry(BaseRegistry): def __init__( - self, registry_config: Optional[RegistryConfig], repo_path: Optional[Path] + self, + registry_config: Optional[RegistryConfig], + project: str, + repo_path: Optional[Path], ): assert registry_config is not None, "SqlRegistry needs a valid registry_config" self.engine: Engine = create_engine(registry_config.path, echo=False) metadata.create_all(self.engine) self.cached_registry_proto = self.proto() + proto_registry_utils.init_project_metadata(self.cached_registry_proto, project) self.cached_registry_proto_created = datetime.utcnow() self._refresh_lock = Lock() self.cached_registry_proto_ttl = timedelta( @@ -193,6 +197,7 @@ def __init__( if registry_config.cache_ttl_seconds is not None else 0 ) + self.project = project def teardown(self): for t in { @@ -210,6 +215,16 @@ def teardown(self): conn.execute(stmt) def refresh(self, project: Optional[str] = None): + if project: + project_metadata = proto_registry_utils.get_project_metadata( + registry_proto=self.cached_registry_proto, project=project + ) + if project_metadata: + usage.set_current_project_uuid(project_metadata.project_uuid) + else: + proto_registry_utils.init_project_metadata( + self.cached_registry_proto, project + ) self.cached_registry_proto = self.proto() self.cached_registry_proto_created = datetime.utcnow() @@ -816,7 +831,13 @@ def proto(self) -> RegistryProto: ]: objs: List[Any] = lister(project) # type: ignore if objs: - registry_proto_field.extend([obj.to_proto() for obj in objs]) + obj_protos = [obj.to_proto() for obj in objs] + for obj_proto in obj_protos: + if "spec" in obj_proto.DESCRIPTOR.fields_by_name: + obj_proto.spec.project = project + else: + obj_proto.project = project + registry_proto_field.extend(obj_protos) # This is suuuper jank. Because of https://github.com/feast-dev/feast/issues/2783, # the registry proto only has a single infra field, which we're currently setting as the "last" project. diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index e019ac7178..03162e7507 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -349,7 +349,7 @@ def registry_dump(repo_config: RepoConfig, repo_path: Path) -> str: """For debugging only: output contents of the metadata registry""" registry_config = repo_config.get_registry_config() project = repo_config.project - registry = Registry(registry_config=registry_config, repo_path=repo_path) + registry = Registry(project, registry_config=registry_config, repo_path=repo_path) registry_dict = registry.to_dict(project=project) return json.dumps(registry_dict, indent=2, sort_keys=True) diff --git a/sdk/python/tests/integration/registration/test_registry.py b/sdk/python/tests/integration/registration/test_registry.py index 739fb9ec5c..57e625e66b 100644 --- a/sdk/python/tests/integration/registration/test_registry.py +++ b/sdk/python/tests/integration/registration/test_registry.py @@ -45,7 +45,7 @@ def gcs_registry() -> Registry: registry_config = RegistryConfig( path=f"gs://{bucket_name}/registry.db", cache_ttl_seconds=600 ) - return Registry(registry_config, None) + return Registry("project", registry_config, None) @pytest.fixture @@ -57,7 +57,7 @@ def s3_registry() -> Registry: path=f"{aws_registry_path}/{int(time.time() * 1000)}/registry.db", cache_ttl_seconds=600, ) - return Registry(registry_config, None) + return Registry("project", registry_config, None) @pytest.mark.integration diff --git a/sdk/python/tests/unit/infra/test_local_registry.py b/sdk/python/tests/unit/infra/test_local_registry.py index 1e3b2aec88..b5e7d23a97 100644 --- a/sdk/python/tests/unit/infra/test_local_registry.py +++ b/sdk/python/tests/unit/infra/test_local_registry.py @@ -39,7 +39,7 @@ def local_registry() -> Registry: fd, registry_path = mkstemp() registry_config = RegistryConfig(path=registry_path, cache_ttl_seconds=600) - return Registry(registry_config, None) + return Registry("project", registry_config, None) @pytest.mark.parametrize( @@ -443,7 +443,7 @@ def test_apply_data_source(test_registry: Registry): def test_commit(): fd, registry_path = mkstemp() registry_config = RegistryConfig(path=registry_path, cache_ttl_seconds=600) - test_registry = Registry(registry_config, None) + test_registry = Registry("project", registry_config, None) entity = Entity( name="driver_car_id", @@ -484,7 +484,7 @@ def test_commit(): validate_project_uuid(project_uuid, test_registry) # Create new registry that points to the same store - registry_with_same_store = Registry(registry_config, None) + registry_with_same_store = Registry("project", registry_config, None) # Retrieving the entity should fail since the store is empty entities = registry_with_same_store.list_entities(project) @@ -495,7 +495,7 @@ def test_commit(): test_registry.commit() # Reconstruct the new registry in order to read the newly written store - registry_with_same_store = Registry(registry_config, None) + registry_with_same_store = Registry("project", registry_config, None) # Retrieving the entity should now succeed entities = registry_with_same_store.list_entities(project) diff --git a/sdk/python/tests/unit/test_sql_registry.py b/sdk/python/tests/unit/test_sql_registry.py index 51cb430c9e..1c2b5a36dd 100644 --- a/sdk/python/tests/unit/test_sql_registry.py +++ b/sdk/python/tests/unit/test_sql_registry.py @@ -72,7 +72,7 @@ def pg_registry(): path=f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@127.0.0.1:{container_port}/{POSTGRES_DB}", ) - yield SqlRegistry(registry_config, None) + yield SqlRegistry(registry_config, "project", None) container.stop() @@ -106,7 +106,7 @@ def mysql_registry(): path=f"mysql+mysqldb://{POSTGRES_USER}:{POSTGRES_PASSWORD}@127.0.0.1:{container_port}/{POSTGRES_DB}", ) - yield SqlRegistry(registry_config, None) + yield SqlRegistry(registry_config, "project", None) container.stop() @@ -118,7 +118,7 @@ def sqlite_registry(): path="sqlite://", ) - yield SqlRegistry(registry_config, None) + yield SqlRegistry(registry_config, "project", None) @pytest.mark.skipif( @@ -565,6 +565,76 @@ def test_apply_data_source(sql_registry): sql_registry.teardown() +@pytest.mark.skipif( + sys.platform == "darwin" and "GITHUB_REF" in os.environ, + reason="does not run on mac github actions", +) +@pytest.mark.parametrize( + "sql_registry", + [ + lazy_fixture("mysql_registry"), + lazy_fixture("pg_registry"), + lazy_fixture("sqlite_registry"), + ], +) +def test_registry_cache(sql_registry): + # Create Feature Views + batch_source = FileSource( + name="test_source", + file_format=ParquetFormat(), + path="file://feast/*", + timestamp_field="ts_col", + created_timestamp_column="timestamp", + ) + + entity = Entity(name="fs1_my_entity_1", join_keys=["test"]) + + fv1 = FeatureView( + name="my_feature_view_1", + schema=[ + Field(name="fs1_my_feature_1", dtype=Int64), + Field(name="fs1_my_feature_2", dtype=String), + Field(name="fs1_my_feature_3", dtype=Array(String)), + Field(name="fs1_my_feature_4", dtype=Array(Bytes)), + ], + entities=[entity], + tags={"team": "matchmaking"}, + source=batch_source, + ttl=timedelta(minutes=5), + ) + + project = "project" + + # Register data source and feature view + sql_registry.apply_data_source(batch_source, project) + sql_registry.apply_feature_view(fv1, project) + registry_feature_views_cached = sql_registry.list_feature_views( + project, allow_cache=True + ) + registry_data_sources_cached = sql_registry.list_data_sources( + project, allow_cache=True + ) + # Not refreshed cache, so cache miss + assert len(registry_feature_views_cached) == 0 + assert len(registry_data_sources_cached) == 0 + sql_registry.refresh(project) + # Now objects exist + registry_feature_views_cached = sql_registry.list_feature_views( + project, allow_cache=True + ) + registry_data_sources_cached = sql_registry.list_data_sources( + project, allow_cache=True + ) + assert len(registry_feature_views_cached) == 1 + assert len(registry_data_sources_cached) == 1 + registry_feature_view = registry_feature_views_cached[0] + assert registry_feature_view.batch_source == batch_source + registry_data_source = registry_data_sources_cached[0] + assert registry_data_source == batch_source + + sql_registry.teardown() + + @pytest.mark.skipif( sys.platform == "darwin" and "GITHUB_REF" in os.environ, reason="does not run on mac github actions",