diff --git a/protos/feast/core/FeatureService.proto b/protos/feast/core/FeatureService.proto index 2295677583..c04fa97507 100644 --- a/protos/feast/core/FeatureService.proto +++ b/protos/feast/core/FeatureService.proto @@ -5,6 +5,7 @@ option go_package = "github.com/feast-dev/feast/go/protos/feast/core"; option java_outer_classname = "FeatureServiceProto"; option java_package = "feast.proto.core"; +import "google/protobuf/duration.proto"; import "google/protobuf/timestamp.proto"; import "feast/core/FeatureViewProjection.proto"; @@ -35,6 +36,9 @@ message FeatureServiceSpec { // Owner of the feature service. string owner = 6; + + // (optional) if provided logging will be enabled for this feature service. + LoggingConfig logging_config = 7; } @@ -46,3 +50,45 @@ message FeatureServiceMeta { google.protobuf.Timestamp last_updated_timestamp = 2; } + + +message LoggingConfig { + float sample_rate = 1; + google.protobuf.Duration partition_interval = 2; + + oneof destination { + FileDestination file_destination = 3; + BigQueryDestination bigquery_destination = 4; + RedshiftDestination redshift_destination = 5; + SnowflakeDestination snowflake_destination = 6; + CustomDestination custom_destination = 7; + } + + message FileDestination { + string path = 1; + string s3_endpoint_override = 2; + + // column names to use for partitioning + repeated string partition_by = 3; + } + + message BigQueryDestination { + // Full table reference in the form of [project:dataset.table] + string table_ref = 1; + } + + message RedshiftDestination { + // Destination table name. ClusterId and database will be taken from an offline store config + string table_name = 1; + } + + message SnowflakeDestination { + // Destination table name. Schema and database will be taken from an offline store config + string table_name = 1; + } + + message CustomDestination { + string kind = 1; + map config = 2; + } +} diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py new file mode 100644 index 0000000000..e35dbdd9c7 --- /dev/null +++ b/sdk/python/feast/feature_logging.py @@ -0,0 +1,164 @@ +import abc +from typing import TYPE_CHECKING, Dict, Optional, Type, cast + +import pyarrow as pa +from pytz import UTC + +from feast.data_source import DataSource +from feast.embedded_go.type_map import FEAST_TYPE_TO_ARROW_TYPE, PA_TIMESTAMP_TYPE +from feast.errors import ( + FeastObjectNotFoundException, + FeatureViewNotFoundException, + OnDemandFeatureViewNotFoundException, +) +from feast.protos.feast.core.FeatureService_pb2 import ( + LoggingConfig as LoggingConfigProto, +) +from feast.types import from_value_type + +if TYPE_CHECKING: + from feast import FeatureService + from feast.registry import Registry + + +REQUEST_ID_FIELD = "__request_id" +LOG_TIMESTAMP_FIELD = "__log_timestamp" +LOG_DATE_FIELD = "__log_date" + + +class LoggingSource: + """ + Logging source describes object that produces logs (eg, feature service produces logs of served features). + It should be able to provide schema of produced logs table and additional metadata that describes logs data. + """ + + @abc.abstractmethod + def get_schema(self, registry: "Registry") -> pa.Schema: + """ Generate schema for logs destination. """ + raise NotImplementedError + + @abc.abstractmethod + def get_log_timestamp_column(self) -> str: + """ Return timestamp column that must exist in generated schema. """ + raise NotImplementedError + + +class FeatureServiceLoggingSource(LoggingSource): + def __init__(self, feature_service: "FeatureService", project: str): + self._feature_service = feature_service + self._project = project + + def get_schema(self, registry: "Registry") -> pa.Schema: + fields: Dict[str, pa.DataType] = {} + + for projection in self._feature_service.feature_view_projections: + for feature in projection.features: + fields[ + f"{projection.name_to_use()}__{feature.name}" + ] = FEAST_TYPE_TO_ARROW_TYPE[feature.dtype] + fields[ + f"{projection.name_to_use()}__{feature.name}__timestamp" + ] = PA_TIMESTAMP_TYPE + fields[ + f"{projection.name_to_use()}__{feature.name}__status" + ] = pa.int32() + + try: + feature_view = registry.get_feature_view(projection.name, self._project) + except FeatureViewNotFoundException: + try: + on_demand_feature_view = registry.get_on_demand_feature_view( + projection.name, self._project + ) + except OnDemandFeatureViewNotFoundException: + raise FeastObjectNotFoundException( + f"Can't recognize feature view with a name {projection.name}" + ) + + for ( + request_source + ) in on_demand_feature_view.source_request_sources.values(): + for field in request_source.schema: + fields[field.name] = FEAST_TYPE_TO_ARROW_TYPE[field.dtype] + + else: + for entity_name in feature_view.entities: + entity = registry.get_entity(entity_name, self._project) + join_key = projection.join_key_map.get( + entity.join_key, entity.join_key + ) + fields[join_key] = FEAST_TYPE_TO_ARROW_TYPE[ + from_value_type(entity.value_type) + ] + + # system columns + fields[REQUEST_ID_FIELD] = pa.string() + fields[LOG_TIMESTAMP_FIELD] = pa.timestamp("us", tz=UTC) + + return pa.schema( + [pa.field(name, data_type) for name, data_type in fields.items()] + ) + + def get_log_timestamp_column(self) -> str: + return LOG_TIMESTAMP_FIELD + + +class _DestinationRegistry(type): + classes_by_proto_attr_name: Dict[str, Type["LoggingDestination"]] = {} + + def __new__(cls, name, bases, dct): + kls = type.__new__(cls, name, bases, dct) + if dct.get("_proto_attr_name"): + cls.classes_by_proto_attr_name[dct["_proto_attr_name"]] = kls + return kls + + +class LoggingDestination: + """ + Logging destination contains details about where exactly logs should be written inside an offline store. + It is implementation specific - each offline store must implement LoggingDestination subclass. + + Kind of logging destination will be determined by matching attribute name in LoggingConfig protobuf message + and "_proto_kind" property of each subclass. + """ + + _proto_kind: str + + @classmethod + @abc.abstractmethod + def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination": + raise NotImplementedError + + @abc.abstractmethod + def to_proto(self) -> LoggingConfigProto: + raise NotImplementedError + + @abc.abstractmethod + def to_data_source(self) -> DataSource: + """ + Convert this object into a data source to read logs from an offline store. + """ + raise NotImplementedError + + +class LoggingConfig: + destination: LoggingDestination + + def __init__(self, destination: LoggingDestination): + self.destination = destination + + @classmethod + def from_proto(cls, config_proto: LoggingConfigProto) -> Optional["LoggingConfig"]: + proto_kind = cast(str, config_proto.WhichOneof("destination")) + if proto_kind is None: + return + + if proto_kind == "custom_destination": + proto_kind = config_proto.custom_destination.kind + + destination_class = _DestinationRegistry.classes_by_proto_attr_name[proto_kind] + return LoggingConfig(destination=destination_class.from_proto(config_proto)) + + def to_proto(self) -> LoggingConfigProto: + proto = self.destination.to_proto() + return proto diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 492d31a809..bfa48b3bf4 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -5,6 +5,7 @@ from google.protobuf.json_format import MessageToJson from feast.base_feature_view import BaseFeatureView +from feast.feature_logging import LoggingConfig from feast.feature_view import FeatureView from feast.feature_view_projection import FeatureViewProjection from feast.on_demand_feature_view import OnDemandFeatureView @@ -44,6 +45,7 @@ class FeatureService: owner: str created_timestamp: Optional[datetime] = None last_updated_timestamp: Optional[datetime] = None + logging_config: Optional[LoggingConfig] = None @log_exceptions def __init__( @@ -54,6 +56,7 @@ def __init__( tags: Dict[str, str] = None, description: str = "", owner: str = "", + logging_config: Optional[LoggingConfig] = None, ): """ Creates a FeatureService object. @@ -106,6 +109,7 @@ def __init__( self.owner = owner self.created_timestamp = None self.last_updated_timestamp = None + self.logging_config = logging_config def __repr__(self): items = (f"{k} = {v}" for k, v in self.__dict__.items()) @@ -152,6 +156,9 @@ def from_proto(cls, feature_service_proto: FeatureServiceProto): tags=dict(feature_service_proto.spec.tags), description=feature_service_proto.spec.description, owner=feature_service_proto.spec.owner, + logging_config=LoggingConfig.from_proto( + feature_service_proto.spec.logging_config + ), ) fs.feature_view_projections.extend( [ @@ -192,6 +199,9 @@ def to_proto(self) -> FeatureServiceProto: tags=self.tags, description=self.description, owner=self.owner, + logging_config=self.logging_config.to_proto() + if self.logging_config + else None, ) return FeatureServiceProto(spec=spec, meta=meta) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 4f456be384..95b1458849 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -34,6 +34,7 @@ ) import pandas as pd +import pyarrow as pa from colorama import Fore, Style from google.protobuf.timestamp_pb2 import Timestamp from tqdm import tqdm @@ -1976,6 +1977,25 @@ def serve_transformations(self, port: int) -> None: def _teardown_go_server(self): self._go_server = None + def write_logged_features(self, logs: pa.Table, source: Union[FeatureService]): + """ + Write logs produced by a source (currently only feature service is supported as a source) + to an offline store. + """ + if not isinstance(source, FeatureService): + raise ValueError("Only feature service is currently supported as a source") + + assert ( + source.logging_config is not None + ), "Feature service must be configured with logging config in order to use this functionality" + + self._get_provider().write_feature_service_logs( + feature_service=source, + logs=logs, + config=self.config, + registry=self._registry, + ) + def _validate_entity_values(join_key_values: Dict[str, List[Value]]): set_of_row_lengths = {len(v) for v in join_key_values.values()} diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 29d0e029d9..ee024d4d40 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -1,4 +1,5 @@ import contextlib +import tempfile import uuid from datetime import date, datetime, timedelta from typing import ( @@ -28,6 +29,7 @@ FeastProviderLoginError, InvalidEntityType, ) +from feast.feature_logging import LoggingConfig, LoggingSource from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView from feast.infra.offline_stores import offline_utils from feast.infra.offline_stores.offline_store import ( @@ -41,13 +43,18 @@ from ...saved_dataset import SavedDatasetStorage from ...usage import log_exceptions_and_usage -from .bigquery_source import BigQuerySource, SavedDatasetBigQueryStorage +from .bigquery_source import ( + BigQueryLoggingDestination, + BigQuerySource, + SavedDatasetBigQueryStorage, +) try: from google.api_core.exceptions import NotFound from google.auth.exceptions import DefaultCredentialsError from google.cloud import bigquery - from google.cloud.bigquery import Client, Table + from google.cloud.bigquery import Client, SchemaField, Table + from google.cloud.bigquery._pandas_helpers import ARROW_SCALAR_IDS_TO_BQ except ImportError as e: from feast.errors import FeastExtrasDependencyImportError @@ -248,6 +255,42 @@ def query_generator() -> Iterator[str]: ), ) + @staticmethod + def write_logged_features( + config: RepoConfig, + data: pyarrow.Table, + source: LoggingSource, + logging_config: LoggingConfig, + registry: Registry, + ): + destination = logging_config.destination + assert isinstance(destination, BigQueryLoggingDestination) + + client = _get_bigquery_client( + project=config.offline_store.project_id, + location=config.offline_store.location, + ) + + job_config = bigquery.LoadJobConfig( + source_format=bigquery.SourceFormat.PARQUET, + schema=arrow_schema_to_bq_schema(source.get_schema(registry)), + time_partitioning=bigquery.TimePartitioning( + type_=bigquery.TimePartitioningType.DAY, + field=source.get_log_timestamp_column(), + ), + ) + + with tempfile.TemporaryFile() as parquet_temp_file: + pyarrow.parquet.write_table(table=data, where=parquet_temp_file) + + parquet_temp_file.seek(0) + + client.load_table_from_file( + file_obj=parquet_temp_file, + destination=destination.table, + job_config=job_config, + ) + class BigQueryRetrievalJob(RetrievalJob): def __init__( @@ -513,7 +556,9 @@ def _get_entity_df_event_timestamp_range( return entity_df_event_timestamp_range -def _get_bigquery_client(project: Optional[str] = None, location: Optional[str] = None): +def _get_bigquery_client( + project: Optional[str] = None, location: Optional[str] = None +) -> bigquery.Client: try: client = bigquery.Client(project=project, location=location) except DefaultCredentialsError as e: @@ -533,6 +578,24 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str] return client +def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField]: + bq_schema = [] + + for field in arrow_schema: + if pyarrow.types.is_list(field.type): + detected_mode = "REPEATED" + detected_type = ARROW_SCALAR_IDS_TO_BQ[field.type.value_type.id] + else: + detected_mode = "NULLABLE" + detected_type = ARROW_SCALAR_IDS_TO_BQ[field.type.id] + + bq_schema.append( + SchemaField(name=field.name, field_type=detected_type, mode=detected_mode) + ) + + return bq_schema + + # TODO: Optimizations # * Use GENERATE_UUID() instead of ROW_NUMBER(), or join on entity columns directly # * Precompute ROW_NUMBER() so that it doesn't have to be recomputed for every query on entity_dataframe diff --git a/sdk/python/feast/infra/offline_stores/bigquery_source.py b/sdk/python/feast/infra/offline_stores/bigquery_source.py index 001576c98f..f66b2066bd 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery_source.py +++ b/sdk/python/feast/infra/offline_stores/bigquery_source.py @@ -4,7 +4,11 @@ from feast import type_map from feast.data_source import DataSource from feast.errors import DataSourceNotFoundException +from feast.feature_logging import LoggingDestination from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.protos.feast.core.FeatureService_pb2 import ( + LoggingConfig as LoggingConfigProto, +) from feast.protos.feast.core.SavedDataset_pb2 import ( SavedDatasetStorage as SavedDatasetStorageProto, ) @@ -253,3 +257,28 @@ def to_proto(self) -> SavedDatasetStorageProto: def to_data_source(self) -> DataSource: return BigQuerySource(table=self.bigquery_options.table) + + +class BigQueryLoggingDestination(LoggingDestination): + _proto_kind = "bigquery_destination" + + table: str + + def __init__(self, *, table_ref): + self.table = table_ref + + @classmethod + def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination": + return BigQueryLoggingDestination( + table_ref=config_proto.bigquery_destination.table_ref, + ) + + def to_data_source(self) -> DataSource: + return BigQuerySource(table=self.table) + + def to_proto(self) -> LoggingConfigProto: + return LoggingConfigProto( + bigquery_destination=LoggingConfigProto.BigQueryDestination( + table_ref=self.table + ) + ) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 052d546748..a85cd880b1 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -4,14 +4,19 @@ import dask.dataframe as dd import pandas as pd import pyarrow +import pyarrow.parquet import pytz from pydantic.typing import Literal from feast import FileSource, OnDemandFeatureView from feast.data_source import DataSource from feast.errors import FeastJoinKeysDuringMaterialization +from feast.feature_logging import LoggingConfig, LoggingSource from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView -from feast.infra.offline_stores.file_source import SavedDatasetFileStorage +from feast.infra.offline_stores.file_source import ( + FileLoggingDestination, + SavedDatasetFileStorage, +) from feast.infra.offline_stores.offline_store import ( OfflineStore, RetrievalJob, @@ -367,6 +372,28 @@ def pull_all_from_table_or_query( end_date=end_date, ) + @staticmethod + def write_logged_features( + config: RepoConfig, + data: pyarrow.Table, + source: LoggingSource, + logging_config: LoggingConfig, + registry: Registry, + ): + destination = logging_config.destination + assert isinstance(destination, FileLoggingDestination) + + filesystem, path = FileSource.create_filesystem_and_path( + destination.path, destination.s3_endpoint_override, + ) + + pyarrow.parquet.write_to_dataset( + data, + root_path=path, + partition_cols=destination.partition_by, + filesystem=filesystem, + ) + def _get_entity_df_event_timestamp_range( entity_df: Union[pd.DataFrame, str], entity_df_event_timestamp_col: str, diff --git a/sdk/python/feast/infra/offline_stores/file_source.py b/sdk/python/feast/infra/offline_stores/file_source.py index a6fc7a1600..68c1835cb0 100644 --- a/sdk/python/feast/infra/offline_stores/file_source.py +++ b/sdk/python/feast/infra/offline_stores/file_source.py @@ -1,5 +1,5 @@ import warnings -from typing import Callable, Dict, Iterable, Optional, Tuple +from typing import Callable, Dict, Iterable, List, Optional, Tuple from pyarrow._fs import FileSystem from pyarrow._s3fs import S3FileSystem @@ -8,7 +8,11 @@ from feast import type_map from feast.data_format import FileFormat, ParquetFormat from feast.data_source import DataSource +from feast.feature_logging import LoggingDestination from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.protos.feast.core.FeatureService_pb2 import ( + LoggingConfig as LoggingConfigProto, +) from feast.protos.feast.core.SavedDataset_pb2 import ( SavedDatasetStorage as SavedDatasetStorageProto, ) @@ -290,3 +294,48 @@ def to_data_source(self) -> DataSource: file_format=self.file_options.file_format, s3_endpoint_override=self.file_options.s3_endpoint_override, ) + + +class FileLoggingDestination(LoggingDestination): + _proto_kind = "file_destination" + + path: str + s3_endpoint_override: str + partition_by: Optional[List[str]] + + def __init__( + self, + *, + path: str, + s3_endpoint_override="", + partition_by: Optional[List[str]] = None, + ): + self.path = path + self.s3_endpoint_override = s3_endpoint_override + self.partition_by = partition_by + + @classmethod + def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination": + return FileLoggingDestination( + path=config_proto.file_destination.path, + s3_endpoint_override=config_proto.file_destination.s3_endpoint_override, + partition_by=list(config_proto.file_destination.partition_by) + if config_proto.file_destination.partition_by + else None, + ) + + def to_proto(self) -> LoggingConfigProto: + return LoggingConfigProto( + file_destination=LoggingConfigProto.FileDestination( + path=self.path, + s3_endpoint_override=self.s3_endpoint_override, + partition_by=self.partition_by, + ) + ) + + def to_data_source(self) -> DataSource: + return FileSource( + path=self.path, + file_format=ParquetFormat(), + s3_endpoint_override=self.s3_endpoint_override, + ) diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 83f20bb3e5..ed545ed5ad 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -21,6 +21,7 @@ from feast.data_source import DataSource from feast.dqm.errors import ValidationFailed +from feast.feature_logging import LoggingConfig, LoggingSource from feast.feature_view import FeatureView from feast.on_demand_feature_view import OnDemandFeatureView from feast.registry import Registry @@ -241,3 +242,28 @@ def pull_all_from_table_or_query( end_date: Ending date of query """ pass + + @staticmethod + def write_logged_features( + config: RepoConfig, + data: pyarrow.Table, + source: LoggingSource, + logging_config: LoggingConfig, + registry: Registry, + ): + """ + Write logged features to a specified destination (taken from logging_config) in the offline store. + Data can be appended to an existing table (destination) or a new one will be created automatically + (if it doesn't exist). + Hence, this function can be called repeatedly with the same destination to flush logs in chunks. + + Args: + config: Repo configuration object + data: Arrow table produced by logging source. + source: Logging source that provides schema and some additional metadata. + logging_config: used to determine destination + registry: Feast registry + + This is an optional method that could be supported only be some stores. + """ + raise NotImplementedError() diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index cd309c92b2..fb5759cbbb 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -14,6 +14,7 @@ import numpy as np import pandas as pd +import pyarrow import pyarrow as pa from dateutil import parser from pydantic import StrictStr @@ -23,6 +24,7 @@ from feast import OnDemandFeatureView, RedshiftSource from feast.data_source import DataSource from feast.errors import InvalidEntityType +from feast.feature_logging import LoggingConfig, LoggingSource from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView from feast.infra.offline_stores import offline_utils from feast.infra.offline_stores.offline_store import ( @@ -30,7 +32,10 @@ RetrievalJob, RetrievalMetadata, ) -from feast.infra.offline_stores.redshift_source import SavedDatasetRedshiftStorage +from feast.infra.offline_stores.redshift_source import ( + RedshiftLoggingDestination, + SavedDatasetRedshiftStorage, +) from feast.infra.utils import aws_utils from feast.registry import Registry from feast.repo_config import FeastConfigBaseModel, RepoConfig @@ -257,6 +262,37 @@ def query_generator() -> Iterator[str]: ), ) + @staticmethod + def write_logged_features( + config: RepoConfig, + data: pyarrow.Table, + source: LoggingSource, + logging_config: LoggingConfig, + registry: Registry, + ): + destination = logging_config.destination + assert isinstance(destination, RedshiftLoggingDestination) + + redshift_client = aws_utils.get_redshift_data_client( + config.offline_store.region + ) + s3_resource = aws_utils.get_s3_resource(config.offline_store.region) + s3_path = f"{config.offline_store.s3_staging_location}/logged_features/{uuid.uuid4()}.parquet" + + aws_utils.upload_arrow_table_to_redshift( + table=data, + redshift_data_client=redshift_client, + cluster_id=config.offline_store.cluster_id, + database=config.offline_store.database, + user=config.offline_store.user, + s3_resource=s3_resource, + s3_path=s3_path, + iam_role=config.offline_store.iam_role, + table_name=destination.table_name, + schema=source.get_schema(registry), + fail_if_exists=False, + ) + class RedshiftRetrievalJob(RetrievalJob): def __init__( diff --git a/sdk/python/feast/infra/offline_stores/redshift_source.py b/sdk/python/feast/infra/offline_stores/redshift_source.py index 00af8c1abf..93d811998a 100644 --- a/sdk/python/feast/infra/offline_stores/redshift_source.py +++ b/sdk/python/feast/infra/offline_stores/redshift_source.py @@ -4,7 +4,11 @@ from feast import type_map from feast.data_source import DataSource from feast.errors import DataSourceNotFoundException, RedshiftCredentialsError +from feast.feature_logging import LoggingDestination from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.protos.feast.core.FeatureService_pb2 import ( + LoggingConfig as LoggingConfigProto, +) from feast.protos.feast.core.SavedDataset_pb2 import ( SavedDatasetStorage as SavedDatasetStorageProto, ) @@ -327,3 +331,28 @@ def to_proto(self) -> SavedDatasetStorageProto: def to_data_source(self) -> DataSource: return RedshiftSource(table=self.redshift_options.table) + + +class RedshiftLoggingDestination(LoggingDestination): + _proto_kind = "redshift_destination" + + table_name: str + + def __init__(self, *, table_name: str): + self.table_name = table_name + + @classmethod + def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination": + return RedshiftLoggingDestination( + table_name=config_proto.redshift_destination.table_name, + ) + + def to_proto(self) -> LoggingConfigProto: + return LoggingConfigProto( + redshift_destination=LoggingConfigProto.RedshiftDestination( + table_name=self.table_name + ) + ) + + def to_data_source(self) -> DataSource: + return RedshiftSource(table=self.table_name) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index a07f7a57c6..4cf6716c5e 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -16,6 +16,7 @@ import numpy as np import pandas as pd +import pyarrow import pyarrow as pa from pydantic import Field from pydantic.typing import Literal @@ -24,6 +25,7 @@ from feast import OnDemandFeatureView from feast.data_source import DataSource from feast.errors import InvalidEntityType +from feast.feature_logging import LoggingConfig, LoggingSource from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView from feast.infra.offline_stores import offline_utils from feast.infra.offline_stores.offline_store import ( @@ -33,6 +35,7 @@ ) from feast.infra.offline_stores.snowflake_source import ( SavedDatasetSnowflakeStorage, + SnowflakeLoggingDestination, SnowflakeSource, ) from feast.infra.utils.snowflake_utils import ( @@ -274,6 +277,25 @@ def query_generator() -> Iterator[str]: ), ) + @staticmethod + def write_logged_features( + config: RepoConfig, + data: pyarrow.Table, + source: LoggingSource, + logging_config: LoggingConfig, + registry: Registry, + ): + assert isinstance(logging_config.destination, SnowflakeLoggingDestination) + + snowflake_conn = get_snowflake_conn(config.offline_store) + + write_pandas( + snowflake_conn, + data.to_pandas(), + table_name=logging_config.destination.table_name, + auto_create_table=True, + ) + class SnowflakeRetrievalJob(RetrievalJob): def __init__( diff --git a/sdk/python/feast/infra/offline_stores/snowflake_source.py b/sdk/python/feast/infra/offline_stores/snowflake_source.py index 904fc48043..ef3ae52d28 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake_source.py +++ b/sdk/python/feast/infra/offline_stores/snowflake_source.py @@ -3,7 +3,11 @@ from feast import type_map from feast.data_source import DataSource +from feast.feature_logging import LoggingDestination from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.protos.feast.core.FeatureService_pb2 import ( + LoggingConfig as LoggingConfigProto, +) from feast.protos.feast.core.SavedDataset_pb2 import ( SavedDatasetStorage as SavedDatasetStorageProto, ) @@ -329,3 +333,28 @@ def to_proto(self) -> SavedDatasetStorageProto: def to_data_source(self) -> DataSource: return SnowflakeSource(table=self.snowflake_options.table) + + +class SnowflakeLoggingDestination(LoggingDestination): + table_name: str + + _proto_kind = "snowflake_destination" + + def __init__(self, *, table_name: str): + self.table_name = table_name + + @classmethod + def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination": + return SnowflakeLoggingDestination( + table_name=config_proto.snowflake_destination.table_name, + ) + + def to_proto(self) -> LoggingConfigProto: + return LoggingConfigProto( + snowflake_destination=LoggingConfigProto.SnowflakeDestination( + table_name=self.table_name, + ) + ) + + def to_data_source(self) -> DataSource: + return SnowflakeSource(table=self.table_name,) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 09ca98d86d..6364297b1e 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -2,10 +2,13 @@ from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union import pandas +import pyarrow import pyarrow as pa from tqdm import tqdm +from feast import FeatureService from feast.entity import Entity +from feast.feature_logging import FeatureServiceLoggingSource from feast.feature_view import FeatureView from feast.infra.offline_stores.offline_store import RetrievalJob from feast.infra.offline_stores.offline_utils import get_offline_store_from_config @@ -214,3 +217,50 @@ def retrieve_saved_dataset( start_date=make_tzaware(dataset.min_event_timestamp), # type: ignore end_date=make_tzaware(dataset.max_event_timestamp + timedelta(seconds=1)), # type: ignore ) + + def write_feature_service_logs( + self, + feature_service: FeatureService, + logs: pyarrow.Table, + config: RepoConfig, + registry: Registry, + ): + assert ( + feature_service.logging_config is not None + ), "Logging should be configured for the feature service before calling this function" + + self.offline_store.write_logged_features( + config=config, + data=logs, + source=FeatureServiceLoggingSource(feature_service, config.project), + logging_config=feature_service.logging_config, + registry=registry, + ) + + def retrieve_feature_service_logs( + self, + feature_service: FeatureService, + start_date: datetime, + end_date: datetime, + config: RepoConfig, + registry: Registry, + ) -> RetrievalJob: + assert ( + feature_service.logging_config is not None + ), "Logging should be configured for the feature service before calling this function" + + logging_source = FeatureServiceLoggingSource(feature_service, config.project) + schema = logging_source.get_schema(registry) + logging_config = feature_service.logging_config + ts_column = logging_source.get_log_timestamp_column() + columns = list(set(schema.names) - {ts_column}) + + return self.offline_store.pull_all_from_table_or_query( + config=config, + data_source=logging_config.destination.to_data_source(), + join_key_columns=[], + feature_name_columns=columns, + timestamp_field=ts_column, + start_date=start_date, + end_date=end_date, + ) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index a71bd6d2d0..f8c2a4482f 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -9,7 +9,7 @@ import pyarrow from tqdm import tqdm -from feast import errors +from feast import FeatureService, errors from feast.entity import Entity from feast.feature_view import DUMMY_ENTITY_ID, FeatureView from feast.importer import import_class @@ -186,6 +186,41 @@ def retrieve_saved_dataset( """ ... + @abc.abstractmethod + def write_feature_service_logs( + self, + feature_service: FeatureService, + logs: pyarrow.Table, + config: RepoConfig, + registry: Registry, + ): + """ + Write features and entities logged by a feature server to an offline store. + + Schema of logs table is being inferred from the provided feature service. + Only feature services with configured logging are accepted. + """ + ... + + @abc.abstractmethod + def retrieve_feature_service_logs( + self, + feature_service: FeatureService, + from_: datetime, + to: datetime, + config: RepoConfig, + registry: Registry, + ) -> RetrievalJob: + """ + Read logged features from an offline store for a given time window [from, to). + Target table is determined based on logging configuration from the feature service. + + Returns: + RetrievalJob object, which wraps the query to the offline store. + + """ + ... + def get_feature_server_endpoint(self) -> Optional[str]: """Returns endpoint for the feature server, if it exists.""" return None diff --git a/sdk/python/feast/infra/utils/aws_utils.py b/sdk/python/feast/infra/utils/aws_utils.py index fe5eed774e..d73c484b29 100644 --- a/sdk/python/feast/infra/utils/aws_utils.py +++ b/sdk/python/feast/infra/utils/aws_utils.py @@ -5,6 +5,7 @@ from typing import Any, Dict, Iterator, Optional, Tuple import pandas as pd +import pyarrow import pyarrow as pa import pyarrow.parquet as pq from tenacity import ( @@ -194,13 +195,6 @@ def upload_df_to_redshift( The caller is responsible for deleting the table when no longer necessary. - Here's how the upload process works: - 1. Pandas DataFrame is converted to PyArrow Table - 2. PyArrow Table is serialized into a Parquet format on local disk - 3. The Parquet file is uploaded to S3 - 4. The S3 file is uploaded to Redshift as a new table through COPY command - 5. The local disk & s3 paths are cleaned up - Args: redshift_data_client: Redshift Data API Service client cluster_id: Redshift Cluster Identifier @@ -216,10 +210,6 @@ def upload_df_to_redshift( Raises: RedshiftTableNameTooLong: The specified table name is too long. """ - if len(table_name) > REDSHIFT_TABLE_NAME_MAX_LENGTH: - raise RedshiftTableNameTooLong(table_name) - - bucket, key = get_bucket_and_key(s3_path) # Drop the index so that we dont have unnecessary columns df.reset_index(drop=True, inplace=True) @@ -231,35 +221,92 @@ def upload_df_to_redshift( # More details at: # https://pandas.pydata.org/pandas-docs/stable/user_guide/missing_data.html#values-considered-missing table = pa.Table.from_pandas(df) - column_names, column_types = [], [] - for field in table.schema: - column_names.append(field.name) - column_types.append(pa_to_redshift_value_type(field.type)) + upload_arrow_table_to_redshift( + table, + redshift_data_client, + cluster_id=cluster_id, + database=database, + user=user, + s3_resource=s3_resource, + iam_role=iam_role, + s3_path=s3_path, + table_name=table_name, + ) + + +def upload_arrow_table_to_redshift( + table: pyarrow.Table, + redshift_data_client, + cluster_id: str, + database: str, + user: str, + s3_resource, + iam_role: str, + s3_path: str, + table_name: str, + schema: Optional[pyarrow.Schema] = None, + fail_if_exists: bool = True, +): + """Uploads an Arrow Table to Redshift to a new or existing table. + + Here's how the upload process works: + 1. PyArrow Table is serialized into a Parquet format on local disk + 2. The Parquet file is uploaded to S3 + 3. The S3 file is uploaded to Redshift as a new table through COPY command + 4. The local disk & s3 paths are cleaned up + + Args: + redshift_data_client: Redshift Data API Service client + cluster_id: Redshift Cluster Identifier + database: Redshift Database Name + user: Redshift username + s3_resource: S3 Resource object + s3_path: S3 path where the Parquet file is temporarily uploaded + iam_role: IAM Role for Redshift to assume during the COPY command. + The role must grant permission to read the S3 location. + table_name: The name of the new Redshift table where we copy the dataframe + table: The Arrow Table to upload + schema: (Optionally) client may provide arrow Schema which will be converted into redshift table schema + fail_if_exists: fail if table with such name exists or append data to existing table + + Raises: + RedshiftTableNameTooLong: The specified table name is too long. + """ + if len(table_name) > REDSHIFT_TABLE_NAME_MAX_LENGTH: + raise RedshiftTableNameTooLong(table_name) + + bucket, key = get_bucket_and_key(s3_path) + + schema = schema or table.schema column_query_list = ", ".join( - [ - f"{column_name} {column_type}" - for column_name, column_type in zip(column_names, column_types) - ] + [f"{field.name} {pa_to_redshift_value_type(field.type)}" for field in schema] ) # Write the PyArrow Table on disk in Parquet format and upload it to S3 - with tempfile.TemporaryDirectory() as temp_dir: - file_path = f"{temp_dir}/{uuid.uuid4()}.parquet" - pq.write_table(table, file_path) - s3_resource.Object(bucket, key).put(Body=open(file_path, "rb")) + with tempfile.TemporaryFile(suffix=".parquet") as parquet_temp_file: + pq.write_table(table, parquet_temp_file) + parquet_temp_file.seek(0) + s3_resource.Object(bucket, key).put(Body=parquet_temp_file) - # Create the table with the desired schema and - # copy the Parquet file contents to the Redshift table - create_and_copy_query = ( - f"CREATE TABLE {table_name}({column_query_list}); " - + f"COPY {table_name} FROM '{s3_path}' IAM_ROLE '{iam_role}' FORMAT AS PARQUET" + copy_query = ( + f"COPY {table_name} FROM '{s3_path}' IAM_ROLE '{iam_role}' FORMAT AS PARQUET" ) - execute_redshift_statement( - redshift_data_client, cluster_id, database, user, create_and_copy_query + create_query = ( + f"CREATE TABLE {'IF NOT EXISTS' if not fail_if_exists else ''}" + f" {table_name}({column_query_list})" ) - # Clean up S3 temporary data - s3_resource.Object(bucket, key).delete() + try: + execute_redshift_statement( + redshift_data_client, + cluster_id, + database, + user, + f"{create_query}; {copy_query}", + ) + finally: + # Clean up S3 temporary data + s3_resource.Object(bucket, key).delete() @contextlib.contextmanager diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index 1d4ce7d6cb..bccf7931b5 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -2,9 +2,10 @@ from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union import pandas +import pyarrow from tqdm import tqdm -from feast import Entity, FeatureView, RepoConfig +from feast import Entity, FeatureService, FeatureView, RepoConfig from feast.infra.offline_stores.offline_store import RetrievalJob from feast.infra.provider import Provider from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto @@ -79,3 +80,22 @@ def online_read( def retrieve_saved_dataset(self, config: RepoConfig, dataset: SavedDataset): pass + + def write_feature_service_logs( + self, + feature_service: FeatureService, + logs: pyarrow.Table, + config: RepoConfig, + registry: Registry, + ): + pass + + def retrieve_feature_service_logs( + self, + feature_service: FeatureService, + from_: datetime, + to: datetime, + config: RepoConfig, + registry: Registry, + ) -> RetrievalJob: + pass diff --git a/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py index ba36f8e89b..b36af0db47 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py @@ -4,6 +4,7 @@ import pandas as pd from feast.data_source import DataSource +from feast.feature_logging import LoggingDestination from feast.repo_config import FeastConfigBaseModel from feast.saved_dataset import SavedDatasetStorage @@ -51,6 +52,9 @@ def create_offline_store_config(self) -> FeastConfigBaseModel: def create_saved_dataset_destination(self) -> SavedDatasetStorage: ... + def create_logged_features_destination(self) -> LoggingDestination: + pass + @abstractmethod def teardown(self): ... diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py index 881f547617..620f444159 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py @@ -7,8 +7,12 @@ from feast import BigQuerySource from feast.data_source import DataSource +from feast.feature_logging import LoggingDestination from feast.infra.offline_stores.bigquery import BigQueryOfflineStoreConfig -from feast.infra.offline_stores.bigquery_source import SavedDatasetBigQueryStorage +from feast.infra.offline_stores.bigquery_source import ( + BigQueryLoggingDestination, + SavedDatasetBigQueryStorage, +) from tests.integration.feature_repos.universal.data_source_creator import ( DataSourceCreator, ) @@ -86,5 +90,11 @@ def create_saved_dataset_destination(self) -> SavedDatasetBigQueryStorage: ) return SavedDatasetBigQueryStorage(table=table) + def create_logged_features_destination(self) -> LoggingDestination: + table = self.get_prefixed_table_name( + f"logged_features_{str(uuid.uuid4()).replace('-', '_')}" + ) + return BigQueryLoggingDestination(table_ref=table) + def get_prefixed_table_name(self, suffix: str) -> str: return f"{self.client.project}.{self.project_name}.{suffix}" diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py index 64c3aeacf3..ccc1544bb8 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py @@ -1,3 +1,5 @@ +import os.path +import shutil import tempfile import uuid from typing import Any, Dict, List, Optional @@ -10,8 +12,12 @@ from feast import FileSource from feast.data_format import ParquetFormat from feast.data_source import DataSource +from feast.feature_logging import LoggingDestination from feast.infra.offline_stores.file import FileOfflineStoreConfig -from feast.infra.offline_stores.file_source import SavedDatasetFileStorage +from feast.infra.offline_stores.file_source import ( + FileLoggingDestination, + SavedDatasetFileStorage, +) from feast.repo_config import FeastConfigBaseModel from tests.integration.feature_repos.universal.data_source_creator import ( DataSourceCreator, @@ -20,10 +26,12 @@ class FileDataSourceCreator(DataSourceCreator): files: List[Any] + dirs: List[Any] def __init__(self, project_name: str, *args, **kwargs): super().__init__(project_name) self.files = [] + self.dirs = [] def create_data_source( self, @@ -53,6 +61,7 @@ def create_data_source( def create_saved_dataset_destination(self) -> SavedDatasetFileStorage: d = tempfile.mkdtemp(prefix=self.project_name) + self.dirs.append(d) return SavedDatasetFileStorage( path=d, file_format=ParquetFormat(), s3_endpoint_override=None ) @@ -63,10 +72,20 @@ def get_prefixed_table_name(self, suffix: str) -> str: def create_offline_store_config(self) -> FeastConfigBaseModel: return FileOfflineStoreConfig() + def create_logged_features_destination(self) -> LoggingDestination: + d = tempfile.mkdtemp(prefix=self.project_name) + self.dirs.append(d) + return FileLoggingDestination(path=d) + def teardown(self): for f in self.files: f.close() + for d in self.dirs: + if not os.path.exists(d): + continue + shutil.rmtree(d) + class S3FileDataSourceCreator(DataSourceCreator): f: Any @@ -143,6 +162,15 @@ def create_saved_dataset_destination(self) -> SavedDatasetFileStorage: s3_endpoint_override=f"http://{host}:{port}", ) + def create_logged_features_destination(self) -> LoggingDestination: + port = self.minio.get_exposed_port("9000") + host = self.minio.get_container_host_ip() + + return FileLoggingDestination( + path=f"s3://{self.bucket}/logged_features/{str(uuid.uuid4())}", + s3_endpoint_override=f"http://{host}:{port}", + ) + def get_prefixed_table_name(self, suffix: str) -> str: return f"{suffix}" diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py index 7e305fee80..3b2794393f 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py @@ -5,8 +5,12 @@ from feast import RedshiftSource from feast.data_source import DataSource +from feast.feature_logging import LoggingDestination from feast.infra.offline_stores.redshift import RedshiftOfflineStoreConfig -from feast.infra.offline_stores.redshift_source import SavedDatasetRedshiftStorage +from feast.infra.offline_stores.redshift_source import ( + RedshiftLoggingDestination, + SavedDatasetRedshiftStorage, +) from feast.infra.utils import aws_utils from feast.repo_config import FeastConfigBaseModel from tests.integration.feature_repos.universal.data_source_creator import ( @@ -74,6 +78,14 @@ def create_saved_dataset_destination(self) -> SavedDatasetRedshiftStorage: return SavedDatasetRedshiftStorage(table_ref=table) + def create_logged_features_destination(self) -> LoggingDestination: + table = self.get_prefixed_table_name( + f"persisted_ds_{str(uuid.uuid4()).replace('-', '_')}" + ) + self.tables.append(table) + + return RedshiftLoggingDestination(table_name=table) + def create_offline_store_config(self) -> FeastConfigBaseModel: return self.offline_store_config diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py index 3942444f32..23466bc00c 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py @@ -6,8 +6,12 @@ from feast import SnowflakeSource from feast.data_source import DataSource +from feast.feature_logging import LoggingDestination from feast.infra.offline_stores.snowflake import SnowflakeOfflineStoreConfig -from feast.infra.offline_stores.snowflake_source import SavedDatasetSnowflakeStorage +from feast.infra.offline_stores.snowflake_source import ( + SavedDatasetSnowflakeStorage, + SnowflakeLoggingDestination, +) from feast.infra.utils.snowflake_utils import get_snowflake_conn, write_pandas from feast.repo_config import FeastConfigBaseModel from tests.integration.feature_repos.universal.data_source_creator import ( @@ -66,6 +70,14 @@ def create_saved_dataset_destination(self) -> SavedDatasetSnowflakeStorage: return SavedDatasetSnowflakeStorage(table_ref=table) + def create_logged_features_destination(self) -> LoggingDestination: + table = self.get_prefixed_table_name( + f"logged_features_{str(uuid.uuid4()).replace('-', '_')}" + ) + self.tables.append(table) + + return SnowflakeLoggingDestination(table_name=table) + def create_offline_store_config(self) -> FeastConfigBaseModel: return self.offline_store_config diff --git a/sdk/python/tests/integration/offline_store/test_feature_logging.py b/sdk/python/tests/integration/offline_store/test_feature_logging.py new file mode 100644 index 0000000000..f15eb8a849 --- /dev/null +++ b/sdk/python/tests/integration/offline_store/test_feature_logging.py @@ -0,0 +1,124 @@ +import datetime +import uuid + +import numpy as np +import pandas as pd +import pyarrow as pa +import pytest +from google.api_core.exceptions import NotFound + +from feast.feature_logging import ( + LOG_DATE_FIELD, + LOG_TIMESTAMP_FIELD, + REQUEST_ID_FIELD, + FeatureServiceLoggingSource, + LoggingConfig, +) +from feast.feature_service import FeatureService +from feast.protos.feast.serving.ServingService_pb2 import FieldStatus +from feast.wait import wait_retry_backoff +from tests.integration.feature_repos.repo_configuration import ( + UniversalDatasets, + construct_universal_feature_views, +) +from tests.integration.feature_repos.universal.entities import driver +from tests.integration.feature_repos.universal.feature_views import conv_rate_plus_100 + + +@pytest.mark.integration +@pytest.mark.universal +def test_feature_service_logging(environment, universal_data_sources): + store = environment.feature_store + + (_, datasets, data_sources) = universal_data_sources + + feature_views = construct_universal_feature_views(data_sources) + store.apply([driver(), *feature_views.values()]) + + logs_df = prepare_logs(datasets) + + feature_service = FeatureService( + name="test_service", + features=[ + feature_views.driver[["conv_rate", "avg_daily_trips"]], + feature_views.driver_odfv[ + ["conv_rate_plus_val_to_add", "conv_rate_plus_100_rounded"] + ], + ], + logging_config=LoggingConfig( + destination=environment.data_source_creator.create_logged_features_destination() + ), + ) + + num_rows = logs_df.shape[0] + first_batch = logs_df.iloc[: num_rows // 2, :] + second_batch = logs_df.iloc[num_rows // 2 :, :] + + schema = FeatureServiceLoggingSource( + feature_service=feature_service, project=store.project + ).get_schema(store._registry) + + store.write_logged_features( + source=feature_service, logs=pa.Table.from_pandas(first_batch, schema=schema), + ) + + store.write_logged_features( + source=feature_service, logs=pa.Table.from_pandas(second_batch, schema=schema), + ) + expected_columns = list(set(logs_df.columns) - {LOG_DATE_FIELD}) + + def retrieve(): + retrieval_job = store._get_provider().retrieve_feature_service_logs( + feature_service=feature_service, + start_date=logs_df[LOG_TIMESTAMP_FIELD].min(), + end_date=logs_df[LOG_TIMESTAMP_FIELD].max() + datetime.timedelta(seconds=1), + config=store.config, + registry=store._registry, + ) + try: + df = retrieval_job.to_df() + except NotFound: + # Table was not created yet + return None, False + + return df, df.shape[0] == logs_df.shape[0] + + persisted_logs = wait_retry_backoff( + retrieve, timeout_secs=60, timeout_msg="Logs retrieval failed" + ) + + persisted_logs = persisted_logs[expected_columns] + logs_df = logs_df[expected_columns] + pd.testing.assert_frame_equal( + logs_df.sort_values(REQUEST_ID_FIELD).reset_index(drop=True), + persisted_logs.sort_values(REQUEST_ID_FIELD).reset_index(drop=True), + check_dtype=False, + ) + + +def prepare_logs(datasets: UniversalDatasets) -> pd.DataFrame: + driver_df = datasets.driver_df + driver_df["val_to_add"] = 50 + driver_df = driver_df.join(conv_rate_plus_100(driver_df)) + num_rows = driver_df.shape[0] + + logs_df = driver_df[["driver_id", "val_to_add"]] + logs_df[REQUEST_ID_FIELD] = [str(uuid.uuid4()) for _ in range(num_rows)] + logs_df[LOG_TIMESTAMP_FIELD] = pd.Series( + np.random.randint(0, 7 * 24 * 3600, num_rows) + ).map(lambda secs: pd.Timestamp.utcnow() - datetime.timedelta(seconds=secs)) + logs_df[LOG_DATE_FIELD] = logs_df[LOG_TIMESTAMP_FIELD].dt.date + + for view, features in ( + ("driver_stats", ("conv_rate", "avg_daily_trips")), + ( + "conv_rate_plus_100", + ("conv_rate_plus_val_to_add", "conv_rate_plus_100_rounded"), + ), + ): + for feature in features: + logs_df[f"{view}__{feature}"] = driver_df[feature] + logs_df[f"{view}__{feature}__timestamp"] = driver_df["event_timestamp"] + logs_df[f"{view}__{feature}__status"] = FieldStatus.PRESENT + + return logs_df