From 136c1dc8653715edd43efbe88e293689cb77534b Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 8 Jun 2021 11:35:49 -0700 Subject: [PATCH] format and lint Signed-off-by: Achal Shah --- sdk/python/feast/infra/gcp.py | 11 ++++----- .../feast/infra/online_stores/datastore.py | 24 ++++++++++++------- .../feast/infra/online_stores/helpers.py | 20 +++++++--------- .../feast/infra/online_stores/sqlite.py | 1 - sdk/python/tests/foo_provider.py | 4 ++-- 5 files changed, 29 insertions(+), 31 deletions(-) diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index 15c3a4d458..30516292d3 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -1,17 +1,13 @@ -import itertools from datetime import datetime -from multiprocessing.pool import ThreadPool -from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union -import mmh3 import pandas from tqdm import tqdm -from feast import FeatureTable, utils +from feast import FeatureTable from feast.entity import Entity from feast.errors import FeastProviderLoginError from feast.feature_view import FeatureView -from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.offline_stores.helpers import get_offline_store_from_config from feast.infra.online_stores.helpers import get_online_store_from_config from feast.infra.provider import ( @@ -47,6 +43,7 @@ def __init__(self, config: RepoConfig): self._write_batch_size = config.online_store.write_batch_size assert config.offline_store is not None + self.repo_config = config self.offline_store = get_offline_store_from_config(config.offline_store) self.online_store = get_online_store_from_config(config.online_store) @@ -168,7 +165,7 @@ def materialize_single_feature_view( with tqdm_builder(len(rows_to_write)) as pbar: self.online_write_batch( - project, feature_view, rows_to_write, lambda x: pbar.update(x) + self.repo_config, feature_view, rows_to_write, lambda x: pbar.update(x) ) def get_historical_features( diff --git a/sdk/python/feast/infra/online_stores/datastore.py b/sdk/python/feast/infra/online_stores/datastore.py index 4d4ff57809..4a6c9c77b8 100644 --- a/sdk/python/feast/infra/online_stores/datastore.py +++ b/sdk/python/feast/infra/online_stores/datastore.py @@ -18,13 +18,13 @@ import mmh3 -from feast import FeatureTable +from feast import FeatureTable, utils from feast.feature_view import FeatureView from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.online_store import OnlineStore from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto -from feast.repo_config import RepoConfig +from feast.repo_config import DatastoreOnlineStoreConfig, RepoConfig try: from google.auth.exceptions import DefaultCredentialsError @@ -47,11 +47,11 @@ class DatastoreOnlineStore(OnlineStore): """ @classmethod - def _initialize_client(cls, config: RepoConfig): + def _initialize_client(cls, online_config: DatastoreOnlineStoreConfig): + try: return datastore.Client( - project=config.online_store.project_id, - namespace=config.online_store.namespace, + project=online_config.project_id, namespace=online_config.namespace, ) except DefaultCredentialsError as e: raise FeastProviderLoginError( @@ -70,10 +70,13 @@ def online_write_batch( ], progress: Optional[Callable[[int], Any]], ) -> None: - client = cls._initialize_client(config) - write_concurrency = config.online_store.write_concurrency - write_batch_size = config.online_store.write_batch_size + online_config = config.online_store + assert isinstance(online_config, DatastoreOnlineStoreConfig) + client = cls._initialize_client(online_config) + + write_concurrency = online_config.write_concurrency + write_batch_size = online_config.write_batch_size feast_project = config.project pool = ThreadPool(processes=write_concurrency) @@ -145,7 +148,10 @@ def online_read( table: Union[FeatureTable, FeatureView], entity_keys: List[EntityKeyProto], ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: - client = cls._initialize_client(config) + + online_config = config.online_store + assert isinstance(online_config, DatastoreOnlineStoreConfig) + client = cls._initialize_client(online_config) feast_project = config.project diff --git a/sdk/python/feast/infra/online_stores/helpers.py b/sdk/python/feast/infra/online_stores/helpers.py index 4c37622ed1..cf486d760f 100644 --- a/sdk/python/feast/infra/online_stores/helpers.py +++ b/sdk/python/feast/infra/online_stores/helpers.py @@ -1,8 +1,6 @@ from feast.data_source import BigQuerySource, DataSource, FileSource from feast.errors import FeastOnlineStoreUnsupportedDataSource -from feast.infra.online_stores.datastore import DatastoreOnlineStore from feast.infra.online_stores.online_store import OnlineStore -from feast.infra.online_stores.sqlite import SqliteOnlineStore from feast.repo_config import ( DatastoreOnlineStoreConfig, OnlineStoreConfig, @@ -16,28 +14,26 @@ def get_online_store_from_config( """Get the offline store from offline store config""" if isinstance(online_store_config, SqliteOnlineStoreConfig): - from feast.infra.offline_stores.file import FileOfflineStore + from feast.infra.online_stores.sqlite import SqliteOnlineStore return SqliteOnlineStore() elif isinstance(online_store_config, DatastoreOnlineStoreConfig): - from feast.infra.offline_stores.bigquery import BigQueryOfflineStore + from feast.infra.online_stores.datastore import DatastoreOnlineStore return DatastoreOnlineStore() raise ValueError(f"Unsupported offline store config '{online_store_config}'") -SUPPORTED_DATA_SOURCES_FOR_ONLINE_STORE = { - SqliteOnlineStoreConfig: {FileSource}, - DatastoreOnlineStoreConfig: {BigQuerySource}, -} - - def assert_online_store_supports_data_source( online_store_config: OnlineStoreConfig, data_source: DataSource ): - if type(data_source) in SUPPORTED_DATA_SOURCES_FOR_ONLINE_STORE.get( - type(online_store_config), set() + if ( + isinstance(online_store_config, SqliteOnlineStoreConfig) + and isinstance(data_source, FileSource) + ) or ( + isinstance(online_store_config, DatastoreOnlineStoreConfig) + and isinstance(data_source, BigQuerySource) ): return diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index d2150d09a7..2ef5a4ed66 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import os import sqlite3 from datetime import datetime from pathlib import Path diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index b313ad9cc7..8b7e5f4d36 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -34,7 +34,7 @@ def teardown_infra( def online_write_batch( self, - project: str, + config: RepoConfig, table: Union[FeatureTable, FeatureView], data: List[ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] @@ -67,7 +67,7 @@ def get_historical_features( def online_read( self, - project: str, + config: RepoConfig, table: Union[FeatureTable, FeatureView], entity_keys: List[EntityKeyProto], requested_features: List[str] = None,