diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 8b7b5c5f11..12aeada765 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -15,6 +15,7 @@ import json import logging import sys +from pathlib import Path from typing import Dict import click @@ -26,6 +27,8 @@ from feast.entity import Entity from feast.feature_table import FeatureTable from feast.loaders.yaml import yaml_loader +from feast.repo_config import load_repo_config +from feast.repo_operations import apply_total, registry_dump, teardown _logger = logging.getLogger(__name__) @@ -353,5 +356,38 @@ def project_list(): print(tabulate(table, headers=["NAME"], tablefmt="plain")) +@cli.command("apply") +@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True)) +def apply_total_command(repo_path: str): + """ + Applies a feature repo + """ + repo_config = load_repo_config(Path(repo_path)) + + apply_total(repo_config, Path(repo_path).resolve()) + + +@cli.command("teardown") +@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True)) +def teardown_command(repo_path: str): + """ + Tear down infra for a feature repo + """ + repo_config = load_repo_config(Path(repo_path)) + + teardown(repo_config, Path(repo_path).resolve()) + + +@cli.command("registry-dump") +@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True)) +def registry_dump_command(repo_path: str): + """ + Prints contents of the metadata registry + """ + repo_config = load_repo_config(Path(repo_path)) + + registry_dump(repo_config) + + if __name__ == "__main__": cli() diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index aea899e1ab..0f76c6eeef 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -13,7 +13,7 @@ # limitations under the License. from typing import Optional -from feast.feature_store_config import Config +from feast.repo_config import RepoConfig, load_repo_config class FeatureStore: @@ -22,13 +22,13 @@ class FeatureStore: """ def __init__( - self, config_path: Optional[str], config: Optional[Config], + self, config_path: Optional[str], config: Optional[RepoConfig], ): if config_path is None or config is None: raise Exception("You cannot specify both config_path and config") if config is not None: self.config = config elif config_path is not None: - self.config = Config.from_path(config_path) + self.config = load_repo_config(config_path) else: - self.config = Config() + self.config = RepoConfig() diff --git a/sdk/python/feast/feature_store_config.py b/sdk/python/feast/feature_store_config.py deleted file mode 100644 index b26d1e0bd5..0000000000 --- a/sdk/python/feast/feature_store_config.py +++ /dev/null @@ -1,53 +0,0 @@ -# Copyright 2019 The Feast Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from typing import Optional - -import yaml - - -class Config: - """ - Configuration object that contains all possible configuration options for a FeatureStore. - """ - - def __init__( - self, - provider: Optional[str], - online_store: Optional[str], - metadata_store: Optional[str], - ): - self.provider = provider if (provider is not None) else "local" - self.online_store = online_store if (online_store is not None) else "local" - self.metadata_store = ( - metadata_store if (metadata_store is not None) else "./metadata_store" - ) - - @classmethod - def from_path(cls, config_path: str): - """ - Construct the configuration object from a filepath containing a yaml file. - - Example yaml file: - - provider: gcp - online_store: firestore - metadata_store: gs://my_bucket/metadata_store - """ - with open(config_path, "r") as f: - config_dict = yaml.safe_load(f) - return cls( - provider=config_dict.get("provider"), - online_store=config_dict.get("online_store"), - metadata_store=config_dict.get("metadata_store"), - ) diff --git a/sdk/python/feast/infra/__init__.py b/sdk/python/feast/infra/__init__.py new file mode 100644 index 0000000000..c49186db43 --- /dev/null +++ b/sdk/python/feast/infra/__init__.py @@ -0,0 +1 @@ +# from .provider import Provider diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py new file mode 100644 index 0000000000..8c3882bb1e --- /dev/null +++ b/sdk/python/feast/infra/gcp.py @@ -0,0 +1,76 @@ +from datetime import datetime +from typing import List, Optional + +from feast import FeatureTable +from feast.infra.provider import Provider +from feast.repo_config import DatastoreOnlineStoreConfig + + +def _delete_all_values(client, key) -> None: + """ + Delete all data under the key path in datastore. + """ + while True: + query = client.query(kind="Value", ancestor=key) + entities = list(query.fetch(limit=1000)) + if not entities: + return + + for entity in entities: + print("Deleting: {}".format(entity)) + client.delete(entity.key) + + +class Gcp(Provider): + _project_id: Optional[str] + + def __init__(self, config: Optional[DatastoreOnlineStoreConfig]): + if config: + self._project_id = config.project_id + else: + self._project_id = None + + def _initialize_client(self): + from google.cloud import datastore + + if self._project_id is not None: + return datastore.Client(self.project_id) + else: + return datastore.Client() + + def update_infra( + self, + project: str, + tables_to_delete: List[FeatureTable], + tables_to_keep: List[FeatureTable], + ): + from google.cloud import datastore + + client = self._initialize_client() + + for table in tables_to_keep: + key = client.key("FeastProject", project, "FeatureTable", table.name) + entity = datastore.Entity(key=key) + entity.update({"created_at": datetime.utcnow()}) + client.put(entity) + + for table in tables_to_delete: + _delete_all_values( + client, client.key("FeastProject", project, "FeatureTable", table.name) + ) + + # Delete the table metadata datastore entity + key = client.key("FeastProject", project, "FeatureTable", table.name) + client.delete(key) + + def teardown_infra(self, project: str, tables: List[FeatureTable]) -> None: + client = self._initialize_client() + + for table in tables: + _delete_all_values( + client, client.key("FeastProject", project, "FeatureTable", table.name) + ) + + # Delete the table metadata datastore entity + key = client.key("FeastProject", project, "FeatureTable", table.name) + client.delete(key) diff --git a/sdk/python/feast/infra/local_sqlite.py b/sdk/python/feast/infra/local_sqlite.py new file mode 100644 index 0000000000..fbd43a6dfb --- /dev/null +++ b/sdk/python/feast/infra/local_sqlite.py @@ -0,0 +1,36 @@ +import os +import sqlite3 +from typing import List + +from feast import FeatureTable +from feast.infra.provider import Provider +from feast.repo_config import LocalOnlineStoreConfig + + +def _table_id(project: str, table: FeatureTable) -> str: + return f"{project}_{table.name}" + + +class LocalSqlite(Provider): + _db_path: str + + def __init__(self, config: LocalOnlineStoreConfig): + self._db_path = config.path + + def update_infra( + self, + project: str, + tables_to_delete: List[FeatureTable], + tables_to_keep: List[FeatureTable], + ): + conn = sqlite3.connect(self._db_path) + for table in tables_to_keep: + conn.execute( + f"CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (key BLOB, value BLOB)" + ) + + for table in tables_to_delete: + conn.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}") + + def teardown_infra(self, project: str, tables: List[FeatureTable]) -> None: + os.unlink(self._db_path) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py new file mode 100644 index 0000000000..52689e9543 --- /dev/null +++ b/sdk/python/feast/infra/provider.py @@ -0,0 +1,49 @@ +import abc +from typing import List + +from feast import FeatureTable +from feast.repo_config import RepoConfig + + +class Provider(abc.ABC): + @abc.abstractmethod + def update_infra( + self, + project: str, + tables_to_delete: List[FeatureTable], + tables_to_keep: List[FeatureTable], + ): + """ + Reconcile cloud resources with the objects declared in the feature repo. + + Args: + tables_to_delete: Tables that were deleted from the feature repo, so provider needs to + clean up the corresponding cloud resources. + tables_to_keep: Tables that are still in the feature repo. Depending on implementation, + provider may or may not need to update the corresponding resources. + """ + ... + + @abc.abstractmethod + def teardown_infra(self, project: str, tables: List[FeatureTable]): + """ + Tear down all cloud resources for a repo. + + Args: + tables: Tables that are declared in the feature repo. + """ + ... + + +def get_provider(config: RepoConfig) -> Provider: + if config.provider == "gcp": + from feast.infra.gcp import Gcp + + return Gcp(config.online_store.datastore) + elif config.provider == "local": + from feast.infra.local_sqlite import LocalSqlite + + assert config.online_store.local is not None + return LocalSqlite(config.online_store.local) + else: + raise ValueError(config) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py new file mode 100644 index 0000000000..aae9c217d1 --- /dev/null +++ b/sdk/python/feast/repo_config.py @@ -0,0 +1,31 @@ +from pathlib import Path +from typing import NamedTuple, Optional + +import yaml +from bindr import bind + + +class LocalOnlineStoreConfig(NamedTuple): + path: str + + +class DatastoreOnlineStoreConfig(NamedTuple): + project_id: str + + +class OnlineStoreConfig(NamedTuple): + datastore: Optional[DatastoreOnlineStoreConfig] = None + local: Optional[LocalOnlineStoreConfig] = None + + +class RepoConfig(NamedTuple): + metadata_store: str + project: str + provider: str + online_store: OnlineStoreConfig + + +def load_repo_config(repo_path: Path) -> RepoConfig: + with open(repo_path / "feature_store.yaml") as f: + raw_config = yaml.safe_load(f) + return bind(RepoConfig, raw_config) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py new file mode 100644 index 0000000000..706458e6c6 --- /dev/null +++ b/sdk/python/feast/repo_operations.py @@ -0,0 +1,98 @@ +import importlib +import os +import sys +from pathlib import Path +from typing import List, NamedTuple + +from feast import Entity, FeatureTable +from feast.infra.provider import get_provider +from feast.registry import Registry +from feast.repo_config import RepoConfig + + +def py_path_to_module(path: Path, repo_root: Path) -> str: + return ( + str(path.relative_to(repo_root))[: -len(".py")] + .replace("./", "") + .replace("/", ".") + ) + + +class ParsedRepo(NamedTuple): + feature_tables: List[FeatureTable] + entities: List[Entity] + + +def parse_repo(repo_root: Path) -> ParsedRepo: + """ Collect feature table definitions from feature repo """ + res = ParsedRepo(feature_tables=[], entities=[]) + + # FIXME: process subdirs but exclude hidden ones + repo_files = [p.resolve() for p in repo_root.glob("*.py")] + + for repo_file in repo_files: + + module_path = py_path_to_module(repo_file, repo_root) + + print(f"Processing {repo_file} as {module_path}") + module = importlib.import_module(module_path) + + for attr_name in dir(module): + obj = getattr(module, attr_name) + if isinstance(obj, FeatureTable): + res.feature_tables.append(obj) + elif isinstance(obj, Entity): + res.entities.append(obj) + return res + + +def apply_total(repo_config: RepoConfig, repo_path: Path): + os.chdir(repo_path) + sys.path.append("") + + project = repo_config.project + registry = Registry(repo_config.metadata_store) + repo = parse_repo(repo_path) + + for entity in repo.entities: + registry.apply_entity(entity, project=project) + + repo_table_names = set(t.name for t in repo.feature_tables) + tables_to_delete = [] + for registry_table in registry.list_feature_tables(project=project): + if registry_table.name not in repo_table_names: + tables_to_delete.append(registry_table) + + # Delete tables that should not exist + for registry_table in tables_to_delete: + registry.delete_feature_table(registry_table.name, project=project) + + for table in repo.feature_tables: + registry.apply_feature_table(table, project) + + infra_provider = get_provider(repo_config) + infra_provider.update_infra( + project, tables_to_delete=tables_to_delete, tables_to_keep=repo.feature_tables + ) + + print("Done!") + + +def teardown(repo_config: RepoConfig, repo_path: Path): + registry = Registry(repo_config.metadata_store) + project = repo_config.project + registry_tables = registry.list_feature_tables(project=project) + infra_provider = get_provider(repo_config) + infra_provider.teardown_infra(project, tables=registry_tables) + + +def registry_dump(repo_config: RepoConfig): + """ For debugging only: output contents of the metadata registry """ + + project = repo_config.project + registry = Registry(repo_config.metadata_store) + + for entity in registry.list_entities(project=project): + print(entity) + for table in registry.list_feature_tables(project=project): + print(table) diff --git a/sdk/python/requirements-ci.txt b/sdk/python/requirements-ci.txt index 24f1d7dd70..5049778cb2 100644 --- a/sdk/python/requirements-ci.txt +++ b/sdk/python/requirements-ci.txt @@ -22,3 +22,5 @@ pytest-mock==1.10.4 PyYAML==5.3.1 great-expectations==0.13.2 adlfs==0.5.9 +firebase-admin==4.5.2 +google-cloud-datastore==2.1.0 diff --git a/sdk/python/setup.py b/sdk/python/setup.py index c8ceb5d479..7660a7bcdd 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -29,7 +29,7 @@ "google-api-core==1.22.4", "google-cloud-bigquery==1.18.*", "google-cloud-storage==1.20.*", - "google-cloud-core==1.0.*", + "google-cloud-core==1.4.*", "googleapis-common-protos==1.52.*", "google-cloud-bigquery-storage==0.7.*", "grpcio==1.31.0", @@ -45,6 +45,7 @@ "numpy<1.20.0", "google", "kubernetes==12.0.*", + "bindr", ] # README file from Feast repo root directory diff --git a/sdk/python/tests/cli/example_feature_repo_1.py b/sdk/python/tests/cli/example_feature_repo_1.py new file mode 100644 index 0000000000..4a32700f5e --- /dev/null +++ b/sdk/python/tests/cli/example_feature_repo_1.py @@ -0,0 +1,28 @@ +from google.protobuf.duration_pb2 import Duration + +from feast import BigQuerySource, Entity, Feature, FeatureTable, ValueType + +driver_locations_source = BigQuerySource( + table_ref="rh_prod.ride_hailing_co.drivers", + event_timestamp_column="event_timestamp", + created_timestamp_column="created_timestamp", +) + + +driver = Entity( + name="driver", # The name is derived from this argument, not object name. + value_type=ValueType.INT64, + description="driver id", +) + + +driver_locations = FeatureTable( + name="driver_locations", + entities=["driver"], + max_age=Duration(seconds=86400 * 1), + features=[ + Feature(name="lat", dtype=ValueType.FLOAT), + Feature(name="lon", dtype=ValueType.STRING), + ], + batch_source=driver_locations_source, +) diff --git a/sdk/python/tests/cli/test_cli_local.py b/sdk/python/tests/cli/test_cli_local.py new file mode 100644 index 0000000000..cb3628b953 --- /dev/null +++ b/sdk/python/tests/cli/test_cli_local.py @@ -0,0 +1,54 @@ +import subprocess +import sys +import tempfile +from pathlib import Path +from textwrap import dedent +from typing import List + +from feast import cli + + +class CliRunner: + """ + NB. We can't use test runner helper from click here, since it doesn't start a new Python + interpreter. And we need a new interpreter for each test since we dynamically import + modules from the feature repo, and it is hard to clean up that state otherwise. + """ + + def run(self, args: List[str], cwd: Path) -> subprocess.CompletedProcess: + return subprocess.run([sys.executable, cli.__file__] + args, cwd=cwd) + + +class TestCliLocal: + def test_basic(self) -> None: + runner = CliRunner() + with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory() as data_dir_name: + + repo_path = Path(repo_dir_name) + data_path = Path(data_dir_name) + + repo_config = repo_path / "feature_store.yaml" + + repo_config.write_text( + dedent( + f""" + project: foo + metadata_store: {data_path / "metadata.db"} + provider: local + online_store: + local: + path: {data_path / "online_store.db"} + """ + ) + ) + + repo_example = repo_path / "example.py" + repo_example.write_text( + (Path(__file__).parent / "example_feature_repo_1.py").read_text() + ) + + result = runner.run(["apply", str(repo_path)], cwd=repo_path) + assert result.returncode == 0 + + result = runner.run(["teardown", str(repo_path)], cwd=repo_path) + assert result.returncode == 0 diff --git a/sdk/python/tests/cli/test_datastore.py b/sdk/python/tests/cli/test_datastore.py new file mode 100644 index 0000000000..c2ef7d350e --- /dev/null +++ b/sdk/python/tests/cli/test_datastore.py @@ -0,0 +1,63 @@ +import random +import string +import subprocess +import sys +import tempfile +from pathlib import Path +from textwrap import dedent +from typing import List + +import pytest + +from feast import cli + + +class CliRunner: + """ + NB. We can't use test runner helper from click here, since it doesn't start a new Python + interpreter. And we need a new interpreter for each test since we dynamically import + modules from the feature repo, and it is hard to clean up that state otherwise. + """ + + def run(self, args: List[str], cwd: Path) -> subprocess.CompletedProcess: + return subprocess.run([sys.executable, cli.__file__] + args, cwd=cwd) + + +@pytest.mark.integration +class TestCliGcp: + def setup_method(self): + self._project_id = "".join( + random.choice(string.ascii_lowercase + string.digits) for _ in range(10) + ) + + def test_basic(self) -> None: + runner = CliRunner() + with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory() as data_dir_name: + + repo_path = Path(repo_dir_name) + data_path = Path(data_dir_name) + + repo_config = repo_path / "feature_store.yaml" + + repo_config.write_text( + dedent( + f""" + project: {self._project_id} + metadata_store: {data_path / "metadata.db"} + provider: gcp + online_store: + datastore: + """ + ) + ) + + repo_example = repo_path / "example.py" + repo_example.write_text( + (Path(__file__).parent / "example_feature_repo_1.py").read_text() + ) + + result = runner.run(["apply", str(repo_path)], cwd=repo_path) + assert result.returncode == 0 + + result = runner.run(["teardown", str(repo_path)], cwd=repo_path) + assert result.returncode == 0