Skip to content

Commit

Permalink
fix: Integration tests for async sdk method (#4201)
Browse files Browse the repository at this point in the history
  • Loading branch information
breno-costa authored May 17, 2024
1 parent 20f5419 commit 08c44ae
Showing 1 changed file with 53 additions and 18 deletions.
71 changes: 53 additions & 18 deletions sdk/python/tests/integration/online_store/test_universal_online.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import datetime
import os
import time
Expand All @@ -12,6 +13,7 @@
import requests
from botocore.exceptions import BotoCoreError

from feast import FeatureStore
from feast.entity import Entity
from feast.errors import FeatureNameCollisionError
from feast.feature_service import FeatureService
Expand Down Expand Up @@ -400,19 +402,15 @@ def test_online_retrieval_with_shared_batch_source(environment, universal_data_s
)


@pytest.mark.integration
@pytest.mark.universal_online_stores
@pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v))
def test_online_retrieval_with_event_timestamps(
environment, universal_data_sources, full_feature_names
):
fs = environment.feature_store
def setup_feature_store_universal_feature_views(
environment, universal_data_sources
) -> FeatureStore:
fs: FeatureStore = environment.feature_store
entities, datasets, data_sources = universal_data_sources
feature_views = construct_universal_feature_views(data_sources)

fs.apply([driver(), feature_views.driver, feature_views.global_fv])

# fake data to ingest into Online Store
data = {
"driver_id": [1, 2],
"conv_rate": [0.5, 0.3],
Expand All @@ -429,18 +427,11 @@ def test_online_retrieval_with_event_timestamps(
}
df_ingest = pd.DataFrame(data)

# directly ingest data into the Online Store
fs.write_to_online_store("driver_stats", df_ingest)
return fs

response = fs.get_online_features(
features=[
"driver_stats:avg_daily_trips",
"driver_stats:acc_rate",
"driver_stats:conv_rate",
],
entity_rows=[{"driver_id": 1}, {"driver_id": 2}],
)
df = response.to_df(True)

def assert_feature_store_universal_feature_views_response(df: pd.DataFrame):
assertpy.assert_that(len(df)).is_equal_to(2)
assertpy.assert_that(df["driver_id"].iloc[0]).is_equal_to(1)
assertpy.assert_that(df["driver_id"].iloc[1]).is_equal_to(2)
Expand All @@ -464,6 +455,50 @@ def test_online_retrieval_with_event_timestamps(
)


@pytest.mark.integration
@pytest.mark.universal_online_stores
def test_online_retrieval_with_event_timestamps(environment, universal_data_sources):
fs = setup_feature_store_universal_feature_views(
environment, universal_data_sources
)

response = fs.get_online_features(
features=[
"driver_stats:avg_daily_trips",
"driver_stats:acc_rate",
"driver_stats:conv_rate",
],
entity_rows=[{"driver_id": 1}, {"driver_id": 2}],
)
df = response.to_df(True)

assert_feature_store_universal_feature_views_response(df)


@pytest.mark.integration
@pytest.mark.universal_online_stores(only=["redis"])
def test_async_online_retrieval_with_event_timestamps(
environment, universal_data_sources
):
fs = setup_feature_store_universal_feature_views(
environment, universal_data_sources
)

response = asyncio.run(
fs.get_online_features_async(
features=[
"driver_stats:avg_daily_trips",
"driver_stats:acc_rate",
"driver_stats:conv_rate",
],
entity_rows=[{"driver_id": 1}, {"driver_id": 2}],
)
)
df = response.to_df(True)

assert_feature_store_universal_feature_views_response(df)


@pytest.mark.integration
@pytest.mark.universal_online_stores(only=["redis"])
def test_online_store_cleanup(environment, universal_data_sources):
Expand Down

0 comments on commit 08c44ae

Please sign in to comment.