diff --git a/tests/functional/adapter/incremental/test_incremental_on_schema_change.py b/tests/functional/adapter/incremental/test_incremental_on_schema_change.py new file mode 100644 index 000000000..974edd261 --- /dev/null +++ b/tests/functional/adapter/incremental/test_incremental_on_schema_change.py @@ -0,0 +1,84 @@ +import pytest + +from dbt.tests.util import run_dbt + +from dbt.tests.adapter.incremental.test_incremental_on_schema_change import BaseIncrementalOnSchemaChangeSetup + + +class IncrementalOnSchemaChangeIgnoreFail(BaseIncrementalOnSchemaChangeSetup): + def test_run_incremental_ignore(self, project): + select = "model_a incremental_ignore incremental_ignore_target" + compare_source = "incremental_ignore" + compare_target = "incremental_ignore_target" + self.run_twice_and_assert(select, compare_source, compare_target, project) + + def test_run_incremental_fail_on_schema_change(self, project): + select = "model_a incremental_fail" + run_dbt(["run", "--models", select, "--full-refresh"]) + results_two = run_dbt(["run", "--models", select], expect_pass=False) + assert "Compilation Error" in results_two[1].message + + +@pytest.mark.skip_profile("databricks_sql_endpoint") +class TestAppendOnSchemaChange(IncrementalOnSchemaChangeIgnoreFail): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": { + "+incremental_strategy": "append", + } + } + + +@pytest.mark.skip_profile("databricks_sql_endpoint", "spark_session") +class TestInsertOverwriteOnSchemaChange(IncrementalOnSchemaChangeIgnoreFail): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": { + "+file_format": "parquet", + "+partition_by": "id", + "+incremental_strategy": "insert_overwrite", + } + } + + +@pytest.mark.skip_profile("apache_spark", "spark_session") +class TestDeltaOnSchemaChange(BaseIncrementalOnSchemaChangeSetup): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": { + "+file_format": "delta", + "+incremental_strategy": "merge", + "+unique_key": "id", + } + } + + def run_incremental_sync_all_columns(self, project): + select = "model_a incremental_sync_all_columns incremental_sync_all_columns_target" + compare_source = "incremental_sync_all_columns" + compare_target = "incremental_sync_all_columns_target" + run_dbt(["run", "--models", select, "--full-refresh"]) + # Delta Lake doesn"t support removing columns -- show a nice compilation error + results = run_dbt(["run", "--models", select], expect_pass = False) + assert "Compilation Error" in results[1].message + + def run_incremental_sync_remove_only(self, project): + select = "model_a incremental_sync_remove_only incremental_sync_remove_only_target" + compare_source = "incremental_sync_remove_only" + compare_target = "incremental_sync_remove_only_target" + run_dbt(["run", "--models", select, "--full-refresh"]) + # Delta Lake doesn"t support removing columns -- show a nice compilation error + results = run_dbt(["run", "--models", select], expect_pass = False) + assert "Compilation Error" in results[1].message + + def test_run_incremental_append_new_columns(self, project): + # only adding new columns in supported + self.run_incremental_append_new_columns(project) + # handling columns that have been removed doesn"t work on Delta Lake today + # self.run_incremental_append_new_columns_remove_one(project) + + def test_run_incremental_sync_all_columns(self, project): + self.run_incremental_sync_all_columns(project) + self.run_incremental_sync_remove_only(project) diff --git a/tests/functional/adapter/test_incremental_predicates.py b/tests/functional/adapter/incremental/test_incremental_predicates.py similarity index 100% rename from tests/functional/adapter/test_incremental_predicates.py rename to tests/functional/adapter/incremental/test_incremental_predicates.py diff --git a/tests/functional/adapter/test_incremental_unique_id.py b/tests/functional/adapter/incremental/test_incremental_unique_id.py similarity index 100% rename from tests/functional/adapter/test_incremental_unique_id.py rename to tests/functional/adapter/incremental/test_incremental_unique_id.py diff --git a/tests/integration/incremental_on_schema_change/models/incremental_append_new_columns.sql b/tests/integration/incremental_on_schema_change/models/incremental_append_new_columns.sql deleted file mode 100644 index 86f6c7c42..000000000 --- a/tests/integration/incremental_on_schema_change/models/incremental_append_new_columns.sql +++ /dev/null @@ -1,28 +0,0 @@ -{{ - config( - materialized='incremental', - on_schema_change='append_new_columns' - ) -}} - -{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %} - -WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) - -{% if is_incremental() %} - -SELECT id, - cast(field1 as {{string_type}}) as field1, - cast(field2 as {{string_type}}) as field2, - cast(field3 as {{string_type}}) as field3, - cast(field4 as {{string_type}}) as field4 -FROM source_data WHERE id NOT IN (SELECT id from {{ this }} ) - -{% else %} - -SELECT id, - cast(field1 as {{string_type}}) as field1, - cast(field2 as {{string_type}}) as field2 -FROM source_data where id <= 3 - -{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_on_schema_change/models/incremental_append_new_columns_target.sql b/tests/integration/incremental_on_schema_change/models/incremental_append_new_columns_target.sql deleted file mode 100644 index 55ed7b2c5..000000000 --- a/tests/integration/incremental_on_schema_change/models/incremental_append_new_columns_target.sql +++ /dev/null @@ -1,19 +0,0 @@ -{{ - config(materialized='table') -}} - -{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %} - -with source_data as ( - - select * from {{ ref('model_a') }} - -) - -select id - ,cast(field1 as {{string_type}}) as field1 - ,cast(field2 as {{string_type}}) as field2 - ,cast(CASE WHEN id <= 3 THEN NULL ELSE field3 END as {{string_type}}) AS field3 - ,cast(CASE WHEN id <= 3 THEN NULL ELSE field4 END as {{string_type}}) AS field4 - -from source_data \ No newline at end of file diff --git a/tests/integration/incremental_on_schema_change/models/incremental_fail.sql b/tests/integration/incremental_on_schema_change/models/incremental_fail.sql deleted file mode 100644 index 939fc20c2..000000000 --- a/tests/integration/incremental_on_schema_change/models/incremental_fail.sql +++ /dev/null @@ -1,18 +0,0 @@ -{{ - config( - materialized='incremental', - on_schema_change='fail' - ) -}} - -WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) - -{% if is_incremental() %} - -SELECT id, field1, field2 FROM source_data - -{% else %} - -SELECT id, field1, field3 FROm source_data - -{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_on_schema_change/models/incremental_ignore.sql b/tests/integration/incremental_on_schema_change/models/incremental_ignore.sql deleted file mode 100644 index 98f0a74a8..000000000 --- a/tests/integration/incremental_on_schema_change/models/incremental_ignore.sql +++ /dev/null @@ -1,18 +0,0 @@ -{{ - config( - materialized='incremental', - on_schema_change='ignore' - ) -}} - -WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) - -{% if is_incremental() %} - -SELECT id, field1, field2, field3, field4 FROM source_data WHERE id NOT IN (SELECT id from {{ this }} ) - -{% else %} - -SELECT id, field1, field2 FROM source_data LIMIT 3 - -{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_on_schema_change/models/incremental_ignore_target.sql b/tests/integration/incremental_on_schema_change/models/incremental_ignore_target.sql deleted file mode 100644 index 92d4564e0..000000000 --- a/tests/integration/incremental_on_schema_change/models/incremental_ignore_target.sql +++ /dev/null @@ -1,15 +0,0 @@ -{{ - config(materialized='table') -}} - -with source_data as ( - - select * from {{ ref('model_a') }} - -) - -select id - ,field1 - ,field2 - -from source_data \ No newline at end of file diff --git a/tests/integration/incremental_on_schema_change/models/incremental_sync_all_columns.sql b/tests/integration/incremental_on_schema_change/models/incremental_sync_all_columns.sql deleted file mode 100644 index 2c5a461e5..000000000 --- a/tests/integration/incremental_on_schema_change/models/incremental_sync_all_columns.sql +++ /dev/null @@ -1,30 +0,0 @@ -{{ - config( - materialized='incremental', - on_schema_change='sync_all_columns' - - ) -}} - -WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) - -{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %} - -{% if is_incremental() %} - -SELECT id, - cast(field1 as {{string_type}}) as field1, - cast(field3 as {{string_type}}) as field3, -- to validate new fields - cast(field4 as {{string_type}}) AS field4 -- to validate new fields - -FROM source_data WHERE id NOT IN (SELECT id from {{ this }} ) - -{% else %} - -select id, - cast(field1 as {{string_type}}) as field1, - cast(field2 as {{string_type}}) as field2 - -from source_data where id <= 3 - -{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_on_schema_change/models/incremental_sync_all_columns_target.sql b/tests/integration/incremental_on_schema_change/models/incremental_sync_all_columns_target.sql deleted file mode 100644 index 56591eb22..000000000 --- a/tests/integration/incremental_on_schema_change/models/incremental_sync_all_columns_target.sql +++ /dev/null @@ -1,20 +0,0 @@ -{{ - config(materialized='table') -}} - -with source_data as ( - - select * from {{ ref('model_a') }} - -) - -{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %} - -select id - ,cast(field1 as {{string_type}}) as field1 - --,field2 - ,cast(case when id <= 3 then null else field3 end as {{string_type}}) as field3 - ,cast(case when id <= 3 then null else field4 end as {{string_type}}) as field4 - -from source_data -order by id \ No newline at end of file diff --git a/tests/integration/incremental_on_schema_change/models/model_a.sql b/tests/integration/incremental_on_schema_change/models/model_a.sql deleted file mode 100644 index 2a0b2ddaf..000000000 --- a/tests/integration/incremental_on_schema_change/models/model_a.sql +++ /dev/null @@ -1,22 +0,0 @@ -{{ - config(materialized='table') -}} - -with source_data as ( - - select 1 as id, 'aaa' as field1, 'bbb' as field2, 111 as field3, 'TTT' as field4 - union all select 2 as id, 'ccc' as field1, 'ddd' as field2, 222 as field3, 'UUU' as field4 - union all select 3 as id, 'eee' as field1, 'fff' as field2, 333 as field3, 'VVV' as field4 - union all select 4 as id, 'ggg' as field1, 'hhh' as field2, 444 as field3, 'WWW' as field4 - union all select 5 as id, 'iii' as field1, 'jjj' as field2, 555 as field3, 'XXX' as field4 - union all select 6 as id, 'kkk' as field1, 'lll' as field2, 666 as field3, 'YYY' as field4 - -) - -select id - ,field1 - ,field2 - ,field3 - ,field4 - -from source_data \ No newline at end of file diff --git a/tests/integration/incremental_on_schema_change/test_incremental_on_schema_change.py b/tests/integration/incremental_on_schema_change/test_incremental_on_schema_change.py deleted file mode 100644 index 4d1cd374e..000000000 --- a/tests/integration/incremental_on_schema_change/test_incremental_on_schema_change.py +++ /dev/null @@ -1,156 +0,0 @@ -from cProfile import run -from tests.integration.base import DBTIntegrationTest, use_profile - - -class TestIncrementalOnSchemaChange(DBTIntegrationTest): - @property - def schema(self): - return "incremental_on_schema_change" - - @property - def models(self): - return "models" - - @property - def project_config(self): - return { - "config-version": 2, - "test-paths": ["tests"] - } - - def run_twice_and_assert( - self, include, compare_source, compare_target - ): - - # dbt run (twice) - run_args = ['run'] - if include: - run_args.extend(('--models', include)) - results_one = self.run_dbt(run_args) - results_two = self.run_dbt(run_args) - - self.assertEqual(len(results_one), 3) - self.assertEqual(len(results_two), 3) - - self.assertTablesEqual(compare_source, compare_target) - - def run_incremental_ignore(self): - select = 'model_a incremental_ignore incremental_ignore_target' - compare_source = 'incremental_ignore' - compare_target = 'incremental_ignore_target' - self.run_twice_and_assert(select, compare_source, compare_target) - - def run_incremental_append_new_columns(self): - select = 'model_a incremental_append_new_columns incremental_append_new_columns_target' - compare_source = 'incremental_append_new_columns' - compare_target = 'incremental_append_new_columns_target' - self.run_twice_and_assert(select, compare_source, compare_target) - - def run_incremental_fail_on_schema_change(self): - select = 'model_a incremental_fail' - results_one = self.run_dbt(['run', '--models', select, '--full-refresh']) - results_two = self.run_dbt(['run', '--models', select], expect_pass = False) - self.assertIn('Compilation Error', results_two[1].message) - - def run_incremental_sync_all_columns(self): - # this doesn't work on Delta today - select = 'model_a incremental_sync_all_columns incremental_sync_all_columns_target' - compare_source = 'incremental_sync_all_columns' - compare_target = 'incremental_sync_all_columns_target' - results_one = self.run_dbt(['run', '--models', select, '--full-refresh']) - results_two = self.run_dbt(['run', '--models', select], expect_pass = False) - self.assertIn('Compilation Error', results_two[1].message) - - -class TestApacheSparkAppend(TestIncrementalOnSchemaChange): - - @property - def project_config(self): - return { - "config-version": 2, - "test-paths": ["tests"], - "models": { - "+incremental_strategy": "append", - } - } - - # only 'ignore' and 'fail' are supported - - @use_profile('apache_spark') - def test__apache_spark__run_incremental_ignore(self): - self.run_incremental_ignore() - - @use_profile('apache_spark') - def test__apache_spark__run_incremental_fail_on_schema_change(self): - self.run_incremental_fail_on_schema_change() - - -class TestApacheSparkInsertOverwrite(TestIncrementalOnSchemaChange): - - @property - def project_config(self): - return { - "config-version": 2, - "test-paths": ["tests"], - "models": { - "+file_format": "parquet", - "+partition_by": "id", - "+incremental_strategy": "insert_overwrite", - } - } - - # only 'ignore' and 'fail' are supported - - @use_profile('apache_spark') - def test__apache_spark__run_incremental_ignore(self): - self.run_incremental_ignore() - - @use_profile('apache_spark') - def test__apache_spark__run_incremental_fail_on_schema_change(self): - self.run_incremental_fail_on_schema_change() - - -class TestDeltaOnSchemaChange(TestIncrementalOnSchemaChange): - @property - def project_config(self): - return { - "config-version": 2, - "test-paths": ["tests"], - "models": { - "+file_format": "delta", - "+incremental_strategy": "merge", - "+unique_key": "id", - } - } - - @use_profile('databricks_cluster') - def test__databricks_cluster__run_incremental_ignore(self): - self.run_incremental_ignore() - - @use_profile('databricks_cluster') - def test__databricks_cluster__run_incremental_fail_on_schema_change(self): - self.run_incremental_fail_on_schema_change() - - @use_profile('databricks_cluster') - def test__databricks_cluster__run_incremental_append_new_columns(self): - self.run_incremental_append_new_columns() - - @use_profile('databricks_cluster') - def test__databricks_cluster__run_incremental_sync_all_columns(self): - self.run_incremental_sync_all_columns() - - @use_profile('databricks_sql_endpoint') - def test__databricks_sql_endpoint__run_incremental_ignore(self): - self.run_incremental_ignore() - - @use_profile('databricks_sql_endpoint') - def test__databricks_sql_endpoint__run_incremental_fail_on_schema_change(self): - self.run_incremental_fail_on_schema_change() - - @use_profile('databricks_sql_endpoint') - def test__databricks_sql_endpoint__run_incremental_append_new_columns(self): - self.run_incremental_append_new_columns() - - @use_profile('databricks_sql_endpoint') - def test__databricks_sql_endpoint__run_incremental_sync_all_columns(self): - self.run_incremental_sync_all_columns()