diff --git a/docs/tutorials/structuring-feature-repos.md b/docs/tutorials/structuring-feature-repos.md new file mode 100644 index 0000000000..49ed3dee9a --- /dev/null +++ b/docs/tutorials/structuring-feature-repos.md @@ -0,0 +1,3 @@ +# How to Structure Feature Repositories + +## Overview diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index c6a301e958..052bb9ed97 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -72,8 +72,18 @@ def format_options(self, ctx: click.Context, formatter: click.HelpFormatter): default="info", help="The logging level. One of DEBUG, INFO, WARNING, ERROR, and CRITICAL (case-insensitive).", ) +@click.option( + "--feature-store-yaml", + "-f", + help="Override the directory where the CLI should look for the feature_store.yaml file.", +) @click.pass_context -def cli(ctx: click.Context, chdir: Optional[str], log_level: str): +def cli( + ctx: click.Context, + chdir: Optional[str], + log_level: str, + feature_store_yaml: Optional[str], +): """ Feast CLI @@ -83,6 +93,11 @@ def cli(ctx: click.Context, chdir: Optional[str], log_level: str): """ ctx.ensure_object(dict) ctx.obj["CHDIR"] = Path.cwd() if chdir is None else Path(chdir).absolute() + ctx.obj["FS_YAML_FILE"] = ( + Path(feature_store_yaml).absolute() + if feature_store_yaml + else ctx.obj["CHDIR"] / "feature_store.yaml" + ) try: level = getattr(logging, log_level.upper()) logging.basicConfig( @@ -143,7 +158,8 @@ def ui(ctx: click.Context, host: str, port: int, registry_ttl_sec: int): Shows the Feast UI over the current directory """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) store = FeatureStore(repo_path=str(repo)) # Pass in the registry_dump method to get around a circular dependency store.serve_ui( @@ -161,7 +177,8 @@ def endpoint(ctx: click.Context): Display feature server endpoints """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) store = FeatureStore(repo_path=str(repo)) endpoint = store.get_feature_server_endpoint() if endpoint is not None: @@ -188,7 +205,8 @@ def data_source_describe(ctx: click.Context, name: str): Describe a data source """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) store = FeatureStore(repo_path=str(repo)) try: @@ -216,7 +234,8 @@ def data_source_list(ctx: click.Context): List all data sources """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) store = FeatureStore(repo_path=str(repo)) table = [] for datasource in store.list_data_sources(): @@ -248,7 +267,8 @@ def entity_describe(ctx: click.Context, name: str): Describe an entity """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) store = FeatureStore(repo_path=str(repo)) try: @@ -271,7 +291,8 @@ def entity_list(ctx: click.Context): List all entities """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) store = FeatureStore(repo_path=str(repo)) table = [] for entity in store.list_entities(): @@ -298,7 +319,8 @@ def feature_service_describe(ctx: click.Context, name: str): Describe a feature service """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) store = FeatureStore(repo_path=str(repo)) try: @@ -323,7 +345,8 @@ def feature_service_list(ctx: click.Context): List all feature services """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) store = FeatureStore(repo_path=str(repo)) feature_services = [] for feature_service in store.list_feature_services(): @@ -355,7 +378,8 @@ def feature_view_describe(ctx: click.Context, name: str): Describe a feature view """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) store = FeatureStore(repo_path=str(repo)) try: @@ -378,7 +402,8 @@ def feature_view_list(ctx: click.Context): List all feature views """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) store = FeatureStore(repo_path=str(repo)) table = [] for feature_view in [ @@ -421,7 +446,8 @@ def on_demand_feature_view_describe(ctx: click.Context, name: str): [Experimental] Describe an on demand feature view """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) store = FeatureStore(repo_path=str(repo)) try: @@ -446,7 +472,8 @@ def on_demand_feature_view_list(ctx: click.Context): [Experimental] List all on demand feature views """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) store = FeatureStore(repo_path=str(repo)) table = [] for on_demand_feature_view in store.list_on_demand_feature_views(): @@ -469,8 +496,9 @@ def plan_command(ctx: click.Context, skip_source_validation: bool): Create or update a feature store deployment """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - repo_config = load_repo_config(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + repo_config = load_repo_config(fs_yaml_file, repo) try: plan(repo_config, repo, skip_source_validation) except FeastProviderLoginError as e: @@ -489,8 +517,10 @@ def apply_total_command(ctx: click.Context, skip_source_validation: bool): Create or update a feature store deployment """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - repo_config = load_repo_config(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + + repo_config = load_repo_config(fs_yaml_file, repo) try: apply_total(repo_config, repo, skip_source_validation) except FeastProviderLoginError as e: @@ -504,8 +534,9 @@ def teardown_command(ctx: click.Context): Tear down deployed feature store infrastructure """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - repo_config = load_repo_config(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + repo_config = load_repo_config(fs_yaml_file, repo) teardown(repo_config, repo) @@ -517,8 +548,9 @@ def registry_dump_command(ctx: click.Context): Print contents of the metadata registry """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - repo_config = load_repo_config(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + repo_config = load_repo_config(fs_yaml_file, repo) click.echo(registry_dump(repo_config, repo_path=repo)) @@ -545,7 +577,8 @@ def materialize_command( START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) store = FeatureStore(repo_path=str(repo)) store.materialize( feature_views=None if not views else views, @@ -573,7 +606,8 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) store = FeatureStore(repo_path=str(repo)) store.materialize_incremental( feature_views=None if not views else views, @@ -663,7 +697,8 @@ def serve_command( ): """Start a feature server locally on a given port.""" repo = ctx.obj["CHDIR"] - cli_check_repo(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) store = FeatureStore(repo_path=str(repo)) if go: @@ -685,7 +720,8 @@ def serve_command( def serve_transformations_command(ctx: click.Context, port: int): """[Experimental] Start a feature consumption server locally on a given port.""" repo = ctx.obj["CHDIR"] - cli_check_repo(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) store = FeatureStore(repo_path=str(repo)) store.serve_transformations(port) @@ -724,7 +760,8 @@ def validate( START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) store = FeatureStore(repo_path=str(repo)) feature_service = store.get_feature_service(name=feature_service) @@ -766,7 +803,8 @@ def repo_upgrade(ctx: click.Context, write: bool): Upgrade a feature repo in place. """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) try: RepoUpgrader(repo, write).upgrade() except FeastProviderLoginError as e: diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index ac682fb6cd..592a8e8bc4 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -114,13 +114,14 @@ class FeatureStore: repo_path: Path _registry: BaseRegistry _provider: Provider - _go_server: "EmbeddedOnlineFeatureServer" + _go_server: Optional["EmbeddedOnlineFeatureServer"] @log_exceptions def __init__( self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None, + fs_yaml_file: Optional[Path] = None, ): """ Creates a FeatureStore object. @@ -128,16 +129,24 @@ def __init__( Raises: ValueError: If both or neither of repo_path and config are specified. """ - if repo_path is not None and config is not None: - raise ValueError("You cannot specify both repo_path and config.") - if config is not None: + if fs_yaml_file is not None and config is not None: + raise ValueError("You cannot specify both fs_yaml_dir and config.") + + if repo_path: + self.repo_path = Path(repo_path) + else: self.repo_path = Path(os.getcwd()) + + if config is not None: self.config = config - elif repo_path is not None: - self.repo_path = Path(repo_path) - self.config = load_repo_config(Path(repo_path)) + elif fs_yaml_file is not None: + self.config = load_repo_config(fs_yaml_file, self.repo_path) + elif repo_path: + self.config = load_repo_config( + Path(repo_path) / "feature_store.yaml", self.repo_path + ) else: - raise ValueError("Please specify one of repo_path or config.") + raise ValueError("Please specify one of fs_yaml_dir or config.") registry_config = self.config.get_registry_config() if registry_config.registry_type == "sql": @@ -146,7 +155,8 @@ def __init__( r = Registry(registry_config, repo_path=self.repo_path) r._initialize_registry(self.config.project) self._registry = r - self._provider = get_provider(self.config, self.repo_path) + + self._provider = get_provider(self.config) self._go_server = None @log_exceptions @@ -1569,7 +1579,7 @@ def _get_online_features( } # If the embedded Go code is enabled, send request to it instead of going through regular Python logic. - if self.config.go_feature_retrieval: + if self.config.go_feature_retrieval and self._go_server: self._lazy_init_go_server() entity_native_values: Dict[str, List[Any]] @@ -2221,7 +2231,7 @@ def serve( ) -> None: """Start the feature consumption server locally on a given port.""" type_ = type_.lower() - if self.config.go_feature_serving: + if self.config.go_feature_serving and self._go_server: # Start go server instead of python if the flag is enabled self._lazy_init_go_server() enable_logging = ( diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index e99a09a9e2..c5f9380677 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -297,7 +297,7 @@ def get_feature_server_endpoint(self) -> Optional[str]: return None -def get_provider(config: RepoConfig, repo_path: Path) -> Provider: +def get_provider(config: RepoConfig) -> Provider: if "." not in config.provider: if config.provider not in PROVIDERS_CLASS_FOR_TYPE: raise errors.FeastProviderNotImplementedError(config.provider) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 34df1a215f..5c0d350309 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -483,8 +483,8 @@ def get_feature_server_config_from_type(feature_server_type: str): return import_class(module_name, config_class_name, config_class_name) -def load_repo_config(repo_path: Path) -> RepoConfig: - config_path = repo_path / "feature_store.yaml" +def load_repo_config(fs_yaml_file: Path, repo_path: Path) -> RepoConfig: + config_path = fs_yaml_file with open(config_path) as f: raw_config = yaml.safe_load(os.path.expandvars(f.read())) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 91cab2e992..916250542d 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -325,13 +325,12 @@ def registry_dump(repo_config: RepoConfig, repo_path: Path) -> str: return json.dumps(registry_dict, indent=2, sort_keys=True) -def cli_check_repo(repo_path: Path): +def cli_check_repo(repo_path: Path, fs_yaml_file: Path): sys.path.append(str(repo_path)) - config_path = repo_path / "feature_store.yaml" - if not config_path.exists(): + if not fs_yaml_file.exists(): print( - f"Can't find feature_store.yaml at {repo_path}. Make sure you're running feast from an initialized " - f"feast repository. " + f"Can't find feature repo configuration file at {fs_yaml_file}. " + "Make sure you're running feast from an initialized feast repository." ) sys.exit(1) diff --git a/sdk/python/tests/unit/infra/scaffolding/test_repo_config.py b/sdk/python/tests/unit/infra/scaffolding/test_repo_config.py index 8cbe461b0f..6634612994 100644 --- a/sdk/python/tests/unit/infra/scaffolding/test_repo_config.py +++ b/sdk/python/tests/unit/infra/scaffolding/test_repo_config.py @@ -21,7 +21,7 @@ def _test_config(config_text, expect_error: Optional[str]): error = None rc = None try: - rc = load_repo_config(repo_path) + rc = load_repo_config(repo_config, repo_path) except FeastConfigError as e: error = e