Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Update stream fcos to have watermark and sliding interval #2765

Merged
merged 7 commits into from
Jun 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions protos/feast/core/Aggregation.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ message Aggregation {
string column = 1;
string function = 2;
google.protobuf.Duration time_window = 3;
google.protobuf.Duration slide_interval = 4;
}
2 changes: 2 additions & 0 deletions protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ option go_package = "github.com/feast-dev/feast/go/protos/feast/core";
option java_outer_classname = "DataSourceProto";
option java_package = "feast.proto.core";

import "google/protobuf/duration.proto";
import "feast/core/DataFormat.proto";
import "feast/types/Value.proto";
import "feast/core/Feature.proto";
Expand Down Expand Up @@ -135,6 +136,7 @@ message DataSource {
// Defines the stream data format encoding feature/entity data in Kafka messages.
StreamFormat message_format = 3;

google.protobuf.Duration watermark = 4;
}

// Defines options for DataSource that sources features from Kinesis records.
Expand Down
24 changes: 23 additions & 1 deletion sdk/python/feast/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,45 @@ class Aggregation:
column: str # Column name of the feature we are aggregating.
function: str # Provided built in aggregations sum, max, min, count mean
time_window: timedelta # The time window for this aggregation.
slide_interval: timedelta # The sliding window for these aggregations
"""

column: str
function: str
time_window: Optional[timedelta]
slide_interval: Optional[timedelta]

def __init__(
self,
column: Optional[str] = "",
function: Optional[str] = "",
time_window: Optional[timedelta] = None,
slide_interval: Optional[timedelta] = None,
):
self.column = column or ""
self.function = function or ""
self.time_window = time_window
if not slide_interval:
self.slide_interval = self.time_window
else:
self.slide_interval = slide_interval

def to_proto(self) -> AggregationProto:
window_duration = None
if self.time_window is not None:
window_duration = Duration()
window_duration.FromTimedelta(self.time_window)

slide_interval_duration = None
if self.slide_interval is not None:
slide_interval_duration = Duration()
slide_interval_duration.FromTimedelta(self.slide_interval)

return AggregationProto(
column=self.column, function=self.function, time_window=window_duration
column=self.column,
function=self.function,
time_window=window_duration,
slide_interval=slide_interval_duration,
)

@classmethod
Expand All @@ -48,10 +63,16 @@ def from_proto(cls, agg_proto: AggregationProto):
else agg_proto.time_window.ToTimedelta()
)

slide_interval = (
timedelta(days=0)
if agg_proto.slide_interval.ToNanoseconds() == 0
else agg_proto.slide_interval.ToTimedelta()
)
aggregation = cls(
column=agg_proto.column,
function=agg_proto.function,
time_window=time_window,
slide_interval=slide_interval,
)
return aggregation

Expand All @@ -63,6 +84,7 @@ def __eq__(self, other):
self.column != other.column
or self.function != other.function
or self.time_window != other.time_window
or self.slide_interval != other.slide_interval
):
return False

Expand Down
59 changes: 57 additions & 2 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
import enum
import warnings
from abc import ABC, abstractmethod
from datetime import timedelta
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union

from google.protobuf.duration_pb2 import Duration
from google.protobuf.json_format import MessageToJson

from feast import type_map
Expand Down Expand Up @@ -47,11 +49,16 @@ class KafkaOptions:
"""

def __init__(
self, bootstrap_servers: str, message_format: StreamFormat, topic: str,
self,
bootstrap_servers: str,
message_format: StreamFormat,
topic: str,
watermark: Optional[timedelta] = None,
):
self.bootstrap_servers = bootstrap_servers
self.message_format = message_format
self.topic = topic
self.watermark = watermark or None

@classmethod
def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions):
Expand All @@ -64,11 +71,18 @@ def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions):
Returns:
Returns a BigQueryOptions object based on the kafka_options protobuf
"""

watermark = None
if kafka_options_proto.HasField("watermark"):
watermark = (
timedelta(days=0)
if kafka_options_proto.watermark.ToNanoseconds() == 0
else kafka_options_proto.watermark.ToTimedelta()
)
kafka_options = cls(
bootstrap_servers=kafka_options_proto.bootstrap_servers,
message_format=StreamFormat.from_proto(kafka_options_proto.message_format),
topic=kafka_options_proto.topic,
watermark=watermark,
)

return kafka_options
Expand All @@ -80,11 +94,16 @@ def to_proto(self) -> DataSourceProto.KafkaOptions:
Returns:
KafkaOptionsProto protobuf
"""
watermark_duration = None
if self.watermark is not None:
watermark_duration = Duration()
watermark_duration.FromTimedelta(self.watermark)

kafka_options_proto = DataSourceProto.KafkaOptions(
bootstrap_servers=self.bootstrap_servers,
message_format=self.message_format.to_proto(),
topic=self.topic,
watermark=watermark_duration,
)

return kafka_options_proto
Expand Down Expand Up @@ -369,7 +388,32 @@ def __init__(
owner: Optional[str] = "",
timestamp_field: Optional[str] = "",
batch_source: Optional[DataSource] = None,
watermark: Optional[timedelta] = None,
):
"""
Creates a KafkaSource stream source object.
Args:
name: str. Name of data source, which should be unique within a project
event_timestamp_column (optional): str. (Deprecated) Event timestamp column used for point in time
joins of feature values.
bootstrap_servers: str. The servers of the kafka broker in the form "localhost:9092".
message_format: StreamFormat. StreamFormat of serialized messages.
topic: str. The name of the topic to read from in the kafka source.
created_timestamp_column (optional): str. Timestamp column indicating when the row
was created, used for deduplicating rows.
field_mapping (optional): dict(str, str). A dictionary mapping of column names in this data
source to feature names in a feature table or view. Only used for feature
columns, not entity or timestamp columns.
date_partition_column (optional): str. Timestamp column used for partitioning.
description (optional): str. A human-readable description.
tags (optional): dict(str, str). A dictionary of key-value pairs to store arbitrary metadata.
owner (optional): str. The owner of the data source, typically the email of the primary
maintainer.
timestamp_field (optional): str. Event timestamp field used for point
in time joins of feature values.
batch_source: DataSource. The datasource that acts as a batch source.
watermark: timedelta. The watermark for stream data. Specifically how late stream data can arrive without being discarded.
"""
positional_attributes = [
"name",
"event_timestamp_column",
Expand Down Expand Up @@ -425,10 +469,12 @@ def __init__(
timestamp_field=timestamp_field,
)
self.batch_source = batch_source

self.kafka_options = KafkaOptions(
bootstrap_servers=_bootstrap_servers,
message_format=_message_format,
topic=_topic,
watermark=watermark,
)

def __eq__(self, other):
Expand All @@ -445,6 +491,7 @@ def __eq__(self, other):
!= other.kafka_options.bootstrap_servers
or self.kafka_options.message_format != other.kafka_options.message_format
or self.kafka_options.topic != other.kafka_options.topic
or self.kafka_options.watermark != other.kafka_options.watermark
):
return False

Expand All @@ -455,6 +502,13 @@ def __hash__(self):

@staticmethod
def from_proto(data_source: DataSourceProto):
watermark = None
if data_source.kafka_options.HasField("watermark"):
watermark = (
timedelta(days=0)
if data_source.kafka_options.watermark.ToNanoseconds() == 0
else data_source.kafka_options.watermark.ToTimedelta()
)
return KafkaSource(
name=data_source.name,
event_timestamp_column=data_source.timestamp_field,
Expand All @@ -463,6 +517,7 @@ def from_proto(data_source: DataSourceProto):
message_format=StreamFormat.from_proto(
data_source.kafka_options.message_format
),
watermark=watermark,
topic=data_source.kafka_options.topic,
created_timestamp_column=data_source.created_timestamp_column,
timestamp_field=data_source.timestamp_field,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ def simple_udf(x: int):
message_format=AvroFormat(""),
topic="topic",
batch_source=FileSource(path="some path"),
watermark=timedelta(days=1),
)

sfv = StreamFeatureView(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def test_apply_stream_feature_view(environment) -> None:
message_format=AvroFormat(""),
topic="topic",
batch_source=FileSource(path="test_path", timestamp_field="event_timestamp"),
watermark=timedelta(days=1),
)

@stream_feature_view(
Expand Down