diff --git a/core/dbt/deprecations.py b/core/dbt/deprecations.py index 1d1d31ff94c..3cfeea14491 100644 --- a/core/dbt/deprecations.py +++ b/core/dbt/deprecations.py @@ -118,7 +118,7 @@ class BigQueryPartitionByStringDeprecation(DBTDeprecation): - Provided partition_by: {raw_partition_by} - + - dbt inferred: {inferred_partition_by} @@ -127,6 +127,7 @@ class BigQueryPartitionByStringDeprecation(DBTDeprecation): https://docs.getdbt.com/docs/upgrading-to-0-16-0 ''' + _adapter_renamed_description = """\ The adapter function `adapter.{old_name}` is deprecated and will be removed in a future release of dbt. Please use `adapter.{new_name}` instead. diff --git a/plugins/bigquery/dbt/adapters/bigquery/connections.py b/plugins/bigquery/dbt/adapters/bigquery/connections.py index 261b050acc0..b8061512fd0 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/connections.py +++ b/plugins/bigquery/dbt/adapters/bigquery/connections.py @@ -232,8 +232,8 @@ def execute(self, sql, auto_begin=False, fetch=None): status = 'CREATE TABLE ({})'.format(table.num_rows) elif query_job.statement_type == 'SCRIPT': - billed = query_job.total_bytes_billed - status = 'SCRIPT ({} billed)'.format(dbt.utils.format_bytes(billed)) + processed = dbt.utils.format_bytes(query_job.total_bytes_processed) + status = f'SCRIPT ({processed} processed)' elif query_job.statement_type in ['INSERT', 'DELETE', 'MERGE']: status = '{} ({})'.format( @@ -242,7 +242,6 @@ def execute(self, sql, auto_begin=False, fetch=None): ) else: - import ipdb; ipdb.set_trace() status = 'OK' return status, res diff --git a/plugins/bigquery/dbt/adapters/bigquery/impl.py b/plugins/bigquery/dbt/adapters/bigquery/impl.py index c304bf9d109..61803fdfa08 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/impl.py +++ b/plugins/bigquery/dbt/adapters/bigquery/impl.py @@ -403,7 +403,7 @@ def execute_model(self, model, materialization, sql_override=None, if flags.STRICT_MODE: connection = self.connections.get_thread_connection() if not isinstance(connection, Connection): - raise dbt.exceptions.CompilerException( + dbt.exceptions.raise_compiler_error( f'Got {connection} - not a Connection!' ) model_uid = model.get('unique_id') @@ -423,29 +423,29 @@ def execute_model(self, model, materialization, sql_override=None, return res - def _partitions_match(self, table, conf_partition): + def _partitions_match(self, table, conf_partition: Dict[str, Any]): """ Check if the actual and configured partitions for a table are a match. BigQuery tables can be replaced if: - Both tables are not partitioned, OR - Both tables are partitioned using the exact same configs - + If there is a mismatch, then the table cannot be replaced directly. """ - if table.partitioning_type is None and conf_partition is None: - return True + is_partitioned = (table.range_partitioning or table.time_partitioning) - elif conf_partition is None or table.partitioning_type is None: - return False + if not is_partitioned and not conf_partition: + return True - elif table.partitioning_type == 'DAY': - return table.time_partitioning.field == conf_partition.get('field') + if table.time_partitioning is not None: + table_field = table.time_partitioning.field + return table_field == conf_partition.get('field') - elif table.partitioning_type == 'RANGE': # TODO + elif table.range_partitioning is not None: dest_part = table.range_partition.range_ - conf_part = conf_partition.get('range') + conf_part = conf_partition.get('range', {}) - return dest_part.field == conf_part.get('field') \ + return dest_part.field == conf_partition.get('field') \ and dest_part.start == conf_part.get('start') \ and dest_part.end == conf_part.get('end') \ and dest_part.interval == conf_part.get('interval') @@ -465,7 +465,7 @@ def _clusters_match(self, table, conf_cluster): return table.clustering_fields == conf_cluster @available.parse(lambda *a, **k: True) - def is_replaceable(self, relation, conf_partition, conf_cluster): + def is_replaceable(self, relation, conf_partition: dict, conf_cluster): """ Check if a given partition and clustering column spec for a table can replace an existing relation in the database. BigQuery does not @@ -491,13 +491,13 @@ def is_replaceable(self, relation, conf_partition, conf_cluster): )) @available - def parse_partition_by(self, raw_partition_by): + def parse_partition_by(self, raw_partition_by: Any): """ dbt v0.16.0 expects `partition_by` to be a dictionary where previously it was a string. Check the type of `partition_by`, raise error or warning if string, and attempt to convert to dict. """ - + if isinstance(raw_partition_by, dict): if raw_partition_by.get('field'): if raw_partition_by.get('data_type'): @@ -521,8 +521,8 @@ def parse_partition_by(self, raw_partition_by): re.IGNORECASE) if not matches: dbt.exceptions.raise_compiler_error( - f"Specified partition_by '{raw_partition_by}' " - "is not parseable") + f"Specified partition_by '{raw_partition_by}' " + "is not parseable") partition_by = matches.group(1) data_type = 'timestamp' diff --git a/test/integration/022_bigquery_test/partition-models/my_model.sql b/test/integration/022_bigquery_test/partition-models/my_model.sql new file mode 100644 index 00000000000..610daf0b196 --- /dev/null +++ b/test/integration/022_bigquery_test/partition-models/my_model.sql @@ -0,0 +1,11 @@ + + +{{ + config( + materialized="table", + partition_by=var('partition_by'), + cluster_by=var('cluster_by') + ) +}} + +select 1 as id, 'dr. bigquery' as name, current_timestamp() as cur_time, current_date() as cur_date diff --git a/test/integration/022_bigquery_test/scripting-models/incremental_range.sql b/test/integration/022_bigquery_test/scripting-models/incremental_range.sql new file mode 100644 index 00000000000..4c1725832ba --- /dev/null +++ b/test/integration/022_bigquery_test/scripting-models/incremental_range.sql @@ -0,0 +1,39 @@ + +{{ + config( + materialized="incremental", + unique_key="id", + cluster_by="id", + partition_by={ + "field": "id", + "data_type": "int64", + "range": { + "start": 1, + "end": 10, + "interval": 1 + } + } + ) +}} + + +with data as ( + select 1 as id, current_date() as ts union all + select 2 as id, current_date() as ts union all + select 3 as id, current_date() as ts union all + select 4 as id, current_date() as ts + + {% if is_incremental() %} + union all + select 5 as id, current_date() as ts union all + select 6 as id, current_date() as ts union all + select 7 as id, current_date() as ts union all + select 8 as id, current_date() as ts + {% endif %} +) + +select * from data + +{% if is_incremental() %} +where id > _dbt_max_partition +{% endif %} diff --git a/test/integration/022_bigquery_test/scripting-models/incremental_time.sql b/test/integration/022_bigquery_test/scripting-models/incremental_time.sql new file mode 100644 index 00000000000..fe6f7d313ed --- /dev/null +++ b/test/integration/022_bigquery_test/scripting-models/incremental_time.sql @@ -0,0 +1,34 @@ + +{{ + config( + materialized="incremental", + unique_key="id", + cluster_by="id", + partition_by={ + "field": "ts", + "data_type": "timestamp" + } + ) +}} + + +with data as ( + select 1 as id, current_timestamp() as ts union all + select 2 as id, current_timestamp() as ts union all + select 3 as id, current_timestamp() as ts union all + select 4 as id, current_timestamp() as ts + + {% if is_incremental() %} + union all + select 5 as id, current_timestamp() as ts union all + select 6 as id, current_timestamp() as ts union all + select 7 as id, current_timestamp() as ts union all + select 8 as id, current_timestamp() as ts + {% endif %} +) + +select * from data + +{% if is_incremental() %} +where ts > _dbt_max_partition +{% endif %} diff --git a/test/integration/022_bigquery_test/test_bigquery_changing_partitions.py b/test/integration/022_bigquery_test/test_bigquery_changing_partitions.py new file mode 100644 index 00000000000..bdbf637f1ff --- /dev/null +++ b/test/integration/022_bigquery_test/test_bigquery_changing_partitions.py @@ -0,0 +1,49 @@ +from test.integration.base import DBTIntegrationTest, FakeArgs, use_profile +import json + +class TestChangingPartitions(DBTIntegrationTest): + + @property + def schema(self): + return "bigquery_test_022" + + @property + def models(self): + return "partition-models" + + def test_change(self, before, after): + results = self.run_dbt(['run', '--vars', json.dumps(before)]) + self.assertEqual(len(results), 1) + + results = self.run_dbt(['run', '--vars', json.dumps(after)]) + self.assertEqual(len(results), 1) + + def test_add_partition(self): + before = {"partition_by": None, "cluster_by": None} + after = {"partition_by": "date(cur_time)", "cluster_by": None} + self.test_change(before, after) + + def test_remove_partition(self): + before = {"partition_by": "date(cur_time)", "cluster_by": None} + after = {"partition_by": None, "cluster_by": None} + self.test_change(before, after) + + def test_change_partitions(self): + before = {"partition_by": "date(cur_time)", "cluster_by": None} + after = {"partition_by": "cur_date", "cluster_by": None} + self.test_change(before, after) + + def test_add_clustering(self): + before = {"partition_by": "date(cur_time)", "cluster_by": None} + after = {"partition_by": "cur_date", "cluster_by": "id"} + self.test_change(before, after) + + def test_remove_clustering(self): + before = {"partition_by": "date(cur_time)", "cluster_by": "id"} + after = {"partition_by": "cur_date", "cluster_by": None} + self.test_change(before, after) + + def test_change_clustering(self): + before = {"partition_by": "date(cur_time)", "cluster_by": "id"} + after = {"partition_by": "cur_date", "cluster_by": "name"} + self.test_change(before, after) diff --git a/test/integration/022_bigquery_test/test_scripting.py b/test/integration/022_bigquery_test/test_scripting.py new file mode 100644 index 00000000000..7199f634485 --- /dev/null +++ b/test/integration/022_bigquery_test/test_scripting.py @@ -0,0 +1,22 @@ +from test.integration.base import DBTIntegrationTest, FakeArgs, use_profile + +class TestBigQueryScripting(DBTIntegrationTest): + + @property + def schema(self): + return "bigquery_test_022" + + @property + def models(self): + return "scripting-models" + + @property + def profile_config(self): + return self.bigquery_profile() + + def assert_incrementals(self): + results = self.run_dbt() + self.assertEqual(len(results), 2) + + self.run_dbt() + self.assertEqual(len(results), 2) diff --git a/test/unit/test_bigquery_adapter.py b/test/unit/test_bigquery_adapter.py index 05e738c9a09..09a6a190706 100644 --- a/test/unit/test_bigquery_adapter.py +++ b/test/unit/test_bigquery_adapter.py @@ -373,3 +373,61 @@ def test_query_and_results(self, mock_bq): mock_bq.QueryJobConfig.assert_called_once() self.mock_client.query.assert_called_once_with( 'sql', job_config=mock_bq.QueryJobConfig()) + +class TestBigQueryTableOptions(BaseTestBigQueryAdapter): + def test_parse_partition_by(self): + adapter = self.get_adapter('oauth') + + self.assertEqual( + adapter.parse_partition_by("date(ts)"), { + "field": "ts", + "data_type": "timestamp" + } + ) + + self.assertEqual( + adapter.parse_partition_by("ts"), { + "field": "ts", + "data_type": "date" + } + ) + + self.assertEqual( + adapter.parse_partition_by({ + "field": "ts", + }), { + "field": "ts", + "data_type": "date" + } + ) + + self.assertEqual( + adapter.parse_partition_by({ + "field": "ts", + "data_type": "date", + }), { + "field": "ts", + "data_type": "date" + } + ) + + # passthrough + self.assertEqual( + adapter.parse_partition_by({ + "field": "id" + "data_type": "int64", + "range": { + "start": 1, + "end": 100, + "interval": 20 + } + }), { + "field": "id" + "data_type": "int64", + "range": { + "start": 1, + "end": 100, + "interval": 20 + } + } + ) diff --git a/test/unit/test_utils.py b/test/unit/test_utils.py index a66b2f5a72d..0f294fa69ca 100644 --- a/test/unit/test_utils.py +++ b/test/unit/test_utils.py @@ -139,3 +139,16 @@ def test_trivial(self): with self.assertRaises(dbt.exceptions.DbtConfigError): dbt.utils.deep_map(lambda x, _: x, {'foo': object()}) + + +class TestBytesFormatting(unittest.TestCase): + + def test__simple_cases(self): + self.assertEqual(dbt.utils.format_bytes(-1), '-1.0 Bytes') + self.assertEqual(dbt.utils.format_bytes(0), '0.0 Bytes') + self.assertEqual(dbt.utils.format_bytes(20), '20.0 Bytes') + self.assertEqual(dbt.utils.format_bytes(1030), '1.0 KB') + self.assertEqual(dbt.utils.format_bytes(1024**2*1.5), '1.5 MB') + self.assertEqual(dbt.utils.format_bytes(1024**3*52.6), '52.6 GB') + self.assertEqual(dbt.utils.format_bytes(1024**4*128), '128.0 TB') + self.assertEqual(dbt.utils.format_bytes(1024**5+1), '> 1024 TB')