Skip to content

Commit

Permalink
Asynchronously refresh registry in transformation service
Browse files Browse the repository at this point in the history
Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed Nov 18, 2021
1 parent 7032559 commit e524fa0
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
6 changes: 4 additions & 2 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,10 @@ def refresh_registry(self):
downloaded synchronously, which may increase latencies if the triggering method is get_online_features()
"""
registry_config = self.config.get_registry_config()
self._registry = Registry(registry_config, repo_path=self.repo_path)
self._registry.refresh()
registry = Registry(registry_config, repo_path=self.repo_path)
registry.refresh()

self._registry = registry

@log_exceptions_and_usage
def list_entities(self, allow_cache: bool = False) -> List[Entity]:
Expand Down
17 changes: 15 additions & 2 deletions sdk/python/feast/infra/transformation_servers/app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import base64
import os
import tempfile
import threading
from pathlib import Path

import yaml
Expand Down Expand Up @@ -28,9 +29,10 @@
# Write registry contents for local registries
config_string = config_bytes.decode("utf-8")
raw_config = yaml.safe_load(config_string)
registry_path = raw_config["registry"]
registry = raw_config["registry"]
registry_path = registry["path"] if isinstance(registry, dict) else registry
registry_store_class = get_registry_store_class_from_scheme(registry_path)
if registry_store_class == LocalRegistryStore:
if registry_store_class == LocalRegistryStore and not os.path.exists(registry_path):
registry_base64 = os.environ[REGISTRY_ENV_NAME]
registry_bytes = base64.b64decode(registry_base64)
registry_dir = os.path.dirname(registry_path)
Expand All @@ -42,6 +44,17 @@
# Initialize the feature store
store = FeatureStore(repo_path=str(repo_path.resolve()))

if isinstance(registry, dict) and registry.get("cache_ttl_seconds", 0) > 0:
# disable synchronous refresh
store.config.registry.cache_ttl_seconds = 0

# enable asynchronous refresh
def async_refresh():
store.refresh_registry()
threading.Timer(registry["cache_ttl_seconds"], async_refresh).start()

async_refresh()

# Start the feature transformation server
port = (
os.environ.get(FEATURE_TRANSFORMATION_SERVER_PORT_ENV_NAME)
Expand Down

0 comments on commit e524fa0

Please sign in to comment.