Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce an OnlineStore interface #1628

Merged
merged 13 commits into from
Jun 11, 2021
7 changes: 7 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ def __init__(self, offline_store_name: str, data_source_name: str):
)


class FeastOnlineStoreUnsupportedDataSource(Exception):
def __init__(self, online_store_name: str, data_source_name: str):
super().__init__(
f"Online Store '{online_store_name}' does not support data source '{data_source_name}'"
)


class FeastEntityDFMissingColumnsError(Exception):
def __init__(self, expected, missing):
super().__init__(
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ def get_online_features(
table, union_of_entity_keys, entity_name_to_join_key_map
)
read_rows = provider.online_read(
project=self.project,
config=self.config,
table=table,
entity_keys=entity_keys,
requested_features=requested_features,
Expand Down
105 changes: 10 additions & 95 deletions sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
import itertools
from datetime import datetime
from multiprocessing.pool import ThreadPool
from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import mmh3
import pandas
from tqdm import tqdm

from feast import FeatureTable, utils
from feast import FeatureTable
from feast.entity import Entity
from feast.errors import FeastProviderLoginError
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.offline_stores.helpers import get_offline_store_from_config
from feast.infra.online_stores.helpers import get_online_store_from_config
from feast.infra.provider import (
Provider,
RetrievalJob,
Expand Down Expand Up @@ -46,7 +43,9 @@ def __init__(self, config: RepoConfig):
self._write_batch_size = config.online_store.write_batch_size

assert config.offline_store is not None
self.repo_config = config
self.offline_store = get_offline_store_from_config(config.offline_store)
self.online_store = get_online_store_from_config(config.online_store)

def _initialize_client(self):
try:
Expand Down Expand Up @@ -108,46 +107,24 @@ def teardown_infra(

def online_write_batch(
self,
project: str,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
client = self._initialize_client()

pool = ThreadPool(processes=self._write_concurrency)
pool.map(
lambda b: _write_minibatch(client, project, table, b, progress),
_to_minibatches(data, batch_size=self._write_batch_size),
)
self.online_store.online_write_batch(config, table, data, progress)

def online_read(
self,
project: str,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
entity_keys: List[EntityKeyProto],
requested_features: List[str] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
client = self._initialize_client()
result = self.online_store.online_read(config, table, entity_keys)

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
for entity_key in entity_keys:
document_id = compute_datastore_entity_id(entity_key)
key = client.key(
"Project", project, "Table", table.name, "Row", document_id
)
value = client.get(key)
if value is not None:
res = {}
for feature_name, value_bin in value["values"].items():
val = ValueProto()
val.ParseFromString(value_bin)
res[feature_name] = val
result.append((value["event_ts"], res))
else:
result.append((None, None))
return result

def materialize_single_feature_view(
Expand Down Expand Up @@ -188,7 +165,7 @@ def materialize_single_feature_view(

with tqdm_builder(len(rows_to_write)) as pbar:
self.online_write_batch(
project, feature_view, rows_to_write, lambda x: pbar.update(x)
self.repo_config, feature_view, rows_to_write, lambda x: pbar.update(x)
)

def get_historical_features(
Expand Down Expand Up @@ -216,58 +193,6 @@ def get_historical_features(
]


def _to_minibatches(data: ProtoBatch, batch_size) -> Iterator[ProtoBatch]:
"""
Split data into minibatches, making sure we stay under GCP datastore transaction size
limits.
"""
iterable = iter(data)

while True:
batch = list(itertools.islice(iterable, batch_size))
if len(batch) > 0:
yield batch
else:
break


def _write_minibatch(
client,
project: str,
table: Union[FeatureTable, FeatureView],
data: Sequence[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
):
entities = []
for entity_key, features, timestamp, created_ts in data:
document_id = compute_datastore_entity_id(entity_key)

key = client.key("Project", project, "Table", table.name, "Row", document_id,)

entity = datastore.Entity(
key=key, exclude_from_indexes=("created_ts", "event_ts", "values")
)

entity.update(
dict(
key=entity_key.SerializeToString(),
values={k: v.SerializeToString() for k, v in features.items()},
event_ts=utils.make_tzaware(timestamp),
created_ts=(
utils.make_tzaware(created_ts) if created_ts is not None else None
),
)
)
entities.append(entity)
with client.transaction():
client.put_multi(entities)

if progress:
progress(len(entities))


def _delete_all_values(client, key) -> None:
"""
Delete all data under the key path in datastore.
Expand All @@ -280,13 +205,3 @@ def _delete_all_values(client, key) -> None:

for entity in entities:
client.delete(entity.key)


def compute_datastore_entity_id(entity_key: EntityKeyProto) -> str:
"""
Compute Datastore Entity id given Feast Entity Key.

Remember that Datastore Entity is a concept from the Datastore data model, that has nothing to
do with the Entity concept we have in Feast.
"""
return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex()
78 changes: 8 additions & 70 deletions sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from feast import FeatureTable
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.offline_stores.helpers import get_offline_store_from_config
from feast.infra.online_stores.helpers import get_online_store_from_config
from feast.infra.provider import (
Provider,
RetrievalJob,
Expand All @@ -31,6 +31,7 @@ class LocalProvider(Provider):

def __init__(self, config: RepoConfig, repo_path: Path):
assert config is not None
self.config = config
assert isinstance(config.online_store, SqliteOnlineStoreConfig)
assert config.offline_store is not None
local_path = Path(config.online_store.path)
Expand All @@ -39,6 +40,7 @@ def __init__(self, config: RepoConfig, repo_path: Path):
else:
self._db_path = repo_path.joinpath(local_path)
self.offline_store = get_offline_store_from_config(config.offline_store)
self.online_store = get_online_store_from_config(config.online_store)

def _get_conn(self):
Path(self._db_path).parent.mkdir(exist_ok=True)
Expand Down Expand Up @@ -77,88 +79,24 @@ def teardown_infra(

def online_write_batch(
self,
project: str,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
conn = self._get_conn()

with conn:
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(entity_key)
timestamp = _to_naive_utc(timestamp)
if created_ts is not None:
created_ts = _to_naive_utc(created_ts)

for feature_name, val in values.items():
conn.execute(
f"""
UPDATE {_table_id(project, table)}
SET value = ?, event_ts = ?, created_ts = ?
WHERE (entity_key = ? AND feature_name = ?)
""",
(
# SET
val.SerializeToString(),
timestamp,
created_ts,
# WHERE
entity_key_bin,
feature_name,
),
)

conn.execute(
f"""INSERT OR IGNORE INTO {_table_id(project, table)}
(entity_key, feature_name, value, event_ts, created_ts)
VALUES (?, ?, ?, ?, ?)""",
(
entity_key_bin,
feature_name,
val.SerializeToString(),
timestamp,
created_ts,
),
)
if progress:
progress(1)
self.online_store.online_write_batch(config, table, data, progress)

def online_read(
self,
project: str,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
entity_keys: List[EntityKeyProto],
requested_features: List[str] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
result = self.online_store.online_read(config, table, entity_keys)

conn = self._get_conn()
cur = conn.cursor()

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []

for entity_key in entity_keys:
entity_key_bin = serialize_entity_key(entity_key)

cur.execute(
f"SELECT feature_name, value, event_ts FROM {_table_id(project, table)} WHERE entity_key = ?",
(entity_key_bin,),
)

res = {}
res_ts = None
for feature_name, val_bin, ts in cur.fetchall():
val = ValueProto()
val.ParseFromString(val_bin)
res[feature_name] = val
res_ts = ts

if not res:
result.append((None, None))
else:
result.append((res_ts, res))
return result

def materialize_single_feature_view(
Expand Down Expand Up @@ -199,7 +137,7 @@ def materialize_single_feature_view(

with tqdm_builder(len(rows_to_write)) as pbar:
self.online_write_batch(
project, feature_view, rows_to_write, lambda x: pbar.update(x)
self.config, feature_view, rows_to_write, lambda x: pbar.update(x)
)

def get_historical_features(
Expand Down
Empty file.
Loading