Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for on_schema_change #229

Merged
merged 2 commits into from
Oct 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ test/integration/.user.yml
.DS_Store
.vscode
*.log
logs/
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Fixes
- Fix `--store-failures` for tests, by suppressing irrelevant error in `comment_clause()` macro ([#232](https://github.com/dbt-labs/dbt-spark/issues/232), [#233](https://github.com/dbt-labs/dbt-spark/pull/233))
- Add support for `on_schema_change` config in incremental models: `ignore`, `fail`, `append_new_columns`. For `sync_all_columns`, removing columns is not supported by Apache Spark or Delta Lake ([#198](https://github.com/dbt-labs/dbt-spark/issues/198), [#226](https://github.com/dbt-labs/dbt-spark/issues/226), [#229](https://github.com/dbt-labs/dbt-spark/pull/229))

## dbt-spark 0.21.0 (October 4, 2021)

Expand Down
47 changes: 46 additions & 1 deletion dbt/include/spark/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@

{% macro spark__get_columns_in_relation(relation) -%}
{% call statement('get_columns_in_relation', fetch_result=True) %}
describe extended {{ relation }}
describe extended {{ relation.include(schema=(schema is not none)) }}
{% endcall %}
{% do return(load_result('get_columns_in_relation').table) %}
{% endmacro %}
Expand Down Expand Up @@ -194,3 +194,48 @@
{% endfor %}
{% endif %}
{% endmacro %}


{% macro spark__make_temp_relation(base_relation, suffix) %}
{% set tmp_identifier = base_relation.identifier ~ suffix %}
{% set tmp_relation = base_relation.incorporate(path = {
"identifier": tmp_identifier,
"schema": None
}) -%}

{% do return(tmp_relation) %}
{% endmacro %}


{% macro spark__alter_column_type(relation, column_name, new_column_type) -%}
{% call statement('alter_column_type') %}
alter table {{ relation }} alter column {{ column_name }} type {{ new_column_type }};
{% endcall %}
{% endmacro %}


{% macro spark__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}

{% if remove_columns %}
{% set platform_name = 'Delta Lake' if relation.is_delta else 'Apache Spark' %}
{{ exceptions.raise_compiler_error(platform_name + ' does not support dropping columns from tables') }}
{% endif %}

{% if add_columns is none %}
{% set add_columns = [] %}
{% endif %}

{% set sql -%}

alter {{ relation.type }} {{ relation }}

{% if add_columns %} add columns {% endif %}
{% for column in add_columns %}
{{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
{% endfor %}

{%- endset -%}

{% do run_query(sql) %}

{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
{%- set partition_by = config.get('partition_by', none) -%}

{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}

{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}

{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
Expand All @@ -31,6 +33,7 @@
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% else %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% set build_sql = dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) %}
{% endif %}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{{
config(
materialized='incremental',
on_schema_change='append_new_columns'
)
}}

{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %}

WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )

{% if is_incremental() %}

SELECT id,
cast(field1 as {{string_type}}) as field1,
cast(field2 as {{string_type}}) as field2,
cast(field3 as {{string_type}}) as field3,
cast(field4 as {{string_type}}) as field4
FROM source_data WHERE id NOT IN (SELECT id from {{ this }} )

{% else %}

SELECT id,
cast(field1 as {{string_type}}) as field1,
cast(field2 as {{string_type}}) as field2
FROM source_data where id <= 3

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{{
config(materialized='table')
}}

{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %}

with source_data as (

select * from {{ ref('model_a') }}

)

select id
,cast(field1 as {{string_type}}) as field1
,cast(field2 as {{string_type}}) as field2
,cast(CASE WHEN id <= 3 THEN NULL ELSE field3 END as {{string_type}}) AS field3
,cast(CASE WHEN id <= 3 THEN NULL ELSE field4 END as {{string_type}}) AS field4

from source_data
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{
config(
materialized='incremental',
on_schema_change='fail'
)
}}

WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )

{% if is_incremental() %}

SELECT id, field1, field2 FROM source_data

{% else %}

SELECT id, field1, field3 FROm source_data

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{{
config(
materialized='incremental',
on_schema_change='ignore'
)
}}

WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )

{% if is_incremental() %}

SELECT id, field1, field2, field3, field4 FROM source_data WHERE id NOT IN (SELECT id from {{ this }} )

{% else %}

SELECT id, field1, field2 FROM source_data LIMIT 3

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{{
config(materialized='table')
}}

with source_data as (

select * from {{ ref('model_a') }}

)

select id
,field1
,field2

from source_data
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{{
config(
materialized='incremental',
on_schema_change='sync_all_columns'

)
}}

WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )

{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %}

{% if is_incremental() %}

SELECT id,
cast(field1 as {{string_type}}) as field1,
cast(field3 as {{string_type}}) as field3, -- to validate new fields
cast(field4 as {{string_type}}) AS field4 -- to validate new fields

FROM source_data WHERE id NOT IN (SELECT id from {{ this }} )

{% else %}

select id,
cast(field1 as {{string_type}}) as field1,
cast(field2 as {{string_type}}) as field2

from source_data where id <= 3

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{
config(materialized='table')
}}

with source_data as (

select * from {{ ref('model_a') }}

)

{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %}

select id
,cast(field1 as {{string_type}}) as field1
--,field2
,cast(case when id <= 3 then null else field3 end as {{string_type}}) as field3
,cast(case when id <= 3 then null else field4 end as {{string_type}}) as field4

from source_data
order by id
22 changes: 22 additions & 0 deletions test/custom/incremental_on_schema_change/models/model_a.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{{
config(materialized='table')
}}

with source_data as (

select 1 as id, 'aaa' as field1, 'bbb' as field2, 111 as field3, 'TTT' as field4
union all select 2 as id, 'ccc' as field1, 'ddd' as field2, 222 as field3, 'UUU' as field4
union all select 3 as id, 'eee' as field1, 'fff' as field2, 333 as field3, 'VVV' as field4
union all select 4 as id, 'ggg' as field1, 'hhh' as field2, 444 as field3, 'WWW' as field4
union all select 5 as id, 'iii' as field1, 'jjj' as field2, 555 as field3, 'XXX' as field4
union all select 6 as id, 'kkk' as field1, 'lll' as field2, 666 as field3, 'YYY' as field4

)

select id
,field1
,field2
,field3
,field4

from source_data
Loading