Skip to content

Commit

Permalink
historical_field_mappings2 merge for one sign off commit (#2252)
Browse files Browse the repository at this point in the history
Signed-off-by: Michelle Rascati <michelle.rascati@sailpoint.com>
  • Loading branch information
michelle-rascati-sp authored Jan 28, 2022
1 parent 5fc0b52 commit 592af75
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 8 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Setting up your development environment for Feast Python SDK / CLI:
3. _Recommended:_ Create a virtual environment to isolate development dependencies to be installed
```sh
# create & activate a virtual environment
python -v venv venv/
python -m venv venv/
source venv/bin/activate
```

Expand Down
26 changes: 26 additions & 0 deletions sdk/python/feast/driver_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,29 @@ def create_global_daily_stats_df(start_date, end_date) -> pd.DataFrame:
# TODO: Remove created timestamp in order to test whether its really optional
df_daily["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms"))
return df_daily


def create_field_mapping_df(start_date, end_date) -> pd.DataFrame:
"""
Example df generated by this function:
| event_timestamp | column_name | created |
|------------------+-------------+------------------|
| 2021-03-17 19:00 | 99 | 2021-03-24 19:38 |
| 2021-03-17 19:00 | 22 | 2021-03-24 19:38 |
| 2021-03-17 19:00 | 7 | 2021-03-24 19:38 |
| 2021-03-17 19:00 | 45 | 2021-03-24 19:38 |
"""
size = 10
df = pd.DataFrame()
df["column_name"] = np.random.randint(1, 100, size=size).astype(np.int32)
df[DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL] = [
_convert_event_timestamp(
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms"),
EventTimestampType(idx % 4),
)
for idx, dt in enumerate(
pd.date_range(start=start_date, end=end_date, periods=size)
)
]
df["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms"))
return df
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str]
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
{{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
{% for feature in featureview.features %}
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}'
Expand Down Expand Up @@ -699,7 +699,7 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str]
SELECT
{{featureview.name}}__entity_row_unique_id
{% for feature in featureview.features %}
,{% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}
,{% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}
{% endfor %}
FROM {{ featureview.name }}__cleaned
) USING ({{featureview.name}}__entity_row_unique_id)
Expand Down
12 changes: 10 additions & 2 deletions sdk/python/feast/infra/offline_stores/offline_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class FeatureViewQueryContext:
ttl: int
entities: List[str]
features: List[str] # feature reference format
field_mapping: Dict[str, str]
event_timestamp_column: str
created_timestamp_column: Optional[str]
table_subquery: str
Expand Down Expand Up @@ -144,7 +145,10 @@ def get_feature_view_query_context(
name=feature_view.projection.name_to_use(),
ttl=ttl_seconds,
entities=join_keys,
features=features,
features=[
reverse_field_mapping.get(feature, feature) for feature in features
],
field_mapping=feature_view.input.field_mapping,
event_timestamp_column=reverse_field_mapping.get(
event_timestamp_column, event_timestamp_column
),
Expand Down Expand Up @@ -175,7 +179,11 @@ def build_point_in_time_query(
final_output_feature_names = list(entity_df_columns)
final_output_feature_names.extend(
[
(f"{fv.name}__{feature}" if full_feature_names else feature)
(
f"{fv.name}__{fv.field_mapping.get(feature, feature)}"
if full_feature_names
else fv.field_mapping.get(feature, feature)
)
for fv in feature_view_query_contexts
for feature in fv.features
]
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ def _get_entity_df_event_timestamp_range(
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
{{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
{% for feature in featureview.features %}
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}'
Expand Down Expand Up @@ -664,7 +664,7 @@ def _get_entity_df_event_timestamp_range(
SELECT
{{featureview.name}}__entity_row_unique_id
{% for feature in featureview.features %}
,{% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}
,{% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}
{% endfor %}
FROM {{ featureview.name }}__cleaned
) USING ({{featureview.name}}__entity_row_unique_id)
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
+ AWS_REQUIRED
)

DEV_REQUIRED = ["mypy-protobuf==1.*", "grpcio-testing==1.*"] + CI_REQUIRED
DEV_REQUIRED = ["mypy-protobuf>=1.*", "grpcio-testing==1.*"] + CI_REQUIRED

# Get git repo root directory
repo_root = str(pathlib.Path(__file__).resolve().parent.parent.parent)
Expand Down
14 changes: 14 additions & 0 deletions sdk/python/tests/integration/feature_repos/repo_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
create_customer_daily_profile_feature_view,
create_driver_age_request_feature_view,
create_driver_hourly_stats_feature_view,
create_field_mapping_feature_view,
create_global_stats_feature_view,
create_location_stats_feature_view,
create_order_feature_view,
Expand Down Expand Up @@ -126,6 +127,7 @@ def construct_universal_datasets(
order_count=20,
)
global_df = driver_test_data.create_global_daily_stats_df(start_time, end_time)
field_mapping_df = driver_test_data.create_field_mapping_df(start_time, end_time)
entity_df = orders_df[
[
"customer_id",
Expand All @@ -143,6 +145,7 @@ def construct_universal_datasets(
"location": location_df,
"orders": orders_df,
"global": global_df,
"field_mapping": field_mapping_df,
"entity": entity_df,
}

Expand Down Expand Up @@ -180,12 +183,20 @@ def construct_universal_data_sources(
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)
field_mapping_ds = data_source_creator.create_data_source(
datasets["field_mapping"],
destination_name="field_mapping",
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
field_mapping={"column_name": "feature_name"},
)
return {
"customer": customer_ds,
"driver": driver_ds,
"location": location_ds,
"orders": orders_ds,
"global": global_ds,
"field_mapping": field_mapping_ds,
}


Expand All @@ -210,6 +221,9 @@ def construct_universal_feature_views(
"driver_age_request_fv": create_driver_age_request_feature_view(),
"order": create_order_feature_view(data_sources["orders"]),
"location": create_location_stats_feature_view(data_sources["location"]),
"field_mapping": create_field_mapping_feature_view(
data_sources["field_mapping"]
),
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,13 @@ def create_location_stats_feature_view(source, infer_features: bool = False):
ttl=timedelta(days=2),
)
return location_stats_feature_view


def create_field_mapping_feature_view(source):
return FeatureView(
name="field_mapping",
entities=[],
features=[Feature(name="feature_name", dtype=ValueType.INT32)],
batch_source=source,
ttl=timedelta(days=2),
)
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ def get_expected_training_df(
location_fv: FeatureView,
global_df: pd.DataFrame,
global_fv: FeatureView,
field_mapping_df: pd.DataFrame,
field_mapping_fv: FeatureView,
entity_df: pd.DataFrame,
event_timestamp: str,
full_feature_names: bool = False,
Expand All @@ -102,6 +104,10 @@ def get_expected_training_df(
global_records = convert_timestamp_records_to_utc(
global_df.to_dict("records"), global_fv.batch_source.event_timestamp_column
)
field_mapping_records = convert_timestamp_records_to_utc(
field_mapping_df.to_dict("records"),
field_mapping_fv.batch_source.event_timestamp_column,
)
entity_rows = convert_timestamp_records_to_utc(
entity_df.to_dict("records"), event_timestamp
)
Expand Down Expand Up @@ -156,6 +162,13 @@ def get_expected_training_df(
ts_end=order_record[event_timestamp],
)

field_mapping_record = find_asof_record(
field_mapping_records,
ts_key=field_mapping_fv.batch_source.event_timestamp_column,
ts_start=order_record[event_timestamp] - field_mapping_fv.ttl,
ts_end=order_record[event_timestamp],
)

entity_row.update(
{
(
Expand Down Expand Up @@ -197,6 +210,16 @@ def get_expected_training_df(
}
)

# get field_mapping_record by column name, but label by feature name
entity_row.update(
{
(
f"field_mapping__{feature}" if full_feature_names else feature
): field_mapping_record.get(column, None)
for (column, feature) in field_mapping_fv.input.field_mapping.items()
}
)

# Convert records back to pandas dataframe
expected_df = pd.DataFrame(entity_rows)

Expand All @@ -213,6 +236,7 @@ def get_expected_training_df(
"customer_profile__current_balance": "float32",
"customer_profile__avg_passenger_count": "float32",
"global_stats__avg_ride_length": "float32",
"field_mapping__feature_name": "int32",
}
else:
expected_column_types = {
Expand All @@ -221,6 +245,7 @@ def get_expected_training_df(
"current_balance": "float32",
"avg_passenger_count": "float32",
"avg_ride_length": "float32",
"feature_name": "int32",
}

for col, typ in expected_column_types.items():
Expand Down Expand Up @@ -311,6 +336,8 @@ def test_historical_features(environment, universal_data_sources, full_feature_n
feature_views["location"],
datasets["global"],
feature_views["global"],
datasets["field_mapping"],
feature_views["field_mapping"],
entity_df_with_request_data,
event_timestamp,
full_feature_names,
Expand All @@ -336,6 +363,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n
"global_stats:num_rides",
"global_stats:avg_ride_length",
"driver_age:driver_age",
"field_mapping:feature_name",
],
full_feature_names=full_feature_names,
)
Expand Down Expand Up @@ -404,6 +432,7 @@ def test_historical_features_with_missing_request_data(
"conv_rate_plus_100:conv_rate_plus_val_to_add",
"global_stats:num_rides",
"global_stats:avg_ride_length",
"field_mapping:feature_name",
],
full_feature_names=full_feature_names,
)
Expand All @@ -419,6 +448,7 @@ def test_historical_features_with_missing_request_data(
"driver_age:driver_age",
"global_stats:num_rides",
"global_stats:avg_ride_length",
"field_mapping:feature_name",
],
full_feature_names=full_feature_names,
)
Expand Down Expand Up @@ -452,6 +482,7 @@ def test_historical_features_with_entities_from_query(
"order:order_is_success",
"global_stats:num_rides",
"global_stats:avg_ride_length",
"field_mapping:feature_name",
],
full_feature_names=full_feature_names,
)
Expand All @@ -477,6 +508,8 @@ def test_historical_features_with_entities_from_query(
feature_views["location"],
datasets["global"],
feature_views["global"],
datasets["field_mapping"],
feature_views["field_mapping"],
datasets["entity"],
event_timestamp,
full_feature_names,
Expand Down Expand Up @@ -538,6 +571,7 @@ def test_historical_features_persisting(
"order:order_is_success",
"global_stats:num_rides",
"global_stats:avg_ride_length",
"field_mapping:feature_name",
],
full_feature_names=full_feature_names,
)
Expand All @@ -561,6 +595,8 @@ def test_historical_features_persisting(
feature_views["location"],
datasets["global"],
feature_views["global"],
datasets["field_mapping"],
feature_views["field_mapping"],
entity_df,
event_timestamp,
full_feature_names,
Expand Down

0 comments on commit 592af75

Please sign in to comment.