diff --git a/sdk/python/tests/integration/e2e/test_go_feature_server.py b/sdk/python/tests/integration/e2e/test_go_feature_server.py index e469c90c11..4e4cfc1fb8 100644 --- a/sdk/python/tests/integration/e2e/test_go_feature_server.py +++ b/sdk/python/tests/integration/e2e/test_go_feature_server.py @@ -10,7 +10,7 @@ import pytest import pytz -from feast import FeatureService, ValueType +from feast import FeatureService, FeatureView, ValueType from feast.embedded_go.lib.embedded import LoggingOptions from feast.embedded_go.online_features_service import EmbeddedOnlineFeatureServer from feast.feast_object import FeastObject @@ -162,13 +162,14 @@ def test_feature_logging( _, datasets, _ = universal_data_sources latest_rows = get_latest_rows(datasets.driver_df, "driver_id", driver_ids) + feature_view = fs.get_feature_view("driver_stats") features = [ feature.name for proj in feature_service.feature_view_projections for feature in proj.features ] expected_logs = generate_expected_logs( - latest_rows, "driver_stats", features, ["driver_id"], "event_timestamp" + latest_rows, feature_view, features, ["driver_id"], "event_timestamp" ) def retrieve(): @@ -213,15 +214,26 @@ def get_latest_rows(df, join_key, entity_values): def generate_expected_logs( - df, feature_view_name, features, join_keys, timestamp_column + df: pd.DataFrame, + feature_view: FeatureView, + features: List[str], + join_keys: List[str], + timestamp_column: str, ): logs = pd.DataFrame() for join_key in join_keys: logs[join_key] = df[join_key] for feature in features: - logs[f"{feature_view_name}__{feature}"] = df[feature] - logs[f"{feature_view_name}__{feature}__timestamp"] = df[timestamp_column] - logs[f"{feature_view_name}__{feature}__status"] = FieldStatus.PRESENT + col = f"{feature_view.name}__{feature}" + logs[col] = df[feature] + logs[f"{col}__timestamp"] = df[timestamp_column] + logs[f"{col}__status"] = FieldStatus.PRESENT + if feature_view.ttl: + logs[f"{col}__status"] = logs[f"{col}__status"].mask( + df[timestamp_column] + < datetime.utcnow().replace(tzinfo=pytz.UTC) - feature_view.ttl, + FieldStatus.OUTSIDE_MAX_AGE, + ) return logs.sort_values(by=join_keys).reset_index(drop=True)