diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 15de4a5d29..53bae5913a 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -141,8 +141,10 @@ def refresh_registry(self): downloaded synchronously, which may increase latencies if the triggering method is get_online_features() """ registry_config = self.config.get_registry_config() - self._registry = Registry(registry_config, repo_path=self.repo_path) - self._registry.refresh() + registry = Registry(registry_config, repo_path=self.repo_path) + registry.refresh() + + self._registry = registry @log_exceptions_and_usage def list_entities(self, allow_cache: bool = False) -> List[Entity]: diff --git a/sdk/python/feast/infra/transformation_servers/app.py b/sdk/python/feast/infra/transformation_servers/app.py index 19728429da..acfb0959ba 100644 --- a/sdk/python/feast/infra/transformation_servers/app.py +++ b/sdk/python/feast/infra/transformation_servers/app.py @@ -1,6 +1,7 @@ import base64 import os import tempfile +import threading from pathlib import Path import yaml @@ -28,9 +29,10 @@ # Write registry contents for local registries config_string = config_bytes.decode("utf-8") raw_config = yaml.safe_load(config_string) -registry_path = raw_config["registry"] +registry = raw_config["registry"] +registry_path = registry["path"] if isinstance(registry, dict) else registry registry_store_class = get_registry_store_class_from_scheme(registry_path) -if registry_store_class == LocalRegistryStore: +if registry_store_class == LocalRegistryStore and not os.path.exists(registry_path): registry_base64 = os.environ[REGISTRY_ENV_NAME] registry_bytes = base64.b64decode(registry_base64) registry_dir = os.path.dirname(registry_path) @@ -42,6 +44,17 @@ # Initialize the feature store store = FeatureStore(repo_path=str(repo_path.resolve())) +if isinstance(registry, dict) and registry.get("cache_ttl_seconds", 0) > 0: + # disable synchronous refresh + store.config.registry.cache_ttl_seconds = 0 + + # enable asynchronous refresh + def async_refresh(): + store.refresh_registry() + threading.Timer(registry["cache_ttl_seconds"], async_refresh).start() + + async_refresh() + # Start the feature transformation server port = ( os.environ.get(FEATURE_TRANSFORMATION_SERVER_PORT_ENV_NAME)