Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor OnlineStoreConfig classes into owning modules #1649

Merged
merged 11 commits into from
Jun 21, 2021
26 changes: 20 additions & 6 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ def __init__(self, provider_name):
super().__init__(f"Provider '{provider_name}' is not implemented")


class FeastProviderModuleImportError(Exception):
def __init__(self, module_name):
super().__init__(f"Could not import provider module '{module_name}'")
class FeastModuleImportError(Exception):
def __init__(self, module_name, module_type="provider"):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove the default module_type here, instead explicitly pass the module.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in #1657

super().__init__(f"Could not import {module_type} module '{module_name}'")


class FeastProviderClassImportError(Exception):
def __init__(self, module_name, class_name):
class FeastClassImportError(Exception):
def __init__(self, module_name, class_name, class_type="provider"):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in #1657

super().__init__(
f"Could not import provider '{class_name}' from module '{module_name}'"
f"Could not import {class_type} '{class_name}' from module '{module_name}'"
)


Expand All @@ -71,6 +71,20 @@ def __init__(self, offline_store_name: str, data_source_name: str):
)


class FeastOnlineStoreInvalidName(Exception):
def __init__(self, online_store_class_name: str):
super().__init__(
f"Online Store Class '{online_store_class_name}' should end with the string `OnlineStore`.'"
)


class FeastOnlineStoreConfigInvalidName(Exception):
def __init__(self, online_store_config_class_name: str):
super().__init__(
f"Online Store Config Class '{online_store_config_class_name}' should end with the string `OnlineStoreConfig`.'"
)


class FeastOnlineStoreUnsupportedDataSource(Exception):
def __init__(self, online_store_name: str, data_source_name: str):
super().__init__(
Expand Down
23 changes: 22 additions & 1 deletion sdk/python/feast/infra/online_stores/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union

import mmh3
from pydantic import PositiveInt, StrictStr
from pydantic.typing import Literal

from feast import Entity, FeatureTable, utils
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import DatastoreOnlineStoreConfig, RepoConfig
from feast.repo_config import FeastConfigBaseModel, RepoConfig

try:
from google.auth.exceptions import DefaultCredentialsError
Expand All @@ -40,6 +42,25 @@
]


class DatastoreOnlineStoreConfig(FeastConfigBaseModel):
""" Online store config for GCP Datastore """

type: Literal["datastore"] = "datastore"
""" Online store type selector"""

project_id: Optional[StrictStr] = None
""" (optional) GCP Project Id """

namespace: Optional[StrictStr] = None
""" (optional) Datastore namespace """

write_concurrency: Optional[PositiveInt] = 40
""" (optional) Amount of threads to use when writing batches of feature rows into Datastore"""

write_batch_size: Optional[PositiveInt] = 50
""" (optional) Amount of feature rows per batch being written into Datastore"""


class DatastoreOnlineStore(OnlineStore):
"""
OnlineStore is an object used for all interaction between Feast and the service used for offline storage of
Expand Down
78 changes: 27 additions & 51 deletions sdk/python/feast/infra/online_stores/helpers.py
Original file line number Diff line number Diff line change
@@ -1,65 +1,41 @@
import importlib
import struct
from typing import Any, Dict, Set
from typing import Any

import mmh3

from feast.data_source import BigQuerySource, DataSource, FileSource
from feast.errors import FeastOnlineStoreUnsupportedDataSource
from feast import errors
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.storage.Redis_pb2 import RedisKeyV2 as RedisKeyProto
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.repo_config import (
DatastoreOnlineStoreConfig,
OnlineStoreConfig,
RedisOnlineStoreConfig,
SqliteOnlineStoreConfig,
)


def get_online_store_from_config(
online_store_config: OnlineStoreConfig,
) -> OnlineStore:
def get_online_store_from_config(online_store_config: Any,) -> OnlineStore:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: trailing comma

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the type of online_store_config be Any? Or FeastConfigBaseModel?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has to be Any unfortunately - can't be FeastConfigBaseModel since pydantic doesn't understand or handle "subclass of type" as a type annotation.

"""Get the offline store from offline store config"""

if isinstance(online_store_config, SqliteOnlineStoreConfig):
from feast.infra.online_stores.sqlite import SqliteOnlineStore

return SqliteOnlineStore()
elif isinstance(online_store_config, DatastoreOnlineStoreConfig):
from feast.infra.online_stores.datastore import DatastoreOnlineStore

return DatastoreOnlineStore()
elif isinstance(online_store_config, RedisOnlineStoreConfig):
from feast.infra.online_stores.redis import RedisOnlineStore

return RedisOnlineStore()
raise ValueError(f"Unsupported offline store config '{online_store_config}'")


SUPPORTED_SOURCES: Dict[Any, Set[Any]] = {
SqliteOnlineStoreConfig: {FileSource},
DatastoreOnlineStoreConfig: {BigQuerySource},
RedisOnlineStoreConfig: {FileSource, BigQuerySource},
}


def assert_online_store_supports_data_source(
online_store_config: OnlineStoreConfig, data_source: DataSource
):
supported_sources: Set[Any] = SUPPORTED_SOURCES.get(
online_store_config.__class__, set()
)
# This is needed because checking for `in` with Union types breaks mypy.
# https://github.com/python/mypy/issues/4954
# We can replace this with `data_source.__class__ in SUPPORTED_SOURCES[online_store_config.__class__]`
# Once ^ is resolved.
if supported_sources:
for source in supported_sources:
if source == data_source.__class__:
return
raise FeastOnlineStoreUnsupportedDataSource(
online_store_config.type, data_source.__class__.__name__
)
module_name = online_store_config.__module__
qualified_name = type(online_store_config).__name__
store_class_name = qualified_name.replace("Config", "")
try:
module = importlib.import_module(module_name)
except Exception as e:
# The original exception can be anything - either module not found,
# or any other kind of error happening during the module import time.
# So we should include the original error as well in the stack trace.
raise errors.FeastModuleImportError(
module_name, module_type="OnlineStore"
) from e

# Try getting the provider class definition
try:
online_store_class = getattr(module, store_class_name)
except AttributeError:
# This can only be one type of error, when class_name attribute does not exist in the module
# So we don't have to include the original exception here
raise errors.FeastClassImportError(
module_name, store_class_name, class_type="OnlineStore"
) from None
return online_store_class()


def _redis_key(project: str, entity_key: EntityKeyProto):
Expand Down
25 changes: 23 additions & 2 deletions sdk/python/feast/infra/online_stores/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@
# limitations under the License.
import json
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

from google.protobuf.timestamp_pb2 import Timestamp
from pydantic import StrictStr
from pydantic.typing import Literal

from feast import Entity, FeatureTable, FeatureView, RepoConfig, utils
from feast.infra.online_stores.helpers import _mmh3, _redis_key
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import RedisOnlineStoreConfig, RedisType
from feast.repo_config import FeastConfigBaseModel

try:
from redis import Redis
Expand All @@ -35,6 +38,25 @@
EX_SECONDS = 253402300799


class RedisType(str, Enum):
redis = "redis"
redis_cluster = "redis_cluster"


class RedisOnlineStoreConfig(FeastConfigBaseModel):
"""Online store config for Redis store"""

type: Literal["redis"] = "redis"
"""Online store type selector"""

redis_type: RedisType = RedisType.redis
"""Redis type: redis or redis_cluster"""

connection_string: StrictStr = "localhost:6379"
"""Connection string containing the host, port, and configuration parameters for Redis
format: host:port,parameter1,parameter2 eg. redis:6379,db=0 """


class RedisOnlineStore(OnlineStore):
_client: Optional[Union[Redis, RedisCluster]] = None

Expand Down Expand Up @@ -99,7 +121,6 @@ def _get_client(self, online_store_config: RedisOnlineStoreConfig):
startup_nodes, kwargs = self._parse_connection_string(
online_store_config.connection_string
)
print(f"Startup nodes: {startup_nodes}, {kwargs}")
if online_store_config.type == RedisType.redis_cluster:
kwargs["startup_nodes"] = startup_nodes
self._client = RedisCluster(**kwargs)
Expand Down
15 changes: 14 additions & 1 deletion sdk/python/feast/infra/online_stores/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,26 @@
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import pytz
from pydantic import StrictStr
from pydantic.schema import Literal

from feast import Entity, FeatureTable
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import RepoConfig
from feast.repo_config import FeastConfigBaseModel, RepoConfig


class SqliteOnlineStoreConfig(FeastConfigBaseModel):
""" Online store config for local (SQLite-based) store """

type: Literal["sqlite"] = "sqlite"
""" Online store type selector"""

path: StrictStr = "data/online.db"
""" (optional) Path to sqlite db """


class SqliteOnlineStore(OnlineStore):
Expand Down Expand Up @@ -65,6 +77,7 @@ def online_write_batch(
],
progress: Optional[Callable[[int], Any]],
) -> None:

conn = self._get_conn(config)

project = config.project
Expand Down
6 changes: 2 additions & 4 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,17 +163,15 @@ def get_provider(config: RepoConfig, repo_path: Path) -> Provider:
# The original exception can be anything - either module not found,
# or any other kind of error happening during the module import time.
# So we should include the original error as well in the stack trace.
raise errors.FeastProviderModuleImportError(module_name) from e
raise errors.FeastModuleImportError(module_name) from e

# Try getting the provider class definition
try:
ProviderCls = getattr(module, class_name)
except AttributeError:
# This can only be one type of error, when class_name attribute does not exist in the module
# So we don't have to include the original exception here
raise errors.FeastProviderClassImportError(
module_name, class_name
) from None
raise errors.FeastClassImportError(module_name, class_name) from None

return ProviderCls(config, repo_path)

Expand Down
Loading