Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide the user with more options for setting the to_bigquery config #1661

Merged
merged 5 commits into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 34 additions & 14 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,31 +224,51 @@ def to_df(self):
df = self.client.query(self.query).to_dataframe(create_bqstorage_client=True)
return df

def to_bigquery(self, dry_run=False) -> Optional[str]:
def to_sql(self) -> str:
"""
Returns the SQL query that will be executed in BigQuery to build the historical feature table.
"""
return self.query

def to_bigquery(self, job_config: bigquery.QueryJobConfig = None) -> Optional[str]:
"""
Triggers the execution of a historical feature retrieval query and exports the results to a BigQuery table.

Args:
job_config: An optional bigquery.QueryJobConfig to specify options like destination table, dry run, etc.

Returns:
Returns the destination table name or returns None if job_config.dry_run is True.
"""

@retry(wait=wait_fixed(10), stop=stop_after_delay(1800), reraise=True)
def _block_until_done():
return self.client.get_job(bq_job.job_id).state in ["PENDING", "RUNNING"]

today = date.today().strftime("%Y%m%d")
rand_id = str(uuid.uuid4())[:7]
dataset_project = self.config.offline_store.project_id or self.client.project
path = f"{dataset_project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}"
job_config = bigquery.QueryJobConfig(destination=path, dry_run=dry_run)
bq_job = self.client.query(self.query, job_config=job_config)

if dry_run:
print(
"This query will process {} bytes.".format(bq_job.total_bytes_processed)
if not job_config:
today = date.today().strftime("%Y%m%d")
rand_id = str(uuid.uuid4())[:7]
dataset_project = (
self.config.offline_store.project_id or self.client.project
)
return None
path = f"{dataset_project}.{self.config.offline_store.dataset}.historical_{today}_{rand_id}"
job_config = bigquery.QueryJobConfig(destination=path)

bq_job = self.client.query(self.query, job_config=job_config)

_block_until_done()

if bq_job.exception():
raise bq_job.exception()

print(f"Done writing to '{path}'.")
return path
if job_config.dry_run:
print(
"This query will process {} bytes.".format(bq_job.total_bytes_processed)
)
return None

print(f"Done writing to '{job_config.destination}'.")
codyjlin marked this conversation as resolved.
Show resolved Hide resolved
return str(job_config.destination)


@dataclass(frozen=True)
Expand Down
18 changes: 1 addition & 17 deletions sdk/python/tests/test_historical_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,29 +438,13 @@ def test_historical_features_from_bigquery_sources(
],
)

# Just a dry run, should not create table
bq_dry_run = job_from_sql.to_bigquery(dry_run=True)
assert bq_dry_run is None

bq_temp_table_path = job_from_sql.to_bigquery()
assert bq_temp_table_path.split(".")[0] == gcp_project

if provider_type == "gcp_custom_offline_config":
assert bq_temp_table_path.split(".")[1] == "foo"
else:
assert bq_temp_table_path.split(".")[1] == bigquery_dataset

# Check that this table actually exists
actual_bq_temp_table = bigquery.Client().get_table(bq_temp_table_path)
assert actual_bq_temp_table.table_id == bq_temp_table_path.split(".")[-1]

start_time = datetime.utcnow()
actual_df_from_sql_entities = job_from_sql.to_df()
end_time = datetime.utcnow()
with capsys.disabled():
print(
str(
f"\nTime to execute job_from_df.to_df() = '{(end_time - start_time)}'"
f"\nTime to execute job_from_sql.to_df() = '{(end_time - start_time)}'"
)
)

Expand Down