Skip to content

Commit

Permalink
format and lint
Browse files Browse the repository at this point in the history
Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals committed Jun 9, 2021
1 parent ee463e3 commit 136c1dc
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 31 deletions.
11 changes: 4 additions & 7 deletions sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
import itertools
from datetime import datetime
from multiprocessing.pool import ThreadPool
from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import mmh3
import pandas
from tqdm import tqdm

from feast import FeatureTable, utils
from feast import FeatureTable
from feast.entity import Entity
from feast.errors import FeastProviderLoginError
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.offline_stores.helpers import get_offline_store_from_config
from feast.infra.online_stores.helpers import get_online_store_from_config
from feast.infra.provider import (
Expand Down Expand Up @@ -47,6 +43,7 @@ def __init__(self, config: RepoConfig):
self._write_batch_size = config.online_store.write_batch_size

assert config.offline_store is not None
self.repo_config = config
self.offline_store = get_offline_store_from_config(config.offline_store)
self.online_store = get_online_store_from_config(config.online_store)

Expand Down Expand Up @@ -168,7 +165,7 @@ def materialize_single_feature_view(

with tqdm_builder(len(rows_to_write)) as pbar:
self.online_write_batch(
project, feature_view, rows_to_write, lambda x: pbar.update(x)
self.repo_config, feature_view, rows_to_write, lambda x: pbar.update(x)
)

def get_historical_features(
Expand Down
24 changes: 15 additions & 9 deletions sdk/python/feast/infra/online_stores/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

import mmh3

from feast import FeatureTable
from feast import FeatureTable, utils
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import RepoConfig
from feast.repo_config import DatastoreOnlineStoreConfig, RepoConfig

try:
from google.auth.exceptions import DefaultCredentialsError
Expand All @@ -47,11 +47,11 @@ class DatastoreOnlineStore(OnlineStore):
"""

@classmethod
def _initialize_client(cls, config: RepoConfig):
def _initialize_client(cls, online_config: DatastoreOnlineStoreConfig):

try:
return datastore.Client(
project=config.online_store.project_id,
namespace=config.online_store.namespace,
project=online_config.project_id, namespace=online_config.namespace,
)
except DefaultCredentialsError as e:
raise FeastProviderLoginError(
Expand All @@ -70,10 +70,13 @@ def online_write_batch(
],
progress: Optional[Callable[[int], Any]],
) -> None:
client = cls._initialize_client(config)

write_concurrency = config.online_store.write_concurrency
write_batch_size = config.online_store.write_batch_size
online_config = config.online_store
assert isinstance(online_config, DatastoreOnlineStoreConfig)
client = cls._initialize_client(online_config)

write_concurrency = online_config.write_concurrency
write_batch_size = online_config.write_batch_size
feast_project = config.project

pool = ThreadPool(processes=write_concurrency)
Expand Down Expand Up @@ -145,7 +148,10 @@ def online_read(
table: Union[FeatureTable, FeatureView],
entity_keys: List[EntityKeyProto],
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
client = cls._initialize_client(config)

online_config = config.online_store
assert isinstance(online_config, DatastoreOnlineStoreConfig)
client = cls._initialize_client(online_config)

feast_project = config.project

Expand Down
20 changes: 8 additions & 12 deletions sdk/python/feast/infra/online_stores/helpers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from feast.data_source import BigQuerySource, DataSource, FileSource
from feast.errors import FeastOnlineStoreUnsupportedDataSource
from feast.infra.online_stores.datastore import DatastoreOnlineStore
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.online_stores.sqlite import SqliteOnlineStore
from feast.repo_config import (
DatastoreOnlineStoreConfig,
OnlineStoreConfig,
Expand All @@ -16,28 +14,26 @@ def get_online_store_from_config(
"""Get the offline store from offline store config"""

if isinstance(online_store_config, SqliteOnlineStoreConfig):
from feast.infra.offline_stores.file import FileOfflineStore
from feast.infra.online_stores.sqlite import SqliteOnlineStore

return SqliteOnlineStore()
elif isinstance(online_store_config, DatastoreOnlineStoreConfig):
from feast.infra.offline_stores.bigquery import BigQueryOfflineStore
from feast.infra.online_stores.datastore import DatastoreOnlineStore

return DatastoreOnlineStore()

raise ValueError(f"Unsupported offline store config '{online_store_config}'")


SUPPORTED_DATA_SOURCES_FOR_ONLINE_STORE = {
SqliteOnlineStoreConfig: {FileSource},
DatastoreOnlineStoreConfig: {BigQuerySource},
}


def assert_online_store_supports_data_source(
online_store_config: OnlineStoreConfig, data_source: DataSource
):
if type(data_source) in SUPPORTED_DATA_SOURCES_FOR_ONLINE_STORE.get(
type(online_store_config), set()
if (
isinstance(online_store_config, SqliteOnlineStoreConfig)
and isinstance(data_source, FileSource)
) or (
isinstance(online_store_config, DatastoreOnlineStoreConfig)
and isinstance(data_source, BigQuerySource)
):
return

Expand Down
1 change: 0 additions & 1 deletion sdk/python/feast/infra/online_stores/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sqlite3
from datetime import datetime
from pathlib import Path
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/tests/foo_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def teardown_infra(

def online_write_batch(
self,
project: str,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
Expand Down Expand Up @@ -67,7 +67,7 @@ def get_historical_features(

def online_read(
self,
project: str,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
entity_keys: List[EntityKeyProto],
requested_features: List[str] = None,
Expand Down

0 comments on commit 136c1dc

Please sign in to comment.