Skip to content

Commit

Permalink
Use CONCAT() instead of ROW_NUMBER() (#1601)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Delacour <matt.delacour@shopify.com>
  • Loading branch information
MattDelac authored and woop committed Jun 7, 2021
1 parent d25ac21 commit cc0499c
Showing 1 changed file with 21 additions and 10 deletions.
31 changes: 21 additions & 10 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,9 @@ def build_point_in_time_query(
"max_timestamp": max_timestamp,
"left_table_query_string": left_table_query_string,
"entity_df_event_timestamp_col": entity_df_event_timestamp_col,
"unique_entity_keys": set(
[entity for fv in feature_view_query_contexts for entity in fv.entities]
),
"featureviews": [asdict(context) for context in feature_view_query_contexts],
}

Expand Down Expand Up @@ -341,7 +344,15 @@ def _get_bigquery_client():

SINGLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """
WITH entity_dataframe AS (
SELECT ROW_NUMBER() OVER() AS row_number, edf.* FROM {{ left_table_query_string }} as edf
SELECT
*,
CONCAT(
{% for entity_key in unique_entity_keys %}
CAST({{entity_key}} AS STRING),
{% endfor %}
CAST({{entity_df_event_timestamp_col}} AS STRING)
) AS entity_row_unique_id
FROM {{ left_table_query_string }}
),
{% for featureview in featureviews %}
/*
Expand All @@ -354,7 +365,7 @@ def _get_bigquery_client():
{{ featureview.name }}__union_features AS (
SELECT
-- unique identifier for each row in the entity dataset.
row_number,
entity_row_unique_id,
-- event_timestamp contains the timestamps to join onto
{{entity_df_event_timestamp_col}} AS event_timestamp,
-- the feature_timestamp, i.e. the latest occurrence of the requested feature relative to the entity_dataset timestamp
Expand All @@ -368,7 +379,7 @@ def _get_bigquery_client():
FROM entity_dataframe
UNION ALL
SELECT
NULL as row_number,
NULL as entity_row_unique_id,
{{ featureview.event_timestamp_column }} as event_timestamp,
{{ featureview.event_timestamp_column }} as {{ featureview.name }}_feature_timestamp,
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
Expand All @@ -388,15 +399,15 @@ def _get_bigquery_client():
*/
{{ featureview.name }}__joined AS (
SELECT
row_number,
entity_row_unique_id,
event_timestamp,
{{ featureview.entities | join(', ')}},
{% for feature in featureview.features %}
IF(event_timestamp >= {{ featureview.name }}_feature_timestamp {% if featureview.ttl == 0 %}{% else %}AND Timestamp_sub(event_timestamp, interval {{ featureview.ttl }} second) < {{ featureview.name }}_feature_timestamp{% endif %}, {{ featureview.name }}__{{ feature }}, NULL) as {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM (
SELECT
row_number,
entity_row_unique_id,
event_timestamp,
{{ featureview.entities | join(', ')}},
{{ 'FIRST_VALUE(created_timestamp IGNORE NULLS) over w AS created_timestamp,' if featureview.created_timestamp_column else '' }}
Expand All @@ -423,30 +434,30 @@ def _get_bigquery_client():
WHERE is_entity_table
),
/*
4. Finally, deduplicate the rows by selecting the first occurrence of each entity table row_number.
4. Finally, deduplicate the rows by selecting the first occurrence of each entity table entity_row_unique_id.
*/
{{ featureview.name }}__deduped AS (SELECT
k.*
FROM (
SELECT ARRAY_AGG(row LIMIT 1)[OFFSET(0)] k
FROM {{ featureview.name }}__joined row
GROUP BY row_number
GROUP BY entity_row_unique_id
)){% if loop.last %}{% else %}, {% endif %}
{% endfor %}
/*
Joins the outputs of multiple time travel joins to a single table.
*/
SELECT edf.{{entity_df_event_timestamp_col}} as {{entity_df_event_timestamp_col}}, * EXCEPT (row_number, {{entity_df_event_timestamp_col}}) FROM entity_dataframe edf
SELECT edf.{{entity_df_event_timestamp_col}} as {{entity_df_event_timestamp_col}}, * EXCEPT (entity_row_unique_id, {{entity_df_event_timestamp_col}}) FROM entity_dataframe edf
{% for featureview in featureviews %}
LEFT JOIN (
SELECT
row_number,
entity_row_unique_id,
{% for feature in featureview.features %}
{{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.name }}__deduped
) USING (row_number)
) USING (entity_row_unique_id)
{% endfor %}
ORDER BY {{entity_df_event_timestamp_col}}
"""

0 comments on commit cc0499c

Please sign in to comment.