Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python-centric feast deploy CLI #1362

Merged
merged 14 commits into from
Mar 10, 2021
36 changes: 36 additions & 0 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import json
import logging
import sys
from pathlib import Path
from typing import Dict

import click
Expand All @@ -26,6 +27,8 @@
from feast.entity import Entity
from feast.feature_table import FeatureTable
from feast.loaders.yaml import yaml_loader
from feast.repo_config import load_repo_config
woop marked this conversation as resolved.
Show resolved Hide resolved
from feast.repo_operations import apply_total, registry_dump, teardown

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -353,5 +356,38 @@ def project_list():
print(tabulate(table, headers=["NAME"], tablefmt="plain"))


@cli.command("apply")
@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True))
def apply_total_command(repo_path: str):
"""
Applies a feature repo
"""
repo_config = load_repo_config(Path(repo_path))
woop marked this conversation as resolved.
Show resolved Hide resolved

apply_total(repo_config, Path(repo_path).resolve())


@cli.command("teardown")
@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True))
def teardown_command(repo_path: str):
"""
Tear down infra for a a feature repo
woop marked this conversation as resolved.
Show resolved Hide resolved
"""
repo_config = load_repo_config(Path(repo_path))

teardown(repo_config, Path(repo_path).resolve())


@cli.command("registry-dump")
woop marked this conversation as resolved.
Show resolved Hide resolved
@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True))
def registry_dump_command(repo_path: str):
"""
Prints contents of the metadata registry
"""
repo_config = load_repo_config(Path(repo_path))

registry_dump(repo_config)


if __name__ == "__main__":
cli()
8 changes: 4 additions & 4 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.
from typing import Optional

from feast.feature_store_config import Config
from feast.repo_config import RepoConfig, load_repo_config


class FeatureStore:
Expand All @@ -22,13 +22,13 @@ class FeatureStore:
"""

def __init__(
self, config_path: Optional[str], config: Optional[Config],
self, config_path: Optional[str], config: Optional[RepoConfig],
):
if config_path is None or config is None:
woop marked this conversation as resolved.
Show resolved Hide resolved
raise Exception("You cannot specify both config_path and config")
if config is not None:
self.config = config
elif config_path is not None:
self.config = Config.from_path(config_path)
self.config = load_repo_config(config_path)
else:
self.config = Config()
self.config = RepoConfig()
53 changes: 0 additions & 53 deletions sdk/python/feast/feature_store_config.py

This file was deleted.

1 change: 1 addition & 0 deletions sdk/python/feast/infra/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# from .provider import Provider
74 changes: 74 additions & 0 deletions sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from datetime import datetime
from typing import List, Optional

from feast import FeatureTable
from feast.infra.provider import Provider
woop marked this conversation as resolved.
Show resolved Hide resolved
from feast.repo_config import DatastoreOnlineStoreConfig


def _delete_all_values(client, key) -> None:
"""
Delete all keys under the key path in firestore.
woop marked this conversation as resolved.
Show resolved Hide resolved
"""
while True:
query = client.query(kind="Value", ancestor=key)
entities = list(query.fetch(limit=1000))
if not entities:
return

for entity in entities:
print("Deleting: {}".format(entity))
client.delete(entity.key)


class Gcp(Provider):
woop marked this conversation as resolved.
Show resolved Hide resolved
_project_id: Optional[str]

def __init__(self, config: Optional[DatastoreOnlineStoreConfig]):
if config:
self._project_id = config.project_id
else:
self._project_id = None

def _initialize_client(self):
from google.cloud import datastore

if self._project_id is not None:
return datastore.Client(self.project_id)
else:
return datastore.Client()

def update_infra(
woop marked this conversation as resolved.
Show resolved Hide resolved
self,
project: str,
tables_to_delete: List[FeatureTable],
tables_to_keep: List[FeatureTable],
):
from google.cloud import datastore

client = self._initialize_client()

for table in tables_to_keep:
key = client.key("FeastProject", project, "FeatureTable", table.name)
entity = datastore.Entity(key=key)
entity.update({"created_at": datetime.utcnow()})
client.put(entity)

for table in tables_to_delete:
_delete_all_values(
client, client.key("FeastProject", project, "FeatureTable", table.name)
)

key = client.key("FeastProject", project, "FeatureTable", table.name)
client.delete(key)

def teardown_infra(self, project: str, tables: List[FeatureTable]) -> None:
woop marked this conversation as resolved.
Show resolved Hide resolved
client = self._initialize_client()

for table in tables:
_delete_all_values(
client, client.key("FeastProject", project, "FeatureTable", table.name)
)

key = client.key("FeastProject", project, "FeatureTable", table.name)
client.delete(key)
36 changes: 36 additions & 0 deletions sdk/python/feast/infra/local_sqlite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import os
import sqlite3
from typing import List

from feast import FeatureTable
from feast.infra.provider import Provider
from feast.repo_config import LocalOnlineStoreConfig


def _table_id(project: str, table: FeatureTable) -> str:
return f"{project}_{table.name}"


class LocalSqlite(Provider):
_db_path: str

def __init__(self, config: LocalOnlineStoreConfig):
self._db_path = config.path

def update_infra(
self,
project: str,
tables_to_delete: List[FeatureTable],
tables_to_keep: List[FeatureTable],
):
conn = sqlite3.connect(self._db_path)
for table in tables_to_keep:
conn.execute(
f"CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (key BLOB, value BLOB)"
)

for table in tables_to_delete:
conn.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}")

def teardown_infra(self, project: str, tables: List[FeatureTable]) -> None:
os.unlink(self._db_path)
49 changes: 49 additions & 0 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import abc
from typing import List

from feast import FeatureTable
from feast.repo_config import RepoConfig


class Provider(abc.ABC):
woop marked this conversation as resolved.
Show resolved Hide resolved
@abc.abstractmethod
def update_infra(
self,
project: str,
tables_to_delete: List[FeatureTable],
tables_to_keep: List[FeatureTable],
):
"""
Reconcile cloud resources with the objects declared in the feature repo.

Args:
tables_to_delete: Tables that were deleted from the feature repo, so provider needs to
clean up the corresponding cloud resources.
tables_to_keep: Tables that are still in the feature repo. Depending on implementation,
provider may or may not need to update the corresponding resources.
"""
...

@abc.abstractmethod
def teardown_infra(self, project: str, tables: List[FeatureTable]):
"""
Tear down all cloud resources for a repo.

Args:
tables: Tables that are declared in the feature repo.
"""
...


def get_provider(config: RepoConfig) -> Provider:
if config.provider == "gcp":
from feast.infra.gcp import Gcp

return Gcp(config.online_store.datastore)
elif config.provider == "local":
from feast.infra.local_sqlite import LocalSqlite

assert config.online_store.local is not None
woop marked this conversation as resolved.
Show resolved Hide resolved
return LocalSqlite(config.online_store.local)
else:
raise ValueError(config)
31 changes: 31 additions & 0 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from pathlib import Path
from typing import NamedTuple, Optional

import yaml
from bindr import bind


class LocalOnlineStoreConfig(NamedTuple):
path: str


class DatastoreOnlineStoreConfig(NamedTuple):
project_id: str


class OnlineStoreConfig(NamedTuple):
datastore: Optional[DatastoreOnlineStoreConfig] = None
local: Optional[LocalOnlineStoreConfig] = None


class RepoConfig(NamedTuple):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we just called this Config?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we already have a Config or two so I wanted to be a bit more specific here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should reconcile https://github.com/feast-dev/feast/blob/master/sdk/python/feast/feature_store_config.py and this since I think they're the same thing

metadata_store: str
project: str
provider: str
online_store: OnlineStoreConfig


def load_repo_config(repo_path: Path) -> RepoConfig:
with open(repo_path / "feature_store.yaml") as f:
raw_config = yaml.safe_load(f)
return bind(RepoConfig, raw_config)
Loading