Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronously refresh registry in transformation service #2060

Merged
merged 1 commit into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,10 @@ def refresh_registry(self):
downloaded synchronously, which may increase latencies if the triggering method is get_online_features()
"""
registry_config = self.config.get_registry_config()
self._registry = Registry(registry_config, repo_path=self.repo_path)
self._registry.refresh()
registry = Registry(registry_config, repo_path=self.repo_path)
registry.refresh()

self._registry = registry
Comment on lines +144 to +147
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reasoning for this refactoring?

Copy link
Collaborator Author

@pyalex pyalex Nov 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there're multiple threads accessing self._registry it makes sense to fully construct replacement (actual proto is being loaded in registry.refresh, so before that new Registry is still half baked) before making it available for everybody else


@log_exceptions_and_usage
def list_entities(self, allow_cache: bool = False) -> List[Entity]:
Expand Down
17 changes: 15 additions & 2 deletions sdk/python/feast/infra/transformation_servers/app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import base64
import os
import tempfile
import threading
from pathlib import Path

import yaml
Expand Down Expand Up @@ -28,9 +29,10 @@
# Write registry contents for local registries
config_string = config_bytes.decode("utf-8")
raw_config = yaml.safe_load(config_string)
registry_path = raw_config["registry"]
registry = raw_config["registry"]
registry_path = registry["path"] if isinstance(registry, dict) else registry
registry_store_class = get_registry_store_class_from_scheme(registry_path)
if registry_store_class == LocalRegistryStore:
if registry_store_class == LocalRegistryStore and not os.path.exists(registry_path):
registry_base64 = os.environ[REGISTRY_ENV_NAME]
registry_bytes = base64.b64decode(registry_base64)
registry_dir = os.path.dirname(registry_path)
Expand All @@ -42,6 +44,17 @@
# Initialize the feature store
store = FeatureStore(repo_path=str(repo_path.resolve()))

if isinstance(registry, dict) and registry.get("cache_ttl_seconds", 0) > 0:
# disable synchronous refresh
store.config.registry.cache_ttl_seconds = 0

# enable asynchronous refresh
def async_refresh():
store.refresh_registry()
threading.Timer(registry["cache_ttl_seconds"], async_refresh).start()

async_refresh()

# Start the feature transformation server
port = (
os.environ.get(FEATURE_TRANSFORMATION_SERVER_PORT_ENV_NAME)
Expand Down