diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index bfc93e7f53..eb34d947fe 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1210,7 +1210,7 @@ def _augment_response_with_on_demand_transforms( for odfv_name, _feature_refs in odfv_feature_refs.items(): odfv = all_on_demand_feature_views[odfv_name] transformed_features_df = odfv.get_transformed_features_df( - full_feature_names, initial_response_df + initial_response_df ) for row_idx in range(len(result_rows)): result_row = result_rows[row_idx] diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 48c70f9d6c..0ab1de6a13 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -47,7 +47,7 @@ def to_df(self) -> pd.DataFrame: # TODO(adchia): Fix requirement to specify dependent feature views in feature_refs for odfv in self.on_demand_feature_views: features_df = features_df.join( - odfv.get_transformed_features_df(self.full_feature_names, features_df) + odfv.get_transformed_features_df(features_df) ) return features_df @@ -69,7 +69,7 @@ def to_arrow(self) -> pyarrow.Table: features_df = self._to_df_internal() for odfv in self.on_demand_feature_views: features_df = features_df.join( - odfv.get_transformed_features_df(self.full_feature_names, features_df) + odfv.get_transformed_features_df(features_df) ) return pyarrow.Table.from_pandas(features_df) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 95832fa143..c89d122dde 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -164,21 +164,21 @@ def get_request_data_schema(self) -> Dict[str, ValueType]: return schema def get_transformed_features_df( - self, full_feature_names: bool, df_with_features: pd.DataFrame + self, df_with_features: pd.DataFrame ) -> pd.DataFrame: # Apply on demand transformations - # TODO(adchia): Include only the feature values from the specified input FVs in the ODFV. - # Copy over un-prefixed features even if not requested since transform may need it columns_to_cleanup = [] - if full_feature_names: - for input_fv in self.input_feature_views.values(): - for feature in input_fv.features: - full_feature_ref = f"{input_fv.name}__{feature.name}" - if full_feature_ref in df_with_features.keys(): - df_with_features[feature.name] = df_with_features[ - full_feature_ref - ] - columns_to_cleanup.append(feature.name) + for input_fv in self.input_feature_views.values(): + for feature in input_fv.features: + full_feature_ref = f"{input_fv.name}__{feature.name}" + if full_feature_ref in df_with_features.keys(): + # Make sure the partial feature name is always present + df_with_features[feature.name] = df_with_features[full_feature_ref] + columns_to_cleanup.append(feature.name) + elif feature.name in df_with_features.keys(): + # Make sure the full feature name is always present + df_with_features[full_feature_ref] = df_with_features[feature.name] + columns_to_cleanup.append(full_feature_ref) # Compute transformed values and apply to each result row df_with_transformed_features = self.udf.__call__(df_with_features) diff --git a/sdk/python/feast/transformation_server.py b/sdk/python/feast/transformation_server.py index 6793db5ccb..c7e61f9d17 100644 --- a/sdk/python/feast/transformation_server.py +++ b/sdk/python/feast/transformation_server.py @@ -47,7 +47,7 @@ def TransformFeatures(self, request, context): df = pa.ipc.open_file(request.transformation_input.arrow_value).read_pandas() - result_df = odfv.get_transformed_features_df(True, df) + result_df = odfv.get_transformed_features_df(df) result_arrow = pa.Table.from_pandas(result_df) sink = pa.BufferOutputStream() writer = pa.ipc.new_file(sink, result_arrow.schema)