diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index a8a192dc83..04791ac4a2 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -29,12 +29,7 @@ ) from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.registry import Registry -from feast.repo_config import ( - LocalOnlineStoreConfig, - OnlineStoreConfig, - RepoConfig, - load_repo_config, -) +from feast.repo_config import RepoConfig, load_repo_config from feast.telemetry import Telemetry from feast.version import get_version @@ -51,6 +46,12 @@ class FeatureStore: def __init__( self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None, ): + """ Initializes a new FeatureStore object. Used to manage a feature store. + + Args: + repo_path: Path to a `feature_store.yaml` used to configure the feature store + config (RepoConfig): Configuration object used to configure the feature store + """ self.repo_path = repo_path if repo_path is not None and config is not None: raise ValueError("You cannot specify both repo_path and config") @@ -59,14 +60,7 @@ def __init__( elif repo_path is not None: self.config = load_repo_config(Path(repo_path)) else: - self.config = RepoConfig( - registry="./registry.db", - project="default", - provider="local", - online_store=OnlineStoreConfig( - local=LocalOnlineStoreConfig(path="online_store.db") - ), - ) + raise ValueError("Please specify one of repo_path or config") registry_config = self.config.get_registry_config() self._registry = Registry( diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index 15a8c0fcc0..ca139e73e3 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -28,9 +28,10 @@ class GcpProvider(Provider): _gcp_project_id: Optional[str] - def __init__(self, config: Optional[DatastoreOnlineStoreConfig]): - if config: - self._gcp_project_id = config.project_id + def __init__(self, config: RepoConfig): + assert isinstance(config.online_store, DatastoreOnlineStoreConfig) + if config and config.online_store and config.online_store.project_id: + self._gcp_project_id = config.online_store.project_id else: self._gcp_project_id = None diff --git a/sdk/python/feast/infra/local.py b/sdk/python/feast/infra/local.py index d0489b3708..cb848f7b43 100644 --- a/sdk/python/feast/infra/local.py +++ b/sdk/python/feast/infra/local.py @@ -1,6 +1,7 @@ import os import sqlite3 from datetime import datetime +from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union import pandas as pd @@ -20,16 +21,22 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.registry import Registry -from feast.repo_config import LocalOnlineStoreConfig, RepoConfig +from feast.repo_config import RepoConfig, SqliteOnlineStoreConfig class LocalProvider(Provider): _db_path: str - def __init__(self, config: LocalOnlineStoreConfig): - self._db_path = config.path + def __init__(self, config: RepoConfig): + + assert config is not None + assert config.online_store is not None + local_online_store_config = config.online_store + assert isinstance(local_online_store_config, SqliteOnlineStoreConfig) + self._db_path = local_online_store_config.path def _get_conn(self): + Path(self._db_path).parent.mkdir(exist_ok=True) return sqlite3.connect( self._db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES ) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index a43952c45e..2fb067a8b4 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -127,15 +127,11 @@ def get_provider(config: RepoConfig) -> Provider: if config.provider == "gcp": from feast.infra.gcp import GcpProvider - return GcpProvider( - config.online_store.datastore if config.online_store else None - ) + return GcpProvider(config) elif config.provider == "local": from feast.infra.local import LocalProvider - assert config.online_store is not None - assert config.online_store.local is not None - return LocalProvider(config.online_store.local) + return LocalProvider(config) else: raise ValueError(config) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 35dc03ce08..bedcf5c53a 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -1,8 +1,9 @@ from pathlib import Path -from typing import Optional, Union import yaml -from pydantic import BaseModel, StrictInt, StrictStr, ValidationError +from pydantic import BaseModel, StrictInt, StrictStr, ValidationError, root_validator +from pydantic.error_wrappers import ErrorWrapper +from pydantic.typing import Dict, Literal, Optional, Union class FeastBaseModel(BaseModel): @@ -13,26 +14,27 @@ class Config: extra = "forbid" -class LocalOnlineStoreConfig(FeastBaseModel): - """ Online store config for local (SQLite-based) online store """ +class SqliteOnlineStoreConfig(FeastBaseModel): + """ Online store config for local (SQLite-based) store """ - path: StrictStr - """ str: Path to sqlite db """ + type: Literal["sqlite"] = "sqlite" + """ Online store type selector""" + + path: StrictStr = "data/online.db" + """ (optional) Path to sqlite db """ class DatastoreOnlineStoreConfig(FeastBaseModel): """ Online store config for GCP Datastore """ - project_id: StrictStr - """ str: GCP Project Id """ + type: Literal["datastore"] = "datastore" + """ Online store type selector""" + project_id: Optional[StrictStr] = None + """ (optional) GCP Project Id """ -class OnlineStoreConfig(FeastBaseModel): - datastore: Optional[DatastoreOnlineStoreConfig] = None - """ DatastoreOnlineStoreConfig: Optional Google Cloud Datastore config """ - local: Optional[LocalOnlineStoreConfig] = None - """ LocalOnlineStoreConfig: Optional local online store config """ +OnlineStoreConfig = Union[DatastoreOnlineStoreConfig, SqliteOnlineStoreConfig] class RegistryConfig(FeastBaseModel): @@ -51,7 +53,7 @@ class RegistryConfig(FeastBaseModel): class RepoConfig(FeastBaseModel): """ Repo config. Typically loaded from `feature_store.yaml` """ - registry: Union[StrictStr, RegistryConfig] + registry: Union[StrictStr, RegistryConfig] = "data/registry.db" """ str: Path to metadata store. Can be a local path, or remote object storage path, e.g. gcs://foo/bar """ project: StrictStr @@ -63,7 +65,7 @@ class RepoConfig(FeastBaseModel): provider: StrictStr """ str: local or gcp """ - online_store: Optional[OnlineStoreConfig] = None + online_store: OnlineStoreConfig = SqliteOnlineStoreConfig() """ OnlineStoreConfig: Online store configuration (optional depending on provider) """ def get_registry_config(self): @@ -72,40 +74,54 @@ def get_registry_config(self): else: return self.registry - -# This is the JSON Schema for config validation. We use this to have nice detailed error messages -# for config validation, something that bindr unfortunately doesn't provide out of the box. -# -# The schema should match the namedtuple structure above. It could technically even be inferred from -# the types above automatically; but for now we choose a more tedious but less magic path of -# providing the schema manually. - -config_schema = { - "type": "object", - "properties": { - "project": {"type": "string"}, - "registry": {"type": "string"}, - "provider": {"type": "string"}, - "online_store": { - "type": "object", - "properties": { - "local": { - "type": "object", - "properties": {"path": {"type": "string"}}, - "additionalProperties": False, - }, - "datastore": { - "type": "object", - "properties": {"project_id": {"type": "string"}}, - "additionalProperties": False, - }, - }, - "additionalProperties": False, - }, - }, - "required": ["project"], - "additionalProperties": False, -} + @root_validator(pre=True) + def _validate_online_store_config(cls, values): + # This method will validate whether the online store configurations are set correctly. This explicit validation + # is necessary because Pydantic Unions throw very verbose and cryptic exceptions. We also use this method to + # impute the default online store type based on the selected provider. For the time being this method should be + # considered tech debt until we can implement https://github.com/samuelcolvin/pydantic/issues/619 or a more + # granular configuration system + + # Skip if online store isn't set explicitly + if "online_store" not in values: + values["online_store"] = dict() + + # Skip if we arent creating the configuration from a dict + if not isinstance(values["online_store"], Dict): + return values + + # Make sure that the provider configuration is set. We need it to set the defaults + assert "provider" in values + + if "online_store" in values: + # Set the default type + if "type" not in values["online_store"]: + if values["provider"] == "local": + values["online_store"]["type"] = "sqlite" + elif values["provider"] == "gcp": + values["online_store"]["type"] = "datastore" + + online_store_type = values["online_store"]["type"] + + # Make sure the user hasn't provided the wrong type + assert online_store_type in ["datastore", "sqlite"] + + # Validate the dict to ensure one of the union types match + try: + if online_store_type == "sqlite": + SqliteOnlineStoreConfig(**values["online_store"]) + elif values["online_store"]["type"] == "datastore": + DatastoreOnlineStoreConfig(**values["online_store"]) + else: + raise ValidationError( + f"Invalid online store type {online_store_type}" + ) + except ValidationError as e: + raise ValidationError( + [ErrorWrapper(e, loc="online_store")], + model=SqliteOnlineStoreConfig, + ) + return values class FeastConfigError(Exception): diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 837942c8e8..a0aa0b5d28 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -176,8 +176,7 @@ def init_repo(repo_path: Path, minimal: bool): registry: /path/to/registry.db provider: local online_store: - local: - path: /path/to/online_store.db + path: /path/to/online_store.db """ ) ) @@ -214,8 +213,7 @@ def init_repo(repo_path: Path, minimal: bool): registry: {"data/registry.db"} provider: local online_store: - local: - path: {"data/online_store.db"} + path: {"data/online_store.db"} """ ) ) diff --git a/sdk/python/telemetry_tests/test_telemetry.py b/sdk/python/telemetry_tests/test_telemetry.py index 918264b626..44c2c16e13 100644 --- a/sdk/python/telemetry_tests/test_telemetry.py +++ b/sdk/python/telemetry_tests/test_telemetry.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import tempfile import uuid from datetime import datetime @@ -21,7 +22,8 @@ from time import sleep from importlib import reload -from feast import Client, Entity, ValueType, FeatureStore +from feast import Client, Entity, ValueType, FeatureStore, RepoConfig +from feast.repo_config import SqliteOnlineStoreConfig TELEMETRY_BIGQUERY_TABLE = ( "kf-feast.feast_telemetry.cloudfunctions_googleapis_com_cloud_functions" @@ -91,19 +93,29 @@ def test_telemetry_on(): os.environ["FEAST_IS_TELEMETRY_TEST"] = "True" os.environ["FEAST_TELEMETRY"] = "True" - test_feature_store = FeatureStore() - entity = Entity( - name="driver_car_id", - description="Car driver id", - value_type=ValueType.STRING, - labels={"team": "matchmaking"}, - ) - - test_feature_store.apply([entity]) - - os.environ.clear() - os.environ.update(old_environ) - ensure_bigquery_telemetry_id_with_retry(test_telemetry_id) + with tempfile.TemporaryDirectory() as temp_dir: + test_feature_store = FeatureStore( + config=RepoConfig( + registry=os.path.join(temp_dir, "registry.db"), + project="fake_project", + provider="local", + online_store=SqliteOnlineStoreConfig( + path=os.path.join(temp_dir, "online.db") + ), + ) + ) + entity = Entity( + name="driver_car_id", + description="Car driver id", + value_type=ValueType.STRING, + labels={"team": "matchmaking"}, + ) + + test_feature_store.apply([entity]) + + os.environ.clear() + os.environ.update(old_environ) + ensure_bigquery_telemetry_id_with_retry(test_telemetry_id) def test_telemetry_off(): @@ -113,20 +125,30 @@ def test_telemetry_off(): os.environ["FEAST_TELEMETRY"] = "False" os.environ["FEAST_FORCE_TELEMETRY_UUID"] = test_telemetry_id - test_feature_store = FeatureStore() - entity = Entity( - name="driver_car_id", - description="Car driver id", - value_type=ValueType.STRING, - labels={"team": "matchmaking"}, - ) - test_feature_store.apply([entity]) - - os.environ.clear() - os.environ.update(old_environ) - sleep(30) - rows = read_bigquery_telemetry_id(test_telemetry_id) - assert rows.total_rows == 0 + with tempfile.TemporaryDirectory() as temp_dir: + test_feature_store = FeatureStore( + config=RepoConfig( + registry=os.path.join(temp_dir, "registry.db"), + project="fake_project", + provider="local", + online_store=SqliteOnlineStoreConfig( + path=os.path.join(temp_dir, "online.db") + ), + ) + ) + entity = Entity( + name="driver_car_id", + description="Car driver id", + value_type=ValueType.STRING, + labels={"team": "matchmaking"}, + ) + test_feature_store.apply([entity]) + + os.environ.clear() + os.environ.update(old_environ) + sleep(30) + rows = read_bigquery_telemetry_id(test_telemetry_id) + assert rows.total_rows == 0 @retry(wait=wait_exponential(multiplier=1, min=1, max=10), stop=stop_after_attempt(5)) diff --git a/sdk/python/tests/cli_utils.py b/sdk/python/tests/cli_utils.py index aa095be351..d01447ad6a 100644 --- a/sdk/python/tests/cli_utils.py +++ b/sdk/python/tests/cli_utils.py @@ -49,8 +49,7 @@ def local_repo(self, example_repo_py: str): registry: {data_path / "registry.db"} provider: local online_store: - local: - path: {data_path / "online_store.db"} + path: {data_path / "online_store.db"} """ ) ) diff --git a/sdk/python/tests/test_cli_local.py b/sdk/python/tests/test_cli_local.py index 0066531559..9c510713f9 100644 --- a/sdk/python/tests/test_cli_local.py +++ b/sdk/python/tests/test_cli_local.py @@ -27,8 +27,7 @@ def test_workflow() -> None: registry: {data_path / "registry.db"} provider: local online_store: - local: - path: {data_path / "online_store.db"} + path: {data_path / "online_store.db"} """ ) ) diff --git a/sdk/python/tests/test_feature_store.py b/sdk/python/tests/test_feature_store.py index 198c287a72..f0e25b6d62 100644 --- a/sdk/python/tests/test_feature_store.py +++ b/sdk/python/tests/test_feature_store.py @@ -25,7 +25,7 @@ from feast.feature_store import FeatureStore from feast.feature_view import FeatureView from feast.protos.feast.types import Value_pb2 as ValueProto -from feast.repo_config import LocalOnlineStoreConfig, OnlineStoreConfig, RepoConfig +from feast.repo_config import RepoConfig, SqliteOnlineStoreConfig from feast.value_type import ValueType @@ -38,9 +38,7 @@ def feature_store_with_local_registry(): registry=registry_path, project="default", provider="local", - online_store=OnlineStoreConfig( - local=LocalOnlineStoreConfig(path=online_store_path) - ), + online_store=SqliteOnlineStoreConfig(path=online_store_path), ) ) diff --git a/sdk/python/tests/test_historical_retrieval.py b/sdk/python/tests/test_historical_retrieval.py index 6800b512c5..c67f59b0bc 100644 --- a/sdk/python/tests/test_historical_retrieval.py +++ b/sdk/python/tests/test_historical_retrieval.py @@ -18,7 +18,7 @@ from feast.feature_store import FeatureStore from feast.feature_view import FeatureView from feast.infra.provider import ENTITY_DF_EVENT_TIMESTAMP_COL -from feast.repo_config import LocalOnlineStoreConfig, OnlineStoreConfig, RepoConfig +from feast.repo_config import RepoConfig, SqliteOnlineStoreConfig from feast.value_type import ValueType np.random.seed(0) @@ -234,10 +234,8 @@ def test_historical_features_from_parquet_sources(): registry=os.path.join(temp_dir, "registry.db"), project="default", provider="local", - online_store=OnlineStoreConfig( - local=LocalOnlineStoreConfig( - path=os.path.join(temp_dir, "online_store.db") - ) + online_store=SqliteOnlineStoreConfig( + path=os.path.join(temp_dir, "online_store.db") ), ) ) @@ -340,10 +338,8 @@ def test_historical_features_from_bigquery_sources(provider_type): registry=os.path.join(temp_dir, "registry.db"), project="default", provider="local", - online_store=OnlineStoreConfig( - local=LocalOnlineStoreConfig( - path=os.path.join(temp_dir, "online_store.db"), - ) + online_store=SqliteOnlineStoreConfig( + path=os.path.join(temp_dir, "online_store.db"), ), ) ) diff --git a/sdk/python/tests/test_materialize.py b/sdk/python/tests/test_materialize.py index ea21842c4b..cc9eadbdb7 100644 --- a/sdk/python/tests/test_materialize.py +++ b/sdk/python/tests/test_materialize.py @@ -17,7 +17,7 @@ from feast.feature import Feature from feast.feature_store import FeatureStore from feast.feature_view import FeatureView -from feast.repo_config import LocalOnlineStoreConfig, OnlineStoreConfig, RepoConfig +from feast.repo_config import RepoConfig, SqliteOnlineStoreConfig from feast.value_type import ValueType @@ -128,10 +128,8 @@ def prep_local_fs_and_fv() -> Iterator[Tuple[FeatureStore, FeatureView]]: registry=str(Path(repo_dir_name) / "registry.db"), project=f"test_bq_correctness_{str(uuid.uuid4()).replace('-', '')}", provider="local", - online_store=OnlineStoreConfig( - local=LocalOnlineStoreConfig( - path=str(Path(data_dir_name) / "online_store.db") - ) + online_store=SqliteOnlineStoreConfig( + path=str(Path(data_dir_name) / "online_store.db") ), ) fs = FeatureStore(config=config) diff --git a/sdk/python/tests/test_repo_config.py b/sdk/python/tests/test_repo_config.py index 3ba8f36e17..19c8ee4dcc 100644 --- a/sdk/python/tests/test_repo_config.py +++ b/sdk/python/tests/test_repo_config.py @@ -6,99 +6,114 @@ from feast.repo_config import FeastConfigError, load_repo_config -class TestRepoConfig: - def _test_config(self, config_text, expect_error: Optional[str]): - """ - Try loading a repo config and check raised error against a regex. +def _test_config(config_text, expect_error: Optional[str]): + """ + Try loading a repo config and check raised error against a regex. + """ + with tempfile.TemporaryDirectory() as repo_dir_name: + + repo_path = Path(repo_dir_name) + + repo_config = repo_path / "feature_store.yaml" + + repo_config.write_text(config_text) + error = None + try: + load_repo_config(repo_path) + except FeastConfigError as e: + error = e + + if expect_error is not None: + assert expect_error in str(error) + else: + assert error is None + + +def test_local_config(): + _test_config( + dedent( + """ + project: foo + registry: "registry.db" + provider: local """ - with tempfile.TemporaryDirectory() as repo_dir_name: - - repo_path = Path(repo_dir_name) - - repo_config = repo_path / "feature_store.yaml" - - repo_config.write_text(config_text) - error = None - try: - load_repo_config(repo_path) - except FeastConfigError as e: - error = e - - if expect_error is not None: - assert expect_error in str(error) - else: - assert error is None - - def test_basic(self) -> None: - self._test_config( - dedent( - """ - project: foo - registry: "registry.db" - provider: local - online_store: - local: - path: "online_store.db" + ), + expect_error=None, + ) + + +def test_gcp_config(): + _test_config( + dedent( """ - ), - expect_error=None, - ) - - self._test_config( - dedent( - """ - project: foo - registry: "registry.db" - provider: gcp + project: foo + registry: gs://registry.db + provider: gcp + """ + ), + expect_error=None, + ) + + +def test_extra_field(): + _test_config( + dedent( """ - ), - expect_error=None, - ) - - def test_errors(self) -> None: - self._test_config( - dedent( - """ - project: foo - registry: "registry.db" - provider: local - online_store: - local: - that_field_should_not_be_here: yes - path: "online_store.db" + project: foo + registry: "registry.db" + provider: local + online_store: + type: sqlite + that_field_should_not_be_here: yes + path: "online_store.db" + """ + ), + expect_error="__root__ -> online_store -> that_field_should_not_be_here\n" + " extra fields not permitted (type=value_error.extra)", + ) + + +def test_no_online_store_type(): + _test_config( + dedent( """ - ), - expect_error="online_store -> local -> that_field_should_not_be_here\n" - " extra fields not permitted (type=value_error.extra)", - ) - - self._test_config( - dedent( - """ - project: foo - registry: "registry.db" - provider: local - online_store: - local: - path: 100500 + project: foo + registry: "registry.db" + provider: local + online_store: + path: "blah" + """ + ), + expect_error=None, + ) + + +def test_bad_type(): + _test_config( + dedent( """ - ), - expect_error="1 validation error for RepoConfig\n" - "online_store -> local -> path\n" - " str type expected (type=type_error.str)", - ) - - self._test_config( - dedent( - """ - registry: "registry.db" - provider: local - online_store: - local: - path: foo + project: foo + registry: "registry.db" + provider: local + online_store: + path: 100500 + """ + ), + expect_error="__root__ -> online_store -> path\n str type expected", + ) + + +def test_no_project(): + _test_config( + dedent( """ - ), - expect_error="1 validation error for RepoConfig\n" - "project\n" - " field required (type=value_error.missing)", - ) + registry: "registry.db" + provider: local + online_store: + path: foo + """ + ), + expect_error="1 validation error for RepoConfig\n" + "project\n" + " field required (type=value_error.missing)", + )