diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index bb750a8c97..6753223f2e 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -256,8 +256,8 @@ def to_bigquery( job_config = bigquery.QueryJobConfig(destination=path) if not job_config.dry_run and self.on_demand_feature_views: - job = _write_pyarrow_table_to_bq( - self.client, self.to_arrow(), job_config.destination + job = self.client.load_table_from_dataframe( + self.to_df(), job_config.destination ) job.result() print(f"Done writing to '{job_config.destination}'.") @@ -366,7 +366,7 @@ def _upload_entity_df_and_get_entity_schema( elif isinstance(entity_df, pd.DataFrame): # Drop the index so that we dont have unnecessary columns entity_df.reset_index(drop=True, inplace=True) - job = _write_df_to_bq(client, entity_df, table_name) + job = client.load_table_from_dataframe(entity_df, table_name) block_until_done(client, job) entity_schema = dict(zip(entity_df.columns, entity_df.dtypes)) else: @@ -400,44 +400,6 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str] return client -def _write_df_to_bq( - client: bigquery.Client, df: pd.DataFrame, table_name: str -) -> bigquery.LoadJob: - # It is complicated to get BQ to understand that we want an ARRAY - # https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions - # https://github.com/googleapis/python-bigquery/issues/19 - writer = pyarrow.BufferOutputStream() - pyarrow.parquet.write_table( - pyarrow.Table.from_pandas(df), writer, use_compliant_nested_type=True - ) - return _write_pyarrow_buffer_to_bq(client, writer.getvalue(), table_name,) - - -def _write_pyarrow_table_to_bq( - client: bigquery.Client, table: pyarrow.Table, table_name: str -) -> bigquery.LoadJob: - # It is complicated to get BQ to understand that we want an ARRAY - # https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions - # https://github.com/googleapis/python-bigquery/issues/19 - writer = pyarrow.BufferOutputStream() - pyarrow.parquet.write_table(table, writer, use_compliant_nested_type=True) - return _write_pyarrow_buffer_to_bq(client, writer.getvalue(), table_name,) - - -def _write_pyarrow_buffer_to_bq( - client: bigquery.Client, buf: pyarrow.Buffer, table_name: str -) -> bigquery.LoadJob: - reader = pyarrow.BufferReader(buf) - - parquet_options = bigquery.format_options.ParquetOptions() - parquet_options.enable_list_inference = True - job_config = bigquery.LoadJobConfig() - job_config.source_format = bigquery.SourceFormat.PARQUET - job_config.parquet_options = parquet_options - - return client.load_table_from_file(reader, table_name, job_config=job_config,) - - # TODO: Optimizations # * Use GENERATE_UUID() instead of ROW_NUMBER(), or join on entity columns directly # * Precompute ROW_NUMBER() so that it doesn't have to be recomputed for every query on entity_dataframe diff --git a/sdk/python/setup.py b/sdk/python/setup.py index ea4bb670b8..244728f103 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -53,7 +53,7 @@ "pandas>=1.0.0", "pandavro==1.5.*", "protobuf>=3.10", - "pyarrow>=2.0.0", + "pyarrow>=4.0.0", "pydantic>=1.0.0", "PyYAML>=5.4.*", "tabulate==0.8.*", @@ -65,7 +65,7 @@ ] GCP_REQUIRED = [ - "google-cloud-bigquery>=2.14.0", + "google-cloud-bigquery>=2.28.1", "google-cloud-bigquery-storage >= 2.0.0", "google-cloud-datastore>=2.1.*", "google-cloud-storage>=1.34.*", @@ -111,7 +111,7 @@ "firebase-admin==4.5.2", "pre-commit", "assertpy==1.1", - "google-cloud-bigquery>=2.14.0", + "google-cloud-bigquery>=2.28.1", "google-cloud-bigquery-storage >= 2.0.0", "google-cloud-datastore>=2.1.*", "google-cloud-storage>=1.20.*", diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py index 46e0535d73..766c31150e 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py @@ -6,10 +6,7 @@ from feast import BigQuerySource from feast.data_source import DataSource -from feast.infra.offline_stores.bigquery import ( - BigQueryOfflineStoreConfig, - _write_df_to_bq, -) +from feast.infra.offline_stores.bigquery import BigQueryOfflineStoreConfig from tests.integration.feature_repos.universal.data_source_creator import ( DataSourceCreator, ) @@ -69,7 +66,7 @@ def create_data_source( f"{self.gcp_project}.{self.project_name}.{destination_name}" ) - job = _write_df_to_bq(self.client, df, destination_name) + job = self.client.load_table_from_dataframe(df, destination_name) job.result() self.tables.append(destination_name) diff --git a/sdk/python/tests/utils/data_source_utils.py b/sdk/python/tests/utils/data_source_utils.py index af9663203a..6e3d77ead0 100644 --- a/sdk/python/tests/utils/data_source_utils.py +++ b/sdk/python/tests/utils/data_source_utils.py @@ -7,7 +7,6 @@ from feast import BigQuerySource, FileSource from feast.data_format import ParquetFormat -from feast.infra.offline_stores.bigquery import _write_df_to_bq @contextlib.contextmanager @@ -39,7 +38,7 @@ def simple_bq_source_using_table_ref_arg( client.update_dataset(dataset, ["default_table_expiration_ms"]) table_ref = f"{gcp_project}.{bigquery_dataset}.table_{random.randrange(100, 999)}" - job = _write_df_to_bq(client, df, table_ref) + job = client.load_table_from_dataframe(df, table_ref) job.result() return BigQuerySource(