Skip to content

Commit

Permalink
Remove references to event_timestamp_column
Browse files Browse the repository at this point in the history
Signed-off-by: Felix Wang <wangfelix98@gmail.com>
  • Loading branch information
felixwang9817 committed Apr 24, 2022
1 parent 4bffc5c commit 0a0cc58
Show file tree
Hide file tree
Showing 22 changed files with 143 additions and 173 deletions.
2 changes: 1 addition & 1 deletion go/cmd/server/logging/feature_repo/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# for more info.
driver_hourly_stats = FileSource(
path="driver_stats.parquet",
event_timestamp_column="event_timestamp",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def update_entities_with_inferred_types_from_feature_views(
def update_data_sources_with_inferred_event_timestamp_col(
data_sources: List[DataSource], config: RepoConfig
) -> None:
ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column"
ERROR_MSG_PREFIX = "Unable to infer DataSource timestamp_field"

for data_source in data_sources:
if isinstance(data_source, RequestSource):
Expand Down
22 changes: 11 additions & 11 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def pull_latest_from_table_or_query(
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
event_timestamp_column: str,
timestamp_field: str,
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime,
Expand All @@ -96,7 +96,7 @@ def pull_latest_from_table_or_query(
partition_by_join_key_string = (
"PARTITION BY " + partition_by_join_key_string
)
timestamps = [event_timestamp_column]
timestamps = [timestamp_field]
if created_timestamp_column:
timestamps.append(created_timestamp_column)
timestamp_desc_string = " DESC, ".join(timestamps) + " DESC"
Expand All @@ -114,7 +114,7 @@ def pull_latest_from_table_or_query(
SELECT {field_string},
ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row
FROM {from_expression}
WHERE {event_timestamp_column} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}')
WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}')
)
WHERE _feast_row = 1
"""
Expand All @@ -131,7 +131,7 @@ def pull_all_from_table_or_query(
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
event_timestamp_column: str,
timestamp_field: str,
start_date: datetime,
end_date: datetime,
) -> RetrievalJob:
Expand All @@ -143,12 +143,12 @@ def pull_all_from_table_or_query(
location=config.offline_store.location,
)
field_string = ", ".join(
join_key_columns + feature_name_columns + [event_timestamp_column]
join_key_columns + feature_name_columns + [timestamp_field]
)
query = f"""
SELECT {field_string}
FROM {from_expression}
WHERE {event_timestamp_column} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}')
WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}')
"""
return BigQueryRetrievalJob(
query=query, client=client, config=config, full_feature_names=False,
Expand Down Expand Up @@ -583,9 +583,9 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str]
1. We first join the current feature_view to the entity dataframe that has been passed.
This JOIN has the following logic:
- For each row of the entity dataframe, only keep the rows where the `event_timestamp_column`
- For each row of the entity dataframe, only keep the rows where the `timestamp_field`
is less than the one provided in the entity dataframe
- If there a TTL for the current feature_view, also keep the rows where the `event_timestamp_column`
- If there a TTL for the current feature_view, also keep the rows where the `timestamp_field`
is higher the the one provided minus the TTL
- For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been
computed previously
Expand All @@ -596,16 +596,16 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str]
{{ featureview.name }}__subquery AS (
SELECT
{{ featureview.event_timestamp_column }} as event_timestamp,
{{ featureview.timestamp_field }} as event_timestamp,
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
{{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
{% for feature in featureview.features %}
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}'
WHERE {{ featureview.timestamp_field }} <= '{{ featureview.max_event_timestamp }}'
{% if featureview.ttl == 0 %}{% else %}
AND {{ featureview.event_timestamp_column }} >= '{{ featureview.min_event_timestamp }}'
AND {{ featureview.timestamp_field }} >= '{{ featureview.min_event_timestamp }}'
{% endif %}
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def pull_latest_from_table_or_query(
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
event_timestamp_column: str,
timestamp_field: str,
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime,
Expand All @@ -68,7 +68,7 @@ def pull_latest_from_table_or_query(
partition_by_join_key_string = (
"PARTITION BY " + partition_by_join_key_string
)
timestamps = [event_timestamp_column]
timestamps = [timestamp_field]
if created_timestamp_column:
timestamps.append(created_timestamp_column)
timestamp_desc_string = " DESC, ".join(_append_alias(timestamps, "a")) + " DESC"
Expand All @@ -87,7 +87,7 @@ def pull_latest_from_table_or_query(
SELECT {a_field_string},
ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row
FROM ({from_expression}) a
WHERE a."{event_timestamp_column}" BETWEEN '{start_date}'::timestamptz AND '{end_date}'::timestamptz
WHERE a."{timestamp_field}" BETWEEN '{start_date}'::timestamptz AND '{end_date}'::timestamptz
) b
WHERE _feast_row = 1
"""
Expand Down Expand Up @@ -191,15 +191,15 @@ def pull_all_from_table_or_query(
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
event_timestamp_column: str,
timestamp_field: str,
start_date: datetime,
end_date: datetime,
) -> RetrievalJob:
assert isinstance(data_source, PostgreSQLSource)
from_expression = data_source.get_table_query_string()

field_string = ", ".join(
join_key_columns + feature_name_columns + [event_timestamp_column]
join_key_columns + feature_name_columns + [timestamp_field]
)

start_date = start_date.astimezone(tz=utc)
Expand All @@ -208,7 +208,7 @@ def pull_all_from_table_or_query(
query = f"""
SELECT {field_string}
FROM {from_expression}
WHERE "{event_timestamp_column}" BETWEEN '{start_date}'::timestamptz AND '{end_date}'::timestamptz
WHERE "{timestamp_field}" BETWEEN '{start_date}'::timestamptz AND '{end_date}'::timestamptz
"""

return PostgreSQLRetrievalJob(
Expand Down Expand Up @@ -415,9 +415,9 @@ def build_point_in_time_query(
1. We first join the current feature_view to the entity dataframe that has been passed.
This JOIN has the following logic:
- For each row of the entity dataframe, only keep the rows where the `event_timestamp_column`
- For each row of the entity dataframe, only keep the rows where the `timestamp_field`
is less than the one provided in the entity dataframe
- If there a TTL for the current feature_view, also keep the rows where the `event_timestamp_column`
- If there a TTL for the current feature_view, also keep the rows where the `timestamp_field`
is higher the the one provided minus the TTL
- For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been
computed previously
Expand All @@ -428,16 +428,16 @@ def build_point_in_time_query(
"{{ featureview.name }}__subquery" AS (
SELECT
"{{ featureview.event_timestamp_column }}" as event_timestamp,
"{{ featureview.timestamp_field }}" as event_timestamp,
{{ '"' ~ featureview.created_timestamp_column ~ '" as created_timestamp,' if featureview.created_timestamp_column else '' }}
{{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
{% for feature in featureview.features %}
"{{ feature }}" as {% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }} AS sub
WHERE "{{ featureview.event_timestamp_column }}" <= (SELECT MAX(entity_timestamp) FROM entity_dataframe)
WHERE "{{ featureview.timestamp_field }}" <= (SELECT MAX(entity_timestamp) FROM entity_dataframe)
{% if featureview.ttl == 0 %}{% else %}
AND "{{ featureview.event_timestamp_column }}" >= (SELECT MIN(entity_timestamp) FROM entity_dataframe) - {{ featureview.ttl }} * interval '1' second
AND "{{ featureview.timestamp_field }}" >= (SELECT MIN(entity_timestamp) FROM entity_dataframe) - {{ featureview.ttl }} * interval '1' second
{% endif %}
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def pull_latest_from_table_or_query(
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
event_timestamp_column: str,
timestamp_field: str,
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime,
Expand All @@ -76,7 +76,7 @@ def pull_latest_from_table_or_query(
partition_by_join_key_string = (
"PARTITION BY " + partition_by_join_key_string
)
timestamps = [event_timestamp_column]
timestamps = [timestamp_field]
if created_timestamp_column:
timestamps.append(created_timestamp_column)
timestamp_desc_string = " DESC, ".join(timestamps) + " DESC"
Expand All @@ -92,7 +92,7 @@ def pull_latest_from_table_or_query(
SELECT {field_string},
ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS feast_row_
FROM {from_expression} t1
WHERE {event_timestamp_column} BETWEEN TIMESTAMP('{start_date_str}') AND TIMESTAMP('{end_date_str}')
WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date_str}') AND TIMESTAMP('{end_date_str}')
) t2
WHERE feast_row_ = 1
"""
Expand Down Expand Up @@ -190,12 +190,12 @@ def pull_all_from_table_or_query(
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
event_timestamp_column: str,
timestamp_field: str,
start_date: datetime,
end_date: datetime,
) -> RetrievalJob:
"""
Note that join_key_columns, feature_name_columns, event_timestamp_column, and
Note that join_key_columns, feature_name_columns, timestamp_field, and
created_timestamp_column have all already been mapped to column names of the
source table and those column names are the values passed into this function.
"""
Expand All @@ -210,17 +210,15 @@ def pull_all_from_table_or_query(
store_config=config.offline_store
)

fields = ", ".join(
join_key_columns + feature_name_columns + [event_timestamp_column]
)
fields = ", ".join(join_key_columns + feature_name_columns + [timestamp_field])
from_expression = data_source.get_table_query_string()
start_date = start_date.astimezone(tz=utc)
end_date = end_date.astimezone(tz=utc)

query = f"""
SELECT {fields}
FROM {from_expression}
WHERE {event_timestamp_column} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}'
WHERE {timestamp_field} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}'
"""

return SparkRetrievalJob(
Expand Down Expand Up @@ -422,9 +420,9 @@ def _format_datetime(t: datetime) -> str:
1. We first join the current feature_view to the entity dataframe that has been passed.
This JOIN has the following logic:
- For each row of the entity dataframe, only keep the rows where the `event_timestamp_column`
- For each row of the entity dataframe, only keep the rows where the `timestamp_field`
is less than the one provided in the entity dataframe
- If there a TTL for the current feature_view, also keep the rows where the `event_timestamp_column`
- If there a TTL for the current feature_view, also keep the rows where the `timestamp_field`
is higher the the one provided minus the TTL
- For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been
computed previously
Expand All @@ -435,16 +433,16 @@ def _format_datetime(t: datetime) -> str:
{{ featureview.name }}__subquery AS (
SELECT
{{ featureview.event_timestamp_column }} as event_timestamp,
{{ featureview.timestamp_field }} as event_timestamp,
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
{{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
{% for feature in featureview.features %}
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}'
WHERE {{ featureview.timestamp_field }} <= '{{ featureview.max_event_timestamp }}'
{% if featureview.ttl == 0 %}{% else %}
AND {{ featureview.event_timestamp_column }} >= '{{ featureview.min_event_timestamp }}'
AND {{ featureview.timestamp_field }} >= '{{ featureview.min_event_timestamp }}'
{% endif %}
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def pull_latest_from_table_or_query(
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
event_timestamp_column: str,
timestamp_field: str,
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime,
Expand All @@ -177,7 +177,7 @@ def pull_latest_from_table_or_query(
partition_by_join_key_string = (
"PARTITION BY " + partition_by_join_key_string
)
timestamps = [event_timestamp_column]
timestamps = [timestamp_field]
if created_timestamp_column:
timestamps.append(created_timestamp_column)
timestamp_desc_string = " DESC, ".join(timestamps) + " DESC"
Expand All @@ -195,7 +195,7 @@ def pull_latest_from_table_or_query(
SELECT {field_string},
ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row
FROM {from_expression}
WHERE {event_timestamp_column} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}'
WHERE {timestamp_field} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}'
)
WHERE _feast_row = 1
"""
Expand Down Expand Up @@ -302,7 +302,7 @@ def pull_all_from_table_or_query(
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
event_timestamp_column: str,
timestamp_field: str,
start_date: datetime,
end_date: datetime,
user: str = "user",
Expand All @@ -319,12 +319,12 @@ def pull_all_from_table_or_query(
config=config, user=user, auth=auth, http_scheme=http_scheme
)
field_string = ", ".join(
join_key_columns + feature_name_columns + [event_timestamp_column]
join_key_columns + feature_name_columns + [timestamp_field]
)
query = f"""
SELECT {field_string}
FROM {from_expression}
WHERE {event_timestamp_column} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}'
WHERE {timestamp_field} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}'
"""
return TrinoRetrievalJob(
query=query, client=client, config=config, full_feature_names=False,
Expand Down Expand Up @@ -458,9 +458,9 @@ def _get_entity_df_event_timestamp_range(
to the provided entity table.
1. We first join the current feature_view to the entity dataframe that has been passed.
This JOIN has the following logic:
- For each row of the entity dataframe, only keep the rows where the `event_timestamp_column`
- For each row of the entity dataframe, only keep the rows where the `timestamp_field`
is less than the one provided in the entity dataframe
- If there a TTL for the current feature_view, also keep the rows where the `event_timestamp_column`
- If there a TTL for the current feature_view, also keep the rows where the `timestamp_field`
is higher the the one provided minus the TTL
- For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been
computed previously
Expand All @@ -469,16 +469,16 @@ def _get_entity_df_event_timestamp_range(
*/
{{ featureview.name }}__subquery AS (
SELECT
{{ featureview.event_timestamp_column }} as event_timestamp,
{{ featureview.timestamp_field }} as event_timestamp,
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
{{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
{% for feature in featureview.features %}
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
WHERE {{ featureview.event_timestamp_column }} <= from_iso8601_timestamp('{{ featureview.max_event_timestamp }}')
WHERE {{ featureview.timestamp_field }} <= from_iso8601_timestamp('{{ featureview.max_event_timestamp }}')
{% if featureview.ttl == 0 %}{% else %}
AND {{ featureview.event_timestamp_column }} >= from_iso8601_timestamp('{{ featureview.min_event_timestamp }}')
AND {{ featureview.timestamp_field }} >= from_iso8601_timestamp('{{ featureview.min_event_timestamp }}')
{% endif %}
),
{{ featureview.name }}__base AS (
Expand Down
Loading

0 comments on commit 0a0cc58

Please sign in to comment.