diff --git a/Makefile b/Makefile index 30ac86e891..3eb92c263f 100644 --- a/Makefile +++ b/Makefile @@ -218,7 +218,28 @@ test-python-universal-postgres-offline: not gcs_registry and \ not s3_registry and \ not test_snowflake and \ - not test_universal_types" \ + not test_spark" \ + sdk/python/tests + + test-python-universal-clickhouse-offline: + PYTHONPATH='.' \ + FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.clickhouse_repo_configuration \ + PYTEST_PLUGINS=sdk.python.feast.infra.offline_stores.contrib.clickhouse_offline_store.tests \ + python -m pytest -v -n 8 --integration \ + -k "not test_historical_retrieval_with_validation and \ + not test_historical_features_persisting and \ + not test_universal_cli and \ + not test_go_feature_server and \ + not test_feature_logging and \ + not test_reorder_columns and \ + not test_logged_features_validation and \ + not test_lambda_materialization_consistency and \ + not test_offline_write and \ + not test_push_features_to_offline_store and \ + not gcs_registry and \ + not s3_registry and \ + not test_snowflake and \ + not test_spark" \ sdk/python/tests test-python-universal-postgres-online: diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index b7faf526c2..9d062bfd97 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -93,6 +93,7 @@ * [PostgreSQL (contrib)](reference/offline-stores/postgres.md) * [Trino (contrib)](reference/offline-stores/trino.md) * [Azure Synapse + Azure SQL (contrib)](reference/offline-stores/mssql.md) + * [Clickhouse (contrib)](reference/offline-stores/clickhouse.md) * [Remote Offline](reference/offline-stores/remote-offline-store.md) * [Online stores](reference/online-stores/README.md) * [Overview](reference/online-stores/overview.md) diff --git a/docs/reference/data-sources/README.md b/docs/reference/data-sources/README.md index e69fbab8e3..d0031e84c9 100644 --- a/docs/reference/data-sources/README.md +++ b/docs/reference/data-sources/README.md @@ -49,3 +49,7 @@ Please see [Data Source](../../getting-started/concepts/data-ingestion.md) for a {% content-ref url="mssql.md" %} [mssql.md](mssql.md) {% endcontent-ref %} + +{% content-ref url="clickhouse.md" %} +[clickhouse.md](clickhouse.md) +{% endcontent-ref %} diff --git a/docs/reference/data-sources/clickhouse.md b/docs/reference/data-sources/clickhouse.md new file mode 100644 index 0000000000..7630d5dd14 --- /dev/null +++ b/docs/reference/data-sources/clickhouse.md @@ -0,0 +1,36 @@ +# Clickhouse source (contrib) + +## Description + +Clickhouse data sources are Clickhouse tables or views. +These can be specified either by a table reference or a SQL query. + +## Disclaimer + +The Clickhouse data source does not achieve full test coverage. +Please do not assume complete stability. + +## Examples + +Defining a Clickhouse source: + +```python +from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse_source import ( + ClickhouseSource, +) + +driver_stats_source = ClickhouseSource( + name="feast_driver_hourly_stats", + query="SELECT * FROM feast_driver_hourly_stats", + timestamp_field="event_timestamp", + created_timestamp_column="created", +) +``` + +The full set of configuration options is available [here](https://rtd.feast.dev/en/master/#feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse_source.ClickhouseSource). + +## Supported Types + +Clickhouse data sources support all eight primitive types and their corresponding array types. +The support for Clickhouse Decimal type is achieved by converting it to double. +For a comparison against other batch data sources, please see [here](overview.md#functionality-matrix). diff --git a/docs/reference/offline-stores/clickhouse.md b/docs/reference/offline-stores/clickhouse.md new file mode 100644 index 0000000000..317d6e23e1 --- /dev/null +++ b/docs/reference/offline-stores/clickhouse.md @@ -0,0 +1,69 @@ +# Clickhouse offline store (contrib) + +## Description + +The Clickhouse offline store provides support for reading [ClickhouseSource](../data-sources/clickhouse.md). +* Entity dataframes can be provided as a SQL query or can be provided as a Pandas dataframe. A Pandas dataframes will be uploaded to Clickhouse as a table (temporary table by default) in order to complete join operations. + +## Disclaimer + +The Clickhouse offline store does not achieve full test coverage. +Please do not assume complete stability. + +## Getting started +In order to use this offline store, you'll need to run `pip install 'feast[clickhouse]'`. + +## Example + +{% code title="feature_store.yaml" %} +```yaml +project: my_project +registry: data/registry.db +provider: local +offline_store: + type: feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse.ClickhouseOfflineStore + host: DB_HOST + port: DB_PORT + database: DB_NAME + user: DB_USERNAME + password: DB_PASSWORD + use_temporary_tables_for_entity_df: true +online_store: + path: data/online_store.db +``` +{% endcode %} + +Note that `use_temporary_tables_for_entity_df` is an optional parameter. +The full set of configuration options is available in [ClickhouseOfflineStoreConfig](https://rtd.feast.dev/en/master/#feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse.ClickhouseOfflineStore). + +## Functionality Matrix + +The set of functionality supported by offline stores is described in detail [here](overview.md#functionality). +Below is a matrix indicating which functionality is supported by the Clickhouse offline store. + +| | Clickhouse | +| :----------------------------------------------------------------- |:-----------| +| `get_historical_features` (point-in-time correct join) | yes | +| `pull_latest_from_table_or_query` (retrieve latest feature values) | yes | +| `pull_all_from_table_or_query` (retrieve a saved dataset) | no | +| `offline_write_batch` (persist dataframes to offline store) | no | +| `write_logged_features` (persist logged features to offline store) | no | + +Below is a matrix indicating which functionality is supported by `ClickhouseRetrievalJob`. + +| | Clickhouse | +| ----------------------------------------------------- |------------| +| export to dataframe | yes | +| export to arrow table | yes | +| export to arrow batches | no | +| export to SQL | yes | +| export to data lake (S3, GCS, etc.) | yes | +| export to data warehouse | yes | +| export as Spark dataframe | no | +| local execution of Python-based on-demand transforms | yes | +| remote execution of Python-based on-demand transforms | no | +| persist results in the offline store | yes | +| preview the query plan before execution | yes | +| read partitioned data | yes | + +To compare this set of functionality against other offline stores, please see the full [functionality matrix](overview.md#functionality-matrix). diff --git a/sdk/python/docs/source/feast.infra.offline_stores.contrib.clickhouse_offline_store.rst b/sdk/python/docs/source/feast.infra.offline_stores.contrib.clickhouse_offline_store.rst new file mode 100644 index 0000000000..b593da3115 --- /dev/null +++ b/sdk/python/docs/source/feast.infra.offline_stores.contrib.clickhouse_offline_store.rst @@ -0,0 +1,37 @@ +feast.infra.offline\_stores.contrib.clickhouse\_offline\_store package +====================================================================== + +Subpackages +----------- + +.. toctree:: + :maxdepth: 4 + + feast.infra.offline_stores.contrib.clickhouse_offline_store.tests + +Submodules +---------- + +feast.infra.offline\_stores.contrib.clickhouse\_offline\_store.clickhouse module +-------------------------------------------------------------------------------- + +.. automodule:: feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse + :members: + :undoc-members: + :show-inheritance: + +feast.infra.offline\_stores.contrib.clickhouse\_offline\_store.clickhouse\_source module +---------------------------------------------------------------------------------------- + +.. automodule:: feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse_source + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: feast.infra.offline_stores.contrib.clickhouse_offline_store + :members: + :undoc-members: + :show-inheritance: diff --git a/sdk/python/docs/source/feast.infra.offline_stores.contrib.clickhouse_offline_store.tests.rst b/sdk/python/docs/source/feast.infra.offline_stores.contrib.clickhouse_offline_store.tests.rst new file mode 100644 index 0000000000..0b1265e438 --- /dev/null +++ b/sdk/python/docs/source/feast.infra.offline_stores.contrib.clickhouse_offline_store.tests.rst @@ -0,0 +1,21 @@ +feast.infra.offline\_stores.contrib.clickhouse\_offline\_store.tests package +============================================================================ + +Submodules +---------- + +feast.infra.offline\_stores.contrib.clickhouse\_offline\_store.tests.data\_source module +---------------------------------------------------------------------------------------- + +.. automodule:: feast.infra.offline_stores.contrib.clickhouse_offline_store.tests.data_source + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: feast.infra.offline_stores.contrib.clickhouse_offline_store.tests + :members: + :undoc-members: + :show-inheritance: diff --git a/sdk/python/docs/source/feast.infra.offline_stores.contrib.rst b/sdk/python/docs/source/feast.infra.offline_stores.contrib.rst index ec74ddab05..63a8f65ea6 100644 --- a/sdk/python/docs/source/feast.infra.offline_stores.contrib.rst +++ b/sdk/python/docs/source/feast.infra.offline_stores.contrib.rst @@ -8,6 +8,7 @@ Subpackages :maxdepth: 4 feast.infra.offline_stores.contrib.athena_offline_store + feast.infra.offline_stores.contrib.clickhouse_offline_store feast.infra.offline_stores.contrib.mssql_offline_store feast.infra.offline_stores.contrib.postgres_offline_store feast.infra.offline_stores.contrib.spark_offline_store @@ -24,6 +25,14 @@ feast.infra.offline\_stores.contrib.athena\_repo\_configuration module :undoc-members: :show-inheritance: +feast.infra.offline\_stores.contrib.clickhouse\_repo\_configuration module +-------------------------------------------------------------------------- + +.. automodule:: feast.infra.offline_stores.contrib.clickhouse_repo_configuration + :members: + :undoc-members: + :show-inheritance: + feast.infra.offline\_stores.contrib.mssql\_repo\_configuration module --------------------------------------------------------------------- diff --git a/sdk/python/docs/source/feast.infra.online_stores.contrib.rst b/sdk/python/docs/source/feast.infra.online_stores.contrib.rst index 2403b5b8d4..c89bec323d 100644 --- a/sdk/python/docs/source/feast.infra.online_stores.contrib.rst +++ b/sdk/python/docs/source/feast.infra.online_stores.contrib.rst @@ -40,18 +40,10 @@ feast.infra.online\_stores.contrib.elasticsearch\_repo\_configuration module :undoc-members: :show-inheritance: -feast.infra.online\_stores.contrib.qdrant module -------------------------------------------------------- +feast.infra.online\_stores.contrib.faiss\_online\_store module +-------------------------------------------------------------- -.. automodule:: feast.infra.online_stores.contrib.qdrant - :members: - :undoc-members: - :show-inheritance: - -feast.infra.online\_stores.contrib.qdrant\_repo\_configuration module ----------------------------------------------------------------------------- - -.. automodule:: feast.infra.online_stores.contrib.qdrant_repo_configuration +.. automodule:: feast.infra.online_stores.contrib.faiss_online_store :members: :undoc-members: :show-inheritance: @@ -104,6 +96,22 @@ feast.infra.online\_stores.contrib.postgres\_repo\_configuration module :undoc-members: :show-inheritance: +feast.infra.online\_stores.contrib.qdrant module +------------------------------------------------ + +.. automodule:: feast.infra.online_stores.contrib.qdrant + :members: + :undoc-members: + :show-inheritance: + +feast.infra.online\_stores.contrib.qdrant\_repo\_configuration module +--------------------------------------------------------------------- + +.. automodule:: feast.infra.online_stores.contrib.qdrant_repo_configuration + :members: + :undoc-members: + :show-inheritance: + feast.infra.online\_stores.contrib.singlestore\_repo\_configuration module -------------------------------------------------------------------------- diff --git a/sdk/python/docs/source/feast.infra.online_stores.rst b/sdk/python/docs/source/feast.infra.online_stores.rst index 801d187a7c..608e9fef6a 100644 --- a/sdk/python/docs/source/feast.infra.online_stores.rst +++ b/sdk/python/docs/source/feast.infra.online_stores.rst @@ -84,6 +84,14 @@ feast.infra.online\_stores.sqlite module :undoc-members: :show-inheritance: +feast.infra.online\_stores.vector\_store module +----------------------------------------------- + +.. automodule:: feast.infra.online_stores.vector_store + :members: + :undoc-members: + :show-inheritance: + Module contents --------------- diff --git a/sdk/python/docs/source/feast.infra.rst b/sdk/python/docs/source/feast.infra.rst index b0046a2719..791a4ace83 100644 --- a/sdk/python/docs/source/feast.infra.rst +++ b/sdk/python/docs/source/feast.infra.rst @@ -51,6 +51,14 @@ feast.infra.provider module :undoc-members: :show-inheritance: +feast.infra.supported\_async\_methods module +-------------------------------------------- + +.. automodule:: feast.infra.supported_async_methods + :members: + :undoc-members: + :show-inheritance: + Module contents --------------- diff --git a/sdk/python/docs/source/feast.infra.utils.clickhouse.rst b/sdk/python/docs/source/feast.infra.utils.clickhouse.rst new file mode 100644 index 0000000000..4fa3e2a0cb --- /dev/null +++ b/sdk/python/docs/source/feast.infra.utils.clickhouse.rst @@ -0,0 +1,29 @@ +feast.infra.utils.clickhouse package +==================================== + +Submodules +---------- + +feast.infra.utils.clickhouse.clickhouse\_config module +------------------------------------------------------ + +.. automodule:: feast.infra.utils.clickhouse.clickhouse_config + :members: + :undoc-members: + :show-inheritance: + +feast.infra.utils.clickhouse.connection\_utils module +----------------------------------------------------- + +.. automodule:: feast.infra.utils.clickhouse.connection_utils + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: feast.infra.utils.clickhouse + :members: + :undoc-members: + :show-inheritance: diff --git a/sdk/python/docs/source/feast.infra.utils.rst b/sdk/python/docs/source/feast.infra.utils.rst index 083259bfaa..6c23f320b8 100644 --- a/sdk/python/docs/source/feast.infra.utils.rst +++ b/sdk/python/docs/source/feast.infra.utils.rst @@ -7,6 +7,7 @@ Subpackages .. toctree:: :maxdepth: 4 + feast.infra.utils.clickhouse feast.infra.utils.postgres feast.infra.utils.snowflake diff --git a/sdk/python/docs/source/feast.permissions.client.rst b/sdk/python/docs/source/feast.permissions.client.rst index f346801210..84e58bdc2d 100644 --- a/sdk/python/docs/source/feast.permissions.client.rst +++ b/sdk/python/docs/source/feast.permissions.client.rst @@ -20,10 +20,10 @@ feast.permissions.client.auth\_client\_manager module :undoc-members: :show-inheritance: -feast.permissions.client.auth\_client\_manager\_factory module --------------------------------------------------------------- +feast.permissions.client.client\_auth\_token module +--------------------------------------------------- -.. automodule:: feast.permissions.client.auth_client_manager_factory +.. automodule:: feast.permissions.client.client_auth_token :members: :undoc-members: :show-inheritance: @@ -44,6 +44,14 @@ feast.permissions.client.http\_auth\_requests\_wrapper module :undoc-members: :show-inheritance: +feast.permissions.client.intra\_comm\_authentication\_client\_manager module +---------------------------------------------------------------------------- + +.. automodule:: feast.permissions.client.intra_comm_authentication_client_manager + :members: + :undoc-members: + :show-inheritance: + feast.permissions.client.kubernetes\_auth\_client\_manager module ----------------------------------------------------------------- diff --git a/sdk/python/docs/source/feast.protos.feast.core.rst b/sdk/python/docs/source/feast.protos.feast.core.rst index 9d079953c1..78398e54dc 100644 --- a/sdk/python/docs/source/feast.protos.feast.core.rst +++ b/sdk/python/docs/source/feast.protos.feast.core.rst @@ -244,6 +244,22 @@ feast.protos.feast.core.Policy\_pb2\_grpc module :undoc-members: :show-inheritance: +feast.protos.feast.core.Project\_pb2 module +------------------------------------------- + +.. automodule:: feast.protos.feast.core.Project_pb2 + :members: + :undoc-members: + :show-inheritance: + +feast.protos.feast.core.Project\_pb2\_grpc module +------------------------------------------------- + +.. automodule:: feast.protos.feast.core.Project_pb2_grpc + :members: + :undoc-members: + :show-inheritance: + feast.protos.feast.core.Registry\_pb2 module -------------------------------------------- diff --git a/sdk/python/docs/source/feast.rst b/sdk/python/docs/source/feast.rst index b8c04ebde6..ea34c3d8dd 100644 --- a/sdk/python/docs/source/feast.rst +++ b/sdk/python/docs/source/feast.rst @@ -28,6 +28,14 @@ feast.aggregation module :undoc-members: :show-inheritance: +feast.arrow\_error\_handler module +---------------------------------- + +.. automodule:: feast.arrow_error_handler + :members: + :undoc-members: + :show-inheritance: + feast.base\_feature\_view module -------------------------------- @@ -196,6 +204,14 @@ feast.flags\_helper module :undoc-members: :show-inheritance: +feast.grpc\_error\_interceptor module +------------------------------------- + +.. automodule:: feast.grpc_error_interceptor + :members: + :undoc-members: + :show-inheritance: + feast.importer module --------------------- @@ -244,6 +260,14 @@ feast.online\_response module :undoc-members: :show-inheritance: +feast.project module +-------------------- + +.. automodule:: feast.project + :members: + :undoc-members: + :show-inheritance: + feast.project\_metadata module ------------------------------ @@ -292,6 +316,14 @@ feast.repo\_operations module :undoc-members: :show-inheritance: +feast.rest\_error\_handler module +--------------------------------- + +.. automodule:: feast.rest_error_handler + :members: + :undoc-members: + :show-inheritance: + feast.saved\_dataset module --------------------------- diff --git a/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/__init__.py b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py new file mode 100644 index 0000000000..e9b2789968 --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse.py @@ -0,0 +1,576 @@ +import contextlib +import re +from dataclasses import asdict +from datetime import datetime +from typing import Iterator, List, Literal, Optional, Union, cast + +import numpy as np +import pandas as pd +import pyarrow as pa +from pyarrow.compute import cast as pa_cast + +from feast import FeatureView, OnDemandFeatureView, RepoConfig +from feast.data_source import DataSource +from feast.errors import EntitySQLEmptyResults, InvalidEntityType +from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL +from feast.infra.offline_stores import offline_utils +from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse_source import ( + ClickhouseSource, + SavedDatasetClickhouseStorage, +) +from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import ( + PostgreSQLRetrievalJob, + build_point_in_time_query, +) +from feast.infra.offline_stores.offline_store import ( + OfflineStore, + RetrievalJob, + RetrievalMetadata, +) +from feast.infra.registry.base_registry import BaseRegistry +from feast.infra.utils.clickhouse.clickhouse_config import ClickhouseConfig +from feast.infra.utils.clickhouse.connection_utils import get_client +from feast.saved_dataset import SavedDatasetStorage + + +class ClickhouseOfflineStoreConfig(ClickhouseConfig): + type: Literal[ + "feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse.ClickhouseOfflineStore" + ] = "feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse.ClickhouseOfflineStore" + + +class ClickhouseOfflineStore(OfflineStore): + @staticmethod + def get_historical_features( + config: RepoConfig, + feature_views: List[FeatureView], + feature_refs: List[str], + entity_df: Union[pd.DataFrame, str], + registry: BaseRegistry, + project: str, + full_feature_names: bool = False, + ) -> RetrievalJob: + assert isinstance(config.offline_store, ClickhouseOfflineStoreConfig) + for fv in feature_views: + assert isinstance(fv.batch_source, ClickhouseSource) + + entity_schema = _get_entity_schema(entity_df, config) + + entity_df_event_timestamp_col = ( + offline_utils.infer_event_timestamp_from_entity_df(entity_schema) + ) + + entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range( + entity_df, + entity_df_event_timestamp_col, + config, + ) + + @contextlib.contextmanager + def query_generator() -> Iterator[str]: + table_name = offline_utils.get_temp_entity_table_name() + if ( + isinstance(entity_df, pd.DataFrame) + and not config.offline_store.use_temporary_tables_for_entity_df + ): + table_name = f"{config.offline_store.database}.{table_name}" + + _upload_entity_df( + config, + entity_df, + table_name, + entity_df_event_timestamp_col, + ) + + expected_join_keys = offline_utils.get_expected_join_keys( + project, feature_views, registry + ) + + offline_utils.assert_expected_columns_in_entity_df( + entity_schema, expected_join_keys, entity_df_event_timestamp_col + ) + + query_context = offline_utils.get_feature_view_query_context( + feature_refs, + feature_views, + registry, + project, + entity_df_event_timestamp_range, + ) + + query_context_dict = [asdict(context) for context in query_context] + # Hack for query_context.entity_selections to support uppercase in columns + for context in query_context_dict: + context["entity_selections"] = [ + f""""{entity_selection.replace(' AS ', '" AS "')}\"""" + for entity_selection in context["entity_selections"] + ] + + try: + query = build_point_in_time_query( + query_context_dict, + left_table_query_string=table_name, + entity_df_event_timestamp_col=entity_df_event_timestamp_col, + entity_df_columns=entity_schema.keys(), + query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN, + full_feature_names=full_feature_names, + ) + yield query + finally: + if ( + table_name + and not config.offline_store.use_temporary_tables_for_entity_df + ): + get_client(config.offline_store).command( + f"DROP TABLE IF EXISTS {table_name}" + ) + + return ClickhouseRetrievalJob( + query=query_generator, + config=config, + full_feature_names=full_feature_names, + on_demand_feature_views=OnDemandFeatureView.get_requested_odfvs( + feature_refs, project, registry + ), + metadata=RetrievalMetadata( + features=feature_refs, + keys=list(entity_schema.keys() - {entity_df_event_timestamp_col}), + min_event_timestamp=entity_df_event_timestamp_range[0], + max_event_timestamp=entity_df_event_timestamp_range[1], + ), + ) + + @staticmethod + def pull_latest_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + timestamp_field: str, + created_timestamp_column: Optional[str], + start_date: datetime, + end_date: datetime, + ) -> RetrievalJob: + assert isinstance(config.offline_store, ClickhouseOfflineStoreConfig) + assert isinstance(data_source, ClickhouseSource) + from_expression = data_source.get_table_query_string() + + partition_by_join_key_string = ", ".join(_append_alias(join_key_columns, "a")) + if partition_by_join_key_string != "": + partition_by_join_key_string = ( + "PARTITION BY " + partition_by_join_key_string + ) + timestamps = [timestamp_field] + if created_timestamp_column: + timestamps.append(created_timestamp_column) + timestamp_desc_string = " DESC, ".join(_append_alias(timestamps, "a")) + " DESC" + a_field_string = ", ".join( + _append_alias(join_key_columns + feature_name_columns + timestamps, "a") + ) + b_field_string = ", ".join( + _append_alias(join_key_columns + feature_name_columns + timestamps, "b") + ) + + query = f""" + SELECT + {b_field_string} + {f", {repr(DUMMY_ENTITY_VAL)} AS {DUMMY_ENTITY_ID}" if not join_key_columns else ""} + FROM ( + SELECT {a_field_string}, + ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row + FROM ({from_expression}) a + WHERE a."{timestamp_field}" + BETWEEN toDateTime64('{start_date.replace(tzinfo=None)!s}', 6, '{start_date.tzinfo!s}') + AND toDateTime64('{end_date.replace(tzinfo=None)!s}', 6, '{end_date.tzinfo!s}') + ) b + WHERE _feast_row = 1 + """ + + return ClickhouseRetrievalJob( + query=query, + config=config, + full_feature_names=False, + on_demand_feature_views=None, + ) + + +class ClickhouseRetrievalJob(PostgreSQLRetrievalJob): + def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: + with self._query_generator() as query: + results = get_client(self.config.offline_store).query_df(query) + return results + + def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table: + with self._query_generator() as query: + results: pa.Table = get_client(self.config.offline_store).query_arrow(query) + # Feast doesn't support native decimal types, so we must convert decimal columns to double + for col_index, (name, dtype) in enumerate( + zip(results.schema.names, results.schema.types) + ): + if pa.types.is_decimal(dtype): + results = results.set_column( + col_index, + name, + pa_cast(results[name], target_type=pa.float64()), + ) + return results + + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: Optional[bool] = False, + timeout: Optional[int] = None, + ): + assert isinstance(storage, SavedDatasetClickhouseStorage) + + df_to_clickhouse_table( + config=self.config.offline_store, + df=self.to_df(), + table_name=storage.clickhouse_options._table, + entity_timestamp_col="event_timestamp", + ) + + +def _get_entity_schema( + entity_df: Union[pd.DataFrame, str], + config: RepoConfig, +) -> dict[str, np.dtype]: + if isinstance(entity_df, pd.DataFrame): + return dict(zip(entity_df.columns, entity_df.dtypes)) + elif isinstance(entity_df, str): + query = f"SELECT * FROM ({entity_df}) LIMIT 1" + df = get_client(config.offline_store).query_df(query) + return _get_entity_schema(df, config) + else: + raise InvalidEntityType(type(entity_df)) + + +def _get_entity_df_event_timestamp_range( + entity_df: Union[pd.DataFrame, str], + entity_df_event_timestamp_col: str, + config: RepoConfig, +) -> tuple[datetime, datetime]: + if isinstance(entity_df, pd.DataFrame): + entity_df_event_timestamp = entity_df.loc[ + :, entity_df_event_timestamp_col + ].infer_objects() + if pd.api.types.is_string_dtype(entity_df_event_timestamp): + entity_df_event_timestamp = pd.to_datetime( + entity_df_event_timestamp, utc=True + ) + entity_df_event_timestamp_range = ( + entity_df_event_timestamp.min().to_pydatetime(), + entity_df_event_timestamp.max().to_pydatetime(), + ) + elif isinstance(entity_df, str): + # If the entity_df is a string (SQL query), determine range + # from table + query = f'SELECT MIN("{entity_df_event_timestamp_col}") AS "min_value", MAX("{entity_df_event_timestamp_col}") AS "max_value" FROM ({entity_df})' + results = get_client(config.offline_store).query(query).result_rows + + entity_df_event_timestamp_range = cast(tuple[datetime, datetime], results[0]) + if ( + entity_df_event_timestamp_range[0] is None + or entity_df_event_timestamp_range[1] is None + ): + raise EntitySQLEmptyResults(entity_df) + else: + raise InvalidEntityType(type(entity_df)) + + return entity_df_event_timestamp_range + + +def _upload_entity_df( + config: RepoConfig, + entity_df: Union[pd.DataFrame, str], + table_name: str, + entity_timestamp_col: str, +) -> None: + if isinstance(entity_df, pd.DataFrame): + df_to_clickhouse_table( + config.offline_store, entity_df, table_name, entity_timestamp_col + ) + elif isinstance(entity_df, str): + if config.offline_store.use_temporary_tables_for_entity_df: + query = f'CREATE TEMPORARY TABLE "{table_name}" AS ({entity_df})' + else: + query = f'CREATE TABLE "{table_name}" ENGINE = MergeTree() ORDER BY ({entity_timestamp_col}) AS ({entity_df})' + get_client(config.offline_store).command(query) + else: + raise InvalidEntityType(type(entity_df)) + + +def df_to_clickhouse_table( + config: ClickhouseConfig, + df: pd.DataFrame, + table_name: str, + entity_timestamp_col: str, +) -> None: + table_schema = _df_to_create_table_schema(df) + if config.use_temporary_tables_for_entity_df: + query = f"CREATE TEMPORARY TABLE {table_name} ({table_schema})" + else: + query = f""" + CREATE TABLE {table_name} ( + {table_schema} + ) + ENGINE = MergeTree() + ORDER BY ({entity_timestamp_col}) + """ + get_client(config).command(query) + get_client(config).insert_df(table_name, df) + + +def _df_to_create_table_schema(entity_df: pd.DataFrame) -> str: + pa_table = pa.Table.from_pandas(entity_df) + columns = [ + f""""{f.name}" {arrow_to_ch_type(str(f.type), f.nullable)}""" + for f in pa_table.schema + ] + return ", ".join(columns) + + +def arrow_to_ch_type(t_str: str, nullable: bool) -> str: + list_pattern = r"list" + list_res = re.search(list_pattern, t_str) + if list_res is not None: + item_type_str = list_res.group(1) + return f"Array({arrow_to_ch_type(item_type_str, nullable)})" + + if nullable: + return f"Nullable({arrow_to_ch_type(t_str, nullable=False)})" + + try: + if t_str.startswith("timestamp"): + return _arrow_to_ch_timestamp_type(t_str) + return { + "bool": "Boolean", + "int8": "Int8", + "int16": "Int16", + "int32": "Int32", + "int64": "Int64", + "uint8": "UInt8", + "uint16": "UInt16", + "uint32": "UInt32", + "uint64": "Uint64", + "float": "Float32", + "double": "Float64", + "string": "String", + }[t_str] + except KeyError: + raise ValueError(f"Unsupported type: {t_str}") + + +def _arrow_to_ch_timestamp_type(t_str: str) -> str: + _ARROW_PRECISION_TO_CH_PRECISION = { + "s": 0, + "ms": 3, + "us": 6, + "ns": 9, + } + + unit, *rest = t_str.removeprefix("timestamp[").removesuffix("]").split(",") + + unit = unit.strip() + precision = _ARROW_PRECISION_TO_CH_PRECISION[unit] + + if len(rest): + tz = rest[0] + tz = ( + tz.strip() + .removeprefix("tz=") + .translate( + str.maketrans( + { # type: ignore[arg-type] + "'": None, + '"': None, + } + ) + ) + ) + else: + tz = None + + if precision > 0: + if tz is not None: + return f"DateTime64({precision}, '{tz}')" + else: + return f"DateTime64({precision})" + else: + if tz is not None: + return f"DateTime('{tz}')" + else: + return "DateTime" + + +def _append_alias(field_names: List[str], alias: str) -> List[str]: + return [f'{alias}."{field_name}"' for field_name in field_names] + + +MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """ +/* + Compute a deterministic hash for the `left_table_query_string` that will be used throughout + all the logic as the field to GROUP BY the data +*/ +WITH entity_dataframe AS ( + SELECT *, + {{entity_df_event_timestamp_col}} AS entity_timestamp + {% for featureview in featureviews %} + {% if featureview.entities %} + ,( + {% for entity in featureview.entities %} + CAST("{{entity}}" as VARCHAR) || + {% endfor %} + CAST("{{entity_df_event_timestamp_col}}" AS VARCHAR) + ) AS "{{featureview.name}}__entity_row_unique_id" + {% else %} + ,CAST("{{entity_df_event_timestamp_col}}" AS VARCHAR) AS "{{featureview.name}}__entity_row_unique_id" + {% endif %} + {% endfor %} + FROM {{ left_table_query_string }} +), + +{% for featureview in featureviews %} + +"{{ featureview.name }}__entity_dataframe" AS ( + SELECT + {% if featureview.entities %}"{{ featureview.entities | join('", "') }}",{% endif %} + entity_timestamp, + "{{featureview.name}}__entity_row_unique_id" + FROM entity_dataframe + GROUP BY + {% if featureview.entities %}"{{ featureview.entities | join('", "')}}",{% endif %} + entity_timestamp, + "{{featureview.name}}__entity_row_unique_id" +), + +/* + This query template performs the point-in-time correctness join for a single feature set table + to the provided entity table. + + 1. We first join the current feature_view to the entity dataframe that has been passed. + This JOIN has the following logic: + - For each row of the entity dataframe, only keep the rows where the `timestamp_field` + is less than the one provided in the entity dataframe + - If there a TTL for the current feature_view, also keep the rows where the `timestamp_field` + is higher the the one provided minus the TTL + - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been + computed previously + + The output of this CTE will contain all the necessary information and already filtered out most + of the data that is not relevant. +*/ + +"{{ featureview.name }}__subquery" AS ( + SELECT + "{{ featureview.timestamp_field }}" as event_timestamp, + {{ '"' ~ featureview.created_timestamp_column ~ '" as created_timestamp,' if featureview.created_timestamp_column else '' }} + {{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %} + {% for feature in featureview.features %} + "{{ feature }}" as {% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if loop.last %}{% else %}, {% endif %} + {% endfor %} + FROM {{ featureview.table_subquery }} AS sub + WHERE "{{ featureview.timestamp_field }}" <= (SELECT MAX(entity_timestamp) FROM entity_dataframe) + {% if featureview.ttl == 0 %}{% else %} + AND "{{ featureview.timestamp_field }}" >= (SELECT MIN(entity_timestamp) FROM entity_dataframe) - interval {{ featureview.ttl }} second + {% endif %} +), + +"{{ featureview.name }}__base" AS ( + SELECT + subquery.*, + entity_dataframe.entity_timestamp, + entity_dataframe."{{featureview.name}}__entity_row_unique_id" + FROM "{{ featureview.name }}__subquery" AS subquery + INNER JOIN "{{ featureview.name }}__entity_dataframe" AS entity_dataframe + ON TRUE + {% for entity in featureview.entities %} + AND subquery."{{ entity }}" = entity_dataframe."{{ entity }}" + {% endfor %} + WHERE TRUE + AND subquery.event_timestamp <= entity_dataframe.entity_timestamp + + {% if featureview.ttl == 0 %}{% else %} + AND subquery.event_timestamp >= entity_dataframe.entity_timestamp - interval {{ featureview.ttl }} second + {% endif %} +), + +/* + 2. If the `created_timestamp_column` has been set, we need to + deduplicate the data first. This is done by calculating the + `MAX(created_at_timestamp)` for each event_timestamp. + We then join the data on the next CTE +*/ +{% if featureview.created_timestamp_column %} +"{{ featureview.name }}__dedup" AS ( + SELECT + "{{featureview.name}}__entity_row_unique_id", + event_timestamp, + MAX(created_timestamp) as created_timestamp + FROM "{{ featureview.name }}__base" + GROUP BY "{{featureview.name}}__entity_row_unique_id", event_timestamp +), +{% endif %} + +/* + 3. The data has been filtered during the first CTE "*__base" + Thus we only need to compute the latest timestamp of each feature. +*/ +"{{ featureview.name }}__latest" AS ( + SELECT + event_timestamp, + {% if featureview.created_timestamp_column %}created_timestamp,{% endif %} + "{{featureview.name}}__entity_row_unique_id" + FROM + ( + SELECT *, + ROW_NUMBER() OVER( + PARTITION BY "{{featureview.name}}__entity_row_unique_id" + ORDER BY event_timestamp DESC{% if featureview.created_timestamp_column %},created_timestamp DESC{% endif %} + ) AS row_number + FROM "{{ featureview.name }}__base" + {% if featureview.created_timestamp_column %} + INNER JOIN "{{ featureview.name }}__dedup" + USING ("{{featureview.name}}__entity_row_unique_id", event_timestamp, created_timestamp) + {% endif %} + ) AS sub + WHERE row_number = 1 +), + +/* + 4. Once we know the latest value of each feature for a given timestamp, + we can join again the data back to the original "base" dataset +*/ +"{{ featureview.name }}__cleaned" AS ( + SELECT base.* + FROM "{{ featureview.name }}__base" as base + INNER JOIN "{{ featureview.name }}__latest" + USING( + "{{featureview.name}}__entity_row_unique_id", + event_timestamp + {% if featureview.created_timestamp_column %} + ,created_timestamp + {% endif %} + ) +){% if loop.last %}{% else %}, {% endif %} + + +{% endfor %} +/* + Joins the outputs of multiple time travel joins to a single table. + The entity_dataframe dataset being our source of truth here. + */ + +SELECT "{{ final_output_feature_names | join('", "')}}" +FROM entity_dataframe +{% for featureview in featureviews %} +LEFT JOIN ( + SELECT + "{{featureview.name}}__entity_row_unique_id" + {% for feature in featureview.features %} + ,"{% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}" + {% endfor %} + FROM "{{ featureview.name }}__cleaned" +) AS "{{featureview.name}}" USING ("{{featureview.name}}__entity_row_unique_id") +{% endfor %} +""" diff --git a/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse_source.py b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse_source.py new file mode 100644 index 0000000000..89d2f96222 --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/clickhouse_source.py @@ -0,0 +1,211 @@ +import json +from typing import Any, Callable, Iterable, Optional, Tuple, Type + +from clickhouse_connect.datatypes.base import ClickHouseType +from clickhouse_connect.datatypes.container import Array +from clickhouse_connect.datatypes.numeric import ( + Boolean, + Decimal, + Float32, + Float64, + Int32, + Int64, +) +from clickhouse_connect.datatypes.registry import get_from_name +from clickhouse_connect.datatypes.string import String +from clickhouse_connect.datatypes.temporal import DateTime, DateTime64 + +from feast import RepoConfig, ValueType +from feast.data_source import DataSource +from feast.errors import DataSourceNoNameException +from feast.infra.utils.clickhouse.connection_utils import get_client +from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.protos.feast.core.SavedDataset_pb2 import ( + SavedDatasetStorage as SavedDatasetStorageProto, +) +from feast.saved_dataset import SavedDatasetStorage + + +class ClickhouseOptions: + def __init__( + self, + name: Optional[str], + query: Optional[str], + table: Optional[str], + ): + self._name = name or "" + self._query = query or "" + self._table = table or "" + + @classmethod + def from_proto(cls, clickhouse_options_proto: DataSourceProto.CustomSourceOptions): + config = json.loads(clickhouse_options_proto.configuration.decode("utf8")) + postgres_options = cls( + name=config["name"], query=config["query"], table=config["table"] + ) + + return postgres_options + + def to_proto(self) -> DataSourceProto.CustomSourceOptions: + clickhouse_options_proto = DataSourceProto.CustomSourceOptions( + configuration=json.dumps( + {"name": self._name, "query": self._query, "table": self._table} + ).encode() + ) + return clickhouse_options_proto + + +class ClickhouseSource(DataSource): + def __init__( + self, + name: str | None = None, + query: str | None = None, + table: str | None = None, + timestamp_field: str | None = "", + created_timestamp_column: str | None = "", + field_mapping: dict[str, str] | None = None, + description: str | None = "", + tags: dict[str, str] | None = None, + owner: str | None = "", + ): + self._clickhouse_options = ClickhouseOptions( + name=name, query=query, table=table + ) + + if name is None and table is None: + raise DataSourceNoNameException() + name = name or table + assert name + + super().__init__( + name=name, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + field_mapping=field_mapping, + description=description, + tags=tags, + owner=owner, + ) + + @staticmethod + def from_proto(data_source: DataSourceProto) -> Any: + assert data_source.HasField("custom_options") + + postgres_options = json.loads(data_source.custom_options.configuration) + + return ClickhouseSource( + name=postgres_options["name"], + query=postgres_options["query"], + table=postgres_options["table"], + field_mapping=dict(data_source.field_mapping), + timestamp_field=data_source.timestamp_field, + created_timestamp_column=data_source.created_timestamp_column, + description=data_source.description, + tags=dict(data_source.tags), + owner=data_source.owner, + ) + + def to_proto(self) -> DataSourceProto: + data_source_proto = DataSourceProto( + name=self.name, + type=DataSourceProto.CUSTOM_SOURCE, + data_source_class_type="feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse_source.ClickhouseSource", + field_mapping=self.field_mapping, + custom_options=self._clickhouse_options.to_proto(), + description=self.description, + tags=self.tags, + owner=self.owner, + ) + + data_source_proto.timestamp_field = self.timestamp_field + data_source_proto.created_timestamp_column = self.created_timestamp_column + + return data_source_proto + + def validate(self, config: RepoConfig): + pass + + @staticmethod + def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: + return ch_type_to_feast_value_type + + def get_table_column_names_and_types( + self, config: RepoConfig + ) -> Iterable[Tuple[str, str]]: + with get_client(config.offline_store) as client: + result = client.query( + f"SELECT * FROM {self.get_table_query_string()} AS sub LIMIT 0" + ) + column_types = list(zip(result.column_names, result.column_types)) + return [ + (name, _ch_type_to_ch_type_str(type_)) for name, type_ in column_types + ] + + def get_table_query_string(self) -> str: + if self._clickhouse_options._table: + return f"{self._clickhouse_options._table}" + else: + return f"({self._clickhouse_options._query})" + + +class SavedDatasetClickhouseStorage(SavedDatasetStorage): + _proto_attr_name = "custom_storage" + + clickhouse_options: ClickhouseOptions + + def __init__(self, table_ref: str): + self.clickhouse_options = ClickhouseOptions( + table=table_ref, name=None, query=None + ) + + @staticmethod + def from_proto(storage_proto: SavedDatasetStorageProto) -> SavedDatasetStorage: + return SavedDatasetClickhouseStorage( + table_ref=ClickhouseOptions.from_proto(storage_proto.custom_storage)._table + ) + + def to_proto(self) -> SavedDatasetStorageProto: + return SavedDatasetStorageProto( + custom_storage=self.clickhouse_options.to_proto() + ) + + def to_data_source(self) -> DataSource: + return ClickhouseSource(table=self.clickhouse_options._table) + + +def ch_type_to_feast_value_type(type_str: str) -> ValueType: + type_obj = get_from_name(type_str) + type_cls = type(type_obj) + container_type = None + if isinstance(type_obj, Array): + container_type = Array + type_map: dict[ + tuple[Type[ClickHouseType] | None, Type[ClickHouseType]], ValueType + ] = { + (None, Boolean): ValueType.BOOL, + (None, String): ValueType.STRING, + (None, Float32): ValueType.FLOAT, + (None, Float64): ValueType.DOUBLE, + (None, Decimal): ValueType.DOUBLE, + (None, Int32): ValueType.INT32, + (None, Int64): ValueType.INT64, + (None, DateTime): ValueType.UNIX_TIMESTAMP, + (None, DateTime64): ValueType.UNIX_TIMESTAMP, + (Array, Boolean): ValueType.BOOL_LIST, + (Array, String): ValueType.STRING_LIST, + (Array, Float32): ValueType.FLOAT_LIST, + (Array, Float64): ValueType.DOUBLE_LIST, + (Array, Decimal): ValueType.DOUBLE_LIST, + (Array, Int32): ValueType.INT32_LIST, + (Array, Int64): ValueType.INT64_LIST, + (Array, DateTime): ValueType.UNIX_TIMESTAMP_LIST, + (Array, DateTime64): ValueType.UNIX_TIMESTAMP_LIST, + } + value = type_map.get((container_type, type_cls), ValueType.UNKNOWN) + if value == ValueType.UNKNOWN: + print("unknown type:", type_str) + return value + + +def _ch_type_to_ch_type_str(type_: ClickHouseType) -> str: + return type_.name diff --git a/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/tests/__init__.py b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/tests/__init__.py new file mode 100644 index 0000000000..0dd9d78e88 --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/tests/__init__.py @@ -0,0 +1 @@ +from .data_source import clickhouse_container # noqa diff --git a/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/tests/data_source.py new file mode 100644 index 0000000000..8ab4fdb540 --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_offline_store/tests/data_source.py @@ -0,0 +1,116 @@ +import logging +from typing import Dict, Optional + +import pandas as pd +import pytest +from testcontainers.clickhouse import ClickHouseContainer +from testcontainers.core.waiting_utils import wait_for_logs + +from feast.data_source import DataSource +from feast.feature_logging import LoggingDestination +from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse import ( + ClickhouseOfflineStoreConfig, + df_to_clickhouse_table, +) +from feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse_source import ( + ClickhouseSource, +) +from tests.integration.feature_repos.universal.data_source_creator import ( + DataSourceCreator, +) + +logger = logging.getLogger(__name__) + +CLICKHOUSE_USER = "default" +CLICKHOUSE_PASSWORD = "password" +CLICKHOUSE_OFFLINE_DB = "default" +CLICKHOUSE_ONLINE_DB = "default_online" + + +@pytest.fixture(scope="session") +def clickhouse_container(): + container = ClickHouseContainer( + username=CLICKHOUSE_USER, + password=CLICKHOUSE_PASSWORD, + dbname=CLICKHOUSE_OFFLINE_DB, + ) + container.start() + + log_string_to_wait_for = "Logging errors to" + waited = wait_for_logs( + container=container, + predicate=log_string_to_wait_for, + timeout=30, + interval=10, + ) + logger.info("Waited for %s seconds until clickhouse container was up", waited) + + yield container + container.stop() + + +class ClickhouseDataSourceCreator(DataSourceCreator): + def create_logged_features_destination(self) -> LoggingDestination: + return None # type: ignore + + def __init__( + self, project_name: str, fixture_request: pytest.FixtureRequest, **kwargs + ): + super().__init__( + project_name, + ) + + self.project_name = project_name + self.container = fixture_request.getfixturevalue("clickhouse_container") + if not self.container: + raise RuntimeError( + "In order to use this data source " + "'feast.infra.offline_stores.contrib.clickhouse_offline_store.tests' " + "must be include into pytest plugins" + ) + + self.offline_store_config = ClickhouseOfflineStoreConfig( + type="feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse.ClickhouseOfflineStore", + host="localhost", + port=self.container.get_exposed_port(8123), + database=CLICKHOUSE_OFFLINE_DB, + user=CLICKHOUSE_USER, + password=CLICKHOUSE_PASSWORD, + ) + + def create_data_source( + self, + df: pd.DataFrame, + destination_name: str, + created_timestamp_column="created_ts", + field_mapping: Optional[Dict[str, str]] = None, + timestamp_field: Optional[str] = "ts", + ) -> DataSource: + destination_name = self.get_prefixed_table_name(destination_name) + + if self.offline_store_config: + if timestamp_field is None: + timestamp_field = "ts" + df_to_clickhouse_table( + self.offline_store_config, df, destination_name, timestamp_field + ) + return ClickhouseSource( + name=destination_name, + query=f"SELECT * FROM {destination_name}", + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + field_mapping=field_mapping or {"ts_1": "ts"}, + ) + + def create_offline_store_config(self) -> ClickhouseOfflineStoreConfig: + assert self.offline_store_config + return self.offline_store_config + + def get_prefixed_table_name(self, suffix: str) -> str: + return f"{self.project_name}_{suffix}" + + def create_saved_dataset_destination(self): + pass + + def teardown(self): + pass diff --git a/sdk/python/feast/infra/offline_stores/contrib/clickhouse_repo_configuration.py b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_repo_configuration.py new file mode 100644 index 0000000000..5c9d4461b1 --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/contrib/clickhouse_repo_configuration.py @@ -0,0 +1,11 @@ +from feast.infra.offline_stores.contrib.clickhouse_offline_store.tests.data_source import ( + ClickhouseDataSourceCreator, +) +from tests.integration.feature_repos.repo_configuration import REDIS_CONFIG +from tests.integration.feature_repos.universal.online_store.redis import ( + RedisOnlineStoreCreator, +) + +AVAILABLE_OFFLINE_STORES = [("local", ClickhouseDataSourceCreator)] + +AVAILABLE_ONLINE_STORES = {"redis": (REDIS_CONFIG, RedisOnlineStoreCreator)} diff --git a/sdk/python/feast/infra/utils/clickhouse/__init__.py b/sdk/python/feast/infra/utils/clickhouse/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/infra/utils/clickhouse/clickhouse_config.py b/sdk/python/feast/infra/utils/clickhouse/clickhouse_config.py new file mode 100644 index 0000000000..1f163e0a81 --- /dev/null +++ b/sdk/python/feast/infra/utils/clickhouse/clickhouse_config.py @@ -0,0 +1,14 @@ +from pydantic import ConfigDict, StrictStr + +from feast.repo_config import FeastConfigBaseModel + + +class ClickhouseConfig(FeastConfigBaseModel): + host: StrictStr + port: int = 8123 + database: StrictStr + user: StrictStr + password: StrictStr + use_temporary_tables_for_entity_df: bool = True + + model_config = ConfigDict(frozen=True) diff --git a/sdk/python/feast/infra/utils/clickhouse/connection_utils.py b/sdk/python/feast/infra/utils/clickhouse/connection_utils.py new file mode 100644 index 0000000000..e60922e478 --- /dev/null +++ b/sdk/python/feast/infra/utils/clickhouse/connection_utils.py @@ -0,0 +1,18 @@ +from functools import cache + +import clickhouse_connect +from clickhouse_connect.driver import Client + +from feast.infra.utils.clickhouse.clickhouse_config import ClickhouseConfig + + +@cache +def get_client(config: ClickhouseConfig) -> Client: + client = clickhouse_connect.get_client( + host=config.host, + port=config.port, + user=config.user, + password=config.password, + database=config.database, + ) + return client diff --git a/sdk/python/requirements/py3.10-ci-requirements.txt b/sdk/python/requirements/py3.10-ci-requirements.txt index 8c940ba84e..32367ee3ee 100644 --- a/sdk/python/requirements/py3.10-ci-requirements.txt +++ b/sdk/python/requirements/py3.10-ci-requirements.txt @@ -1,5 +1,5 @@ # This file was autogenerated by uv via the following command: -# uv pip compile -p 3.10 --system --no-strip-extras setup.py --extra ci --output-file sdk/python/requirements/py3.10-ci-requirements.txt +# uv pip compile --system --no-strip-extras setup.py --extra ci --output-file sdk/python/requirements/py3.10-ci-requirements.txt aiobotocore==2.15.2 # via feast (setup.py) aiohappyeyeballs==2.4.3 @@ -40,10 +40,6 @@ async-lru==2.0.4 # via jupyterlab async-property==0.2.2 # via python-keycloak -async-timeout==4.0.3 - # via - # aiohttp - # redis atpublic==5.0 # via ibis-framework attrs==24.2.0 @@ -90,6 +86,7 @@ cassandra-driver==3.29.2 # via feast (setup.py) certifi==2024.8.30 # via + # clickhouse-connect # elastic-transport # httpcore # httpx @@ -116,6 +113,8 @@ click==8.1.7 # great-expectations # pip-tools # uvicorn +clickhouse-connect==0.8.5 + # via feast (setup.py) cloudpickle==3.1.0 # via dask colorama==0.4.6 @@ -178,11 +177,6 @@ elasticsearch==8.15.1 # via feast (setup.py) entrypoints==0.4 # via altair -exceptiongroup==1.2.2 - # via - # anyio - # ipython - # pytest execnet==2.1.1 # via pytest-xdist executing==2.1.0 @@ -335,9 +329,7 @@ idna==3.10 imagesize==1.4.1 # via sphinx importlib-metadata==8.5.0 - # via - # build - # dask + # via dask iniconfig==2.0.0 # via pytest ipykernel==6.29.5 @@ -431,6 +423,8 @@ kubernetes==20.13.0 # via feast (setup.py) locket==1.0.0 # via partd +lz4==4.3.3 + # via clickhouse-connect makefun==1.15.6 # via great-expectations markdown-it-py==3.0.0 @@ -741,6 +735,7 @@ python-keycloak==4.2.2 # via feast (setup.py) pytz==2024.2 # via + # clickhouse-connect # great-expectations # ibis-framework # pandas @@ -906,16 +901,6 @@ tinycss2==1.4.0 # via nbconvert toml==0.10.2 # via feast (setup.py) -tomli==2.0.2 - # via - # build - # coverage - # jupyterlab - # mypy - # pip-tools - # pytest - # pytest-env - # singlestoredb tomlkit==0.13.2 # via snowflake-connector-python toolz==0.12.1 @@ -987,8 +972,6 @@ types-urllib3==1.26.25.14 # via types-requests typing-extensions==4.12.2 # via - # anyio - # async-lru # azure-core # azure-identity # azure-storage-blob @@ -997,18 +980,15 @@ typing-extensions==4.12.2 # ibis-framework # ipython # jwcrypto - # multidict # mypy # psycopg # psycopg-pool # pydantic # pydantic-core - # rich # snowflake-connector-python # sqlalchemy # testcontainers # typeguard - # uvicorn tzdata==2024.2 # via pandas tzlocal==5.2 @@ -1021,6 +1001,7 @@ urllib3==2.2.3 # via # feast (setup.py) # botocore + # clickhouse-connect # docker # elastic-transport # great-expectations @@ -1076,3 +1057,5 @@ yarl==1.16.0 # via aiohttp zipp==3.20.2 # via importlib-metadata +zstandard==0.23.0 + # via clickhouse-connect diff --git a/sdk/python/requirements/py3.10-requirements.txt b/sdk/python/requirements/py3.10-requirements.txt index 94c5d3945a..2b1dce3dde 100644 --- a/sdk/python/requirements/py3.10-requirements.txt +++ b/sdk/python/requirements/py3.10-requirements.txt @@ -1,5 +1,5 @@ # This file was autogenerated by uv via the following command: -# uv pip compile -p 3.10 --system --no-strip-extras setup.py --output-file sdk/python/requirements/py3.10-requirements.txt +# uv pip compile --system --no-strip-extras setup.py --output-file sdk/python/requirements/py3.10-requirements.txt annotated-types==0.7.0 # via pydantic anyio==4.6.2.post1 @@ -33,8 +33,6 @@ dask-expr==1.1.16 # via dask dill==0.3.9 # via feast (setup.py) -exceptiongroup==1.2.2 - # via anyio fastapi==0.115.3 # via feast (setup.py) fsspec==2024.10.0 @@ -141,8 +139,6 @@ tenacity==8.5.0 # via feast (setup.py) toml==0.10.2 # via feast (setup.py) -tomli==2.0.2 - # via mypy toolz==1.0.0 # via # dask @@ -153,14 +149,12 @@ typeguard==4.3.0 # via feast (setup.py) typing-extensions==4.12.2 # via - # anyio # fastapi # mypy # pydantic # pydantic-core # sqlalchemy # typeguard - # uvicorn tzdata==2024.2 # via pandas urllib3==2.2.3 diff --git a/sdk/python/requirements/py3.11-ci-requirements.txt b/sdk/python/requirements/py3.11-ci-requirements.txt index 4d5d8a7188..8c36e72d22 100644 --- a/sdk/python/requirements/py3.11-ci-requirements.txt +++ b/sdk/python/requirements/py3.11-ci-requirements.txt @@ -1,5 +1,5 @@ # This file was autogenerated by uv via the following command: -# uv pip compile -p 3.11 --system --no-strip-extras setup.py --extra ci --output-file sdk/python/requirements/py3.11-ci-requirements.txt +# uv pip compile --system --no-strip-extras setup.py --extra ci --output-file sdk/python/requirements/py3.11-ci-requirements.txt aiobotocore==2.15.2 # via feast (setup.py) aiohappyeyeballs==2.4.3 @@ -40,8 +40,6 @@ async-lru==2.0.4 # via jupyterlab async-property==0.2.2 # via python-keycloak -async-timeout==4.0.3 - # via redis atpublic==5.0 # via ibis-framework attrs==24.2.0 @@ -88,6 +86,7 @@ cassandra-driver==3.29.2 # via feast (setup.py) certifi==2024.8.30 # via + # clickhouse-connect # elastic-transport # httpcore # httpx @@ -114,6 +113,8 @@ click==8.1.7 # great-expectations # pip-tools # uvicorn +clickhouse-connect==0.8.5 + # via feast (setup.py) cloudpickle==3.1.0 # via dask colorama==0.4.6 @@ -327,8 +328,6 @@ idna==3.10 # yarl imagesize==1.4.1 # via sphinx -importlib-metadata==8.5.0 - # via dask iniconfig==2.0.0 # via pytest ipykernel==6.29.5 @@ -422,6 +421,8 @@ kubernetes==20.13.0 # via feast (setup.py) locket==1.0.0 # via partd +lz4==4.3.3 + # via clickhouse-connect makefun==1.15.6 # via great-expectations markdown-it-py==3.0.0 @@ -732,6 +733,7 @@ python-keycloak==4.2.2 # via feast (setup.py) pytz==2024.2 # via + # clickhouse-connect # great-expectations # ibis-framework # pandas @@ -974,7 +976,6 @@ typing-extensions==4.12.2 # fastapi # great-expectations # ibis-framework - # ipython # jwcrypto # mypy # psycopg @@ -997,6 +998,7 @@ urllib3==2.2.3 # via # feast (setup.py) # botocore + # clickhouse-connect # docker # elastic-transport # great-expectations @@ -1050,5 +1052,5 @@ xmltodict==0.14.2 # via moto yarl==1.16.0 # via aiohttp -zipp==3.20.2 - # via importlib-metadata +zstandard==0.23.0 + # via clickhouse-connect diff --git a/sdk/python/requirements/py3.11-requirements.txt b/sdk/python/requirements/py3.11-requirements.txt index e2a8589e77..a3c932cc20 100644 --- a/sdk/python/requirements/py3.11-requirements.txt +++ b/sdk/python/requirements/py3.11-requirements.txt @@ -1,5 +1,5 @@ # This file was autogenerated by uv via the following command: -# uv pip compile -p 3.11 --system --no-strip-extras setup.py --output-file sdk/python/requirements/py3.11-requirements.txt +# uv pip compile --system --no-strip-extras setup.py --output-file sdk/python/requirements/py3.11-requirements.txt annotated-types==0.7.0 # via pydantic anyio==4.6.2.post1 @@ -49,8 +49,6 @@ idna==3.10 # via # anyio # requests -importlib-metadata==8.5.0 - # via dask jinja2==3.1.4 # via feast (setup.py) jsonschema==4.23.0 @@ -171,5 +169,3 @@ watchfiles==0.24.0 # via uvicorn websockets==13.1 # via uvicorn -zipp==3.20.2 - # via importlib-metadata diff --git a/sdk/python/requirements/py3.9-ci-requirements.txt b/sdk/python/requirements/py3.9-ci-requirements.txt index 2ba384e205..5e349c7435 100644 --- a/sdk/python/requirements/py3.9-ci-requirements.txt +++ b/sdk/python/requirements/py3.9-ci-requirements.txt @@ -1,5 +1,5 @@ # This file was autogenerated by uv via the following command: -# uv pip compile -p 3.9 --system --no-strip-extras setup.py --extra ci --output-file sdk/python/requirements/py3.9-ci-requirements.txt +# uv pip compile --system --no-strip-extras setup.py --extra ci --output-file sdk/python/requirements/py3.9-ci-requirements.txt aiobotocore==2.15.2 # via feast (setup.py) aiohappyeyeballs==2.4.3 @@ -40,10 +40,6 @@ async-lru==2.0.4 # via jupyterlab async-property==0.2.2 # via python-keycloak -async-timeout==4.0.3 - # via - # aiohttp - # redis atpublic==4.1.0 # via ibis-framework attrs==24.2.0 @@ -92,6 +88,7 @@ cassandra-driver==3.29.2 # via feast (setup.py) certifi==2024.8.30 # via + # clickhouse-connect # elastic-transport # httpcore # httpx @@ -118,6 +115,8 @@ click==8.1.7 # great-expectations # pip-tools # uvicorn +clickhouse-connect==0.8.5 + # via feast (setup.py) cloudpickle==3.1.0 # via dask colorama==0.4.6 @@ -180,11 +179,6 @@ elasticsearch==8.15.1 # via feast (setup.py) entrypoints==0.4 # via altair -exceptiongroup==1.2.2 - # via - # anyio - # ipython - # pytest execnet==2.1.1 # via pytest-xdist executing==2.1.0 @@ -337,16 +331,7 @@ idna==3.10 imagesize==1.4.1 # via sphinx importlib-metadata==8.5.0 - # via - # build - # dask - # jupyter-client - # jupyter-lsp - # jupyterlab - # jupyterlab-server - # nbconvert - # sphinx - # typeguard + # via dask iniconfig==2.0.0 # via pytest ipykernel==6.29.5 @@ -440,6 +425,8 @@ kubernetes==20.13.0 # via feast (setup.py) locket==1.0.0 # via partd +lz4==4.3.3 + # via clickhouse-connect makefun==1.15.6 # via great-expectations markdown-it-py==3.0.0 @@ -749,6 +736,7 @@ python-keycloak==4.2.2 # via feast (setup.py) pytz==2024.2 # via + # clickhouse-connect # great-expectations # ibis-framework # pandas @@ -914,16 +902,6 @@ tinycss2==1.4.0 # via nbconvert toml==0.10.2 # via feast (setup.py) -tomli==2.0.2 - # via - # build - # coverage - # jupyterlab - # mypy - # pip-tools - # pytest - # pytest-env - # singlestoredb tomlkit==0.13.2 # via snowflake-connector-python toolz==0.12.1 @@ -995,30 +973,22 @@ types-urllib3==1.26.25.14 # via types-requests typing-extensions==4.12.2 # via - # aioitertools - # anyio - # async-lru # azure-core # azure-identity # azure-storage-blob # fastapi # great-expectations # ibis-framework - # ipython # jwcrypto - # multidict # mypy # psycopg # psycopg-pool # pydantic # pydantic-core - # rich # snowflake-connector-python # sqlalchemy - # starlette # testcontainers # typeguard - # uvicorn tzdata==2024.2 # via pandas tzlocal==5.2 @@ -1031,6 +1001,7 @@ urllib3==1.26.20 # via # feast (setup.py) # botocore + # clickhouse-connect # docker # elastic-transport # great-expectations @@ -1039,7 +1010,6 @@ urllib3==1.26.20 # qdrant-client # requests # responses - # snowflake-connector-python # testcontainers uvicorn[standard]==0.32.0 # via @@ -1087,3 +1057,5 @@ yarl==1.16.0 # via aiohttp zipp==3.20.2 # via importlib-metadata +zstandard==0.23.0 + # via clickhouse-connect diff --git a/sdk/python/requirements/py3.9-requirements.txt b/sdk/python/requirements/py3.9-requirements.txt index 7f8eecd6f8..7426d83443 100644 --- a/sdk/python/requirements/py3.9-requirements.txt +++ b/sdk/python/requirements/py3.9-requirements.txt @@ -1,5 +1,5 @@ # This file was autogenerated by uv via the following command: -# uv pip compile -p 3.9 --system --no-strip-extras setup.py --output-file sdk/python/requirements/py3.9-requirements.txt +# uv pip compile --system --no-strip-extras setup.py --output-file sdk/python/requirements/py3.9-requirements.txt annotated-types==0.7.0 # via pydantic anyio==4.6.2.post1 @@ -33,8 +33,6 @@ dask-expr==1.1.10 # via dask dill==0.3.9 # via feast (setup.py) -exceptiongroup==1.2.2 - # via anyio fastapi==0.115.3 # via feast (setup.py) fsspec==2024.10.0 @@ -52,9 +50,7 @@ idna==3.10 # anyio # requests importlib-metadata==8.5.0 - # via - # dask - # typeguard + # via dask jinja2==3.1.4 # via feast (setup.py) jsonschema==4.23.0 @@ -143,8 +139,6 @@ tenacity==8.5.0 # via feast (setup.py) toml==0.10.2 # via feast (setup.py) -tomli==2.0.2 - # via mypy toolz==1.0.0 # via # dask @@ -155,15 +149,12 @@ typeguard==4.3.0 # via feast (setup.py) typing-extensions==4.12.2 # via - # anyio # fastapi # mypy # pydantic # pydantic-core # sqlalchemy - # starlette # typeguard - # uvicorn tzdata==2024.2 # via pandas urllib3==2.2.3 diff --git a/setup.py b/setup.py index b335d39c2b..f410bf4ef1 100644 --- a/setup.py +++ b/setup.py @@ -148,6 +148,8 @@ QDRANT_REQUIRED = ["qdrant-client>=1.12.0"] +CLICKHOUSE_REQUIRED = ["clickhouse-connect>=0.7.19"] + CI_REQUIRED = ( [ "build", @@ -217,6 +219,7 @@ + OPENTELEMETRY + FAISS_REQUIRED + QDRANT_REQUIRED + + CLICKHOUSE_REQUIRED ) DOCS_REQUIRED = CI_REQUIRED @@ -287,7 +290,8 @@ "singlestore": SINGLESTORE_REQUIRED, "opentelemetry": OPENTELEMETRY, "faiss": FAISS_REQUIRED, - "qdrant": QDRANT_REQUIRED + "qdrant": QDRANT_REQUIRED, + "clickhouse": CLICKHOUSE_REQUIRED, }, include_package_data=True, license="Apache",