From ccc59f8b231d0466944089e5a39f146dd67ff232 Mon Sep 17 00:00:00 2001 From: Cody Lin Date: Tue, 22 Jun 2021 14:41:21 -0700 Subject: [PATCH 1/5] Provide more options for to_bigquery config Signed-off-by: Cody Lin --- .../feast/infra/offline_stores/bigquery.py | 19 ++++++-------- sdk/python/tests/test_historical_retrieval.py | 26 ++++++++++++++++--- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index f331d1b768..1fc209b0eb 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -1,7 +1,6 @@ import time -import uuid from dataclasses import asdict, dataclass -from datetime import date, datetime, timedelta +from datetime import datetime, timedelta from typing import List, Optional, Set, Union import pandas @@ -224,19 +223,17 @@ def to_df(self): df = self.client.query(self.query).to_dataframe(create_bqstorage_client=True) return df - def to_bigquery(self, dry_run=False) -> Optional[str]: + def to_sql(self): + return self.query + + def to_bigquery(self, job_config=None) -> Optional[str]: @retry(wait=wait_fixed(10), stop=stop_after_delay(1800), reraise=True) def _block_until_done(): return self.client.get_job(bq_job.job_id).state in ["PENDING", "RUNNING"] - today = date.today().strftime("%Y%m%d") - rand_id = str(uuid.uuid4())[:7] - dataset_project = self.config.offline_store.project_id or self.client.project - path = f"{dataset_project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}" - job_config = bigquery.QueryJobConfig(destination=path, dry_run=dry_run) bq_job = self.client.query(self.query, job_config=job_config) - if dry_run: + if job_config.dry_run: print( "This query will process {} bytes.".format(bq_job.total_bytes_processed) ) @@ -247,8 +244,8 @@ def _block_until_done(): if bq_job.exception(): raise bq_job.exception() - print(f"Done writing to '{path}'.") - return path + print(f"Done writing to '{job_config.destination}'.") + return str(job_config.destination) @dataclass(frozen=True) diff --git a/sdk/python/tests/test_historical_retrieval.py b/sdk/python/tests/test_historical_retrieval.py index 656dd589e9..4a48305aff 100644 --- a/sdk/python/tests/test_historical_retrieval.py +++ b/sdk/python/tests/test_historical_retrieval.py @@ -9,6 +9,7 @@ import numpy as np import pandas as pd import pytest +from google.api_core.exceptions import Conflict from google.cloud import bigquery from pandas.testing import assert_frame_equal from pytz import utc @@ -439,12 +440,16 @@ def test_historical_features_from_bigquery_sources( ) # Just a dry run, should not create table - bq_dry_run = job_from_sql.to_bigquery(dry_run=True) + job_config = bigquery.QueryJobConfig(dry_run=True) + bq_dry_run = job_from_sql.to_bigquery(job_config=job_config) assert bq_dry_run is None - bq_temp_table_path = job_from_sql.to_bigquery() - assert bq_temp_table_path.split(".")[0] == gcp_project + path = f"{gcp_project}.{bigquery_dataset}.historical_dest_table" + job_config = bigquery.QueryJobConfig(destination=path) + bq_temp_table_path = job_from_sql.to_bigquery(job_config=job_config) + # Check return output of to_bigquery() + assert bq_temp_table_path.split(".")[0] == gcp_project if provider_type == "gcp_custom_offline_config": assert bq_temp_table_path.split(".")[1] == "foo" else: @@ -454,13 +459,26 @@ def test_historical_features_from_bigquery_sources( actual_bq_temp_table = bigquery.Client().get_table(bq_temp_table_path) assert actual_bq_temp_table.table_id == bq_temp_table_path.split(".")[-1] + # Config to overwrite an existing table should succeed + job_config = bigquery.QueryJobConfig( + destination=path, write_disposition="WRITE_TRUNCATE" + ) + bq_temp_table_path = job_from_sql.to_bigquery(job_config=job_config) + + # Config to fail on the existing table we created should fail + with pytest.raises(Conflict): + job_config = bigquery.QueryJobConfig( + destination=path, write_disposition="WRITE_EMPTY" + ) + bq_temp_table_path = job_from_sql.to_bigquery(job_config=job_config) + start_time = datetime.utcnow() actual_df_from_sql_entities = job_from_sql.to_df() end_time = datetime.utcnow() with capsys.disabled(): print( str( - f"\nTime to execute job_from_df.to_df() = '{(end_time - start_time)}'" + f"\nTime to execute job_from_sql.to_df() = '{(end_time - start_time)}'" ) ) From 30b7b32d9dd88ecd5aa5632d267f2ecead892540 Mon Sep 17 00:00:00 2001 From: Cody Lin Date: Wed, 23 Jun 2021 14:41:04 -0700 Subject: [PATCH 2/5] Fix default job_config when none; remove excessive testing Signed-off-by: Cody Lin --- .../feast/infra/offline_stores/bigquery.py | 22 ++++++++---- sdk/python/tests/test_historical_retrieval.py | 34 ------------------- 2 files changed, 16 insertions(+), 40 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 1fc209b0eb..a0bfdbd224 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -1,6 +1,7 @@ import time +import uuid from dataclasses import asdict, dataclass -from datetime import datetime, timedelta +from datetime import date, datetime, timedelta from typing import List, Optional, Set, Union import pandas @@ -231,19 +232,28 @@ def to_bigquery(self, job_config=None) -> Optional[str]: def _block_until_done(): return self.client.get_job(bq_job.job_id).state in ["PENDING", "RUNNING"] + if not job_config: + today = date.today().strftime("%Y%m%d") + rand_id = str(uuid.uuid4())[:7] + dataset_project = ( + self.config.offline_store.project_id or self.client.project + ) + path = f"{dataset_project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}" + job_config = bigquery.QueryJobConfig(destination=path) + bq_job = self.client.query(self.query, job_config=job_config) + _block_until_done() + + if bq_job.exception(): + raise bq_job.exception() + if job_config.dry_run: print( "This query will process {} bytes.".format(bq_job.total_bytes_processed) ) return None - _block_until_done() - - if bq_job.exception(): - raise bq_job.exception() - print(f"Done writing to '{job_config.destination}'.") return str(job_config.destination) diff --git a/sdk/python/tests/test_historical_retrieval.py b/sdk/python/tests/test_historical_retrieval.py index 4a48305aff..08fb1d206d 100644 --- a/sdk/python/tests/test_historical_retrieval.py +++ b/sdk/python/tests/test_historical_retrieval.py @@ -9,7 +9,6 @@ import numpy as np import pandas as pd import pytest -from google.api_core.exceptions import Conflict from google.cloud import bigquery from pandas.testing import assert_frame_equal from pytz import utc @@ -439,39 +438,6 @@ def test_historical_features_from_bigquery_sources( ], ) - # Just a dry run, should not create table - job_config = bigquery.QueryJobConfig(dry_run=True) - bq_dry_run = job_from_sql.to_bigquery(job_config=job_config) - assert bq_dry_run is None - - path = f"{gcp_project}.{bigquery_dataset}.historical_dest_table" - job_config = bigquery.QueryJobConfig(destination=path) - bq_temp_table_path = job_from_sql.to_bigquery(job_config=job_config) - - # Check return output of to_bigquery() - assert bq_temp_table_path.split(".")[0] == gcp_project - if provider_type == "gcp_custom_offline_config": - assert bq_temp_table_path.split(".")[1] == "foo" - else: - assert bq_temp_table_path.split(".")[1] == bigquery_dataset - - # Check that this table actually exists - actual_bq_temp_table = bigquery.Client().get_table(bq_temp_table_path) - assert actual_bq_temp_table.table_id == bq_temp_table_path.split(".")[-1] - - # Config to overwrite an existing table should succeed - job_config = bigquery.QueryJobConfig( - destination=path, write_disposition="WRITE_TRUNCATE" - ) - bq_temp_table_path = job_from_sql.to_bigquery(job_config=job_config) - - # Config to fail on the existing table we created should fail - with pytest.raises(Conflict): - job_config = bigquery.QueryJobConfig( - destination=path, write_disposition="WRITE_EMPTY" - ) - bq_temp_table_path = job_from_sql.to_bigquery(job_config=job_config) - start_time = datetime.utcnow() actual_df_from_sql_entities = job_from_sql.to_df() end_time = datetime.utcnow() From 0b5f6ed9887a33383dbb563ec059627cb5438c25 Mon Sep 17 00:00:00 2001 From: Cody Lin Date: Thu, 24 Jun 2021 09:39:40 -0700 Subject: [PATCH 3/5] Add param type and docstring Signed-off-by: Cody Lin --- sdk/python/feast/infra/offline_stores/bigquery.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index a0bfdbd224..3af471f1ff 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -227,7 +227,17 @@ def to_df(self): def to_sql(self): return self.query - def to_bigquery(self, job_config=None) -> Optional[str]: + def to_bigquery(self, job_config: bigquery.QueryJobConfig = None) -> Optional[str]: + """ + Uploads the results of the BigQueryRetrievalJob directly to BigQuery + + Args: + job_config: A bigquery.QueryJobConfig to specify options like destination table, dry run, etc. + + Returns: + Returns either the destination table name, none if job_config.dry_run, or raises an exception if job fails + """ + @retry(wait=wait_fixed(10), stop=stop_after_delay(1800), reraise=True) def _block_until_done(): return self.client.get_job(bq_job.job_id).state in ["PENDING", "RUNNING"] From e87dad093fefff98eb33110659d942531d2b2676 Mon Sep 17 00:00:00 2001 From: Cody Lin Date: Thu, 24 Jun 2021 15:55:48 -0700 Subject: [PATCH 4/5] add docstrings and typing Signed-off-by: Cody Lin --- sdk/python/feast/infra/offline_stores/bigquery.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 3af471f1ff..ae80a0bc30 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -224,12 +224,15 @@ def to_df(self): df = self.client.query(self.query).to_dataframe(create_bqstorage_client=True) return df - def to_sql(self): + def to_sql(self) -> str: + """ + Returns the SQL query that can be modified and run by the user to create the BQ table. + """ return self.query def to_bigquery(self, job_config: bigquery.QueryJobConfig = None) -> Optional[str]: """ - Uploads the results of the BigQueryRetrievalJob directly to BigQuery + Uploads the results of the BigQueryRetrievalJob directly to BigQuery. Args: job_config: A bigquery.QueryJobConfig to specify options like destination table, dry run, etc. From e6faf279935996c869e4d24b9a6bd337b265279a Mon Sep 17 00:00:00 2001 From: codyjlin <31944154+codyjlin@users.noreply.github.com> Date: Fri, 25 Jun 2021 19:27:22 -0400 Subject: [PATCH 5/5] Apply docstring suggestions from code review Co-authored-by: Willem Pienaar <6728866+woop@users.noreply.github.com> Signed-off-by: Cody Lin --- sdk/python/feast/infra/offline_stores/bigquery.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index ae80a0bc30..ff98578849 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -226,19 +226,19 @@ def to_df(self): def to_sql(self) -> str: """ - Returns the SQL query that can be modified and run by the user to create the BQ table. + Returns the SQL query that will be executed in BigQuery to build the historical feature table. """ return self.query def to_bigquery(self, job_config: bigquery.QueryJobConfig = None) -> Optional[str]: """ - Uploads the results of the BigQueryRetrievalJob directly to BigQuery. + Triggers the execution of a historical feature retrieval query and exports the results to a BigQuery table. Args: - job_config: A bigquery.QueryJobConfig to specify options like destination table, dry run, etc. + job_config: An optional bigquery.QueryJobConfig to specify options like destination table, dry run, etc. Returns: - Returns either the destination table name, none if job_config.dry_run, or raises an exception if job fails + Returns the destination table name or returns None if job_config.dry_run is True. """ @retry(wait=wait_fixed(10), stop=stop_after_delay(1800), reraise=True)