From fdf57222b6f9a36f4bd8bc0b8fdf95e1ded8ec09 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Mon, 20 Sep 2021 10:36:01 +0100 Subject: [PATCH 1/7] Enable test for list features in BQ Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> --- .../tests/integration/registration/test_universal_types.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/sdk/python/tests/integration/registration/test_universal_types.py b/sdk/python/tests/integration/registration/test_universal_types.py index dc175ab7d9..4411421b85 100644 --- a/sdk/python/tests/integration/registration/test_universal_types.py +++ b/sdk/python/tests/integration/registration/test_universal_types.py @@ -36,13 +36,6 @@ def populate_test_configs(offline: bool): # For offline tests, don't need to vary for online store if offline and test_repo_config.online_store == REDIS_CONFIG: continue - # TODO(https://github.com/feast-dev/feast/issues/1839): Fix BQ materialization of list features - if ( - not offline - and test_repo_config.provider == "gcp" - and feature_is_list is True - ): - continue configs.append( TypeTestConfig( entity_type=entity_type, From 268075e0bbe8fb76f870500ca91f4eb51ba697ae Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Mon, 20 Sep 2021 18:17:51 +0100 Subject: [PATCH 2/7] Remove specifc handling of BQ list types Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> --- .../registration/test_universal_types.py | 30 +++++-------------- 1 file changed, 8 insertions(+), 22 deletions(-) diff --git a/sdk/python/tests/integration/registration/test_universal_types.py b/sdk/python/tests/integration/registration/test_universal_types.py index 4411421b85..e747ba23f9 100644 --- a/sdk/python/tests/integration/registration/test_universal_types.py +++ b/sdk/python/tests/integration/registration/test_universal_types.py @@ -248,16 +248,10 @@ def assert_feature_list_types( "bool": "bool", } assert str(historical_features_df.dtypes["value"]) == "object" - if provider == "gcp": - assert ( - feature_list_dtype_to_expected_historical_feature_list_dtype[feature_dtype] - in type(historical_features_df.value[0]["list"][0]["item"]).__name__ - ) - else: - assert ( - feature_list_dtype_to_expected_historical_feature_list_dtype[feature_dtype] - in type(historical_features_df.value[0][0]).__name__ - ) + assert ( + feature_list_dtype_to_expected_historical_feature_list_dtype[feature_dtype] + in type(historical_features_df.value[0][0]).__name__ + ) def assert_expected_arrow_types( @@ -280,18 +274,10 @@ def assert_expected_arrow_types( feature_dtype ] if feature_is_list: - if provider == "gcp": - assert str( - historical_features_arrow.schema.field_by_name("value").type - ) in [ - f"struct> not null>", - f"struct>>", - ] - else: - assert ( - str(historical_features_arrow.schema.field_by_name("value").type) - == f"list" - ) + assert ( + str(historical_features_arrow.schema.field_by_name("value").type) + == f"list" + ) else: assert ( str(historical_features_arrow.schema.field_by_name("value").type) From 6279b9d1f7035e24653ac538397eee680910a6d3 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Mon, 20 Sep 2021 18:19:15 +0100 Subject: [PATCH 3/7] Enable Parquet list inference for BigQuery Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> --- sdk/python/feast/infra/offline_stores/bigquery.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index c3c5096d96..f788df2588 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -223,10 +223,14 @@ def to_bigquery( if not job_config.dry_run and self.on_demand_feature_views is not None: transformed_df = self.to_df() + # https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions + # https://github.com/googleapis/python-bigquery/issues/19 + parquet_options = bigquery.format_options.ParquetOptions() + parquet_options.enable_list_inference = True + job_config = bigquery.LoadJobConfig() + job_config.parquet_options = parquet_options job = self.client.load_table_from_dataframe( - transformed_df, - job_config.destination, - job_config=bigquery.LoadJobConfig(), + transformed_df, job_config.destination, job_config=job_config, ) job.result() print(f"Done writing to '{job_config.destination}'.") @@ -333,7 +337,12 @@ def _upload_entity_df_and_get_entity_schema( entity_df.reset_index(drop=True, inplace=True) # Upload the dataframe into BigQuery, creating a temporary table + # https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions + # https://github.com/googleapis/python-bigquery/issues/19 + parquet_options = bigquery.format_options.ParquetOptions() + parquet_options.enable_list_inference = True job_config = bigquery.LoadJobConfig() + job_config.parquet_options = parquet_options job = client.load_table_from_dataframe( entity_df, table_name, job_config=job_config ) From 5d35ec5942da4f9d469faf09434b3dbe328d601b Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Mon, 20 Sep 2021 19:54:09 +0100 Subject: [PATCH 4/7] Enable `use_compliant_nested_type` Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> --- .../feast/infra/offline_stores/bigquery.py | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index f788df2588..8d7319eb31 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -222,15 +222,23 @@ def to_bigquery( job_config = bigquery.QueryJobConfig(destination=path) if not job_config.dry_run and self.on_demand_feature_views is not None: - transformed_df = self.to_df() + # It is complicated to get BQ to understand that we want an ARRAY # https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions # https://github.com/googleapis/python-bigquery/issues/19 + writer = pyarrow.BufferOutputStream() + pyarrow.parquet.write_table( + self.to_arrow(), writer, use_compliant_nested_type=True + ) + reader = pyarrow.BufferReader(writer.getvalue()) + parquet_options = bigquery.format_options.ParquetOptions() parquet_options.enable_list_inference = True job_config = bigquery.LoadJobConfig() + job_config.source_format = bigquery.SourceFormat.PARQUET job_config.parquet_options = parquet_options - job = self.client.load_table_from_dataframe( - transformed_df, job_config.destination, job_config=job_config, + + job = self.client.load_table_from_file( + reader, job_config.destination, job_config=job_config, ) job.result() print(f"Done writing to '{job_config.destination}'.") @@ -337,15 +345,21 @@ def _upload_entity_df_and_get_entity_schema( entity_df.reset_index(drop=True, inplace=True) # Upload the dataframe into BigQuery, creating a temporary table + # It is complicated to get BQ to understand that we want an ARRAY # https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions # https://github.com/googleapis/python-bigquery/issues/19 + writer = pyarrow.BufferOutputStream() + pyarrow.parquet.write_table( + pyarrow.Table.from_pandas(entity_df), writer, use_compliant_nested_type=True + ) + reader = pyarrow.BufferReader(writer.getvalue()) + parquet_options = bigquery.format_options.ParquetOptions() parquet_options.enable_list_inference = True job_config = bigquery.LoadJobConfig() + job_config.source_format = bigquery.SourceFormat.PARQUET job_config.parquet_options = parquet_options - job = client.load_table_from_dataframe( - entity_df, table_name, job_config=job_config - ) + job = client.load_table_from_file(reader, table_name, job_config=job_config) block_until_done(client, job) entity_schema = dict(zip(entity_df.columns, entity_df.dtypes)) From 81c8abc79875baa977841b83884cbc96be06cd0a Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Mon, 20 Sep 2021 20:43:30 +0100 Subject: [PATCH 5/7] Add potentially missing import Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> --- sdk/python/feast/infra/offline_stores/bigquery.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 8d7319eb31..8aef92abe6 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -5,6 +5,7 @@ import numpy as np import pandas as pd import pyarrow +import pyarrow.parquet from pydantic import StrictStr from pydantic.typing import Literal from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed From db6f607c4c0ae4a34d3e58ba37d531d61a2ef799 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Mon, 20 Sep 2021 21:39:56 +0100 Subject: [PATCH 6/7] Upload all data to BQ in ARRAY safe manner Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> --- .../feast/infra/offline_stores/bigquery.py | 75 ++++++++++--------- .../universal/data_sources/bigquery.py | 10 +-- .../test_historical_retrieval.py | 14 ++-- sdk/python/tests/utils/data_source_utils.py | 5 +- 4 files changed, 55 insertions(+), 49 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 8aef92abe6..3e1317626a 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -223,23 +223,8 @@ def to_bigquery( job_config = bigquery.QueryJobConfig(destination=path) if not job_config.dry_run and self.on_demand_feature_views is not None: - # It is complicated to get BQ to understand that we want an ARRAY - # https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions - # https://github.com/googleapis/python-bigquery/issues/19 - writer = pyarrow.BufferOutputStream() - pyarrow.parquet.write_table( - self.to_arrow(), writer, use_compliant_nested_type=True - ) - reader = pyarrow.BufferReader(writer.getvalue()) - - parquet_options = bigquery.format_options.ParquetOptions() - parquet_options.enable_list_inference = True - job_config = bigquery.LoadJobConfig() - job_config.source_format = bigquery.SourceFormat.PARQUET - job_config.parquet_options = parquet_options - - job = self.client.load_table_from_file( - reader, job_config.destination, job_config=job_config, + job = _write_pyarrow_table_to_bq( + self.client, self.to_arrow(), job_config.destination ) job.result() print(f"Done writing to '{job_config.destination}'.") @@ -344,23 +329,7 @@ def _upload_entity_df_and_get_entity_schema( elif isinstance(entity_df, pd.DataFrame): # Drop the index so that we dont have unnecessary columns entity_df.reset_index(drop=True, inplace=True) - - # Upload the dataframe into BigQuery, creating a temporary table - # It is complicated to get BQ to understand that we want an ARRAY - # https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions - # https://github.com/googleapis/python-bigquery/issues/19 - writer = pyarrow.BufferOutputStream() - pyarrow.parquet.write_table( - pyarrow.Table.from_pandas(entity_df), writer, use_compliant_nested_type=True - ) - reader = pyarrow.BufferReader(writer.getvalue()) - - parquet_options = bigquery.format_options.ParquetOptions() - parquet_options.enable_list_inference = True - job_config = bigquery.LoadJobConfig() - job_config.source_format = bigquery.SourceFormat.PARQUET - job_config.parquet_options = parquet_options - job = client.load_table_from_file(reader, table_name, job_config=job_config) + job = _write_df_to_bq(client, entity_df, table_name) block_until_done(client, job) entity_schema = dict(zip(entity_df.columns, entity_df.dtypes)) @@ -395,6 +364,44 @@ def _get_bigquery_client(project: Optional[str] = None): return client +def _write_df_to_bq( + client: bigquery.Client, df: pd.DataFrame, table_name: str +) -> bigquery.LoadJob: + # It is complicated to get BQ to understand that we want an ARRAY + # https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions + # https://github.com/googleapis/python-bigquery/issues/19 + writer = pyarrow.BufferOutputStream() + pyarrow.parquet.write_table( + pyarrow.Table.from_pandas(df), writer, use_compliant_nested_type=True + ) + return _write_pyarrow_buffer_to_bq(client, writer.getvalue(), table_name,) + + +def _write_pyarrow_table_to_bq( + client: bigquery.Client, table: pyarrow.Table, table_name: str +) -> bigquery.LoadJob: + # It is complicated to get BQ to understand that we want an ARRAY + # https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions + # https://github.com/googleapis/python-bigquery/issues/19 + writer = pyarrow.BufferOutputStream() + pyarrow.parquet.write_table(table, writer, use_compliant_nested_type=True) + return _write_pyarrow_buffer_to_bq(client, writer.getvalue(), table_name,) + + +def _write_pyarrow_buffer_to_bq( + client: bigquery.Client, buf: pyarrow.Buffer, table_name: str +) -> bigquery.LoadJob: + reader = pyarrow.BufferReader(buf) + + parquet_options = bigquery.format_options.ParquetOptions() + parquet_options.enable_list_inference = True + job_config = bigquery.LoadJobConfig() + job_config.source_format = bigquery.SourceFormat.PARQUET + job_config.parquet_options = parquet_options + + return client.load_table_from_file(reader, table_name, job_config=job_config,) + + # TODO: Optimizations # * Use GENERATE_UUID() instead of ROW_NUMBER(), or join on entity columns directly # * Precompute ROW_NUMBER() so that it doesn't have to be recomputed for every query on entity_dataframe diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py index 228e9959d5..46e0535d73 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py @@ -6,7 +6,10 @@ from feast import BigQuerySource from feast.data_source import DataSource -from feast.infra.offline_stores.bigquery import BigQueryOfflineStoreConfig +from feast.infra.offline_stores.bigquery import ( + BigQueryOfflineStoreConfig, + _write_df_to_bq, +) from tests.integration.feature_repos.universal.data_source_creator import ( DataSourceCreator, ) @@ -61,15 +64,12 @@ def create_data_source( self.create_dataset() - job_config = bigquery.LoadJobConfig() if self.gcp_project not in destination_name: destination_name = ( f"{self.gcp_project}.{self.project_name}.{destination_name}" ) - job = self.client.load_table_from_dataframe( - df, destination_name, job_config=job_config - ) + job = _write_df_to_bq(self.client, df, destination_name) job.result() self.tables.append(destination_name) diff --git a/sdk/python/tests/integration/offline_store/test_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_historical_retrieval.py index 44f9e595e3..2f1377dd7d 100644 --- a/sdk/python/tests/integration/offline_store/test_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_historical_retrieval.py @@ -19,7 +19,10 @@ from feast.feature import Feature from feast.feature_store import FeatureStore, _validate_feature_refs from feast.feature_view import FeatureView -from feast.infra.offline_stores.bigquery import BigQueryOfflineStoreConfig +from feast.infra.offline_stores.bigquery import ( + BigQueryOfflineStoreConfig, + _write_df_to_bq, +) from feast.infra.offline_stores.offline_utils import ( DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, ) @@ -62,9 +65,8 @@ def stage_driver_hourly_stats_parquet_source(directory, df): def stage_driver_hourly_stats_bigquery_source(df, table_id): client = bigquery.Client() - job_config = bigquery.LoadJobConfig() df.reset_index(drop=True, inplace=True) - job = client.load_table_from_dataframe(df, table_id, job_config=job_config) + job = _write_df_to_bq(client, df, table_id) job.result() @@ -99,9 +101,8 @@ def feature_service(name: str, views) -> FeatureService: def stage_customer_daily_profile_bigquery_source(df, table_id): client = bigquery.Client() - job_config = bigquery.LoadJobConfig() df.reset_index(drop=True, inplace=True) - job = client.load_table_from_dataframe(df, table_id, job_config=job_config) + job = _write_df_to_bq(client, df, table_id) job.result() @@ -231,9 +232,8 @@ def get_expected_training_df( def stage_orders_bigquery(df, table_id): client = bigquery.Client() - job_config = bigquery.LoadJobConfig() df.reset_index(drop=True, inplace=True) - job = client.load_table_from_dataframe(df, table_id, job_config=job_config) + job = _write_df_to_bq(client, df, table_id) job.result() diff --git a/sdk/python/tests/utils/data_source_utils.py b/sdk/python/tests/utils/data_source_utils.py index 17ab06365e..af9663203a 100644 --- a/sdk/python/tests/utils/data_source_utils.py +++ b/sdk/python/tests/utils/data_source_utils.py @@ -7,6 +7,7 @@ from feast import BigQuerySource, FileSource from feast.data_format import ParquetFormat +from feast.infra.offline_stores.bigquery import _write_df_to_bq @contextlib.contextmanager @@ -38,9 +39,7 @@ def simple_bq_source_using_table_ref_arg( client.update_dataset(dataset, ["default_table_expiration_ms"]) table_ref = f"{gcp_project}.{bigquery_dataset}.table_{random.randrange(100, 999)}" - job = client.load_table_from_dataframe( - df, table_ref, job_config=bigquery.LoadJobConfig() - ) + job = _write_df_to_bq(client, df, table_ref) job.result() return BigQuerySource( From f90aa173ef73d6555badacd608b26a7099dbfc04 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Mon, 20 Sep 2021 23:52:32 +0100 Subject: [PATCH 7/7] Handle empty list in `python_value_to_proto_value` Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> --- sdk/python/feast/type_map.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 7646ad0c90..69d2b912cf 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -277,11 +277,16 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue: def python_value_to_proto_value( value: Any, feature_type: ValueType = None ) -> ProtoValue: - value_type = ( - python_type_to_feast_value_type("", value) - if value is not None - else feature_type - ) + value_type = feature_type + if value is not None: + if isinstance(value, (list, np.ndarray)): + value_type = ( + feature_type + if len(value) == 0 + else python_type_to_feast_value_type("", value) + ) + else: + value_type = python_type_to_feast_value_type("", value) return _python_value_to_proto_value(value_type, value)