Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Create stream and batch feature view abstractions #2559

Merged
merged 3 commits into from
Apr 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ The services with containerized replacements currently implemented are:
- Datastore
- DynamoDB
- Redis
- Trino

You can run `make test-python-integration-container` to run tests against the containerized versions of dependencies.

Expand Down
58 changes: 58 additions & 0 deletions sdk/python/feast/batch_feature_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from datetime import timedelta
from typing import Dict, List, Optional, Union

from feast.data_source import DataSource
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.field import Field
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto

SUPPORTED_BATCH_SOURCES = {
"BigQuerySource",
"FileSource",
"RedshiftSource",
"SnowflakeSource",
"SparkSource",
"TrinoSource",
}


class BatchFeatureView(FeatureView):
def __init__(
self,
*,
achals marked this conversation as resolved.
Show resolved Hide resolved
name: Optional[str] = None,
entities: Optional[Union[List[Entity], List[str]]] = None,
ttl: Optional[timedelta] = None,
tags: Optional[Dict[str, str]] = None,
online: bool = True,
description: str = "",
owner: str = "",
schema: Optional[List[Field]] = None,
source: Optional[DataSource] = None,
):

if source is None:
raise ValueError("Feature views need a source specified")
if (
type(source).__name__ not in SUPPORTED_BATCH_SOURCES
and source.to_proto().type != DataSourceProto.SourceType.CUSTOM_SOURCE
):
raise ValueError(
f"Batch feature views need a batch source, expected one of {SUPPORTED_BATCH_SOURCES} "
f"or CUSTOM_SOURCE, got {type(source).__name__}: {source.name} instead "
)

super().__init__(
name=name,
entities=entities,
ttl=ttl,
batch_source=None,
stream_source=None,
tags=tags,
online=online,
description=description,
owner=owner,
schema=schema,
source=source,
)
54 changes: 54 additions & 0 deletions sdk/python/feast/stream_feature_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from datetime import timedelta
from typing import Dict, List, Optional, Union

from feast.data_source import DataSource
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.field import Field
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto

SUPPORTED_STREAM_SOURCES = {
"KafkaSource",
"KinesisSource",
}


class StreamFeatureView(FeatureView):
def __init__(
self,
*,
name: Optional[str] = None,
entities: Optional[Union[List[Entity], List[str]]] = None,
ttl: Optional[timedelta] = None,
tags: Optional[Dict[str, str]] = None,
online: bool = True,
description: str = "",
owner: str = "",
schema: Optional[List[Field]] = None,
source: Optional[DataSource] = None,
):

if source is None:
raise ValueError("Feature views need a source specified")
if (
type(source).__name__ not in SUPPORTED_STREAM_SOURCES
and source.to_proto().type != DataSourceProto.SourceType.CUSTOM_SOURCE
):
raise ValueError(
f"Stream feature views need a stream source, expected one of {SUPPORTED_STREAM_SOURCES} "
f"or CUSTOM_SOURCE, got {type(source).__name__}: {source.name} instead "
)

super().__init__(
name=name,
entities=entities,
ttl=ttl,
batch_source=None,
stream_source=None,
tags=tags,
online=online,
description=description,
owner=owner,
schema=schema,
source=source,
)
70 changes: 70 additions & 0 deletions sdk/python/tests/unit/test_feature_views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from datetime import timedelta

import pytest

from feast.batch_feature_view import BatchFeatureView
from feast.data_format import AvroFormat
from feast.data_source import KafkaSource
from feast.infra.offline_stores.file_source import FileSource
from feast.stream_feature_view import StreamFeatureView


def test_create_batch_feature_view():
batch_source = FileSource(path="some path")
BatchFeatureView(
name="test batch feature view",
entities=[],
ttl=timedelta(days=30),
source=batch_source,
)

with pytest.raises(ValueError):
BatchFeatureView(
name="test batch feature view", entities=[], ttl=timedelta(days=30)
)

stream_source = KafkaSource(
name="kafka",
event_timestamp_column="",
bootstrap_servers="",
message_format=AvroFormat(""),
topic="topic",
batch_source=FileSource(path="some path"),
)
with pytest.raises(ValueError):
BatchFeatureView(
name="test batch feature view",
entities=[],
ttl=timedelta(days=30),
source=stream_source,
)


def test_create_stream_feature_view():
stream_source = KafkaSource(
name="kafka",
event_timestamp_column="",
bootstrap_servers="",
message_format=AvroFormat(""),
topic="topic",
batch_source=FileSource(path="some path"),
)
StreamFeatureView(
name="test batch feature view",
entities=[],
ttl=timedelta(days=30),
source=stream_source,
)

with pytest.raises(ValueError):
StreamFeatureView(
name="test batch feature view", entities=[], ttl=timedelta(days=30)
)

with pytest.raises(ValueError):
StreamFeatureView(
name="test batch feature view",
entities=[],
ttl=timedelta(days=30),
source=FileSource(path="some path"),
)