diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 1d931b285f..9c61215156 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -307,6 +307,9 @@ def build_point_in_time_query( "max_timestamp": max_timestamp, "left_table_query_string": left_table_query_string, "entity_df_event_timestamp_col": entity_df_event_timestamp_col, + "unique_entity_keys": set( + [entity for fv in feature_view_query_contexts for entity in fv.entities] + ), "featureviews": [asdict(context) for context in feature_view_query_contexts], } @@ -341,7 +344,15 @@ def _get_bigquery_client(): SINGLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """ WITH entity_dataframe AS ( - SELECT ROW_NUMBER() OVER() AS row_number, edf.* FROM {{ left_table_query_string }} as edf + SELECT + *, + CONCAT( + {% for entity_key in unique_entity_keys %} + CAST({{entity_key}} AS STRING), + {% endfor %} + CAST({{entity_df_event_timestamp_col}} AS STRING) + ) AS entity_row_unique_id + FROM {{ left_table_query_string }} ), {% for featureview in featureviews %} /* @@ -354,7 +365,7 @@ def _get_bigquery_client(): {{ featureview.name }}__union_features AS ( SELECT -- unique identifier for each row in the entity dataset. - row_number, + entity_row_unique_id, -- event_timestamp contains the timestamps to join onto {{entity_df_event_timestamp_col}} AS event_timestamp, -- the feature_timestamp, i.e. the latest occurrence of the requested feature relative to the entity_dataset timestamp @@ -368,7 +379,7 @@ def _get_bigquery_client(): FROM entity_dataframe UNION ALL SELECT - NULL as row_number, + NULL as entity_row_unique_id, {{ featureview.event_timestamp_column }} as event_timestamp, {{ featureview.event_timestamp_column }} as {{ featureview.name }}_feature_timestamp, {{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }} @@ -388,7 +399,7 @@ def _get_bigquery_client(): */ {{ featureview.name }}__joined AS ( SELECT - row_number, + entity_row_unique_id, event_timestamp, {{ featureview.entities | join(', ')}}, {% for feature in featureview.features %} @@ -396,7 +407,7 @@ def _get_bigquery_client(): {% endfor %} FROM ( SELECT - row_number, + entity_row_unique_id, event_timestamp, {{ featureview.entities | join(', ')}}, {{ 'FIRST_VALUE(created_timestamp IGNORE NULLS) over w AS created_timestamp,' if featureview.created_timestamp_column else '' }} @@ -423,30 +434,30 @@ def _get_bigquery_client(): WHERE is_entity_table ), /* - 4. Finally, deduplicate the rows by selecting the first occurrence of each entity table row_number. + 4. Finally, deduplicate the rows by selecting the first occurrence of each entity table entity_row_unique_id. */ {{ featureview.name }}__deduped AS (SELECT k.* FROM ( SELECT ARRAY_AGG(row LIMIT 1)[OFFSET(0)] k FROM {{ featureview.name }}__joined row - GROUP BY row_number + GROUP BY entity_row_unique_id )){% if loop.last %}{% else %}, {% endif %} {% endfor %} /* Joins the outputs of multiple time travel joins to a single table. */ -SELECT edf.{{entity_df_event_timestamp_col}} as {{entity_df_event_timestamp_col}}, * EXCEPT (row_number, {{entity_df_event_timestamp_col}}) FROM entity_dataframe edf +SELECT edf.{{entity_df_event_timestamp_col}} as {{entity_df_event_timestamp_col}}, * EXCEPT (entity_row_unique_id, {{entity_df_event_timestamp_col}}) FROM entity_dataframe edf {% for featureview in featureviews %} LEFT JOIN ( SELECT - row_number, + entity_row_unique_id, {% for feature in featureview.features %} {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %} {% endfor %} FROM {{ featureview.name }}__deduped -) USING (row_number) +) USING (entity_row_unique_id) {% endfor %} ORDER BY {{entity_df_event_timestamp_col}} """