Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix SQL Registry cache miss #3482

Merged
merged 1 commit into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ def __init__(

registry_config = self.config.get_registry_config()
if registry_config.registry_type == "sql":
self._registry = SqlRegistry(registry_config, None)
self._registry = SqlRegistry(registry_config, self.config.project, None)
else:
r = Registry(registry_config, repo_path=self.repo_path)
r = Registry(self.config.project, registry_config, repo_path=self.repo_path)
r._initialize_registry(self.config.project)
self._registry = r

Expand Down Expand Up @@ -210,7 +210,9 @@ def refresh_registry(self):
downloaded synchronously, which may increase latencies if the triggering method is get_online_features().
"""
registry_config = self.config.get_registry_config()
registry = Registry(registry_config, repo_path=self.repo_path)
registry = Registry(
self.config.project, registry_config, repo_path=self.repo_path
)
registry.refresh(self.config.project)

self._registry = registry
Expand Down
24 changes: 23 additions & 1 deletion sdk/python/feast/infra/registry/proto_registry_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import List
import uuid
from typing import List, Optional

from feast import usage
from feast.data_source import DataSource
from feast.entity import Entity
from feast.errors import (
Expand All @@ -15,12 +17,32 @@
from feast.feature_view import FeatureView
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.project_metadata import ProjectMetadata
from feast.protos.feast.core.Registry_pb2 import ProjectMetadata as ProjectMetadataProto
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.request_feature_view import RequestFeatureView
from feast.saved_dataset import SavedDataset, ValidationReference
from feast.stream_feature_view import StreamFeatureView


def init_project_metadata(cached_registry_proto: RegistryProto, project: str):
new_project_uuid = f"{uuid.uuid4()}"
usage.set_current_project_uuid(new_project_uuid)
cached_registry_proto.project_metadata.append(
ProjectMetadata(project_name=project, project_uuid=new_project_uuid).to_proto()
)


def get_project_metadata(
registry_proto: Optional[RegistryProto], project: str
) -> Optional[ProjectMetadataProto]:
if not registry_proto:
return None
for pm in registry_proto.project_metadata:
if pm.project == project:
return pm
return None


def get_feature_service(
registry_proto: RegistryProto, name: str, project: str
) -> FeatureService:
Expand Down
61 changes: 29 additions & 32 deletions sdk/python/feast/infra/registry/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import uuid
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
Expand Down Expand Up @@ -44,7 +43,6 @@
from feast.infra.registry.registry_store import NoopRegistryStore
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.project_metadata import ProjectMetadata
from feast.protos.feast.core.Registry_pb2 import ProjectMetadata as ProjectMetadataProto
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.repo_config import RegistryConfig
from feast.repo_contents import RepoContents
Expand Down Expand Up @@ -143,25 +141,6 @@ def get_registry_store_class_from_scheme(registry_path: str):
return get_registry_store_class_from_type(registry_store_type)


def _get_project_metadata(
registry_proto: Optional[RegistryProto], project: str
) -> Optional[ProjectMetadataProto]:
if not registry_proto:
return None
for pm in registry_proto.project_metadata:
if pm.project == project:
return pm
return None


def _init_project_metadata(cached_registry_proto: RegistryProto, project: str):
new_project_uuid = f"{uuid.uuid4()}"
usage.set_current_project_uuid(new_project_uuid)
cached_registry_proto.project_metadata.append(
ProjectMetadata(project_name=project, project_uuid=new_project_uuid).to_proto()
)


class Registry(BaseRegistry):
def apply_user_metadata(
self,
Expand All @@ -184,19 +163,25 @@ def get_user_metadata(
cached_registry_proto_ttl: timedelta

def __new__(
cls, registry_config: Optional[RegistryConfig], repo_path: Optional[Path]
cls,
project: str,
registry_config: Optional[RegistryConfig],
repo_path: Optional[Path],
):
# We override __new__ so that we can inspect registry_config and create a SqlRegistry without callers
# needing to make any changes.
if registry_config and registry_config.registry_type == "sql":
from feast.infra.registry.sql import SqlRegistry

return SqlRegistry(registry_config, repo_path)
return SqlRegistry(registry_config, project, repo_path)
else:
return super(Registry, cls).__new__(cls)

def __init__(
self, registry_config: Optional[RegistryConfig], repo_path: Optional[Path]
self,
project: str,
registry_config: Optional[RegistryConfig],
repo_path: Optional[Path],
):
"""
Create the Registry object.
Expand Down Expand Up @@ -225,7 +210,7 @@ def __init__(
)

def clone(self) -> "Registry":
new_registry = Registry(None, None)
new_registry = Registry("project", None, None)
new_registry.cached_registry_proto_ttl = timedelta(seconds=0)
new_registry.cached_registry_proto = (
self.cached_registry_proto.__deepcopy__()
Expand All @@ -243,7 +228,7 @@ def _initialize_registry(self, project: str):
except FileNotFoundError:
registry_proto = RegistryProto()
registry_proto.registry_schema_version = REGISTRY_SCHEMA_VERSION
_init_project_metadata(registry_proto, project)
proto_registry_utils.init_project_metadata(registry_proto, project)
self._registry_store.update_registry_proto(registry_proto)

def update_infra(self, infra: Infra, project: str, commit: bool = True):
Expand Down Expand Up @@ -791,7 +776,12 @@ def _prepare_registry_for_changes(self, project: str):
"""Prepares the Registry for changes by refreshing the cache if necessary."""
try:
self._get_registry_proto(project=project, allow_cache=True)
if _get_project_metadata(self.cached_registry_proto, project) is None:
if (
proto_registry_utils.get_project_metadata(
self.cached_registry_proto, project
)
is None
):
# Project metadata not initialized yet. Try pulling without cache
self._get_registry_proto(project=project, allow_cache=False)
except FileNotFoundError:
Expand All @@ -802,8 +792,15 @@ def _prepare_registry_for_changes(self, project: str):

# Initialize project metadata if needed
assert self.cached_registry_proto
if _get_project_metadata(self.cached_registry_proto, project) is None:
_init_project_metadata(self.cached_registry_proto, project)
if (
proto_registry_utils.get_project_metadata(
self.cached_registry_proto, project
)
is None
):
proto_registry_utils.init_project_metadata(
self.cached_registry_proto, project
)
self.commit()

return self.cached_registry_proto
Expand Down Expand Up @@ -836,7 +833,7 @@ def _get_registry_proto(
)

if project:
old_project_metadata = _get_project_metadata(
old_project_metadata = proto_registry_utils.get_project_metadata(
registry_proto=self.cached_registry_proto, project=project
)

Expand All @@ -854,13 +851,13 @@ def _get_registry_proto(
if not project:
return registry_proto

project_metadata = _get_project_metadata(
project_metadata = proto_registry_utils.get_project_metadata(
registry_proto=registry_proto, project=project
)
if project_metadata:
usage.set_current_project_uuid(project_metadata.project_uuid)
else:
_init_project_metadata(registry_proto, project)
proto_registry_utils.init_project_metadata(registry_proto, project)
self.commit()

return registry_proto
Expand Down
25 changes: 23 additions & 2 deletions sdk/python/feast/infra/registry/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,19 +180,24 @@ class FeastMetadataKeys(Enum):

class SqlRegistry(BaseRegistry):
def __init__(
self, registry_config: Optional[RegistryConfig], repo_path: Optional[Path]
self,
registry_config: Optional[RegistryConfig],
project: str,
repo_path: Optional[Path],
):
assert registry_config is not None, "SqlRegistry needs a valid registry_config"
self.engine: Engine = create_engine(registry_config.path, echo=False)
metadata.create_all(self.engine)
self.cached_registry_proto = self.proto()
proto_registry_utils.init_project_metadata(self.cached_registry_proto, project)
self.cached_registry_proto_created = datetime.utcnow()
self._refresh_lock = Lock()
self.cached_registry_proto_ttl = timedelta(
seconds=registry_config.cache_ttl_seconds
if registry_config.cache_ttl_seconds is not None
else 0
)
self.project = project

def teardown(self):
for t in {
Expand All @@ -210,6 +215,16 @@ def teardown(self):
conn.execute(stmt)

def refresh(self, project: Optional[str] = None):
if project:
project_metadata = proto_registry_utils.get_project_metadata(
registry_proto=self.cached_registry_proto, project=project
)
if project_metadata:
usage.set_current_project_uuid(project_metadata.project_uuid)
else:
proto_registry_utils.init_project_metadata(
self.cached_registry_proto, project
)
self.cached_registry_proto = self.proto()
self.cached_registry_proto_created = datetime.utcnow()

Expand Down Expand Up @@ -816,7 +831,13 @@ def proto(self) -> RegistryProto:
]:
objs: List[Any] = lister(project) # type: ignore
if objs:
registry_proto_field.extend([obj.to_proto() for obj in objs])
obj_protos = [obj.to_proto() for obj in objs]
for obj_proto in obj_protos:
if "spec" in obj_proto.DESCRIPTOR.fields_by_name:
obj_proto.spec.project = project
else:
obj_proto.project = project
registry_proto_field.extend(obj_protos)

# This is suuuper jank. Because of https://github.com/feast-dev/feast/issues/2783,
# the registry proto only has a single infra field, which we're currently setting as the "last" project.
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ def registry_dump(repo_config: RepoConfig, repo_path: Path) -> str:
"""For debugging only: output contents of the metadata registry"""
registry_config = repo_config.get_registry_config()
project = repo_config.project
registry = Registry(registry_config=registry_config, repo_path=repo_path)
registry = Registry(project, registry_config=registry_config, repo_path=repo_path)
registry_dict = registry.to_dict(project=project)
return json.dumps(registry_dict, indent=2, sort_keys=True)

Expand Down
4 changes: 2 additions & 2 deletions sdk/python/tests/integration/registration/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def gcs_registry() -> Registry:
registry_config = RegistryConfig(
path=f"gs://{bucket_name}/registry.db", cache_ttl_seconds=600
)
return Registry(registry_config, None)
return Registry("project", registry_config, None)


@pytest.fixture
Expand All @@ -57,7 +57,7 @@ def s3_registry() -> Registry:
path=f"{aws_registry_path}/{int(time.time() * 1000)}/registry.db",
cache_ttl_seconds=600,
)
return Registry(registry_config, None)
return Registry("project", registry_config, None)


@pytest.mark.integration
Expand Down
8 changes: 4 additions & 4 deletions sdk/python/tests/unit/infra/test_local_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
def local_registry() -> Registry:
fd, registry_path = mkstemp()
registry_config = RegistryConfig(path=registry_path, cache_ttl_seconds=600)
return Registry(registry_config, None)
return Registry("project", registry_config, None)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -443,7 +443,7 @@ def test_apply_data_source(test_registry: Registry):
def test_commit():
fd, registry_path = mkstemp()
registry_config = RegistryConfig(path=registry_path, cache_ttl_seconds=600)
test_registry = Registry(registry_config, None)
test_registry = Registry("project", registry_config, None)

entity = Entity(
name="driver_car_id",
Expand Down Expand Up @@ -484,7 +484,7 @@ def test_commit():
validate_project_uuid(project_uuid, test_registry)

# Create new registry that points to the same store
registry_with_same_store = Registry(registry_config, None)
registry_with_same_store = Registry("project", registry_config, None)

# Retrieving the entity should fail since the store is empty
entities = registry_with_same_store.list_entities(project)
Expand All @@ -495,7 +495,7 @@ def test_commit():
test_registry.commit()

# Reconstruct the new registry in order to read the newly written store
registry_with_same_store = Registry(registry_config, None)
registry_with_same_store = Registry("project", registry_config, None)

# Retrieving the entity should now succeed
entities = registry_with_same_store.list_entities(project)
Expand Down
Loading