Skip to content

Commit

Permalink
Make partition metadata available to BigQuery users
Browse files Browse the repository at this point in the history
  • Loading branch information
ran-eh committed Oct 11, 2020
1 parent 7203825 commit 82ddedd
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 2 deletions.
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,4 @@ target/

.DS_Store

# vscode
.vscode/
# vscode.vscode/
14 changes: 14 additions & 0 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,20 @@ def execute(
fetch=fetch
)

@available.parse(lambda *a, **k: ('', empty_table()))
def get_partitions_metadata(
self, table_id: str
) -> Tuple[agate.Table]:
"""Obtain partitions metadata for a BigQuery partitioned table.
:param str table_id: a partitioned table id, in standard SQL format.
:return: a partition metadata tuple, as described in https://cloud.google.com/bigquery/docs/creating-partitioned-tables#getting_partition_metadata_using_meta_tables.
:rtype: agate.Table
"""
return self.connections.get_partitions_metadata(
table_id=table_id
)

###
# Methods that should never be overridden
###
Expand Down
44 changes: 44 additions & 0 deletions plugins/bigquery/dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,50 @@ def execute(self, sql, auto_begin=False, fetch=None):

return status, res

# The following method intentionaly violates DRY, in that it is mostly
# copy-pasted from raw_execute(). This is done in order to discorage
# use of legacySQL queries in DBT, except to obtain partition metadata.
# the method would be removed when partition metadata becomes available
# from standardSQL.
def _raw_execute_legacy_sql(self, sql, fetch=False):
conn = self.get_thread_connection()
client = conn.handle

logger.debug('On {}: {}', conn.name, sql)

job_params = {'use_legacy_sql': True}

priority = conn.credentials.priority
if priority == Priority.Batch:
job_params['priority'] = google.cloud.bigquery.QueryPriority.BATCH
else:
job_params[
'priority'] = google.cloud.bigquery.QueryPriority.INTERACTIVE

maximum_bytes_billed = conn.credentials.maximum_bytes_billed
if maximum_bytes_billed is not None and maximum_bytes_billed != 0:
job_params['maximum_bytes_billed'] = maximum_bytes_billed

def fn():
return self._query_and_results(client, sql, conn, job_params)

query_job, iterator = self._retry_and_handle(msg=sql, conn=conn, fn=fn)

return query_job, iterator

def get_partitions_metadata(self, table_id):
def standard_to_legacy(table_id):
table_ref = google.cloud.bigquery.table.TableReference.from_string(table_id)
return (table_ref.project + ':' + table_ref.dataset_id + '.' + table_ref.table_id).replace('`','')

legacy_sql = 'SELECT * FROM [' + standard_to_legacy(table_id) + '$__PARTITIONS_SUMMARY__]'

sql = self._add_query_comment(legacy_sql)
# auto_begin is ignored on bigquery, and only included for consistency
_, iterator = self._raw_execute_legacy_sql(sql, fetch='fetch_result')

return self.get_table_from_response(iterator)

def create_bigquery_table(self, database, schema, table_name, callback,
sql):
"""Create a bigquery table. The caller must supply a callback
Expand Down
8 changes: 8 additions & 0 deletions plugins/bigquery/dbt/include/bigquery/macros/etc.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,11 @@
{% macro grant_access_to(entity, entity_type, role, grant_target_dict) -%}
{% do adapter.grant_access_to(entity, entity_type, role, grant_target_dict) %}
{% endmacro %}

{%- macro get_partitions_metadata(table) -%}
{%- if execute -%}
{%- set res = adapter.get_partitions_metadata(table) -%}
{{- return(res) -}}
{%- endif -%}
{{- return(None) -}}
{%- endmacro -%}

0 comments on commit 82ddedd

Please sign in to comment.