Skip to content

Commit

Permalink
Add streaming sources to the FeatureView API (#1664)
Browse files Browse the repository at this point in the history
* Add a streaming source to the FeatureView API

This diff only updates the API. It is currently up to the providers to actually use this information to spin up resources to consume events from the stream sources.

Signed-off-by: Achal Shah <achals@gmail.com>

* remove stuff from rebase

Signed-off-by: Achal Shah <achals@gmail.com>

* make format

Signed-off-by: Achal Shah <achals@gmail.com>

* Update protos

Signed-off-by: Achal Shah <achals@gmail.com>

* lint

Signed-off-by: Achal Shah <achals@gmail.com>

* format

Signed-off-by: Achal Shah <achals@gmail.com>

* CR

Signed-off-by: Achal Shah <achals@gmail.com>

* fix test

Signed-off-by: Achal Shah <achals@gmail.com>

* lint

Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals authored Jun 28, 2021
1 parent b9dd955 commit 51fe128
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 10 deletions.
4 changes: 3 additions & 1 deletion protos/feast/core/FeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ message FeatureViewSpec {
google.protobuf.Duration ttl = 6;

// Batch/Offline DataSource where this view can retrieve offline feature data.
DataSource input = 7;
DataSource batch_source = 7;
// Streaming DataSource from where this view can consume "online" feature data.
DataSource stream_source = 9;

// Whether these features should be served online or not
bool online = 8;
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,9 @@ def __init__(
)

def __eq__(self, other):
if other is None:
return False

if not isinstance(other, KinesisSource):
raise TypeError(
"Comparisons should only involve KinesisSource class objects."
Expand Down
39 changes: 33 additions & 6 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class FeatureView:
ttl: Optional[timedelta]
online: bool
input: DataSource

batch_source: Optional[DataSource] = None
stream_source: Optional[DataSource] = None
created_timestamp: Optional[Timestamp] = None
last_updated_timestamp: Optional[Timestamp] = None
materialization_intervals: List[Tuple[datetime, datetime]]
Expand All @@ -62,15 +63,22 @@ def __init__(
entities: List[str],
ttl: Optional[Union[Duration, timedelta]],
input: DataSource,
batch_source: Optional[DataSource] = None,
stream_source: Optional[DataSource] = None,
features: List[Feature] = [],
tags: Optional[Dict[str, str]] = None,
online: bool = True,
):
_input = input or batch_source
assert _input is not None

cols = [entity for entity in entities] + [feat.name for feat in features]
for col in cols:
if input.field_mapping is not None and col in input.field_mapping.keys():
if _input.field_mapping is not None and col in _input.field_mapping.keys():
raise ValueError(
f"The field {col} is mapped to {input.field_mapping[col]} for this data source. Please either remove this field mapping or use {input.field_mapping[col]} as the Entity or Feature name."
f"The field {col} is mapped to {_input.field_mapping[col]} for this data source. "
f"Please either remove this field mapping or use {_input.field_mapping[col]} as the "
f"Entity or Feature name."
)

self.name = name
Expand All @@ -84,7 +92,9 @@ def __init__(
self.ttl = ttl

self.online = online
self.input = input
self.input = _input
self.batch_source = _input
self.stream_source = stream_source

self.materialization_intervals = []

Expand Down Expand Up @@ -118,6 +128,8 @@ def __eq__(self, other):
return False
if self.input != other.input:
return False
if self.stream_source != other.stream_source:
return False

return True

Expand Down Expand Up @@ -157,14 +169,21 @@ def to_proto(self) -> FeatureViewProto:
ttl_duration = Duration()
ttl_duration.FromTimedelta(self.ttl)

print(f"Stream soruce: {self.stream_source}, {type(self.stream_source)}")

spec = FeatureViewSpecProto(
name=self.name,
entities=self.entities,
features=[feature.to_proto() for feature in self.features],
tags=self.tags,
ttl=(ttl_duration if ttl_duration is not None else None),
online=self.online,
input=self.input.to_proto(),
batch_source=self.input.to_proto(),
stream_source=(
self.stream_source.to_proto()
if self.stream_source is not None
else None
),
)

return FeatureViewProto(spec=spec, meta=meta)
Expand All @@ -181,6 +200,12 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
Returns a FeatureViewProto object based on the feature view protobuf
"""

_input = DataSource.from_proto(feature_view_proto.spec.batch_source)
stream_source = (
DataSource.from_proto(feature_view_proto.spec.stream_source)
if feature_view_proto.spec.HasField("stream_source")
else None
)
feature_view = cls(
name=feature_view_proto.spec.name,
entities=[entity for entity in feature_view_proto.spec.entities],
Expand All @@ -200,7 +225,9 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
and feature_view_proto.spec.ttl.nanos == 0
else feature_view_proto.spec.ttl
),
input=DataSource.from_proto(feature_view_proto.spec.input),
input=_input,
batch_source=_input,
stream_source=stream_source,
)

feature_view.created_timestamp = feature_view_proto.meta.created_timestamp
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/tensorflow_metadata/proto/v0/path_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/python/tensorflow_metadata/proto/v0/schema_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/python/tensorflow_metadata/proto/v0/statistics_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 51fe128

Please sign in to comment.