From c26d973e79598017719fec71d3571062230bc3c5 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 3 Jun 2021 13:32:10 -0700 Subject: [PATCH 1/5] Add a specific error for missing columns during materialization Signed-off-by: Achal Shah --- sdk/python/feast/errors.py | 12 +++++++ sdk/python/feast/infra/offline_stores/file.py | 6 ++-- ...ample_feature_repo_with_entity_join_key.py | 32 +++++++++++++++++++ sdk/python/tests/test_e2e_local.py | 23 +++++++++++++ ..._materialize_from_bigquery_to_datastore.py | 0 5 files changed, 69 insertions(+), 4 deletions(-) create mode 100644 sdk/python/tests/example_feature_repo_with_entity_join_key.py delete mode 100644 sdk/python/tests/test_materialize_from_bigquery_to_datastore.py diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index e57b084b81..12fd6a30db 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -1,3 +1,5 @@ +from typing import Set + from colorama import Fore, Style @@ -75,3 +77,13 @@ def __init__(self, expected, missing): f"The entity dataframe you have provided must contain columns {expected}, " f"but {missing} were missing." ) + + +class FeastJoinKeysDuringMaterialization(Exception): + def __init__(self, + join_key_columns: Set[str], + source_columns: Set[str]): + super().__init__( + f"The DataFrame must have at least {join_key_columns} columns present, " + f"but these were missing: {join_key_columns - source_columns} " + ) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 341e271468..f5d6ef00e3 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -6,6 +6,7 @@ import pytz from feast.data_source import DataSource, FileSource +from feast.errors import FeastJoinKeysDuringMaterialization from feast.feature_view import FeatureView from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob from feast.infra.provider import ( @@ -218,10 +219,7 @@ def pull_latest_from_table_or_query( source_columns = set(source_df.columns) if not set(join_key_columns).issubset(source_columns): - raise ValueError( - f"The DataFrame must have at least {set(join_key_columns)} columns present, " - f"but these were missing: {set(join_key_columns)- source_columns} " - ) + raise FeastJoinKeysDuringMaterialization(set(join_key_columns), source_columns) ts_columns = ( [event_timestamp_column, created_timestamp_column] diff --git a/sdk/python/tests/example_feature_repo_with_entity_join_key.py b/sdk/python/tests/example_feature_repo_with_entity_join_key.py new file mode 100644 index 0000000000..f2506f68f1 --- /dev/null +++ b/sdk/python/tests/example_feature_repo_with_entity_join_key.py @@ -0,0 +1,32 @@ +from google.protobuf.duration_pb2 import Duration + +from feast import Entity, Feature, FeatureView, ValueType +from feast.data_source import FileSource + +driver_hourly_stats = FileSource( + path="%PARQUET_PATH%", # placeholder to be replaced by the test + event_timestamp_column="datetime", + created_timestamp_column="created", +) + + +# The join key here is deliberately different from the parquet file to test the failure path. +driver = Entity(name="driver_id", + value_type=ValueType.INT64, + description="driver id", + join_key="driver") + + +driver_hourly_stats_view = FeatureView( + name="driver_hourly_stats", + entities=["driver_id"], + ttl=Duration(seconds=86400 * 1), + features=[ + Feature(name="conv_rate", dtype=ValueType.FLOAT), + Feature(name="acc_rate", dtype=ValueType.FLOAT), + Feature(name="avg_daily_trips", dtype=ValueType.INT64), + ], + online=True, + input=driver_hourly_stats, + tags={}, +) diff --git a/sdk/python/tests/test_e2e_local.py b/sdk/python/tests/test_e2e_local.py index 0714f5e91b..decc3ac5fb 100644 --- a/sdk/python/tests/test_e2e_local.py +++ b/sdk/python/tests/test_e2e_local.py @@ -105,3 +105,26 @@ def test_e2e_local() -> None: assert r.returncode == 0 _assert_online_features(store, driver_df, end_date) + + # Test a failure case when the parquet file doesn't include a join key + with runner.local_repo( + get_example_repo("example_feature_repo_with_entity_join_key.py").replace( + "%PARQUET_PATH%", driver_stats_path + ), + "file", + ) as store: + + assert store.repo_path is not None + + # feast materialize + returncode, output = runner.run_with_output( + [ + "materialize", + start_date.isoformat(), + (end_date - timedelta(days=7)).isoformat(), + ], + cwd=Path(store.repo_path), + ) + + assert returncode != 0 + assert "feast.errors.FeastJoinKeysDuringMaterialization" in str(output) diff --git a/sdk/python/tests/test_materialize_from_bigquery_to_datastore.py b/sdk/python/tests/test_materialize_from_bigquery_to_datastore.py deleted file mode 100644 index e69de29bb2..0000000000 From 65d65e35bb2b0a8dc09cb4397fe9a8a4e8779b81 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 3 Jun 2021 13:44:35 -0700 Subject: [PATCH 2/5] make format Signed-off-by: Achal Shah --- sdk/python/feast/errors.py | 4 +--- sdk/python/feast/infra/offline_stores/file.py | 4 +++- .../tests/example_feature_repo_with_entity_join_key.py | 10 ++++++---- sdk/python/tests/test_e2e_local.py | 8 ++++---- 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 12fd6a30db..573403b037 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -80,9 +80,7 @@ def __init__(self, expected, missing): class FeastJoinKeysDuringMaterialization(Exception): - def __init__(self, - join_key_columns: Set[str], - source_columns: Set[str]): + def __init__(self, join_key_columns: Set[str], source_columns: Set[str]): super().__init__( f"The DataFrame must have at least {join_key_columns} columns present, " f"but these were missing: {join_key_columns - source_columns} " diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index f5d6ef00e3..15e6560fdb 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -219,7 +219,9 @@ def pull_latest_from_table_or_query( source_columns = set(source_df.columns) if not set(join_key_columns).issubset(source_columns): - raise FeastJoinKeysDuringMaterialization(set(join_key_columns), source_columns) + raise FeastJoinKeysDuringMaterialization( + set(join_key_columns), source_columns + ) ts_columns = ( [event_timestamp_column, created_timestamp_column] diff --git a/sdk/python/tests/example_feature_repo_with_entity_join_key.py b/sdk/python/tests/example_feature_repo_with_entity_join_key.py index f2506f68f1..572811dc9b 100644 --- a/sdk/python/tests/example_feature_repo_with_entity_join_key.py +++ b/sdk/python/tests/example_feature_repo_with_entity_join_key.py @@ -11,10 +11,12 @@ # The join key here is deliberately different from the parquet file to test the failure path. -driver = Entity(name="driver_id", - value_type=ValueType.INT64, - description="driver id", - join_key="driver") +driver = Entity( + name="driver_id", + value_type=ValueType.INT64, + description="driver id", + join_key="driver", +) driver_hourly_stats_view = FeatureView( diff --git a/sdk/python/tests/test_e2e_local.py b/sdk/python/tests/test_e2e_local.py index decc3ac5fb..d61d8caa7b 100644 --- a/sdk/python/tests/test_e2e_local.py +++ b/sdk/python/tests/test_e2e_local.py @@ -108,10 +108,10 @@ def test_e2e_local() -> None: # Test a failure case when the parquet file doesn't include a join key with runner.local_repo( - get_example_repo("example_feature_repo_with_entity_join_key.py").replace( - "%PARQUET_PATH%", driver_stats_path - ), - "file", + get_example_repo("example_feature_repo_with_entity_join_key.py").replace( + "%PARQUET_PATH%", driver_stats_path + ), + "file", ) as store: assert store.repo_path is not None From 0ee858ddbb378ed3b90ce9b5218c88c29a36d2cb Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 7 Jun 2021 11:52:10 -0700 Subject: [PATCH 3/5] Be more specific about dataframe Signed-off-by: Achal Shah --- sdk/python/feast/errors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 573403b037..b3eb7479cc 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -82,6 +82,6 @@ def __init__(self, expected, missing): class FeastJoinKeysDuringMaterialization(Exception): def __init__(self, join_key_columns: Set[str], source_columns: Set[str]): super().__init__( - f"The DataFrame must have at least {join_key_columns} columns present, " + f"The DataFrame being materialized must have at least {join_key_columns} columns present, " f"but these were missing: {join_key_columns - source_columns} " ) From 41136c898fdfa0a54aa037875ba74a63008c0667 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 9 Jun 2021 11:51:07 -0700 Subject: [PATCH 4/5] Update error message Signed-off-by: Achal Shah --- sdk/python/feast/errors.py | 4 ++-- sdk/python/feast/infra/offline_stores/file.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index b3eb7479cc..5eeec6a07f 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -80,8 +80,8 @@ def __init__(self, expected, missing): class FeastJoinKeysDuringMaterialization(Exception): - def __init__(self, join_key_columns: Set[str], source_columns: Set[str]): + def __init__(self, source: str, join_key_columns: Set[str], source_columns: Set[str]): super().__init__( - f"The DataFrame being materialized must have at least {join_key_columns} columns present, " + f"The DataFrame from {source} being materialized must have at least {join_key_columns} columns present, " f"but these were missing: {join_key_columns - source_columns} " ) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 15e6560fdb..171512c851 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -220,6 +220,7 @@ def pull_latest_from_table_or_query( source_columns = set(source_df.columns) if not set(join_key_columns).issubset(source_columns): raise FeastJoinKeysDuringMaterialization( + data_source.path, set(join_key_columns), source_columns ) From 44b07eb7cca781629c7e79d68292c783a8c58276 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 9 Jun 2021 11:54:10 -0700 Subject: [PATCH 5/5] format Signed-off-by: Achal Shah --- sdk/python/feast/errors.py | 4 +++- sdk/python/feast/infra/offline_stores/file.py | 3 +-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 5eeec6a07f..b186d9ab33 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -80,7 +80,9 @@ def __init__(self, expected, missing): class FeastJoinKeysDuringMaterialization(Exception): - def __init__(self, source: str, join_key_columns: Set[str], source_columns: Set[str]): + def __init__( + self, source: str, join_key_columns: Set[str], source_columns: Set[str] + ): super().__init__( f"The DataFrame from {source} being materialized must have at least {join_key_columns} columns present, " f"but these were missing: {join_key_columns - source_columns} " diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 171512c851..acd12ff900 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -220,8 +220,7 @@ def pull_latest_from_table_or_query( source_columns = set(source_df.columns) if not set(join_key_columns).issubset(source_columns): raise FeastJoinKeysDuringMaterialization( - data_source.path, - set(join_key_columns), source_columns + data_source.path, set(join_key_columns), source_columns ) ts_columns = (