From 5430c46ab1046e2b381c5609d1fcb6f8fc595b23 Mon Sep 17 00:00:00 2001 From: Matt Delacour Date: Tue, 8 Jun 2021 18:20:37 -0400 Subject: [PATCH] New SQL template for BQ historical retrieval Signed-off-by: Matt Delacour --- .../feast/infra/offline_stores/bigquery.py | 196 ++++++++++-------- sdk/python/tests/test_historical_retrieval.py | 17 +- 2 files changed, 119 insertions(+), 94 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index fcd13cd7f8..f8228e756e 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -394,6 +394,10 @@ def _get_bigquery_client(): # * Create temporary tables instead of keeping all tables in memory SINGLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """ +/* + Compute a deterministic hash for the `left_table_query_string` that will be used throughout + all the logic as the field to GROUP BY the data +*/ WITH entity_dataframe AS ( SELECT *, @@ -405,110 +409,128 @@ def _get_bigquery_client(): ) AS entity_row_unique_id FROM {{ left_table_query_string }} ), + {% for featureview in featureviews %} + /* This query template performs the point-in-time correctness join for a single feature set table to the provided entity table. - 1. Concatenate the timestamp and entities from the feature set table with the entity dataset. - Feature values are joined to this table later for improved efficiency. - featureview_timestamp is equal to null in rows from the entity dataset. - */ -{{ featureview.name }}__union_features AS ( -SELECT - -- unique identifier for each row in the entity dataset. - entity_row_unique_id, - -- event_timestamp contains the timestamps to join onto - {{entity_df_event_timestamp_col}} AS event_timestamp, - -- the feature_timestamp, i.e. the latest occurrence of the requested feature relative to the entity_dataset timestamp - NULL as {{ featureview.name }}_feature_timestamp, - -- created timestamp of the feature at the corresponding feature_timestamp - {{ 'NULL as created_timestamp,' if featureview.created_timestamp_column else '' }} - -- select only entities belonging to this feature set - {{ featureview.entities | join(', ')}}, - -- boolean for filtering the dataset later - true AS is_entity_table -FROM entity_dataframe -UNION ALL -SELECT - NULL as entity_row_unique_id, - {{ featureview.event_timestamp_column }} as event_timestamp, - {{ featureview.event_timestamp_column }} as {{ featureview.name }}_feature_timestamp, - {{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }} - {{ featureview.entity_selections | join(', ')}}, - false AS is_entity_table -FROM {{ featureview.table_subquery }} WHERE {{ featureview.event_timestamp_column }} <= '{{ max_timestamp }}' -{% if featureview.ttl == 0 %}{% else %}AND {{ featureview.event_timestamp_column }} >= Timestamp_sub(TIMESTAMP '{{ min_timestamp }}', interval {{ featureview.ttl }} second){% endif %} + + 1. We first join the current feature_view to the entity dataframe that has been passed. + This JOIN has the following logic: + - For each row of the entity dataframe, only keep the rows where the `event_timestamp_column` + is less than the one provided in the entity dataframe + - If there a TTL for the current feature_view, also keep the rows where the `event_timestamp_column` + is higher the the one provided minus the TTL + - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been + computed previously + + The output of this CTE will contain all the necessary information and already filtered out most + of the data that is not relevant. +*/ + +{{ featureview.name }}__subquery AS ( + SELECT + {{ featureview.event_timestamp_column }} as event_timestamp, + {{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }} + {{ featureview.entity_selections | join(', ')}}, + {% for feature in featureview.features %} + {{ feature }} as {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %} + {% endfor %} + FROM {{ featureview.table_subquery }} ), + +{{ featureview.name }}__base AS ( + SELECT + subquery.*, + entity_dataframe.{{entity_df_event_timestamp_col}} AS entity_timestamp, + entity_dataframe.entity_row_unique_id + FROM {{ featureview.name }}__subquery AS subquery + INNER JOIN entity_dataframe + ON TRUE + AND subquery.event_timestamp <= entity_dataframe.{{entity_df_event_timestamp_col}} + + {% if featureview.ttl == 0 %}{% else %} + AND subquery.event_timestamp >= Timestamp_sub(entity_dataframe.{{entity_df_event_timestamp_col}}, interval {{ featureview.ttl }} second) + {% endif %} + + {% for entity in featureview.entities %} + AND subquery.{{ entity }} = entity_dataframe.{{ entity }} + {% endfor %} +), + /* - 2. Window the data in the unioned dataset, partitioning by entity and ordering by event_timestamp, as - well as is_entity_table. - Within each window, back-fill the feature_timestamp - as a result of this, the null feature_timestamps - in the rows from the entity table should now contain the latest timestamps relative to the row's - event_timestamp. - For rows where event_timestamp(provided datetime) - feature_timestamp > max age, set the - feature_timestamp to null. - */ -{{ featureview.name }}__joined AS ( -SELECT - entity_row_unique_id, - event_timestamp, - {{ featureview.entities | join(', ')}}, - {% for feature in featureview.features %} - IF(event_timestamp >= {{ featureview.name }}_feature_timestamp {% if featureview.ttl == 0 %}{% else %}AND Timestamp_sub(event_timestamp, interval {{ featureview.ttl }} second) < {{ featureview.name }}_feature_timestamp{% endif %}, {{ featureview.name }}__{{ feature }}, NULL) as {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %} - {% endfor %} -FROM ( -SELECT - entity_row_unique_id, - event_timestamp, - {{ featureview.entities | join(', ')}}, - {{ 'FIRST_VALUE(created_timestamp IGNORE NULLS) over w AS created_timestamp,' if featureview.created_timestamp_column else '' }} - FIRST_VALUE({{ featureview.name }}_feature_timestamp IGNORE NULLS) over w AS {{ featureview.name }}_feature_timestamp, - is_entity_table -FROM {{ featureview.name }}__union_features -WINDOW w AS (PARTITION BY {{ featureview.entities | join(', ') }} ORDER BY event_timestamp DESC, is_entity_table DESC{{', created_timestamp DESC' if featureview.created_timestamp_column else ''}} ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) -) + 2. If the `created_timestamp_column` has been set, we need to + deduplicate the data first. This is done by calculating the + `MAX(created_at_timestamp)` for each event_timestamp. + We then join the data on the next CTE +*/ +{% if featureview.created_timestamp_column %} +{{ featureview.name }}__dedup AS ( + SELECT + entity_row_unique_id, + event_timestamp, + MAX(created_timestamp) as created_timestamp, + FROM {{ featureview.name }}__base + GROUP BY entity_row_unique_id, event_timestamp +), +{% endif %} + /* - 3. Select only the rows from the entity table, and join the features from the original feature set table - to the dataset using the entity values, feature_timestamp, and created_timestamps. - */ -LEFT JOIN ( -SELECT - {{ featureview.event_timestamp_column }} as {{ featureview.name }}_feature_timestamp, - {{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }} - {{ featureview.entity_selections | join(', ')}}, - {% for feature in featureview.features %} - {{ feature }} as {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %} - {% endfor %} -FROM {{ featureview.table_subquery }} WHERE {{ featureview.event_timestamp_column }} <= '{{ max_timestamp }}' -{% if featureview.ttl == 0 %}{% else %}AND {{ featureview.event_timestamp_column }} >= Timestamp_sub(TIMESTAMP '{{ min_timestamp }}', interval {{ featureview.ttl }} second){% endif %} -) USING ({{ featureview.name }}_feature_timestamp,{{ ' created_timestamp,' if featureview.created_timestamp_column else '' }} {{ featureview.entities | join(', ')}}) -WHERE is_entity_table + 3. The data has been filtered during the first CTE "*__base" + Thus we only need to compute the latest timestamp of each feature. +*/ +{{ featureview.name }}__latest AS ( + SELECT + entity_row_unique_id, + MAX(event_timestamp) AS event_timestamp + {% if featureview.created_timestamp_column %} + ,ANY_VALUE(created_timestamp) AS created_timestamp + {% endif %} + + FROM {{ featureview.name }}__base + {% if featureview.created_timestamp_column %} + INNER JOIN {{ featureview.name }}__dedup + USING (entity_row_unique_id, event_timestamp, created_timestamp) + {% endif %} + + GROUP BY entity_row_unique_id ), + /* - 4. Finally, deduplicate the rows by selecting the first occurrence of each entity table entity_row_unique_id. - */ -{{ featureview.name }}__deduped AS (SELECT - k.* -FROM ( - SELECT ARRAY_AGG(row LIMIT 1)[OFFSET(0)] k - FROM {{ featureview.name }}__joined row - GROUP BY entity_row_unique_id -)){% if loop.last %}{% else %}, {% endif %} + 4. Once we know the latest value of each feature for a given timestamp, + we can join again the data back to the original "base" dataset +*/ +{{ featureview.name }}__cleaned AS ( + SELECT base.* + FROM {{ featureview.name }}__base as base + INNER JOIN {{ featureview.name }}__latest + USING( + entity_row_unique_id, + event_timestamp + {% if featureview.created_timestamp_column %} + ,created_timestamp + {% endif %} + ) +){% if loop.last %}{% else %}, {% endif %} + {% endfor %} /* Joins the outputs of multiple time travel joins to a single table. + The entity_dataframe dataset being our source of truth here. */ -SELECT edf.{{entity_df_event_timestamp_col}} as {{entity_df_event_timestamp_col}}, * EXCEPT (entity_row_unique_id, {{entity_df_event_timestamp_col}}) FROM entity_dataframe edf + +SELECT * EXCEPT (entity_row_unique_id) +FROM entity_dataframe {% for featureview in featureviews %} LEFT JOIN ( SELECT - entity_row_unique_id, - {% for feature in featureview.features %} - {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %} - {% endfor %} - FROM {{ featureview.name }}__deduped + entity_row_unique_id, + {% for feature in featureview.features %} + {{ featureview.name }}__{{ feature }}, + {% endfor %} + FROM {{ featureview.name }}__cleaned ) USING (entity_row_unique_id) {% endfor %} -ORDER BY {{entity_df_event_timestamp_col}} """ diff --git a/sdk/python/tests/test_historical_retrieval.py b/sdk/python/tests/test_historical_retrieval.py index 80431a1ef3..83f48ccd96 100644 --- a/sdk/python/tests/test_historical_retrieval.py +++ b/sdk/python/tests/test_historical_retrieval.py @@ -334,7 +334,6 @@ def test_historical_features_from_bigquery_sources( start_date, ) = generate_entities(start_date, infer_event_timestamp_col) - # bigquery_dataset = "test_hist_retrieval_static" bigquery_dataset = ( f"test_hist_retrieval_{int(time.time_ns())}_{random.randint(1000, 9999)}" ) @@ -452,13 +451,16 @@ def test_historical_features_from_bigquery_sources( ) ) + assert sorted(expected_df.columns) == sorted( + actual_df_from_sql_entities.columns + ) assert_frame_equal( expected_df.sort_values( by=[event_timestamp, "order_id", "driver_id", "customer_id"] ).reset_index(drop=True), - actual_df_from_sql_entities.sort_values( - by=[event_timestamp, "order_id", "driver_id", "customer_id"] - ).reset_index(drop=True), + actual_df_from_sql_entities[expected_df.columns] + .sort_values(by=[event_timestamp, "order_id", "driver_id", "customer_id"]) + .reset_index(drop=True), check_dtype=False, ) @@ -532,12 +534,13 @@ def test_historical_features_from_bigquery_sources( ) ) + assert sorted(expected_df.columns) == sorted(actual_df_from_df_entities.columns) assert_frame_equal( expected_df.sort_values( by=[event_timestamp, "order_id", "driver_id", "customer_id"] ).reset_index(drop=True), - actual_df_from_df_entities.sort_values( - by=[event_timestamp, "order_id", "driver_id", "customer_id"] - ).reset_index(drop=True), + actual_df_from_df_entities[expected_df.columns] + .sort_values(by=[event_timestamp, "order_id", "driver_id", "customer_id"]) + .reset_index(drop=True), check_dtype=False, )