Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Write logged features to an offline store (Python API) #2574

Merged
merged 16 commits into from
Apr 26, 2022
46 changes: 46 additions & 0 deletions protos/feast/core/FeatureService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ option go_package = "github.com/feast-dev/feast/go/protos/feast/core";
option java_outer_classname = "FeatureServiceProto";
option java_package = "feast.proto.core";

import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "feast/core/FeatureViewProjection.proto";

Expand Down Expand Up @@ -35,6 +36,9 @@ message FeatureServiceSpec {

// Owner of the feature service.
string owner = 6;

// (optional) if provided logging will be enabled for this feature service.
LoggingConfig logging_config = 7;
}


Expand All @@ -46,3 +50,45 @@ message FeatureServiceMeta {
google.protobuf.Timestamp last_updated_timestamp = 2;

}


message LoggingConfig {
float sample_rate = 1;
google.protobuf.Duration partition_interval = 2;

oneof destination {
FileDestination file_destination = 3;
BigQueryDestination bigquery_destination = 4;
RedshiftDestination redshift_destination = 5;
SnowflakeDestination snowflake_destination = 6;
CustomDestination custom_destination = 7;
}

message FileDestination {
string path = 1;
string s3_endpoint_override = 2;

// column names to use for partitioning
repeated string partition_by = 3;
}

message BigQueryDestination {
// Full table reference in the form of [project:dataset.table]
string table_ref = 1;
}

message RedshiftDestination {
// Destination table name. ClusterId and database will be taken from an offline store config
string table_name = 1;
}

message SnowflakeDestination {
// Destination table name. Schema and database will be taken from an offline store config
string table_name = 1;
}

message CustomDestination {
string kind = 1;
map<string, string> config = 2;
}
}
164 changes: 164 additions & 0 deletions sdk/python/feast/feature_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import abc
from typing import TYPE_CHECKING, Dict, Optional, Type, cast

import pyarrow as pa
from pytz import UTC

from feast.data_source import DataSource
from feast.embedded_go.type_map import FEAST_TYPE_TO_ARROW_TYPE, PA_TIMESTAMP_TYPE
from feast.errors import (
FeastObjectNotFoundException,
FeatureViewNotFoundException,
OnDemandFeatureViewNotFoundException,
)
from feast.protos.feast.core.FeatureService_pb2 import (
LoggingConfig as LoggingConfigProto,
)
from feast.types import from_value_type

if TYPE_CHECKING:
from feast import FeatureService
from feast.registry import Registry


REQUEST_ID_FIELD = "__request_id"
LOG_TIMESTAMP_FIELD = "__log_timestamp"
LOG_DATE_FIELD = "__log_date"


class LoggingSource:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to check my understanding, users should be able to instantiate LoggingSources that could be wrappers of existing data sources right? e.g. we append to an already existing BigQuery table that already has the equivalent of these features?

Copy link
Collaborator Author

@pyalex pyalex Apr 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly.

  1. User doesn't instantiate logging source, this is being created mostly internally depending on where logs are coming from.
  2. Currently the only available logging source is a feature server. Other example could be materialization job or streaming job.
  3. We do not append to feature sources (like BigQueryDataSource). Instead logs are being written to LoggingDestination. This one is defined by user as part of LoggingConfig in feature service (see changes to proto).
  4. LoggingDestination can be converted to DataSource (see LoggingDestination.to_data_source) when logs are loaded from offline store.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some doc string.

"""
Logging source describes object that produces logs (eg, feature service produces logs of served features).
It should be able to provide schema of produced logs table and additional metadata that describes logs data.
"""

@abc.abstractmethod
def get_schema(self, registry: "Registry") -> pa.Schema:
""" Generate schema for logs destination. """
raise NotImplementedError

@abc.abstractmethod
def get_log_timestamp_column(self) -> str:
""" Return timestamp column that must exist in generated schema. """
raise NotImplementedError


class FeatureServiceLoggingSource(LoggingSource):
def __init__(self, feature_service: "FeatureService", project: str):
self._feature_service = feature_service
self._project = project

def get_schema(self, registry: "Registry") -> pa.Schema:
fields: Dict[str, pa.DataType] = {}

for projection in self._feature_service.feature_view_projections:
for feature in projection.features:
fields[
f"{projection.name_to_use()}__{feature.name}"
] = FEAST_TYPE_TO_ARROW_TYPE[feature.dtype]
fields[
f"{projection.name_to_use()}__{feature.name}__timestamp"
] = PA_TIMESTAMP_TYPE
fields[
f"{projection.name_to_use()}__{feature.name}__status"
] = pa.int32()

try:
feature_view = registry.get_feature_view(projection.name, self._project)
except FeatureViewNotFoundException:
try:
on_demand_feature_view = registry.get_on_demand_feature_view(
projection.name, self._project
)
except OnDemandFeatureViewNotFoundException:
raise FeastObjectNotFoundException(
f"Can't recognize feature view with a name {projection.name}"
)

for (
request_source
) in on_demand_feature_view.source_request_sources.values():
for field in request_source.schema:
fields[field.name] = FEAST_TYPE_TO_ARROW_TYPE[field.dtype]

else:
for entity_name in feature_view.entities:
entity = registry.get_entity(entity_name, self._project)
join_key = projection.join_key_map.get(
entity.join_key, entity.join_key
)
fields[join_key] = FEAST_TYPE_TO_ARROW_TYPE[
from_value_type(entity.value_type)
]

# system columns
fields[REQUEST_ID_FIELD] = pa.string()
fields[LOG_TIMESTAMP_FIELD] = pa.timestamp("us", tz=UTC)

return pa.schema(
[pa.field(name, data_type) for name, data_type in fields.items()]
)

def get_log_timestamp_column(self) -> str:
return LOG_TIMESTAMP_FIELD


class _DestinationRegistry(type):
classes_by_proto_attr_name: Dict[str, Type["LoggingDestination"]] = {}

def __new__(cls, name, bases, dct):
kls = type.__new__(cls, name, bases, dct)
if dct.get("_proto_attr_name"):
cls.classes_by_proto_attr_name[dct["_proto_attr_name"]] = kls
return kls


class LoggingDestination:
"""
Logging destination contains details about where exactly logs should be written inside an offline store.
It is implementation specific - each offline store must implement LoggingDestination subclass.

Kind of logging destination will be determined by matching attribute name in LoggingConfig protobuf message
and "_proto_kind" property of each subclass.
"""

_proto_kind: str

@classmethod
@abc.abstractmethod
def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination":
raise NotImplementedError

@abc.abstractmethod
def to_proto(self) -> LoggingConfigProto:
raise NotImplementedError

@abc.abstractmethod
def to_data_source(self) -> DataSource:
"""
Convert this object into a data source to read logs from an offline store.
"""
raise NotImplementedError


class LoggingConfig:
destination: LoggingDestination

def __init__(self, destination: LoggingDestination):
self.destination = destination

@classmethod
def from_proto(cls, config_proto: LoggingConfigProto) -> Optional["LoggingConfig"]:
proto_kind = cast(str, config_proto.WhichOneof("destination"))
if proto_kind is None:
return

if proto_kind == "custom_destination":
proto_kind = config_proto.custom_destination.kind

destination_class = _DestinationRegistry.classes_by_proto_attr_name[proto_kind]
return LoggingConfig(destination=destination_class.from_proto(config_proto))

def to_proto(self) -> LoggingConfigProto:
proto = self.destination.to_proto()
return proto
10 changes: 10 additions & 0 deletions sdk/python/feast/feature_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from google.protobuf.json_format import MessageToJson

from feast.base_feature_view import BaseFeatureView
from feast.feature_logging import LoggingConfig
from feast.feature_view import FeatureView
from feast.feature_view_projection import FeatureViewProjection
from feast.on_demand_feature_view import OnDemandFeatureView
Expand Down Expand Up @@ -44,6 +45,7 @@ class FeatureService:
owner: str
created_timestamp: Optional[datetime] = None
last_updated_timestamp: Optional[datetime] = None
logging_config: Optional[LoggingConfig] = None

@log_exceptions
def __init__(
Expand All @@ -54,6 +56,7 @@ def __init__(
tags: Dict[str, str] = None,
description: str = "",
owner: str = "",
logging_config: Optional[LoggingConfig] = None,
):
"""
Creates a FeatureService object.
Expand Down Expand Up @@ -106,6 +109,7 @@ def __init__(
self.owner = owner
self.created_timestamp = None
self.last_updated_timestamp = None
self.logging_config = logging_config

def __repr__(self):
items = (f"{k} = {v}" for k, v in self.__dict__.items())
Expand Down Expand Up @@ -152,6 +156,9 @@ def from_proto(cls, feature_service_proto: FeatureServiceProto):
tags=dict(feature_service_proto.spec.tags),
description=feature_service_proto.spec.description,
owner=feature_service_proto.spec.owner,
logging_config=LoggingConfig.from_proto(
feature_service_proto.spec.logging_config
),
)
fs.feature_view_projections.extend(
[
Expand Down Expand Up @@ -192,6 +199,9 @@ def to_proto(self) -> FeatureServiceProto:
tags=self.tags,
description=self.description,
owner=self.owner,
logging_config=self.logging_config.to_proto()
if self.logging_config
else None,
)

return FeatureServiceProto(spec=spec, meta=meta)
Expand Down
20 changes: 20 additions & 0 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
)

import pandas as pd
import pyarrow as pa
from colorama import Fore, Style
from google.protobuf.timestamp_pb2 import Timestamp
from tqdm import tqdm
Expand Down Expand Up @@ -1976,6 +1977,25 @@ def serve_transformations(self, port: int) -> None:
def _teardown_go_server(self):
self._go_server = None

def write_logged_features(self, logs: pa.Table, source: Union[FeatureService]):
"""
Write logs produced by a source (currently only feature service is supported as a source)
to an offline store.
"""
if not isinstance(source, FeatureService):
raise ValueError("Only feature service is currently supported as a source")

assert (
source.logging_config is not None
), "Feature service must be configured with logging config in order to use this functionality"

self._get_provider().write_feature_service_logs(
feature_service=source,
logs=logs,
config=self.config,
registry=self._registry,
)


def _validate_entity_values(join_key_values: Dict[str, List[Value]]):
set_of_row_lengths = {len(v) for v in join_key_values.values()}
Expand Down
Loading