-
Notifications
You must be signed in to change notification settings - Fork 996
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Write logged features to an offline store (Python API) #2574
Merged
feast-ci-bot
merged 16 commits into
feast-dev:master
from
pyalex:offline-store-write-logs
Apr 26, 2022
Merged
Changes from all commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
2043767
write logs to offline store
pyalex f3f9998
format
pyalex af58966
fix after rebase
pyalex 2681b3c
fix tests
pyalex 5637da6
handle table not found in tests
pyalex 43c97c2
some api docs
pyalex c5c7fbe
fix import
pyalex eaea5dd
use predefined schema in tests
pyalex 7ac0b83
address pr comments
pyalex 40fe028
more api docs
pyalex 65a1a77
add proto attr to snowflake dest
pyalex 4b983d6
add prefixes to system fields
pyalex 0fc3e5a
add custom destination
pyalex 9d1eb1a
move partition columns to destination config
pyalex ede26b2
after rebase
pyalex b2ef41f
allow data source creator implementations w/o logging destination
pyalex File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
import abc | ||
from typing import TYPE_CHECKING, Dict, Optional, Type, cast | ||
|
||
import pyarrow as pa | ||
from pytz import UTC | ||
|
||
from feast.data_source import DataSource | ||
from feast.embedded_go.type_map import FEAST_TYPE_TO_ARROW_TYPE, PA_TIMESTAMP_TYPE | ||
from feast.errors import ( | ||
FeastObjectNotFoundException, | ||
FeatureViewNotFoundException, | ||
OnDemandFeatureViewNotFoundException, | ||
) | ||
from feast.protos.feast.core.FeatureService_pb2 import ( | ||
LoggingConfig as LoggingConfigProto, | ||
) | ||
from feast.types import from_value_type | ||
|
||
if TYPE_CHECKING: | ||
from feast import FeatureService | ||
from feast.registry import Registry | ||
|
||
|
||
REQUEST_ID_FIELD = "__request_id" | ||
LOG_TIMESTAMP_FIELD = "__log_timestamp" | ||
LOG_DATE_FIELD = "__log_date" | ||
|
||
|
||
class LoggingSource: | ||
""" | ||
Logging source describes object that produces logs (eg, feature service produces logs of served features). | ||
It should be able to provide schema of produced logs table and additional metadata that describes logs data. | ||
""" | ||
|
||
@abc.abstractmethod | ||
def get_schema(self, registry: "Registry") -> pa.Schema: | ||
""" Generate schema for logs destination. """ | ||
raise NotImplementedError | ||
|
||
@abc.abstractmethod | ||
def get_log_timestamp_column(self) -> str: | ||
""" Return timestamp column that must exist in generated schema. """ | ||
raise NotImplementedError | ||
|
||
|
||
class FeatureServiceLoggingSource(LoggingSource): | ||
def __init__(self, feature_service: "FeatureService", project: str): | ||
self._feature_service = feature_service | ||
self._project = project | ||
|
||
def get_schema(self, registry: "Registry") -> pa.Schema: | ||
fields: Dict[str, pa.DataType] = {} | ||
|
||
for projection in self._feature_service.feature_view_projections: | ||
for feature in projection.features: | ||
fields[ | ||
f"{projection.name_to_use()}__{feature.name}" | ||
] = FEAST_TYPE_TO_ARROW_TYPE[feature.dtype] | ||
fields[ | ||
f"{projection.name_to_use()}__{feature.name}__timestamp" | ||
] = PA_TIMESTAMP_TYPE | ||
fields[ | ||
f"{projection.name_to_use()}__{feature.name}__status" | ||
] = pa.int32() | ||
|
||
try: | ||
feature_view = registry.get_feature_view(projection.name, self._project) | ||
except FeatureViewNotFoundException: | ||
try: | ||
on_demand_feature_view = registry.get_on_demand_feature_view( | ||
projection.name, self._project | ||
) | ||
except OnDemandFeatureViewNotFoundException: | ||
raise FeastObjectNotFoundException( | ||
f"Can't recognize feature view with a name {projection.name}" | ||
) | ||
|
||
for ( | ||
request_source | ||
) in on_demand_feature_view.source_request_sources.values(): | ||
for field in request_source.schema: | ||
fields[field.name] = FEAST_TYPE_TO_ARROW_TYPE[field.dtype] | ||
|
||
else: | ||
for entity_name in feature_view.entities: | ||
entity = registry.get_entity(entity_name, self._project) | ||
join_key = projection.join_key_map.get( | ||
entity.join_key, entity.join_key | ||
) | ||
fields[join_key] = FEAST_TYPE_TO_ARROW_TYPE[ | ||
from_value_type(entity.value_type) | ||
] | ||
|
||
# system columns | ||
fields[REQUEST_ID_FIELD] = pa.string() | ||
fields[LOG_TIMESTAMP_FIELD] = pa.timestamp("us", tz=UTC) | ||
|
||
return pa.schema( | ||
[pa.field(name, data_type) for name, data_type in fields.items()] | ||
) | ||
|
||
def get_log_timestamp_column(self) -> str: | ||
return LOG_TIMESTAMP_FIELD | ||
|
||
|
||
class _DestinationRegistry(type): | ||
classes_by_proto_attr_name: Dict[str, Type["LoggingDestination"]] = {} | ||
|
||
def __new__(cls, name, bases, dct): | ||
kls = type.__new__(cls, name, bases, dct) | ||
if dct.get("_proto_attr_name"): | ||
cls.classes_by_proto_attr_name[dct["_proto_attr_name"]] = kls | ||
return kls | ||
|
||
|
||
class LoggingDestination: | ||
""" | ||
Logging destination contains details about where exactly logs should be written inside an offline store. | ||
It is implementation specific - each offline store must implement LoggingDestination subclass. | ||
|
||
Kind of logging destination will be determined by matching attribute name in LoggingConfig protobuf message | ||
and "_proto_kind" property of each subclass. | ||
""" | ||
|
||
_proto_kind: str | ||
|
||
@classmethod | ||
@abc.abstractmethod | ||
def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination": | ||
raise NotImplementedError | ||
|
||
@abc.abstractmethod | ||
def to_proto(self) -> LoggingConfigProto: | ||
raise NotImplementedError | ||
|
||
@abc.abstractmethod | ||
def to_data_source(self) -> DataSource: | ||
""" | ||
Convert this object into a data source to read logs from an offline store. | ||
""" | ||
raise NotImplementedError | ||
|
||
|
||
class LoggingConfig: | ||
destination: LoggingDestination | ||
|
||
def __init__(self, destination: LoggingDestination): | ||
self.destination = destination | ||
|
||
@classmethod | ||
def from_proto(cls, config_proto: LoggingConfigProto) -> Optional["LoggingConfig"]: | ||
proto_kind = cast(str, config_proto.WhichOneof("destination")) | ||
if proto_kind is None: | ||
return | ||
|
||
if proto_kind == "custom_destination": | ||
proto_kind = config_proto.custom_destination.kind | ||
|
||
destination_class = _DestinationRegistry.classes_by_proto_attr_name[proto_kind] | ||
return LoggingConfig(destination=destination_class.from_proto(config_proto)) | ||
|
||
def to_proto(self) -> LoggingConfigProto: | ||
proto = self.destination.to_proto() | ||
return proto |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to check my understanding, users should be able to instantiate LoggingSources that could be wrappers of existing data sources right? e.g. we append to an already existing BigQuery table that already has the equivalent of these features?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not exactly.
LoggingDestination
. This one is defined by user as part of LoggingConfig in feature service (see changes to proto).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some doc string.