Skip to content

Commit

Permalink
WIP ci: Docs on structuring feature repos
Browse files Browse the repository at this point in the history
Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals committed Aug 12, 2022
1 parent 0ed1a63 commit 2112bd9
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 47 deletions.
3 changes: 3 additions & 0 deletions docs/tutorials/structuring-feature-repos.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# How to Structure Feature Repositories

## Overview
92 changes: 65 additions & 27 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,18 @@ def format_options(self, ctx: click.Context, formatter: click.HelpFormatter):
default="info",
help="The logging level. One of DEBUG, INFO, WARNING, ERROR, and CRITICAL (case-insensitive).",
)
@click.option(
"--feature-store-yaml",
"-f",
help="Override the directory where the CLI should look for the feature_store.yaml file.",
)
@click.pass_context
def cli(ctx: click.Context, chdir: Optional[str], log_level: str):
def cli(
ctx: click.Context,
chdir: Optional[str],
log_level: str,
feature_store_yaml: Optional[str],
):
"""
Feast CLI
Expand All @@ -83,6 +93,11 @@ def cli(ctx: click.Context, chdir: Optional[str], log_level: str):
"""
ctx.ensure_object(dict)
ctx.obj["CHDIR"] = Path.cwd() if chdir is None else Path(chdir).absolute()
ctx.obj["FS_YAML_FILE"] = (
Path(feature_store_yaml).absolute()
if feature_store_yaml
else ctx.obj["CHDIR"] / "feature_store.yaml"
)
try:
level = getattr(logging, log_level.upper())
logging.basicConfig(
Expand Down Expand Up @@ -143,7 +158,8 @@ def ui(ctx: click.Context, host: str, port: int, registry_ttl_sec: int):
Shows the Feast UI over the current directory
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo))
# Pass in the registry_dump method to get around a circular dependency
store.serve_ui(
Expand All @@ -161,7 +177,8 @@ def endpoint(ctx: click.Context):
Display feature server endpoints
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo))
endpoint = store.get_feature_server_endpoint()
if endpoint is not None:
Expand All @@ -188,7 +205,8 @@ def data_source_describe(ctx: click.Context, name: str):
Describe a data source
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo))

try:
Expand Down Expand Up @@ -216,7 +234,8 @@ def data_source_list(ctx: click.Context):
List all data sources
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo))
table = []
for datasource in store.list_data_sources():
Expand Down Expand Up @@ -248,7 +267,8 @@ def entity_describe(ctx: click.Context, name: str):
Describe an entity
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo))

try:
Expand All @@ -271,7 +291,8 @@ def entity_list(ctx: click.Context):
List all entities
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo))
table = []
for entity in store.list_entities():
Expand All @@ -298,7 +319,8 @@ def feature_service_describe(ctx: click.Context, name: str):
Describe a feature service
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo))

try:
Expand All @@ -323,7 +345,8 @@ def feature_service_list(ctx: click.Context):
List all feature services
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo))
feature_services = []
for feature_service in store.list_feature_services():
Expand Down Expand Up @@ -355,7 +378,8 @@ def feature_view_describe(ctx: click.Context, name: str):
Describe a feature view
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo))

try:
Expand All @@ -378,7 +402,8 @@ def feature_view_list(ctx: click.Context):
List all feature views
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo))
table = []
for feature_view in [
Expand Down Expand Up @@ -421,7 +446,8 @@ def on_demand_feature_view_describe(ctx: click.Context, name: str):
[Experimental] Describe an on demand feature view
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo))

try:
Expand All @@ -446,7 +472,8 @@ def on_demand_feature_view_list(ctx: click.Context):
[Experimental] List all on demand feature views
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo))
table = []
for on_demand_feature_view in store.list_on_demand_feature_views():
Expand All @@ -469,8 +496,9 @@ def plan_command(ctx: click.Context, skip_source_validation: bool):
Create or update a feature store deployment
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
repo_config = load_repo_config(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
repo_config = load_repo_config(fs_yaml_file, repo)
try:
plan(repo_config, repo, skip_source_validation)
except FeastProviderLoginError as e:
Expand All @@ -489,8 +517,10 @@ def apply_total_command(ctx: click.Context, skip_source_validation: bool):
Create or update a feature store deployment
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
repo_config = load_repo_config(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)

repo_config = load_repo_config(fs_yaml_file, repo)
try:
apply_total(repo_config, repo, skip_source_validation)
except FeastProviderLoginError as e:
Expand All @@ -504,8 +534,9 @@ def teardown_command(ctx: click.Context):
Tear down deployed feature store infrastructure
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
repo_config = load_repo_config(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
repo_config = load_repo_config(fs_yaml_file, repo)

teardown(repo_config, repo)

Expand All @@ -517,8 +548,9 @@ def registry_dump_command(ctx: click.Context):
Print contents of the metadata registry
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
repo_config = load_repo_config(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
repo_config = load_repo_config(fs_yaml_file, repo)

click.echo(registry_dump(repo_config, repo_path=repo))

Expand All @@ -545,7 +577,8 @@ def materialize_command(
START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo))
store.materialize(
feature_views=None if not views else views,
Expand Down Expand Up @@ -573,7 +606,8 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List
END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo))
store.materialize_incremental(
feature_views=None if not views else views,
Expand Down Expand Up @@ -663,7 +697,8 @@ def serve_command(
):
"""Start a feature server locally on a given port."""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo))

if go:
Expand All @@ -685,7 +720,8 @@ def serve_command(
def serve_transformations_command(ctx: click.Context, port: int):
"""[Experimental] Start a feature consumption server locally on a given port."""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo))

store.serve_transformations(port)
Expand Down Expand Up @@ -724,7 +760,8 @@ def validate(
START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01'
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
store = FeatureStore(repo_path=str(repo))

feature_service = store.get_feature_service(name=feature_service)
Expand Down Expand Up @@ -766,7 +803,8 @@ def repo_upgrade(ctx: click.Context, write: bool):
Upgrade a feature repo in place.
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
fs_yaml_file = ctx.obj["FS_YAML_FILE"]
cli_check_repo(repo, fs_yaml_file)
try:
RepoUpgrader(repo, write).upgrade()
except FeastProviderLoginError as e:
Expand Down
32 changes: 21 additions & 11 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,30 +114,39 @@ class FeatureStore:
repo_path: Path
_registry: BaseRegistry
_provider: Provider
_go_server: "EmbeddedOnlineFeatureServer"
_go_server: Optional["EmbeddedOnlineFeatureServer"]

@log_exceptions
def __init__(
self,
repo_path: Optional[str] = None,
config: Optional[RepoConfig] = None,
fs_yaml_file: Optional[Path] = None,
):
"""
Creates a FeatureStore object.
Raises:
ValueError: If both or neither of repo_path and config are specified.
"""
if repo_path is not None and config is not None:
raise ValueError("You cannot specify both repo_path and config.")
if config is not None:
if fs_yaml_file is not None and config is not None:
raise ValueError("You cannot specify both fs_yaml_dir and config.")

if repo_path:
self.repo_path = Path(repo_path)
else:
self.repo_path = Path(os.getcwd())

if config is not None:
self.config = config
elif repo_path is not None:
self.repo_path = Path(repo_path)
self.config = load_repo_config(Path(repo_path))
elif fs_yaml_file is not None:
self.config = load_repo_config(fs_yaml_file, self.repo_path)
elif repo_path:
self.config = load_repo_config(
Path(repo_path) / "feature_store.yaml", self.repo_path
)
else:
raise ValueError("Please specify one of repo_path or config.")
raise ValueError("Please specify one of fs_yaml_dir or config.")

registry_config = self.config.get_registry_config()
if registry_config.registry_type == "sql":
Expand All @@ -146,7 +155,8 @@ def __init__(
r = Registry(registry_config, repo_path=self.repo_path)
r._initialize_registry(self.config.project)
self._registry = r
self._provider = get_provider(self.config, self.repo_path)

self._provider = get_provider(self.config)
self._go_server = None

@log_exceptions
Expand Down Expand Up @@ -1569,7 +1579,7 @@ def _get_online_features(
}

# If the embedded Go code is enabled, send request to it instead of going through regular Python logic.
if self.config.go_feature_retrieval:
if self.config.go_feature_retrieval and self._go_server:
self._lazy_init_go_server()

entity_native_values: Dict[str, List[Any]]
Expand Down Expand Up @@ -2221,7 +2231,7 @@ def serve(
) -> None:
"""Start the feature consumption server locally on a given port."""
type_ = type_.lower()
if self.config.go_feature_serving:
if self.config.go_feature_serving and self._go_server:
# Start go server instead of python if the flag is enabled
self._lazy_init_go_server()
enable_logging = (
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def get_feature_server_endpoint(self) -> Optional[str]:
return None


def get_provider(config: RepoConfig, repo_path: Path) -> Provider:
def get_provider(config: RepoConfig) -> Provider:
if "." not in config.provider:
if config.provider not in PROVIDERS_CLASS_FOR_TYPE:
raise errors.FeastProviderNotImplementedError(config.provider)
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,8 @@ def get_feature_server_config_from_type(feature_server_type: str):
return import_class(module_name, config_class_name, config_class_name)


def load_repo_config(repo_path: Path) -> RepoConfig:
config_path = repo_path / "feature_store.yaml"
def load_repo_config(fs_yaml_file: Path, repo_path: Path) -> RepoConfig:
config_path = fs_yaml_file

with open(config_path) as f:
raw_config = yaml.safe_load(os.path.expandvars(f.read()))
Expand Down
9 changes: 4 additions & 5 deletions sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,13 +325,12 @@ def registry_dump(repo_config: RepoConfig, repo_path: Path) -> str:
return json.dumps(registry_dict, indent=2, sort_keys=True)


def cli_check_repo(repo_path: Path):
def cli_check_repo(repo_path: Path, fs_yaml_file: Path):
sys.path.append(str(repo_path))
config_path = repo_path / "feature_store.yaml"
if not config_path.exists():
if not fs_yaml_file.exists():
print(
f"Can't find feature_store.yaml at {repo_path}. Make sure you're running feast from an initialized "
f"feast repository. "
f"Can't find feature repo configuration file at {fs_yaml_file}. "
"Make sure you're running feast from an initialized feast repository."
)
sys.exit(1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def _test_config(config_text, expect_error: Optional[str]):
error = None
rc = None
try:
rc = load_repo_config(repo_path)
rc = load_repo_config(repo_config, repo_path)
except FeastConfigError as e:
error = e

Expand Down

0 comments on commit 2112bd9

Please sign in to comment.