diff --git a/README.md b/README.md index f64f91c27d..4fe0c11083 100644 --- a/README.md +++ b/README.md @@ -75,11 +75,11 @@ print(training_df.head()) # model = ml.fit(training_df) ``` ```commandline - event_timestamp driver_id driver_hourly_stats__conv_rate driver_hourly_stats__acc_rate - 2021-04-12 08:12:10 1002 0.497279 0.357702 - 2021-04-12 10:59:42 1001 0.979747 0.008166 - 2021-04-12 15:01:12 1004 0.151432 0.551748 - 2021-04-12 16:40:26 1003 0.951506 0.753572 + event_timestamp driver_id conv_rate acc_rate avg_daily_trips +0 2021-04-12 08:12:10+00:00 1002 0.713465 0.597095 531 +1 2021-04-12 10:59:42+00:00 1001 0.072752 0.044344 11 +2 2021-04-12 15:01:12+00:00 1004 0.658182 0.079150 220 +3 2021-04-12 16:40:26+00:00 1003 0.162092 0.309035 959 ``` diff --git a/docs/quickstart.md b/docs/quickstart.md index 1dfcfcc66d..1fdb6f42d1 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -119,9 +119,9 @@ pprint(feature_vector) ```python { 'driver_id': [1001], - 'driver_hourly_stats__conv_rate': [0.49274], - 'driver_hourly_stats__acc_rate': [0.92743], - 'driver_hourly_stats__avg_daily_trips': [72], + 'conv_rate': [0.49274], + 'acc_rate': [0.92743], + 'avg_daily_trips': [72], } ``` diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 1c7e93f9ee..b855dd57ed 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -1,4 +1,4 @@ -from typing import Set +from typing import List, Set from colorama import Fore, Style @@ -88,6 +88,27 @@ def __init__(self, offline_store_name: str, data_source_name: str): ) +class FeatureNameCollisionError(Exception): + def __init__(self, feature_refs_collisions: List[str], full_feature_names: bool): + if full_feature_names: + collisions = [ref.replace(":", "__") for ref in feature_refs_collisions] + error_message = ( + "To resolve this collision, please ensure that the features in question " + "have different names." + ) + else: + collisions = [ref.split(":")[1] for ref in feature_refs_collisions] + error_message = ( + "To resolve this collision, either use the full feature name by setting " + "'full_feature_names=True', or ensure that the features in question have different names." + ) + + feature_names = ", ".join(set(collisions)) + super().__init__( + f"Duplicate features named {feature_names} found.\n{error_message}" + ) + + class FeastOnlineStoreInvalidName(Exception): def __init__(self, online_store_class_name: str): super().__init__( diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 3d5dc9fcc7..5d8e4604dc 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -12,8 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import os -import sys -from collections import OrderedDict, defaultdict +from collections import Counter, OrderedDict, defaultdict from datetime import datetime, timedelta from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union @@ -24,7 +23,7 @@ from feast import utils from feast.entity import Entity -from feast.errors import FeastProviderLoginError, FeatureViewNotFoundException +from feast.errors import FeatureNameCollisionError, FeatureViewNotFoundException from feast.feature_view import FeatureView from feast.inference import ( update_data_sources_with_inferred_event_timestamp_col, @@ -230,9 +229,11 @@ def apply( update_entities_with_inferred_types_from_feature_views( entities_to_update, views_to_update, self.config ) + update_data_sources_with_inferred_event_timestamp_col( [view.input for view in views_to_update], self.config ) + for view in views_to_update: view.infer_features_from_input_source(self.config) @@ -255,7 +256,10 @@ def apply( @log_exceptions_and_usage def get_historical_features( - self, entity_df: Union[pd.DataFrame, str], feature_refs: List[str], + self, + entity_df: Union[pd.DataFrame, str], + feature_refs: List[str], + full_feature_names: bool = False, ) -> RetrievalJob: """Enrich an entity dataframe with historical feature values for either training or batch scoring. @@ -277,6 +281,9 @@ def get_historical_features( SQL query. The query must be of a format supported by the configured offline store (e.g., BigQuery) feature_refs: A list of features that should be retrieved from the offline store. Feature references are of the format "feature_view:feature", e.g., "customer_fv:daily_transactions". + full_feature_names: A boolean that provides the option to add the feature view prefixes to the feature names, + changing them from the format "feature" to "feature_view__feature" (e.g., "daily_transactions" changes to + "customer_fv__daily_transactions"). By default, this value is set to False. Returns: RetrievalJob which can be used to materialize the results. @@ -289,32 +296,29 @@ def get_historical_features( >>> fs = FeatureStore(config=RepoConfig(provider="gcp")) >>> retrieval_job = fs.get_historical_features( >>> entity_df="SELECT event_timestamp, order_id, customer_id from gcp_project.my_ds.customer_orders", - >>> feature_refs=["customer:age", "customer:avg_orders_1d", "customer:avg_orders_7d"] - >>> ) + >>> feature_refs=["customer:age", "customer:avg_orders_1d", "customer:avg_orders_7d"], + >>> ) >>> feature_data = retrieval_job.to_df() >>> model.fit(feature_data) # insert your modeling framework here. """ - all_feature_views = self._registry.list_feature_views(project=self.project) - try: - feature_views = _get_requested_feature_views( - feature_refs, all_feature_views - ) - except FeatureViewNotFoundException as e: - sys.exit(e) + + _validate_feature_refs(feature_refs, full_feature_names) + feature_views = list( + view for view, _ in _group_feature_refs(feature_refs, all_feature_views) + ) provider = self._get_provider() - try: - job = provider.get_historical_features( - self.config, - feature_views, - feature_refs, - entity_df, - self._registry, - self.project, - ) - except FeastProviderLoginError as e: - sys.exit(e) + + job = provider.get_historical_features( + self.config, + feature_views, + feature_refs, + entity_df, + self._registry, + self.project, + full_feature_names, + ) return job @@ -480,7 +484,10 @@ def tqdm_builder(length): @log_exceptions_and_usage def get_online_features( - self, feature_refs: List[str], entity_rows: List[Dict[str, Any]], + self, + feature_refs: List[str], + entity_rows: List[Dict[str, Any]], + full_feature_names: bool = False, ) -> OnlineResponse: """ Retrieves the latest online feature data. @@ -548,7 +555,8 @@ def get_online_features( project=self.project, allow_cache=True ) - grouped_refs = _group_refs(feature_refs, all_feature_views) + _validate_feature_refs(feature_refs, full_feature_names) + grouped_refs = _group_feature_refs(feature_refs, all_feature_views) for table, requested_features in grouped_refs: entity_keys = _get_table_entity_keys( table, union_of_entity_keys, entity_name_to_join_key_map @@ -565,13 +573,21 @@ def get_online_features( if feature_data is None: for feature_name in requested_features: - feature_ref = f"{table.name}__{feature_name}" + feature_ref = ( + f"{table.name}__{feature_name}" + if full_feature_names + else feature_name + ) result_row.statuses[ feature_ref ] = GetOnlineFeaturesResponse.FieldStatus.NOT_FOUND else: for feature_name in feature_data: - feature_ref = f"{table.name}__{feature_name}" + feature_ref = ( + f"{table.name}__{feature_name}" + if full_feature_names + else feature_name + ) if feature_name in requested_features: result_row.fields[feature_ref].CopyFrom( feature_data[feature_name] @@ -599,7 +615,31 @@ def _entity_row_to_field_values( return result -def _group_refs( +def _validate_feature_refs(feature_refs: List[str], full_feature_names: bool = False): + collided_feature_refs = [] + + if full_feature_names: + collided_feature_refs = [ + ref for ref, occurrences in Counter(feature_refs).items() if occurrences > 1 + ] + else: + feature_names = [ref.split(":")[1] for ref in feature_refs] + collided_feature_names = [ + ref + for ref, occurrences in Counter(feature_names).items() + if occurrences > 1 + ] + + for feature_name in collided_feature_names: + collided_feature_refs.extend( + [ref for ref in feature_refs if ref.endswith(":" + feature_name)] + ) + + if len(collided_feature_refs) > 0: + raise FeatureNameCollisionError(collided_feature_refs, full_feature_names) + + +def _group_feature_refs( feature_refs: List[str], all_feature_views: List[FeatureView] ) -> List[Tuple[FeatureView, List[str]]]: """ Get list of feature views and corresponding feature names based on feature references""" @@ -612,6 +652,7 @@ def _group_refs( for ref in feature_refs: view_name, feat_name = ref.split(":") + if view_name not in view_index: raise FeatureViewNotFoundException(view_name) views_features[view_name].append(feat_name) @@ -622,14 +663,6 @@ def _group_refs( return result -def _get_requested_feature_views( - feature_refs: List[str], all_feature_views: List[FeatureView] -) -> List[FeatureView]: - """Get list of feature views based on feature references""" - # TODO: Get rid of this function. We only need _group_refs - return list(view for view, _ in _group_refs(feature_refs, all_feature_views)) - - def _get_table_entity_keys( table: FeatureView, entity_keys: List[EntityKeyProto], join_key_map: Dict[str, str], ) -> List[EntityKeyProto]: diff --git a/sdk/python/feast/infra/aws.py b/sdk/python/feast/infra/aws.py index 272f39840e..f182bbbcee 100644 --- a/sdk/python/feast/infra/aws.py +++ b/sdk/python/feast/infra/aws.py @@ -129,6 +129,7 @@ def get_historical_features( entity_df: Union[pandas.DataFrame, str], registry: Registry, project: str, + full_feature_names: bool, ) -> RetrievalJob: job = self.offline_store.get_historical_features( config=config, @@ -137,5 +138,6 @@ def get_historical_features( entity_df=entity_df, registry=registry, project=project, + full_feature_names=full_feature_names, ) return job diff --git a/sdk/python/feast/infra/gcp.py b/sdk/python/feast/infra/gcp.py index 4d7711e5b9..2662a6e54f 100644 --- a/sdk/python/feast/infra/gcp.py +++ b/sdk/python/feast/infra/gcp.py @@ -131,6 +131,7 @@ def get_historical_features( entity_df: Union[pandas.DataFrame, str], registry: Registry, project: str, + full_feature_names: bool, ) -> RetrievalJob: job = self.offline_store.get_historical_features( config=config, @@ -139,5 +140,6 @@ def get_historical_features( entity_df=entity_df, registry=registry, project=project, + full_feature_names=full_feature_names, ) return job diff --git a/sdk/python/feast/infra/local.py b/sdk/python/feast/infra/local.py index 0033b28a9d..f677c84672 100644 --- a/sdk/python/feast/infra/local.py +++ b/sdk/python/feast/infra/local.py @@ -130,6 +130,7 @@ def get_historical_features( entity_df: Union[pd.DataFrame, str], registry: Registry, project: str, + full_feature_names: bool, ) -> RetrievalJob: return self.offline_store.get_historical_features( config=config, @@ -138,6 +139,7 @@ def get_historical_features( entity_df=entity_df, registry=registry, project=project, + full_feature_names=full_feature_names, ) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 681c7e9478..00b4c06a24 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -95,6 +95,7 @@ def get_historical_features( entity_df: Union[pandas.DataFrame, str], registry: Registry, project: str, + full_feature_names: bool = False, ) -> RetrievalJob: # TODO: Add entity_df validation in order to fail before interacting with BigQuery assert isinstance(config.offline_store, BigQueryOfflineStoreConfig) @@ -121,7 +122,11 @@ def get_historical_features( # Build a query context containing all information required to template the BigQuery SQL query query_context = get_feature_view_query_context( - feature_refs, feature_views, registry, project + feature_refs, + feature_views, + registry, + project, + full_feature_names=full_feature_names, ) # TODO: Infer min_timestamp and max_timestamp from entity_df @@ -132,6 +137,7 @@ def get_historical_features( max_timestamp=datetime.now() + timedelta(days=1), left_table_query_string=str(table.reference), entity_df_event_timestamp_col=entity_df_event_timestamp_col, + full_feature_names=full_feature_names, ) job = BigQueryRetrievalJob(query=query, client=client, config=config) @@ -373,6 +379,7 @@ def get_feature_view_query_context( feature_views: List[FeatureView], registry: Registry, project: str, + full_feature_names: bool = False, ) -> List[FeatureViewQueryContext]: """Build a query context containing all information required to template a BigQuery point-in-time SQL query""" @@ -432,6 +439,7 @@ def build_point_in_time_query( max_timestamp: datetime, left_table_query_string: str, entity_df_event_timestamp_col: str, + full_feature_names: bool = False, ): """Build point-in-time query between each feature view table and the entity dataframe""" template = Environment(loader=BaseLoader()).from_string( @@ -448,6 +456,7 @@ def build_point_in_time_query( [entity for fv in feature_view_query_contexts for entity in fv.entities] ), "featureviews": [asdict(context) for context in feature_view_query_contexts], + "full_feature_names": full_feature_names, } query = template.render(template_context) @@ -521,7 +530,7 @@ def _get_bigquery_client(project: Optional[str] = None): {{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }} {{ featureview.entity_selections | join(', ')}}, {% for feature in featureview.features %} - {{ feature }} as {{ featureview.name }}__{{ feature }}{% if loop.last %}{% else %}, {% endif %} + {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %} {% endfor %} FROM {{ featureview.table_subquery }} ), @@ -614,7 +623,7 @@ def _get_bigquery_client(project: Optional[str] = None): SELECT entity_row_unique_id, {% for feature in featureview.features %} - {{ featureview.name }}__{{ feature }}, + {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}, {% endfor %} FROM {{ featureview.name }}__cleaned ) USING (entity_row_unique_id) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 9ab71ec2d8..8ff896ba61 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -53,6 +53,7 @@ def get_historical_features( entity_df: Union[pd.DataFrame, str], registry: Registry, project: str, + full_feature_names: bool = False, ) -> RetrievalJob: if not isinstance(entity_df, pd.DataFrame): raise ValueError( @@ -72,7 +73,6 @@ def get_historical_features( raise ValueError( f"Please provide an entity_df with a column named {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL} representing the time of events." ) - feature_views_to_features = _get_requested_feature_views_to_features_dict( feature_refs, feature_views ) @@ -138,14 +138,16 @@ def evaluate_historical_retrieval(): # Modify the separator for feature refs in column names to double underscore. We are using # double underscore as separator for consistency with other databases like BigQuery, # where there are very few characters available for use as separators - prefixed_feature_name = f"{feature_view.name}__{feature}" - + if full_feature_names: + formatted_feature_name = f"{feature_view.name}__{feature}" + else: + formatted_feature_name = feature # Add the feature name to the list of columns - feature_names.append(prefixed_feature_name) + feature_names.append(formatted_feature_name) # Ensure that the source dataframe feature column includes the feature view name as a prefix df_to_join.rename( - columns={feature: prefixed_feature_name}, inplace=True, + columns={feature: formatted_feature_name}, inplace=True, ) # Build a list of entity columns to join on (from the right table) diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index f2ee65d80d..e8d32cd384 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -72,5 +72,6 @@ def get_historical_features( entity_df: Union[pd.DataFrame, str], registry: Registry, project: str, + full_feature_names: bool = False, ) -> RetrievalJob: pass diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index f15b9af451..1850958111 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -56,5 +56,6 @@ def get_historical_features( entity_df: Union[pd.DataFrame, str], registry: Registry, project: str, + full_feature_names: bool = False, ) -> RetrievalJob: pass diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 29766c9d9a..2775c48173 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -116,6 +116,7 @@ def get_historical_features( entity_df: Union[pandas.DataFrame, str], registry: Registry, project: str, + full_feature_names: bool, ) -> RetrievalJob: pass @@ -168,13 +169,16 @@ def get_provider(config: RepoConfig, repo_path: Path) -> Provider: def _get_requested_feature_views_to_features_dict( feature_refs: List[str], feature_views: List[FeatureView] ) -> Dict[FeatureView, List[str]]: - """Create a dict of FeatureView -> List[Feature] for all requested features""" + """Create a dict of FeatureView -> List[Feature] for all requested features. + Set full_feature_names to True to have feature names prefixed by their feature view name.""" feature_views_to_feature_map = {} # type: Dict[FeatureView, List[str]] + for ref in feature_refs: ref_parts = ref.split(":") feature_view_from_ref = ref_parts[0] feature_from_ref = ref_parts[1] + found = False for feature_view_from_registry in feature_views: if feature_view_from_registry.name == feature_view_from_ref: @@ -190,6 +194,7 @@ def _get_requested_feature_views_to_features_dict( if not found: raise ValueError(f"Could not find feature view from reference {ref}") + return feature_views_to_feature_map diff --git a/sdk/python/tensorflow_metadata/proto/v0/path_pb2.py b/sdk/python/tensorflow_metadata/proto/v0/path_pb2.py index 4b6dec828c..d732119ead 100644 --- a/sdk/python/tensorflow_metadata/proto/v0/path_pb2.py +++ b/sdk/python/tensorflow_metadata/proto/v0/path_pb2.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: tensorflow_metadata/proto/v0/path.proto -"""Generated protocol buffer code.""" + from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message from google.protobuf import reflection as _reflection diff --git a/sdk/python/tensorflow_metadata/proto/v0/schema_pb2.py b/sdk/python/tensorflow_metadata/proto/v0/schema_pb2.py index d3bfc50616..78fda8003d 100644 --- a/sdk/python/tensorflow_metadata/proto/v0/schema_pb2.py +++ b/sdk/python/tensorflow_metadata/proto/v0/schema_pb2.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: tensorflow_metadata/proto/v0/schema.proto -"""Generated protocol buffer code.""" + from google.protobuf.internal import enum_type_wrapper from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message diff --git a/sdk/python/tensorflow_metadata/proto/v0/statistics_pb2.py b/sdk/python/tensorflow_metadata/proto/v0/statistics_pb2.py index 21473adc75..d8e12bd120 100644 --- a/sdk/python/tensorflow_metadata/proto/v0/statistics_pb2.py +++ b/sdk/python/tensorflow_metadata/proto/v0/statistics_pb2.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: tensorflow_metadata/proto/v0/statistics.proto -"""Generated protocol buffer code.""" + from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message from google.protobuf import reflection as _reflection diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index 22ae294603..a5d396a458 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -63,6 +63,7 @@ def get_historical_features( entity_df: Union[pandas.DataFrame, str], registry: Registry, project: str, + full_feature_names: bool = False, ) -> RetrievalJob: pass diff --git a/sdk/python/tests/test_e2e_local.py b/sdk/python/tests/test_e2e_local.py index d61d8caa7b..6057226b73 100644 --- a/sdk/python/tests/test_e2e_local.py +++ b/sdk/python/tests/test_e2e_local.py @@ -32,6 +32,7 @@ def _assert_online_features( "driver_hourly_stats:avg_daily_trips", ], entity_rows=[{"driver_id": 1001}], + full_feature_names=True, ) assert "driver_hourly_stats__avg_daily_trips" in result.to_dict() diff --git a/sdk/python/tests/test_historical_retrieval.py b/sdk/python/tests/test_historical_retrieval.py index 992a2c8524..3a708c7503 100644 --- a/sdk/python/tests/test_historical_retrieval.py +++ b/sdk/python/tests/test_historical_retrieval.py @@ -17,8 +17,9 @@ from feast import RepoConfig, errors, utils from feast.data_source import BigQuerySource, FileSource from feast.entity import Entity +from feast.errors import FeatureNameCollisionError from feast.feature import Feature -from feast.feature_store import FeatureStore +from feast.feature_store import FeatureStore, _validate_feature_refs from feast.feature_view import FeatureView from feast.infra.offline_stores.bigquery import BigQueryOfflineStoreConfig from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig @@ -30,7 +31,7 @@ PROJECT_NAME = "default" -def generate_entities(date, infer_event_timestamp_col): +def generate_entities(date, infer_event_timestamp_col, order_count: int = 1000): end_date = date before_start_date = end_date - timedelta(days=365) start_date = end_date - timedelta(days=7) @@ -42,7 +43,7 @@ def generate_entities(date, infer_event_timestamp_col): drivers=driver_entities, start_date=before_start_date, end_date=after_end_date, - order_count=1000, + order_count=order_count, infer_event_timestamp_col=infer_event_timestamp_col, ) return customer_entities, driver_entities, end_date, orders_df, start_date @@ -108,6 +109,7 @@ def create_customer_daily_profile_feature_view(source): Feature(name="current_balance", dtype=ValueType.FLOAT), Feature(name="avg_passenger_count", dtype=ValueType.FLOAT), Feature(name="lifetime_trip_count", dtype=ValueType.INT32), + Feature(name="avg_daily_trips", dtype=ValueType.INT32), ], input=source, ttl=timedelta(days=2), @@ -139,6 +141,7 @@ def get_expected_training_df( driver_fv: FeatureView, orders_df: pd.DataFrame, event_timestamp: str, + full_feature_names: bool = False, ): # Convert all pandas dataframes into records with UTC timestamps order_records = convert_timestamp_records_to_utc( @@ -169,15 +172,21 @@ def get_expected_training_df( filter_key="customer_id", filter_value=order_record["customer_id"], ) + order_record.update( { - f"driver_stats__{k}": driver_record.get(k, None) + (f"driver_stats__{k}" if full_feature_names else k): driver_record.get( + k, None + ) for k in ("conv_rate", "avg_daily_trips") } ) + order_record.update( { - f"customer_profile__{k}": customer_record.get(k, None) + ( + f"customer_profile__{k}" if full_feature_names else k + ): customer_record.get(k, None) for k in ( "current_balance", "avg_passenger_count", @@ -195,12 +204,21 @@ def get_expected_training_df( expected_df = expected_df[[event_timestamp] + current_cols] # Cast some columns to expected types, since we lose information when converting pandas DFs into Python objects. - expected_column_types = { - "order_is_success": "int32", - "driver_stats__conv_rate": "float32", - "customer_profile__current_balance": "float32", - "customer_profile__avg_passenger_count": "float32", - } + if full_feature_names: + expected_column_types = { + "order_is_success": "int32", + "driver_stats__conv_rate": "float32", + "customer_profile__current_balance": "float32", + "customer_profile__avg_passenger_count": "float32", + } + else: + expected_column_types = { + "order_is_success": "int32", + "conv_rate": "float32", + "current_balance": "float32", + "avg_passenger_count": "float32", + } + for col, typ in expected_column_types.items(): expected_df[col] = expected_df[col].astype(typ) @@ -244,7 +262,12 @@ def __exit__(self, exc_type, exc_value, exc_traceback): @pytest.mark.parametrize( "infer_event_timestamp_col", [False, True], ) -def test_historical_features_from_parquet_sources(infer_event_timestamp_col): +@pytest.mark.parametrize( + "full_feature_names", [False, True], +) +def test_historical_features_from_parquet_sources( + infer_event_timestamp_col, full_feature_names +): start_date = datetime.now().replace(microsecond=0, second=0, minute=0) ( customer_entities, @@ -292,6 +315,7 @@ def test_historical_features_from_parquet_sources(infer_event_timestamp_col): "customer_profile:avg_passenger_count", "customer_profile:lifetime_trip_count", ], + full_feature_names=full_feature_names, ) actual_df = job.to_df() @@ -301,7 +325,13 @@ def test_historical_features_from_parquet_sources(infer_event_timestamp_col): else "e_ts" ) expected_df = get_expected_training_df( - customer_df, customer_fv, driver_df, driver_fv, orders_df, event_timestamp, + customer_df, + customer_fv, + driver_df, + driver_fv, + orders_df, + event_timestamp, + full_feature_names=full_feature_names, ) assert_frame_equal( expected_df.sort_values( @@ -320,8 +350,11 @@ def test_historical_features_from_parquet_sources(infer_event_timestamp_col): @pytest.mark.parametrize( "infer_event_timestamp_col", [False, True], ) +@pytest.mark.parametrize( + "full_feature_names", [False, True], +) def test_historical_features_from_bigquery_sources( - provider_type, infer_event_timestamp_col, capsys + provider_type, infer_event_timestamp_col, capsys, full_feature_names ): start_date = datetime.now().replace(microsecond=0, second=0, minute=0) ( @@ -425,7 +458,13 @@ def test_historical_features_from_bigquery_sources( else "e_ts" ) expected_df = get_expected_training_df( - customer_df, customer_fv, driver_df, driver_fv, orders_df, event_timestamp, + customer_df, + customer_fv, + driver_df, + driver_fv, + orders_df, + event_timestamp, + full_feature_names, ) job_from_sql = store.get_historical_features( @@ -437,6 +476,7 @@ def test_historical_features_from_bigquery_sources( "customer_profile:avg_passenger_count", "customer_profile:lifetime_trip_count", ], + full_feature_names=full_feature_names, ) start_time = datetime.utcnow() @@ -500,6 +540,7 @@ def test_historical_features_from_bigquery_sources( "customer_profile:avg_passenger_count", "customer_profile:lifetime_trip_count", ], + full_feature_names=full_feature_names, ) # Rename the join key; this should now raise an error. @@ -547,7 +588,56 @@ def test_historical_features_from_bigquery_sources( .reset_index(drop=True), check_dtype=False, ) + table_from_df_entities = job_from_df.to_arrow() assert_frame_equal( actual_df_from_df_entities, table_from_df_entities.to_pandas() ) + + +def test_feature_name_collision_on_historical_retrieval(): + + # _validate_feature_refs is the function that checks for colliding feature names + # check when feature names collide and 'full_feature_names=False' + with pytest.raises(FeatureNameCollisionError) as error: + _validate_feature_refs( + feature_refs=[ + "driver_stats:conv_rate", + "driver_stats:avg_daily_trips", + "customer_profile:current_balance", + "customer_profile:avg_passenger_count", + "customer_profile:lifetime_trip_count", + "customer_profile:avg_daily_trips", + ], + full_feature_names=False, + ) + + expected_error_message = ( + "Duplicate features named avg_daily_trips found.\n" + "To resolve this collision, either use the full feature name by setting " + "'full_feature_names=True', or ensure that the features in question have different names." + ) + + assert str(error.value) == expected_error_message + + # check when feature names collide and 'full_feature_names=True' + with pytest.raises(FeatureNameCollisionError) as error: + _validate_feature_refs( + feature_refs=[ + "driver_stats:conv_rate", + "driver_stats:avg_daily_trips", + "driver_stats:avg_daily_trips", + "customer_profile:current_balance", + "customer_profile:avg_passenger_count", + "customer_profile:lifetime_trip_count", + "customer_profile:avg_daily_trips", + ], + full_feature_names=True, + ) + + expected_error_message = ( + "Duplicate features named driver_stats__avg_daily_trips found.\n" + "To resolve this collision, please ensure that the features in question " + "have different names." + ) + assert str(error.value) == expected_error_message diff --git a/sdk/python/tests/test_offline_online_store_consistency.py b/sdk/python/tests/test_offline_online_store_consistency.py index 0fcc368b22..3e1cb1a9c7 100644 --- a/sdk/python/tests/test_offline_online_store_consistency.py +++ b/sdk/python/tests/test_offline_online_store_consistency.py @@ -228,16 +228,25 @@ def check_offline_and_online_features( driver_id: int, event_timestamp: datetime, expected_value: Optional[float], + full_feature_names: bool, ) -> None: # Check online store response_dict = fs.get_online_features( - [f"{fv.name}:value"], [{"driver": driver_id}] + [f"{fv.name}:value"], + [{"driver": driver_id}], + full_feature_names=full_feature_names, ).to_dict() - if expected_value: - assert abs(response_dict[f"{fv.name}__value"][0] - expected_value) < 1e-6 + if full_feature_names: + if expected_value: + assert abs(response_dict[f"{fv.name}__value"][0] - expected_value) < 1e-6 + else: + assert response_dict[f"{fv.name}__value"][0] is None else: - assert response_dict[f"{fv.name}__value"][0] is None + if expected_value: + assert abs(response_dict["value"][0] - expected_value) < 1e-6 + else: + assert response_dict["value"][0] is None # Check offline store df = fs.get_historical_features( @@ -245,16 +254,23 @@ def check_offline_and_online_features( {"driver_id": [driver_id], "event_timestamp": [event_timestamp]} ), feature_refs=[f"{fv.name}:value"], + full_feature_names=full_feature_names, ).to_df() - if expected_value: - assert abs(df.to_dict()[f"{fv.name}__value"][0] - expected_value) < 1e-6 + if full_feature_names: + if expected_value: + assert abs(df.to_dict()[f"{fv.name}__value"][0] - expected_value) < 1e-6 + else: + assert math.isnan(df.to_dict()[f"{fv.name}__value"][0]) else: - assert math.isnan(df.to_dict()[f"{fv.name}__value"][0]) + if expected_value: + assert abs(df.to_dict()["value"][0] - expected_value) < 1e-6 + else: + assert math.isnan(df.to_dict()["value"][0]) def run_offline_online_store_consistency_test( - fs: FeatureStore, fv: FeatureView + fs: FeatureStore, fv: FeatureView, full_feature_names: bool ) -> None: now = datetime.utcnow() # Run materialize() @@ -265,16 +281,31 @@ def run_offline_online_store_consistency_test( # check result of materialize() check_offline_and_online_features( - fs=fs, fv=fv, driver_id=1, event_timestamp=end_date, expected_value=0.3 + fs=fs, + fv=fv, + driver_id=1, + event_timestamp=end_date, + expected_value=0.3, + full_feature_names=full_feature_names, ) check_offline_and_online_features( - fs=fs, fv=fv, driver_id=2, event_timestamp=end_date, expected_value=None + fs=fs, + fv=fv, + driver_id=2, + event_timestamp=end_date, + expected_value=None, + full_feature_names=full_feature_names, ) # check prior value for materialize_incremental() check_offline_and_online_features( - fs=fs, fv=fv, driver_id=3, event_timestamp=end_date, expected_value=4 + fs=fs, + fv=fv, + driver_id=3, + event_timestamp=end_date, + expected_value=4, + full_feature_names=full_feature_names, ) # run materialize_incremental() @@ -282,7 +313,12 @@ def run_offline_online_store_consistency_test( # check result of materialize_incremental() check_offline_and_online_features( - fs=fs, fv=fv, driver_id=3, event_timestamp=now, expected_value=5 + fs=fs, + fv=fv, + driver_id=3, + event_timestamp=now, + expected_value=5, + full_feature_names=full_feature_names, ) @@ -290,23 +326,29 @@ def run_offline_online_store_consistency_test( @pytest.mark.parametrize( "bq_source_type", ["query", "table"], ) -def test_bq_offline_online_store_consistency(bq_source_type: str): +@pytest.mark.parametrize("full_feature_names", [True, False]) +def test_bq_offline_online_store_consistency( + bq_source_type: str, full_feature_names: bool +): with prep_bq_fs_and_fv(bq_source_type) as (fs, fv): - run_offline_online_store_consistency_test(fs, fv) + run_offline_online_store_consistency_test(fs, fv, full_feature_names) +@pytest.mark.parametrize("full_feature_names", [True, False]) @pytest.mark.integration -def test_redis_offline_online_store_consistency(): +def test_redis_offline_online_store_consistency(full_feature_names: bool): with prep_redis_fs_and_fv() as (fs, fv): - run_offline_online_store_consistency_test(fs, fv) + run_offline_online_store_consistency_test(fs, fv, full_feature_names) +@pytest.mark.parametrize("full_feature_names", [True, False]) @pytest.mark.integration -def test_dynamodb_offline_online_store_consistency(): +def test_dynamodb_offline_online_store_consistency(full_feature_names: bool): with prep_dynamodb_fs_and_fv() as (fs, fv): - run_offline_online_store_consistency_test(fs, fv) + run_offline_online_store_consistency_test(fs, fv, full_feature_names) -def test_local_offline_online_store_consistency(): +@pytest.mark.parametrize("full_feature_names", [True, False]) +def test_local_offline_online_store_consistency(full_feature_names: bool): with prep_local_fs_and_fv() as (fs, fv): - run_offline_online_store_consistency_test(fs, fv) + run_offline_online_store_consistency_test(fs, fv, full_feature_names) diff --git a/sdk/python/tests/test_online_retrieval.py b/sdk/python/tests/test_online_retrieval.py index d9939871e9..3d0da04e18 100644 --- a/sdk/python/tests/test_online_retrieval.py +++ b/sdk/python/tests/test_online_retrieval.py @@ -101,30 +101,34 @@ def test_online() -> None: "customer_driver_combined:trips", ], entity_rows=[{"driver": 1, "customer": 5}, {"driver": 1, "customer": 5}], + full_feature_names=False, ).to_dict() - assert "driver_locations__lon" in result - assert "customer_profile__avg_orders_day" in result - assert "customer_profile__name" in result + assert "lon" in result + assert "avg_orders_day" in result + assert "name" in result assert result["driver"] == [1, 1] assert result["customer"] == [5, 5] - assert result["driver_locations__lon"] == ["1.0", "1.0"] - assert result["customer_profile__avg_orders_day"] == [1.0, 1.0] - assert result["customer_profile__name"] == ["John", "John"] - assert result["customer_driver_combined__trips"] == [7, 7] + assert result["lon"] == ["1.0", "1.0"] + assert result["avg_orders_day"] == [1.0, 1.0] + assert result["name"] == ["John", "John"] + assert result["trips"] == [7, 7] # Ensure features are still in result when keys not found result = store.get_online_features( feature_refs=["customer_driver_combined:trips"], entity_rows=[{"driver": 0, "customer": 0}], + full_feature_names=False, ).to_dict() - assert "customer_driver_combined__trips" in result + assert "trips" in result # invalid table reference with pytest.raises(FeatureViewNotFoundException): store.get_online_features( - feature_refs=["driver_locations_bad:lon"], entity_rows=[{"driver": 1}], + feature_refs=["driver_locations_bad:lon"], + entity_rows=[{"driver": 1}], + full_feature_names=False, ) # Create new FeatureStore object with fast cache invalidation @@ -149,9 +153,10 @@ def test_online() -> None: "customer_driver_combined:trips", ], entity_rows=[{"driver": 1, "customer": 5}], + full_feature_names=False, ).to_dict() - assert result["driver_locations__lon"] == ["1.0"] - assert result["customer_driver_combined__trips"] == [7] + assert result["lon"] == ["1.0"] + assert result["trips"] == [7] # Rename the registry.db so that it cant be used for refreshes os.rename(store.config.registry, store.config.registry + "_fake") @@ -169,6 +174,7 @@ def test_online() -> None: "customer_driver_combined:trips", ], entity_rows=[{"driver": 1, "customer": 5}], + full_feature_names=False, ).to_dict() # Restore registry.db so that we can see if it actually reloads registry @@ -183,9 +189,10 @@ def test_online() -> None: "customer_driver_combined:trips", ], entity_rows=[{"driver": 1, "customer": 5}], + full_feature_names=False, ).to_dict() - assert result["driver_locations__lon"] == ["1.0"] - assert result["customer_driver_combined__trips"] == [7] + assert result["lon"] == ["1.0"] + assert result["trips"] == [7] # Create a registry with infinite cache (for users that want to manually refresh the registry) fs_infinite_ttl = FeatureStore( @@ -208,9 +215,10 @@ def test_online() -> None: "customer_driver_combined:trips", ], entity_rows=[{"driver": 1, "customer": 5}], + full_feature_names=False, ).to_dict() - assert result["driver_locations__lon"] == ["1.0"] - assert result["customer_driver_combined__trips"] == [7] + assert result["lon"] == ["1.0"] + assert result["trips"] == [7] # Wait a bit so that an arbitrary TTL would take effect time.sleep(2) @@ -227,9 +235,10 @@ def test_online() -> None: "customer_driver_combined:trips", ], entity_rows=[{"driver": 1, "customer": 5}], + full_feature_names=False, ).to_dict() - assert result["driver_locations__lon"] == ["1.0"] - assert result["customer_driver_combined__trips"] == [7] + assert result["lon"] == ["1.0"] + assert result["trips"] == [7] # Force registry reload (should fail because file is missing) with pytest.raises(FileNotFoundError): @@ -269,7 +278,7 @@ def test_online_to_df(): for (d, c) in zip(driver_ids, customer_ids): """ driver table: - driver driver_locations__lon driver_locations__lat + lon lat 1 1.0 0.1 2 2.0 0.2 3 3.0 0.3 @@ -296,10 +305,10 @@ def test_online_to_df(): """ customer table - customer customer_profile__avg_orders_day customer_profile__name customer_profile__age - 4 4.0 foo4 40 - 5 5.0 foo5 50 - 6 6.0 foo6 60 + customer avg_orders_day name age + 4 4.0 foo4 40 + 5 5.0 foo5 50 + 6 6.0 foo6 60 """ customer_key = EntityKeyProto( join_keys=["customer"], entity_values=[ValueProto(int64_val=c)] @@ -325,10 +334,10 @@ def test_online_to_df(): ) """ customer_driver_combined table - customer driver customer_driver_combined__trips - 4 1 4 - 5 2 10 - 6 3 18 + customer driver trips + 4 1 4 + 5 2 10 + 6 3 18 """ combo_keys = EntityKeyProto( join_keys=["customer", "driver"], @@ -366,35 +375,31 @@ def test_online_to_df(): ).to_df() """ Construct the expected dataframe with reversed row order like so: - driver customer driver_locations__lon driver_locations__lat customer_profile__avg_orders_day customer_profile__name customer_profile__age customer_driver_combined__trips - 3 6 3.0 0.3 6.0 foo6 60 18 - 2 5 2.0 0.2 5.0 foo5 50 10 - 1 4 1.0 0.1 4.0 foo4 40 4 + driver customer lon lat avg_orders_day name age trips + 3 6 3.0 0.3 6.0 foo6 60 18 + 2 5 2.0 0.2 5.0 foo5 50 10 + 1 4 1.0 0.1 4.0 foo4 40 4 """ df_dict = { "driver": driver_ids, "customer": customer_ids, - "driver_locations__lon": [str(d * lon_multiply) for d in driver_ids], - "driver_locations__lat": [d * lat_multiply for d in driver_ids], - "customer_profile__avg_orders_day": [ - c * avg_order_day_multiply for c in customer_ids - ], - "customer_profile__name": [name + str(c) for c in customer_ids], - "customer_profile__age": [c * age_multiply for c in customer_ids], - "customer_driver_combined__trips": [ - d * c for (d, c) in zip(driver_ids, customer_ids) - ], + "lon": [str(d * lon_multiply) for d in driver_ids], + "lat": [d * lat_multiply for d in driver_ids], + "avg_orders_day": [c * avg_order_day_multiply for c in customer_ids], + "name": [name + str(c) for c in customer_ids], + "age": [c * age_multiply for c in customer_ids], + "trips": [d * c for (d, c) in zip(driver_ids, customer_ids)], } # Requested column order ordered_column = [ "driver", "customer", - "driver_locations__lon", - "driver_locations__lat", - "customer_profile__avg_orders_day", - "customer_profile__name", - "customer_profile__age", - "customer_driver_combined__trips", + "lon", + "lat", + "avg_orders_day", + "name", + "age", + "trips", ] expected_df = pd.DataFrame({k: reversed(v) for (k, v) in df_dict.items()}) assert_frame_equal(result_df[ordered_column], expected_df)