Skip to content

Commit

Permalink
Simplify BigQuery load jobs (#1935)
Browse files Browse the repository at this point in the history
* Simplify BigQuery load jobs

Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>

* Ensure `pyarrow` supports `use_compliant_nested_type`

Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>
  • Loading branch information
judahrand authored and felixwang9817 committed Oct 28, 2021
1 parent a6d4fa9 commit 17193d2
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 51 deletions.
44 changes: 3 additions & 41 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ def to_bigquery(
job_config = bigquery.QueryJobConfig(destination=path)

if not job_config.dry_run and self.on_demand_feature_views:
job = _write_pyarrow_table_to_bq(
self.client, self.to_arrow(), job_config.destination
job = self.client.load_table_from_dataframe(
self.to_df(), job_config.destination
)
job.result()
print(f"Done writing to '{job_config.destination}'.")
Expand Down Expand Up @@ -366,7 +366,7 @@ def _upload_entity_df_and_get_entity_schema(
elif isinstance(entity_df, pd.DataFrame):
# Drop the index so that we dont have unnecessary columns
entity_df.reset_index(drop=True, inplace=True)
job = _write_df_to_bq(client, entity_df, table_name)
job = client.load_table_from_dataframe(entity_df, table_name)
block_until_done(client, job)
entity_schema = dict(zip(entity_df.columns, entity_df.dtypes))
else:
Expand Down Expand Up @@ -400,44 +400,6 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str]
return client


def _write_df_to_bq(
client: bigquery.Client, df: pd.DataFrame, table_name: str
) -> bigquery.LoadJob:
# It is complicated to get BQ to understand that we want an ARRAY<value_type>
# https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions
# https://github.com/googleapis/python-bigquery/issues/19
writer = pyarrow.BufferOutputStream()
pyarrow.parquet.write_table(
pyarrow.Table.from_pandas(df), writer, use_compliant_nested_type=True
)
return _write_pyarrow_buffer_to_bq(client, writer.getvalue(), table_name,)


def _write_pyarrow_table_to_bq(
client: bigquery.Client, table: pyarrow.Table, table_name: str
) -> bigquery.LoadJob:
# It is complicated to get BQ to understand that we want an ARRAY<value_type>
# https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions
# https://github.com/googleapis/python-bigquery/issues/19
writer = pyarrow.BufferOutputStream()
pyarrow.parquet.write_table(table, writer, use_compliant_nested_type=True)
return _write_pyarrow_buffer_to_bq(client, writer.getvalue(), table_name,)


def _write_pyarrow_buffer_to_bq(
client: bigquery.Client, buf: pyarrow.Buffer, table_name: str
) -> bigquery.LoadJob:
reader = pyarrow.BufferReader(buf)

parquet_options = bigquery.format_options.ParquetOptions()
parquet_options.enable_list_inference = True
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.PARQUET
job_config.parquet_options = parquet_options

return client.load_table_from_file(reader, table_name, job_config=job_config,)


# TODO: Optimizations
# * Use GENERATE_UUID() instead of ROW_NUMBER(), or join on entity columns directly
# * Precompute ROW_NUMBER() so that it doesn't have to be recomputed for every query on entity_dataframe
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
"pandas>=1.0.0",
"pandavro==1.5.*",
"protobuf>=3.10",
"pyarrow>=2.0.0",
"pyarrow>=4.0.0",
"pydantic>=1.0.0",
"PyYAML>=5.4.*",
"tabulate==0.8.*",
Expand All @@ -65,7 +65,7 @@
]

GCP_REQUIRED = [
"google-cloud-bigquery>=2.14.0",
"google-cloud-bigquery>=2.28.1",
"google-cloud-bigquery-storage >= 2.0.0",
"google-cloud-datastore>=2.1.*",
"google-cloud-storage>=1.34.*",
Expand Down Expand Up @@ -111,7 +111,7 @@
"firebase-admin==4.5.2",
"pre-commit",
"assertpy==1.1",
"google-cloud-bigquery>=2.14.0",
"google-cloud-bigquery>=2.28.1",
"google-cloud-bigquery-storage >= 2.0.0",
"google-cloud-datastore>=2.1.*",
"google-cloud-storage>=1.20.*",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@

from feast import BigQuerySource
from feast.data_source import DataSource
from feast.infra.offline_stores.bigquery import (
BigQueryOfflineStoreConfig,
_write_df_to_bq,
)
from feast.infra.offline_stores.bigquery import BigQueryOfflineStoreConfig
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
)
Expand Down Expand Up @@ -69,7 +66,7 @@ def create_data_source(
f"{self.gcp_project}.{self.project_name}.{destination_name}"
)

job = _write_df_to_bq(self.client, df, destination_name)
job = self.client.load_table_from_dataframe(df, destination_name)
job.result()

self.tables.append(destination_name)
Expand Down
3 changes: 1 addition & 2 deletions sdk/python/tests/utils/data_source_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from feast import BigQuerySource, FileSource
from feast.data_format import ParquetFormat
from feast.infra.offline_stores.bigquery import _write_df_to_bq


@contextlib.contextmanager
Expand Down Expand Up @@ -39,7 +38,7 @@ def simple_bq_source_using_table_ref_arg(
client.update_dataset(dataset, ["default_table_expiration_ms"])
table_ref = f"{gcp_project}.{bigquery_dataset}.table_{random.randrange(100, 999)}"

job = _write_df_to_bq(client, df, table_ref)
job = client.load_table_from_dataframe(df, table_ref)
job.result()

return BigQuerySource(
Expand Down

0 comments on commit 17193d2

Please sign in to comment.