diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 3153f02e51..9519f181d7 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -663,6 +663,14 @@ def init_command(project_directory, minimal: bool, template: str): show_default=True, help="Timeout for keep alive", ) +@click.option( + "--registry_ttl_sec", + "-r", + help="Number of seconds after which the registry is refreshed", + type=click.INT, + default=5, + show_default=True, +) @click.pass_context def serve_command( ctx: click.Context, @@ -673,6 +681,7 @@ def serve_command( no_feature_log: bool, workers: int, keep_alive_timeout: int, + registry_ttl_sec: int = 5, ): """Start a feature server locally on a given port.""" store = create_feature_store(ctx) @@ -685,6 +694,7 @@ def serve_command( no_feature_log=no_feature_log, workers=workers, keep_alive_timeout=keep_alive_timeout, + registry_ttl_sec=registry_ttl_sec, ) diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index 7c638dd248..618aefb2f2 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -1,4 +1,5 @@ import json +import threading import traceback import warnings from typing import List, Optional @@ -44,14 +45,37 @@ class MaterializeIncrementalRequest(BaseModel): feature_views: Optional[List[str]] = None -def get_app(store: "feast.FeatureStore"): +def get_app(store: "feast.FeatureStore", registry_ttl_sec: int = 5): proto_json.patch() app = FastAPI() + # Asynchronously refresh registry, notifying shutdown and canceling the active timer if the app is shutting down + registry_proto = None + shutting_down = False + active_timer: Optional[threading.Timer] = None async def get_body(request: Request): return await request.body() + def async_refresh(): + store.refresh_registry() + nonlocal registry_proto + registry_proto = store.registry.proto() + if shutting_down: + return + nonlocal active_timer + active_timer = threading.Timer(registry_ttl_sec, async_refresh) + active_timer.start() + + @app.on_event("shutdown") + def shutdown_event(): + nonlocal shutting_down + shutting_down = True + if active_timer: + active_timer.cancel() + + async_refresh() + @app.post("/get-online-features") def get_online_features(body=Depends(get_body)): try: @@ -180,7 +204,10 @@ def materialize_incremental(body=Depends(get_body)): class FeastServeApplication(gunicorn.app.base.BaseApplication): def __init__(self, store: "feast.FeatureStore", **options): - self._app = get_app(store=store) + self._app = get_app( + store=store, + registry_ttl_sec=options.get("registry_ttl_sec", 5), + ) self._options = options super().__init__() @@ -202,6 +229,7 @@ def start_server( no_access_log: bool, workers: int, keep_alive_timeout: int, + registry_ttl_sec: int = 5, ): FeastServeApplication( store=store, @@ -209,4 +237,5 @@ def start_server( accesslog=None if no_access_log else "-", workers=workers, keepalive=keep_alive_timeout, + registry_ttl_sec=registry_ttl_sec, ).run() diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 4b8200d96f..d3f98f8032 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -2228,6 +2228,7 @@ def serve( no_feature_log: bool, workers: int, keep_alive_timeout: int, + registry_ttl_sec: int, ) -> None: """Start the feature consumption server locally on a given port.""" type_ = type_.lower() @@ -2243,6 +2244,7 @@ def serve( no_access_log=no_access_log, workers=workers, keep_alive_timeout=keep_alive_timeout, + registry_ttl_sec=registry_ttl_sec, ) @log_exceptions_and_usage