From 325ad250a461dc8a41b29f702a0f49d6f96594c7 Mon Sep 17 00:00:00 2001 From: Matt Delacour Date: Thu, 5 Aug 2021 10:57:49 +0200 Subject: [PATCH 1/2] Fix how latest feature is calculated using Window function Signed-off-by: Matt Delacour --- .../feast/infra/offline_stores/bigquery.py | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 5fa0114133..3463e96898 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -430,20 +430,21 @@ def _get_bigquery_client(project: Optional[str] = None): Thus we only need to compute the latest timestamp of each feature. */ {{ featureview.name }}__latest AS ( - SELECT - {{featureview.name}}__entity_row_unique_id, - MAX(event_timestamp) AS event_timestamp + SELECT * EXCEPT(row_number) + FROM + ( + SELECT *, + ROW_NUMBER() OVER( + PARTITION BY {{featureview.name}}__entity_row_unique_id + ORDER BY event_timestamp DESC{% if featureview.created_timestamp_column %},created_timestamp DESC{% endif %} + ) AS row_number, + FROM {{ featureview.name }}__base {% if featureview.created_timestamp_column %} - ,ANY_VALUE(created_timestamp) AS created_timestamp + INNER JOIN {{ featureview.name }}__dedup + USING ({{featureview.name}}__entity_row_unique_id, event_timestamp, created_timestamp) {% endif %} - - FROM {{ featureview.name }}__base - {% if featureview.created_timestamp_column %} - INNER JOIN {{ featureview.name }}__dedup - USING ({{featureview.name}}__entity_row_unique_id, event_timestamp, created_timestamp) - {% endif %} - - GROUP BY {{featureview.name}}__entity_row_unique_id + ) + WHERE row_number = 1 ), /* From 0fb046e2b07a5a8e1876d20c0ba1840857833b90 Mon Sep 17 00:00:00 2001 From: Matt Delacour Date: Fri, 6 Aug 2021 07:16:39 +0200 Subject: [PATCH 2/2] Add simple test to replicate the backfill issue Signed-off-by: Matt Delacour --- .../test_historical_retrieval.py | 134 ++++++++++++++++++ 1 file changed, 134 insertions(+) diff --git a/sdk/python/tests/integration/offline_store/test_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_historical_retrieval.py index ddaaee9282..b50cea2392 100644 --- a/sdk/python/tests/integration/offline_store/test_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_historical_retrieval.py @@ -992,3 +992,137 @@ def test_feature_name_collision_on_historical_retrieval(): "have different names." ) assert str(error.value) == expected_error_message + + +@pytest.mark.integration +def test_historical_features_from_bigquery_sources_containing_backfills(capsys): + now = datetime.now().replace(microsecond=0, second=0, minute=0) + tomorrow = now + timedelta(days=1) + + entity_dataframe = pd.DataFrame( + data=[ + {"driver_id": 1001, "event_timestamp": now + timedelta(days=2)}, + {"driver_id": 1002, "event_timestamp": now + timedelta(days=2)}, + ] + ) + + driver_stats_df = pd.DataFrame( + data=[ + # Duplicated rows simple case + { + "driver_id": 1001, + "avg_daily_trips": 10, + "event_timestamp": now, + "created": tomorrow, + }, + { + "driver_id": 1001, + "avg_daily_trips": 20, + "event_timestamp": tomorrow, + "created": tomorrow, + }, + # Duplicated rows after a backfill + { + "driver_id": 1002, + "avg_daily_trips": 30, + "event_timestamp": now, + "created": tomorrow, + }, + { + "driver_id": 1002, + "avg_daily_trips": 40, + "event_timestamp": tomorrow, + "created": now, + }, + ] + ) + + expected_df = pd.DataFrame( + data=[ + { + "driver_id": 1001, + "event_timestamp": now + timedelta(days=2), + "avg_daily_trips": 20, + }, + { + "driver_id": 1002, + "event_timestamp": now + timedelta(days=2), + "avg_daily_trips": 40, + }, + ] + ) + + bigquery_dataset = ( + f"test_hist_retrieval_{int(time.time_ns())}_{random.randint(1000, 9999)}" + ) + + with BigQueryDataSet(bigquery_dataset), TemporaryDirectory() as temp_dir: + gcp_project = bigquery.Client().project + + # Entity Dataframe SQL query + table_id = f"{bigquery_dataset}.orders" + stage_orders_bigquery(entity_dataframe, table_id) + entity_df_query = f"SELECT * FROM {gcp_project}.{table_id}" + + # Driver Feature View + driver_table_id = f"{gcp_project}.{bigquery_dataset}.driver_hourly" + stage_driver_hourly_stats_bigquery_source(driver_stats_df, driver_table_id) + + store = FeatureStore( + config=RepoConfig( + registry=os.path.join(temp_dir, "registry.db"), + project="".join( + random.choices(string.ascii_uppercase + string.digits, k=10) + ), + provider="gcp", + offline_store=BigQueryOfflineStoreConfig( + type="bigquery", dataset=bigquery_dataset + ), + ) + ) + + driver = Entity(name="driver", join_key="driver_id", value_type=ValueType.INT64) + driver_fv = FeatureView( + name="driver_stats", + entities=["driver"], + features=[Feature(name="avg_daily_trips", dtype=ValueType.INT32)], + batch_source=BigQuerySource( + table_ref=driver_table_id, + event_timestamp_column="event_timestamp", + created_timestamp_column="created", + ), + ttl=None, + ) + + store.apply([driver, driver_fv]) + + try: + job_from_sql = store.get_historical_features( + entity_df=entity_df_query, + features=["driver_stats:avg_daily_trips"], + full_feature_names=False, + ) + + start_time = datetime.utcnow() + actual_df_from_sql_entities = job_from_sql.to_df() + end_time = datetime.utcnow() + with capsys.disabled(): + print( + str( + f"\nTime to execute job_from_sql.to_df() = '{(end_time - start_time)}'" + ) + ) + + assert sorted(expected_df.columns) == sorted( + actual_df_from_sql_entities.columns + ) + assert_frame_equal( + expected_df.sort_values(by=["driver_id"]).reset_index(drop=True), + actual_df_from_sql_entities[expected_df.columns] + .sort_values(by=["driver_id"]) + .reset_index(drop=True), + check_dtype=False, + ) + + finally: + store.teardown()