Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change internal references from input to batch_source #1729

Merged
merged 4 commits into from
Jul 27, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/concepts/feature-view.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ driver_stats_fv = FeatureView(
Feature(name="trips_today", dtype=ValueType.INT64),
Feature(name="rating", dtype=ValueType.FLOAT),
],
input=BigQuerySource(
batch_source=BigQuerySource(
table_ref="feast-oss.demo_data.driver_activity"
)
)
Expand Down
4 changes: 2 additions & 2 deletions docs/concepts/feature-views.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ driver_stats_fv = FeatureView(
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
],

# Inputs are used to find feature values. In the case of this feature
# Batch sources are used to find feature values. In the case of this feature
# view we will query a source table on BigQuery for driver statistics
# features
input=driver_stats_source,
batch_source=driver_stats_source,

# Tags are user defined key/value pairs that are attached to each
# feature view
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/feature-repository.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ driver_locations = FeatureView(
Feature(name="lat", dtype=ValueType.FLOAT),
Feature(name="lon", dtype=ValueType.STRING),
],
input=driver_locations_source,
batch_source=driver_locations_source,
)
```
{% endcode %}
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/feature-repository/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ driver_locations = FeatureView(
Feature(name="lat", dtype=ValueType.FLOAT),
Feature(name="lon", dtype=ValueType.STRING),
],
input=driver_locations_source,
batch_source=driver_locations_source,
)
```
{% endcode %}
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def apply(
>>> name="customer_fv",
>>> entities=["customer"],
>>> features=[Feature(name="age", dtype=ValueType.INT64)],
>>> input=FileSource(path="file.parquet", event_timestamp_column="timestamp"),
>>> batch_source=FileSource(path="file.parquet", event_timestamp_column="timestamp"),
>>> ttl=timedelta(days=1)
>>> )
>>> fs.apply([customer_entity, customer_feature_view])
Expand All @@ -284,11 +284,11 @@ def apply(
)

update_data_sources_with_inferred_event_timestamp_col(
[view.input for view in views_to_update], self.config
[view.batch_source for view in views_to_update], self.config
)

for view in views_to_update:
view.infer_features_from_input_source(self.config)
view.infer_features_from_batch_source(self.config)

if len(views_to_update) + len(entities_to_update) + len(
services_to_update
Expand Down
48 changes: 29 additions & 19 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import warnings
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Union

Expand All @@ -38,6 +39,8 @@
from feast.usage import log_exceptions
from feast.value_type import ValueType

warnings.simplefilter("once", DeprecationWarning)


class FeatureView:
"""
Expand All @@ -51,7 +54,7 @@ class FeatureView:
ttl: Optional[timedelta]
online: bool
input: DataSource
batch_source: Optional[DataSource] = None
batch_source: DataSource
stream_source: Optional[DataSource] = None
created_timestamp: Optional[Timestamp] = None
last_updated_timestamp: Optional[Timestamp] = None
Expand All @@ -63,13 +66,21 @@ def __init__(
name: str,
entities: List[str],
ttl: Optional[Union[Duration, timedelta]],
input: DataSource,
input: Optional[DataSource] = None,
batch_source: Optional[DataSource] = None,
stream_source: Optional[DataSource] = None,
features: List[Feature] = None,
tags: Optional[Dict[str, str]] = None,
online: bool = True,
):
warnings.warn(
Copy link
Contributor

@tedhtchang tedhtchang Jul 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felixwang9817 Should we warn users only when the input is assigned ?

Copy link
Collaborator Author

@felixwang9817 felixwang9817 Jul 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tedhtchang That's a good point, I think ideally we would have done that. However, given that (a) input was a required parameter before this change, so almost all users will continue to use input, and (b) releases are fairly frequent so this deprecation warning will soon become outdated, I think we can leave this change as is.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tedhtchang btw, I managed to insert this change into a separate PR (#1746). Thanks for pointing this one out!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felixwang9817 Thanks. Looks good.

(
"The argument 'input' is being deprecated. Please use 'batch_source' "
"instead. Feast 0.12 and onwards will not support the argument 'input'."
felixwang9817 marked this conversation as resolved.
Show resolved Hide resolved
),
DeprecationWarning,
)

_input = input or batch_source
assert _input is not None

Expand Down Expand Up @@ -139,7 +150,7 @@ def __eq__(self, other):
return False
if sorted(self.features) != sorted(other.features):
return False
if self.input != other.input:
if self.batch_source != other.batch_source:
return False
if self.stream_source != other.stream_source:
return False
Expand Down Expand Up @@ -182,10 +193,8 @@ def to_proto(self) -> FeatureViewProto:
ttl_duration = Duration()
ttl_duration.FromTimedelta(self.ttl)

batch_source_proto = self.input.to_proto()
batch_source_proto.data_source_class_type = (
f"{self.input.__class__.__module__}.{self.input.__class__.__name__}"
)
batch_source_proto = self.batch_source.to_proto()
batch_source_proto.data_source_class_type = f"{self.batch_source.__class__.__module__}.{self.batch_source.__class__.__name__}"

stream_source_proto = None
if self.stream_source:
Expand Down Expand Up @@ -217,7 +226,7 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
Returns a FeatureViewProto object based on the feature view protobuf
"""

_input = DataSource.from_proto(feature_view_proto.spec.batch_source)
batch_source = DataSource.from_proto(feature_view_proto.spec.batch_source)
stream_source = (
DataSource.from_proto(feature_view_proto.spec.stream_source)
if feature_view_proto.spec.HasField("stream_source")
Expand All @@ -242,8 +251,8 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
and feature_view_proto.spec.ttl.nanos == 0
else feature_view_proto.spec.ttl
),
input=_input,
batch_source=_input,
input=batch_source,
batch_source=batch_source,
stream_source=stream_source,
)

Expand All @@ -265,29 +274,30 @@ def most_recent_end_time(self) -> Optional[datetime]:
return None
return max([interval[1] for interval in self.materialization_intervals])

def infer_features_from_input_source(self, config: RepoConfig):
def infer_features_from_batch_source(self, config: RepoConfig):
if not self.features:
columns_to_exclude = {
self.input.event_timestamp_column,
self.input.created_timestamp_column,
self.batch_source.event_timestamp_column,
self.batch_source.created_timestamp_column,
} | set(self.entities)

for col_name, col_datatype in self.input.get_table_column_names_and_types(
config
):
for (
col_name,
col_datatype,
) in self.batch_source.get_table_column_names_and_types(config):
if col_name not in columns_to_exclude and not re.match(
"^__|__$",
col_name, # double underscores often signal an internal-use column
):
feature_name = (
self.input.field_mapping[col_name]
if col_name in self.input.field_mapping.keys()
self.batch_source.field_mapping[col_name]
if col_name in self.batch_source.field_mapping.keys()
else col_name
)
self.features.append(
Feature(
feature_name,
self.input.source_datatype_to_feast_value_type()(
self.batch_source.source_datatype_to_feast_value_type()(
col_datatype
),
)
Expand Down
10 changes: 5 additions & 5 deletions sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def update_entities_with_inferred_types_from_feature_views(
entities: List[Entity], feature_views: List[FeatureView], config: RepoConfig
) -> None:
"""
Infer entity value type by examining schema of feature view input sources
Infer entity value type by examining schema of feature view batch sources
"""
incomplete_entities = {
entity.name: entity
Expand All @@ -26,22 +26,22 @@ def update_entities_with_inferred_types_from_feature_views(
if not (incomplete_entities_keys & set(view.entities)):
continue # skip if view doesn't contain any entities that need inference

col_names_and_types = view.input.get_table_column_names_and_types(config)
col_names_and_types = view.batch_source.get_table_column_names_and_types(config)
for entity_name in view.entities:
if entity_name in incomplete_entities:
# get entity information from information extracted from the view input source
# get entity information from information extracted from the view batch source
extracted_entity_name_type_pairs = list(
filter(lambda tup: tup[0] == entity_name, col_names_and_types)
)
if len(extracted_entity_name_type_pairs) == 0:
# Doesn't mention inference error because would also be an error without inferencing
raise ValueError(
f"""No column in the input source for the {view.name} feature view matches
f"""No column in the batch source for the {view.name} feature view matches
its entity's name."""
)

entity = incomplete_entities[entity_name]
inferred_value_type = view.input.source_datatype_to_feast_value_type()(
inferred_value_type = view.batch_source.source_datatype_to_feast_value_type()(
extracted_entity_name_type_pairs[0][1]
)

Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def materialize_single_feature_view(

offline_job = self.offline_store.pull_latest_from_table_or_query(
config=config,
data_source=feature_view.input,
data_source=feature_view.batch_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
event_timestamp_column=event_timestamp_column,
Expand All @@ -110,8 +110,8 @@ def materialize_single_feature_view(

table = offline_job.to_arrow()

if feature_view.input.field_mapping is not None:
table = _run_field_mapping(table, feature_view.input.field_mapping)
if feature_view.batch_source.field_mapping is not None:
table = _run_field_mapping(table, feature_view.batch_source.field_mapping)

join_keys = [entity.join_key for entity in entities]
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def materialize_single_feature_view(

offline_job = self.offline_store.pull_latest_from_table_or_query(
config=config,
data_source=feature_view.input,
data_source=feature_view.batch_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
event_timestamp_column=event_timestamp_column,
Expand All @@ -112,8 +112,8 @@ def materialize_single_feature_view(
)
table = offline_job.to_arrow()

if feature_view.input.field_mapping is not None:
table = _run_field_mapping(table, feature_view.input.field_mapping)
if feature_view.batch_source.field_mapping is not None:
table = _run_field_mapping(table, feature_view.batch_source.field_mapping)

join_keys = [entity.join_key for entity in entities]
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def materialize_single_feature_view(
) = _get_column_names(feature_view, entities)

offline_job = self.offline_store.pull_latest_from_table_or_query(
data_source=feature_view.input,
data_source=feature_view.batch_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
event_timestamp_column=event_timestamp_column,
Expand All @@ -111,8 +111,8 @@ def materialize_single_feature_view(
)
table = offline_job.to_arrow()

if feature_view.input.field_mapping is not None:
table = _run_field_mapping(table, feature_view.input.field_mapping)
if feature_view.batch_source.field_mapping is not None:
table = _run_field_mapping(table, feature_view.batch_source.field_mapping)

join_keys = [entity.join_key for entity in entities]
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)
Expand Down
16 changes: 11 additions & 5 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,21 @@ def evaluate_historical_retrieval():

# Load feature view data from sources and join them incrementally
for feature_view, features in feature_views_to_features.items():
event_timestamp_column = feature_view.input.event_timestamp_column
created_timestamp_column = feature_view.input.created_timestamp_column
event_timestamp_column = (
feature_view.batch_source.event_timestamp_column
)
created_timestamp_column = (
feature_view.batch_source.created_timestamp_column
)

# Read offline parquet data in pyarrow format
table = pyarrow.parquet.read_table(feature_view.input.path)
table = pyarrow.parquet.read_table(feature_view.batch_source.path)

# Rename columns by the field mapping dictionary if it exists
if feature_view.input.field_mapping is not None:
table = _run_field_mapping(table, feature_view.input.field_mapping)
if feature_view.batch_source.field_mapping is not None:
table = _run_field_mapping(
table, feature_view.batch_source.field_mapping
)

# Convert pyarrow table to pandas dataframe
df_to_join = table.to_pandas()
Expand Down
14 changes: 7 additions & 7 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,13 @@ def _get_column_names(
the query to the offline store.
"""
# if we have mapped fields, use the original field names in the call to the offline store
event_timestamp_column = feature_view.input.event_timestamp_column
event_timestamp_column = feature_view.batch_source.event_timestamp_column
feature_names = [feature.name for feature in feature_view.features]
created_timestamp_column = feature_view.input.created_timestamp_column
created_timestamp_column = feature_view.batch_source.created_timestamp_column
join_keys = [entity.join_key for entity in entities]
if feature_view.input.field_mapping is not None:
if feature_view.batch_source.field_mapping is not None:
reverse_field_mapping = {
v: k for k, v in feature_view.input.field_mapping.items()
v: k for k, v in feature_view.batch_source.field_mapping.items()
}
event_timestamp_column = (
reverse_field_mapping[event_timestamp_column]
Expand Down Expand Up @@ -292,13 +292,13 @@ def _coerce_datetime(ts):
value = python_value_to_proto_value(row[idx], feature.dtype)
feature_dict[feature.name] = value
event_timestamp_idx = table.column_names.index(
feature_view.input.event_timestamp_column
feature_view.batch_source.event_timestamp_column
)
event_timestamp = _coerce_datetime(row[event_timestamp_idx])

if feature_view.input.created_timestamp_column:
if feature_view.batch_source.created_timestamp_column:
created_timestamp_idx = table.column_names.index(
feature_view.input.created_timestamp_column
feature_view.batch_source.created_timestamp_column
)
created_timestamp = _coerce_datetime(row[created_timestamp_idx])
else:
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
registry._initialize_registry()
sys.dont_write_bytecode = True
repo = parse_repo(repo_path)
data_sources = [t.input for t in repo.feature_views]
data_sources = [t.batch_source for t in repo.feature_views]

if not skip_source_validation:
# Make sure the data source used by this feature view is supported by Feast
Expand All @@ -175,7 +175,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
)
update_data_sources_with_inferred_event_timestamp_col(data_sources, repo_config)
for view in repo.feature_views:
view.infer_features_from_input_source(repo_config)
view.infer_features_from_batch_source(repo_config)

repo_table_names = set(t.name for t in repo.feature_tables)

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/templates/aws/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
],
online=True,
input=driver_hourly_stats,
batch_source=driver_hourly_stats,
tags={},
)
4 changes: 2 additions & 2 deletions sdk/python/feast/templates/gcp/driver_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@
Feature(name="acc_rate", dtype=ValueType.FLOAT),
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
],
# Inputs are used to find feature values. In the case of this feature
# Batch sources are used to find feature values. In the case of this feature
# view we will query a source table on BigQuery for driver statistics
# features
input=driver_stats_source,
batch_source=driver_stats_source,
# Tags are user defined key/value pairs that are attached to each
# feature view
tags={"team": "driver_performance"},
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/templates/local/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
],
online=True,
input=driver_hourly_stats,
batch_source=driver_hourly_stats,
tags={},
)
Loading