Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Patch FAISS online return signature #4671

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from feast import Entity, FeatureView, RepoConfig
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey
from feast.protos.feast.types.Value_pb2 import Value
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel


Expand Down Expand Up @@ -94,9 +94,9 @@ def online_read(
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKey],
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]]:
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
if self._index is None:
return [(None, None)] * len(entity_keys)

Expand All @@ -111,7 +111,7 @@ def online_read(
else:
feature_vector = self._index.reconstruct(int(idx))
feature_dict = {
name: Value(double_val=value)
name: ValueProto(double_val=value)
for name, value in zip(
self._in_memory_store.feature_names, feature_vector
)
Expand All @@ -123,7 +123,9 @@ def online_write_batch(
self,
config: RepoConfig,
table: FeatureView,
data: List[Tuple[EntityKey, Dict[str, Value], datetime, Optional[datetime]]],
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
if self._index is None:
Expand Down Expand Up @@ -181,9 +183,10 @@ def retrieve_online_documents(
) -> List[
Tuple[
Optional[datetime],
Optional[Value],
Optional[Value],
Optional[Value],
Optional[EntityKeyProto],
Optional[ValueProto],
Optional[ValueProto],
Optional[ValueProto],
]
]:
if self._index is None:
Expand All @@ -196,9 +199,10 @@ def retrieve_online_documents(
results: List[
Tuple[
Optional[datetime],
Optional[Value],
Optional[Value],
Optional[Value],
Optional[EntityKeyProto],
Optional[ValueProto],
Optional[ValueProto],
Optional[ValueProto],
]
] = []
for i, idx in enumerate(indices[0]):
Expand All @@ -209,14 +213,15 @@ def retrieve_online_documents(

timestamp = Timestamp()
timestamp.GetCurrentTime()

feature_value = Value(string_val=",".join(map(str, feature_vector)))
vector_value = Value(string_val=",".join(map(str, feature_vector)))
distance_value = Value(float_val=distances[0][i])
entity_value = EntityKeyProto()
feature_value = ValueProto(string_val=",".join(map(str, feature_vector)))
vector_value = ValueProto(string_val=",".join(map(str, feature_vector)))
distance_value = ValueProto(float_val=distances[0][i])

results.append(
(
timestamp.ToDatetime(),
entity_value,
feature_value,
vector_value,
distance_value,
Expand All @@ -229,8 +234,8 @@ async def online_read_async(
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKey],
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]]:
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
# Implement async read if needed
raise NotImplementedError("Async read is not implemented for FaissOnlineStore")
Loading