From 5481caf37989c347bf4469f5f081f4f15f20fdb7 Mon Sep 17 00:00:00 2001 From: Vitaly Sergeyev Date: Fri, 4 Mar 2022 18:32:56 -0500 Subject: [PATCH] feat: Event timestamps response (#2355) * ability to get event timestamps from online response Signed-off-by: Vitaly Sergeyev * fix event timestamp bugs Signed-off-by: Vitaly Sergeyev * python formatting Signed-off-by: Vitaly Sergeyev * optional param to retrieve event_timestamp in online_reponse Signed-off-by: Vitaly Sergeyev * formatting Signed-off-by: Vitaly Sergeyev * renaming param Signed-off-by: Vitaly Sergeyev --- sdk/python/feast/feature_store.py | 2 +- sdk/python/feast/infra/online_stores/redis.py | 3 +- sdk/python/feast/online_response.py | 24 +++++-- .../online_store/test_universal_online.py | 65 +++++++++++++++++++ 4 files changed, 88 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index b47e6745c9..ed944804c5 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1508,10 +1508,10 @@ def _read_from_online_store( # Each row is a set of features for a given entity key. We only need to convert # the data to Protobuf once. - row_ts_proto = Timestamp() null_value = Value() read_row_protos = [] for read_row in read_rows: + row_ts_proto = Timestamp() row_ts, feature_data = read_row if row_ts is not None: row_ts_proto.FromDatetime(row_ts) diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 6225f2d1d1..a2e8e27d80 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -27,6 +27,7 @@ Union, ) +import pytz from google.protobuf.timestamp_pb2 import Timestamp from pydantic import StrictStr from pydantic.typing import Literal @@ -302,5 +303,5 @@ def _get_features_for_entity( if not res: return None, None else: - timestamp = datetime.fromtimestamp(res_ts.seconds) + timestamp = datetime.fromtimestamp(res_ts.seconds, tz=pytz.utc) return timestamp, res diff --git a/sdk/python/feast/online_response.py b/sdk/python/feast/online_response.py index bb69c6b9d9..f01bd510be 100644 --- a/sdk/python/feast/online_response.py +++ b/sdk/python/feast/online_response.py @@ -20,10 +20,12 @@ from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesResponse from feast.type_map import feast_value_type_to_python_type +TIMESTAMP_POSTFIX: str = "__ts" + class OnlineResponse: """ - Defines a online response in feast. + Defines an online response in feast. """ def __init__(self, online_response_proto: GetOnlineFeaturesResponse): @@ -44,9 +46,12 @@ def __init__(self, online_response_proto: GetOnlineFeaturesResponse): del result.event_timestamps[idx] break - def to_dict(self) -> Dict[str, Any]: + def to_dict(self, include_event_timestamps: bool = False) -> Dict[str, Any]: """ Converts GetOnlineFeaturesResponse features into a dictionary form. + + Args: + is_with_event_timestamps: bool Optionally include feature timestamps in the dictionary """ response: Dict[str, List[Any]] = {} @@ -58,11 +63,22 @@ def to_dict(self) -> Dict[str, Any]: else: response[feature_ref].append(native_type_value) + if include_event_timestamps: + event_ts = result.event_timestamps[idx].seconds + timestamp_ref = feature_ref + TIMESTAMP_POSTFIX + if timestamp_ref not in response: + response[timestamp_ref] = [event_ts] + else: + response[timestamp_ref].append(event_ts) + return response - def to_df(self) -> pd.DataFrame: + def to_df(self, include_event_timestamps: bool = False) -> pd.DataFrame: """ Converts GetOnlineFeaturesResponse features into Panda dataframe form. + + Args: + is_with_event_timestamps: bool Optionally include feature timestamps in the dataframe """ - return pd.DataFrame(self.to_dict()) + return pd.DataFrame(self.to_dict(include_event_timestamps)) diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 90a06bb347..bdbdfb3555 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -18,6 +18,7 @@ FeatureNameCollisionError, RequestDataNotFoundInEntityRowsException, ) +from feast.online_response import TIMESTAMP_POSTFIX from feast.wait import wait_retry_backoff from tests.integration.feature_repos.repo_configuration import ( Environment, @@ -322,6 +323,70 @@ def get_online_features_dict( return dict1 +@pytest.mark.integration +@pytest.mark.universal +@pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v)) +def test_online_retrieval_with_event_timestamps( + environment, universal_data_sources, full_feature_names +): + fs = environment.feature_store + entities, datasets, data_sources = universal_data_sources + feature_views = construct_universal_feature_views(data_sources) + + fs.apply([driver(), feature_views.driver, feature_views.global_fv]) + + # fake data to ingest into Online Store + data = { + "driver_id": [1, 2], + "conv_rate": [0.5, 0.3], + "acc_rate": [0.6, 0.4], + "avg_daily_trips": [4, 5], + "event_timestamp": [ + pd.to_datetime(1646263500, utc=True, unit="s"), + pd.to_datetime(1646263600, utc=True, unit="s"), + ], + "created": [ + pd.to_datetime(1646263500, unit="s"), + pd.to_datetime(1646263600, unit="s"), + ], + } + df_ingest = pd.DataFrame(data) + + # directly ingest data into the Online Store + fs.write_to_online_store("driver_stats", df_ingest) + + response = fs.get_online_features( + features=[ + "driver_stats:avg_daily_trips", + "driver_stats:acc_rate", + "driver_stats:conv_rate", + ], + entity_rows=[{"driver": 1}, {"driver": 2}], + ) + df = response.to_df(True) + assertpy.assert_that(len(df)).is_equal_to(2) + assertpy.assert_that(df["driver_id"].iloc[0]).is_equal_to(1) + assertpy.assert_that(df["driver_id"].iloc[1]).is_equal_to(2) + assertpy.assert_that(df["avg_daily_trips" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to( + 1646263500 + ) + assertpy.assert_that(df["avg_daily_trips" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to( + 1646263600 + ) + assertpy.assert_that(df["acc_rate" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to( + 1646263500 + ) + assertpy.assert_that(df["acc_rate" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to( + 1646263600 + ) + assertpy.assert_that(df["conv_rate" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to( + 1646263500 + ) + assertpy.assert_that(df["conv_rate" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to( + 1646263600 + ) + + @pytest.mark.integration @pytest.mark.universal @pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v))