Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flatten configuration structure for online store #1459

Merged
merged 6 commits into from
Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 8 additions & 14 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,7 @@
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.registry import Registry
from feast.repo_config import (
LocalOnlineStoreConfig,
OnlineStoreConfig,
RepoConfig,
load_repo_config,
)
from feast.repo_config import RepoConfig, load_repo_config
from feast.telemetry import Telemetry
from feast.version import get_version

Expand All @@ -51,6 +46,12 @@ class FeatureStore:
def __init__(
self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None,
):
""" Initializes a new FeatureStore object. Used to manage a feature store.

Args:
repo_path: Path to a `feature_store.yaml` used to configure the feature store
config (RepoConfig): Configuration object used to configure the feature store
"""
self.repo_path = repo_path
if repo_path is not None and config is not None:
raise ValueError("You cannot specify both repo_path and config")
Expand All @@ -59,14 +60,7 @@ def __init__(
elif repo_path is not None:
self.config = load_repo_config(Path(repo_path))
else:
self.config = RepoConfig(
registry="./registry.db",
project="default",
provider="local",
online_store=OnlineStoreConfig(
local=LocalOnlineStoreConfig(path="online_store.db")
),
)
raise ValueError("Please specify one of repo_path or config")

registry_config = self.config.get_registry_config()
self._registry = Registry(
Expand Down
7 changes: 4 additions & 3 deletions sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@
class GcpProvider(Provider):
_gcp_project_id: Optional[str]

def __init__(self, config: Optional[DatastoreOnlineStoreConfig]):
if config:
self._gcp_project_id = config.project_id
def __init__(self, config: RepoConfig):
assert isinstance(config.online_store, DatastoreOnlineStoreConfig)
if config and config.online_store and config.online_store.project_id:
self._gcp_project_id = config.online_store.project_id
else:
self._gcp_project_id = None

Expand Down
13 changes: 10 additions & 3 deletions sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import pandas as pd
Expand All @@ -20,16 +21,22 @@
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.repo_config import LocalOnlineStoreConfig, RepoConfig
from feast.repo_config import RepoConfig, SqliteOnlineStoreConfig


class LocalProvider(Provider):
_db_path: str

def __init__(self, config: LocalOnlineStoreConfig):
self._db_path = config.path
def __init__(self, config: RepoConfig):

assert config is not None
assert config.online_store is not None
local_online_store_config = config.online_store
assert isinstance(local_online_store_config, SqliteOnlineStoreConfig)
self._db_path = local_online_store_config.path

def _get_conn(self):
Path(self._db_path).parent.mkdir(exist_ok=True)
return sqlite3.connect(
self._db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES
)
Expand Down
8 changes: 2 additions & 6 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,11 @@ def get_provider(config: RepoConfig) -> Provider:
if config.provider == "gcp":
from feast.infra.gcp import GcpProvider

return GcpProvider(
config.online_store.datastore if config.online_store else None
)
return GcpProvider(config)
elif config.provider == "local":
from feast.infra.local import LocalProvider

assert config.online_store is not None
assert config.online_store.local is not None
return LocalProvider(config.online_store.local)
return LocalProvider(config)
else:
raise ValueError(config)

Expand Down
114 changes: 65 additions & 49 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from pathlib import Path
from typing import Optional, Union

import yaml
from pydantic import BaseModel, StrictInt, StrictStr, ValidationError
from pydantic import BaseModel, StrictInt, StrictStr, ValidationError, root_validator
from pydantic.error_wrappers import ErrorWrapper
from pydantic.typing import Dict, Literal, Optional, Union


class FeastBaseModel(BaseModel):
Expand All @@ -13,26 +14,27 @@ class Config:
extra = "forbid"


class LocalOnlineStoreConfig(FeastBaseModel):
""" Online store config for local (SQLite-based) online store """
class SqliteOnlineStoreConfig(FeastBaseModel):
""" Online store config for local (SQLite-based) store """

path: StrictStr
""" str: Path to sqlite db """
type: Literal["sqlite"] = "sqlite"
""" Online store type selector"""

path: StrictStr = "data/online.db"
""" (optional) Path to sqlite db """


class DatastoreOnlineStoreConfig(FeastBaseModel):
""" Online store config for GCP Datastore """

project_id: StrictStr
""" str: GCP Project Id """
type: Literal["datastore"] = "datastore"
""" Online store type selector"""

project_id: Optional[StrictStr] = None
""" (optional) GCP Project Id """

class OnlineStoreConfig(FeastBaseModel):
datastore: Optional[DatastoreOnlineStoreConfig] = None
""" DatastoreOnlineStoreConfig: Optional Google Cloud Datastore config """

local: Optional[LocalOnlineStoreConfig] = None
""" LocalOnlineStoreConfig: Optional local online store config """
OnlineStoreConfig = Union[DatastoreOnlineStoreConfig, SqliteOnlineStoreConfig]


class RegistryConfig(FeastBaseModel):
Expand All @@ -51,7 +53,7 @@ class RegistryConfig(FeastBaseModel):
class RepoConfig(FeastBaseModel):
""" Repo config. Typically loaded from `feature_store.yaml` """

registry: Union[StrictStr, RegistryConfig]
registry: Union[StrictStr, RegistryConfig] = "data/registry.db"
""" str: Path to metadata store. Can be a local path, or remote object storage path, e.g. gcs://foo/bar """

project: StrictStr
Expand All @@ -63,7 +65,7 @@ class RepoConfig(FeastBaseModel):
provider: StrictStr
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll need defaults for provider and project in order for RepoConfig() with no arguments to work

""" str: local or gcp """

online_store: Optional[OnlineStoreConfig] = None
online_store: OnlineStoreConfig = SqliteOnlineStoreConfig()
""" OnlineStoreConfig: Online store configuration (optional depending on provider) """

def get_registry_config(self):
Expand All @@ -72,40 +74,54 @@ def get_registry_config(self):
else:
return self.registry


# This is the JSON Schema for config validation. We use this to have nice detailed error messages
# for config validation, something that bindr unfortunately doesn't provide out of the box.
#
# The schema should match the namedtuple structure above. It could technically even be inferred from
# the types above automatically; but for now we choose a more tedious but less magic path of
# providing the schema manually.

config_schema = {
"type": "object",
"properties": {
"project": {"type": "string"},
"registry": {"type": "string"},
"provider": {"type": "string"},
"online_store": {
"type": "object",
"properties": {
"local": {
"type": "object",
"properties": {"path": {"type": "string"}},
"additionalProperties": False,
},
"datastore": {
"type": "object",
"properties": {"project_id": {"type": "string"}},
"additionalProperties": False,
},
},
"additionalProperties": False,
},
},
"required": ["project"],
"additionalProperties": False,
}
@root_validator(pre=True)
def _validate_online_store_config(cls, values):
# This method will validate whether the online store configurations are set correctly. This explicit validation
# is necessary because Pydantic Unions throw very verbose and cryptic exceptions. We also use this method to
# impute the default online store type based on the selected provider. For the time being this method should be
# considered tech debt until we can implement https://github.com/samuelcolvin/pydantic/issues/619 or a more
# granular configuration system

# Skip if online store isn't set explicitly
if "online_store" not in values:
values["online_store"] = dict()

# Skip if we arent creating the configuration from a dict
if not isinstance(values["online_store"], Dict):
return values

# Make sure that the provider configuration is set. We need it to set the defaults
assert "provider" in values

if "online_store" in values:
# Set the default type
if "type" not in values["online_store"]:
if values["provider"] == "local":
values["online_store"]["type"] = "sqlite"
elif values["provider"] == "gcp":
values["online_store"]["type"] = "datastore"

online_store_type = values["online_store"]["type"]

# Make sure the user hasn't provided the wrong type
assert online_store_type in ["datastore", "sqlite"]

# Validate the dict to ensure one of the union types match
try:
if online_store_type == "sqlite":
SqliteOnlineStoreConfig(**values["online_store"])
elif values["online_store"]["type"] == "datastore":
DatastoreOnlineStoreConfig(**values["online_store"])
else:
raise ValidationError(
f"Invalid online store type {online_store_type}"
)
except ValidationError as e:
raise ValidationError(
[ErrorWrapper(e, loc="online_store")],
model=SqliteOnlineStoreConfig,
)
return values


class FeastConfigError(Exception):
Expand Down
6 changes: 2 additions & 4 deletions sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,7 @@ def init_repo(repo_path: Path, minimal: bool):
registry: /path/to/registry.db
provider: local
online_store:
local:
path: /path/to/online_store.db
path: /path/to/online_store.db
"""
)
)
Expand Down Expand Up @@ -214,8 +213,7 @@ def init_repo(repo_path: Path, minimal: bool):
registry: {"data/registry.db"}
provider: local
online_store:
local:
path: {"data/online_store.db"}
path: {"data/online_store.db"}
"""
)
)
Expand Down
78 changes: 50 additions & 28 deletions sdk/python/telemetry_tests/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import tempfile
import uuid
from datetime import datetime

Expand All @@ -21,7 +22,8 @@
from time import sleep
from importlib import reload

from feast import Client, Entity, ValueType, FeatureStore
from feast import Client, Entity, ValueType, FeatureStore, RepoConfig
from feast.repo_config import SqliteOnlineStoreConfig

TELEMETRY_BIGQUERY_TABLE = (
"kf-feast.feast_telemetry.cloudfunctions_googleapis_com_cloud_functions"
Expand Down Expand Up @@ -91,19 +93,29 @@ def test_telemetry_on():
os.environ["FEAST_IS_TELEMETRY_TEST"] = "True"
os.environ["FEAST_TELEMETRY"] = "True"

test_feature_store = FeatureStore()
entity = Entity(
name="driver_car_id",
description="Car driver id",
value_type=ValueType.STRING,
labels={"team": "matchmaking"},
)

test_feature_store.apply([entity])

os.environ.clear()
os.environ.update(old_environ)
ensure_bigquery_telemetry_id_with_retry(test_telemetry_id)
with tempfile.TemporaryDirectory() as temp_dir:
test_feature_store = FeatureStore(
config=RepoConfig(
registry=os.path.join(temp_dir, "registry.db"),
project="fake_project",
provider="local",
online_store=SqliteOnlineStoreConfig(
path=os.path.join(temp_dir, "online.db")
),
)
)
entity = Entity(
name="driver_car_id",
description="Car driver id",
value_type=ValueType.STRING,
labels={"team": "matchmaking"},
)

test_feature_store.apply([entity])

os.environ.clear()
os.environ.update(old_environ)
ensure_bigquery_telemetry_id_with_retry(test_telemetry_id)


def test_telemetry_off():
Expand All @@ -113,20 +125,30 @@ def test_telemetry_off():
os.environ["FEAST_TELEMETRY"] = "False"
os.environ["FEAST_FORCE_TELEMETRY_UUID"] = test_telemetry_id

test_feature_store = FeatureStore()
entity = Entity(
name="driver_car_id",
description="Car driver id",
value_type=ValueType.STRING,
labels={"team": "matchmaking"},
)
test_feature_store.apply([entity])

os.environ.clear()
os.environ.update(old_environ)
sleep(30)
rows = read_bigquery_telemetry_id(test_telemetry_id)
assert rows.total_rows == 0
with tempfile.TemporaryDirectory() as temp_dir:
test_feature_store = FeatureStore(
config=RepoConfig(
registry=os.path.join(temp_dir, "registry.db"),
project="fake_project",
provider="local",
online_store=SqliteOnlineStoreConfig(
path=os.path.join(temp_dir, "online.db")
),
)
)
entity = Entity(
name="driver_car_id",
description="Car driver id",
value_type=ValueType.STRING,
labels={"team": "matchmaking"},
)
test_feature_store.apply([entity])

os.environ.clear()
os.environ.update(old_environ)
sleep(30)
rows = read_bigquery_telemetry_id(test_telemetry_id)
assert rows.total_rows == 0


@retry(wait=wait_exponential(multiplier=1, min=1, max=10), stop=stop_after_attempt(5))
Expand Down
Loading