Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbanin committed Feb 15, 2020
1 parent 1ae17d5 commit 27d21f9
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 21 deletions.
3 changes: 2 additions & 1 deletion core/dbt/deprecations.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class BigQueryPartitionByStringDeprecation(DBTDeprecation):
- Provided partition_by: {raw_partition_by}
- dbt inferred: {inferred_partition_by}
Expand All @@ -127,6 +127,7 @@ class BigQueryPartitionByStringDeprecation(DBTDeprecation):
https://docs.getdbt.com/docs/upgrading-to-0-16-0
'''


_adapter_renamed_description = """\
The adapter function `adapter.{old_name}` is deprecated and will be removed in
a future release of dbt. Please use `adapter.{new_name}` instead.
Expand Down
5 changes: 2 additions & 3 deletions plugins/bigquery/dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ def execute(self, sql, auto_begin=False, fetch=None):
status = 'CREATE TABLE ({})'.format(table.num_rows)

elif query_job.statement_type == 'SCRIPT':
billed = query_job.total_bytes_billed
status = 'SCRIPT ({} billed)'.format(dbt.utils.format_bytes(billed))
processed = dbt.utils.format_bytes(query_job.total_bytes_processed)
status = f'SCRIPT ({processed} processed)'

elif query_job.statement_type in ['INSERT', 'DELETE', 'MERGE']:
status = '{} ({})'.format(
Expand All @@ -242,7 +242,6 @@ def execute(self, sql, auto_begin=False, fetch=None):
)

else:
import ipdb; ipdb.set_trace()
status = 'OK'

return status, res
Expand Down
34 changes: 17 additions & 17 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ def execute_model(self, model, materialization, sql_override=None,
if flags.STRICT_MODE:
connection = self.connections.get_thread_connection()
if not isinstance(connection, Connection):
raise dbt.exceptions.CompilerException(
dbt.exceptions.raise_compiler_error(
f'Got {connection} - not a Connection!'
)
model_uid = model.get('unique_id')
Expand All @@ -423,29 +423,29 @@ def execute_model(self, model, materialization, sql_override=None,

return res

def _partitions_match(self, table, conf_partition):
def _partitions_match(self, table, conf_partition: Dict[str, Any]):
"""
Check if the actual and configured partitions for a table are a match.
BigQuery tables can be replaced if:
- Both tables are not partitioned, OR
- Both tables are partitioned using the exact same configs
If there is a mismatch, then the table cannot be replaced directly.
"""
if table.partitioning_type is None and conf_partition is None:
return True
is_partitioned = (table.range_partitioning or table.time_partitioning)

elif conf_partition is None or table.partitioning_type is None:
return False
if not is_partitioned and not conf_partition:
return True

elif table.partitioning_type == 'DAY':
return table.time_partitioning.field == conf_partition.get('field')
if table.time_partitioning is not None:
table_field = table.time_partitioning.field
return table_field == conf_partition.get('field')

elif table.partitioning_type == 'RANGE': # TODO
elif table.range_partitioning is not None:
dest_part = table.range_partition.range_
conf_part = conf_partition.get('range')
conf_part = conf_partition.get('range', {})

return dest_part.field == conf_part.get('field') \
return dest_part.field == conf_partition.get('field') \
and dest_part.start == conf_part.get('start') \
and dest_part.end == conf_part.get('end') \
and dest_part.interval == conf_part.get('interval')
Expand All @@ -465,7 +465,7 @@ def _clusters_match(self, table, conf_cluster):
return table.clustering_fields == conf_cluster

@available.parse(lambda *a, **k: True)
def is_replaceable(self, relation, conf_partition, conf_cluster):
def is_replaceable(self, relation, conf_partition: dict, conf_cluster):
"""
Check if a given partition and clustering column spec for a table
can replace an existing relation in the database. BigQuery does not
Expand All @@ -491,13 +491,13 @@ def is_replaceable(self, relation, conf_partition, conf_cluster):
))

@available
def parse_partition_by(self, raw_partition_by):
def parse_partition_by(self, raw_partition_by: Any):
"""
dbt v0.16.0 expects `partition_by` to be a dictionary where previously
it was a string. Check the type of `partition_by`, raise error
or warning if string, and attempt to convert to dict.
"""

if isinstance(raw_partition_by, dict):
if raw_partition_by.get('field'):
if raw_partition_by.get('data_type'):
Expand All @@ -521,8 +521,8 @@ def parse_partition_by(self, raw_partition_by):
re.IGNORECASE)
if not matches:
dbt.exceptions.raise_compiler_error(
f"Specified partition_by '{raw_partition_by}' "
"is not parseable")
f"Specified partition_by '{raw_partition_by}' "
"is not parseable")

partition_by = matches.group(1)
data_type = 'timestamp'
Expand Down
11 changes: 11 additions & 0 deletions test/integration/022_bigquery_test/partition-models/my_model.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@


{{
config(
materialized="table",
partition_by=var('partition_by'),
cluster_by=var('cluster_by')
)
}}

select 1 as id, 'dr. bigquery' as name, current_timestamp() as cur_time, current_date() as cur_date
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@

{{
config(
materialized="incremental",
unique_key="id",
cluster_by="id",
partition_by={
"field": "id",
"data_type": "int64",
"range": {
"start": 1,
"end": 10,
"interval": 1
}
}
)
}}


with data as (
select 1 as id, current_date() as ts union all
select 2 as id, current_date() as ts union all
select 3 as id, current_date() as ts union all
select 4 as id, current_date() as ts

{% if is_incremental() %}
union all
select 5 as id, current_date() as ts union all
select 6 as id, current_date() as ts union all
select 7 as id, current_date() as ts union all
select 8 as id, current_date() as ts
{% endif %}
)

select * from data

{% if is_incremental() %}
where id > _dbt_max_partition
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@

{{
config(
materialized="incremental",
unique_key="id",
cluster_by="id",
partition_by={
"field": "ts",
"data_type": "timestamp"
}
)
}}


with data as (
select 1 as id, current_timestamp() as ts union all
select 2 as id, current_timestamp() as ts union all
select 3 as id, current_timestamp() as ts union all
select 4 as id, current_timestamp() as ts

{% if is_incremental() %}
union all
select 5 as id, current_timestamp() as ts union all
select 6 as id, current_timestamp() as ts union all
select 7 as id, current_timestamp() as ts union all
select 8 as id, current_timestamp() as ts
{% endif %}
)

select * from data

{% if is_incremental() %}
where ts > _dbt_max_partition
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from test.integration.base import DBTIntegrationTest, FakeArgs, use_profile
import json

class TestChangingPartitions(DBTIntegrationTest):

@property
def schema(self):
return "bigquery_test_022"

@property
def models(self):
return "partition-models"

def test_change(self, before, after):
results = self.run_dbt(['run', '--vars', json.dumps(before)])
self.assertEqual(len(results), 1)

results = self.run_dbt(['run', '--vars', json.dumps(after)])
self.assertEqual(len(results), 1)

def test_add_partition(self):
before = {"partition_by": None, "cluster_by": None}
after = {"partition_by": "date(cur_time)", "cluster_by": None}
self.test_change(before, after)

def test_remove_partition(self):
before = {"partition_by": "date(cur_time)", "cluster_by": None}
after = {"partition_by": None, "cluster_by": None}
self.test_change(before, after)

def test_change_partitions(self):
before = {"partition_by": "date(cur_time)", "cluster_by": None}
after = {"partition_by": "cur_date", "cluster_by": None}
self.test_change(before, after)

def test_add_clustering(self):
before = {"partition_by": "date(cur_time)", "cluster_by": None}
after = {"partition_by": "cur_date", "cluster_by": "id"}
self.test_change(before, after)

def test_remove_clustering(self):
before = {"partition_by": "date(cur_time)", "cluster_by": "id"}
after = {"partition_by": "cur_date", "cluster_by": None}
self.test_change(before, after)

def test_change_clustering(self):
before = {"partition_by": "date(cur_time)", "cluster_by": "id"}
after = {"partition_by": "cur_date", "cluster_by": "name"}
self.test_change(before, after)
22 changes: 22 additions & 0 deletions test/integration/022_bigquery_test/test_scripting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from test.integration.base import DBTIntegrationTest, FakeArgs, use_profile

class TestBigQueryScripting(DBTIntegrationTest):

@property
def schema(self):
return "bigquery_test_022"

@property
def models(self):
return "scripting-models"

@property
def profile_config(self):
return self.bigquery_profile()

def assert_incrementals(self):
results = self.run_dbt()
self.assertEqual(len(results), 2)

self.run_dbt()
self.assertEqual(len(results), 2)
58 changes: 58 additions & 0 deletions test/unit/test_bigquery_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,3 +373,61 @@ def test_query_and_results(self, mock_bq):
mock_bq.QueryJobConfig.assert_called_once()
self.mock_client.query.assert_called_once_with(
'sql', job_config=mock_bq.QueryJobConfig())

class TestBigQueryTableOptions(BaseTestBigQueryAdapter):
def test_parse_partition_by(self):
adapter = self.get_adapter('oauth')

self.assertEqual(
adapter.parse_partition_by("date(ts)"), {
"field": "ts",
"data_type": "timestamp"
}
)

self.assertEqual(
adapter.parse_partition_by("ts"), {
"field": "ts",
"data_type": "date"
}
)

self.assertEqual(
adapter.parse_partition_by({
"field": "ts",
}), {
"field": "ts",
"data_type": "date"
}
)

self.assertEqual(
adapter.parse_partition_by({
"field": "ts",
"data_type": "date",
}), {
"field": "ts",
"data_type": "date"
}
)

# passthrough
self.assertEqual(
adapter.parse_partition_by({
"field": "id"
"data_type": "int64",
"range": {
"start": 1,
"end": 100,
"interval": 20
}
}), {
"field": "id"
"data_type": "int64",
"range": {
"start": 1,
"end": 100,
"interval": 20
}
}
)
13 changes: 13 additions & 0 deletions test/unit/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,16 @@ def test_trivial(self):
with self.assertRaises(dbt.exceptions.DbtConfigError):
dbt.utils.deep_map(lambda x, _: x, {'foo': object()})



class TestBytesFormatting(unittest.TestCase):

def test__simple_cases(self):
self.assertEqual(dbt.utils.format_bytes(-1), '-1.0 Bytes')
self.assertEqual(dbt.utils.format_bytes(0), '0.0 Bytes')
self.assertEqual(dbt.utils.format_bytes(20), '20.0 Bytes')
self.assertEqual(dbt.utils.format_bytes(1030), '1.0 KB')
self.assertEqual(dbt.utils.format_bytes(1024**2*1.5), '1.5 MB')
self.assertEqual(dbt.utils.format_bytes(1024**3*52.6), '52.6 GB')
self.assertEqual(dbt.utils.format_bytes(1024**4*128), '128.0 TB')
self.assertEqual(dbt.utils.format_bytes(1024**5+1), '> 1024 TB')

0 comments on commit 27d21f9

Please sign in to comment.