Skip to content

Commit

Permalink
4. Append-only merge if no unique_key
Browse files Browse the repository at this point in the history
  • Loading branch information
jtcohen6 committed Jan 12, 2021
1 parent ecafb59 commit 50092a0
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 20 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ where date_day::date >= '2019-01-01'
group by 1
```

The `merge` strategy is only supported when using file_format `delta` (supported in Databricks). It also requires you to specify a `unique key` to match existing records.
The `merge` strategy is only supported when using file_format `delta` (supported in Databricks). If a `unique key` is specified, the statement will match existing records and overwrite them with new values. If a `unique key` config is not specified, dbt will simply
append new data to the model, without overwriting any existing data. (For atomic replacement of an entire Delta table, use the `'table'` materialization instead.)

```
{{ config(
Expand Down
32 changes: 13 additions & 19 deletions dbt/include/spark/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,21 @@
{% do return(strategy) %}
{% endmacro %}

{% macro dbt_spark_validate_merge(file_format) %}
{% set invalid_file_format_msg -%}
You can only choose the 'merge' incremental_strategy when file_format is set to 'delta'
{%- endset %}

{% if file_format != 'delta' %}
{% do exceptions.raise_compiler_error(invalid_file_format_msg) %}
{% endif %}

{% endmacro %}


{% macro spark__get_merge_sql(target, source, unique_key, dest_columns, predicates=none) %}
{# ignore dest_columns - we will just use `*` #}

{% set merge_condition %}
{% if unique_key %}
on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{% else %}
on false
{% endif %}
{% endset %}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source.include(schema=false) }} as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{{ merge_condition }}
when matched then update set *
when not matched then insert *
{% endmacro %}
Expand All @@ -97,21 +95,17 @@

{% materialization incremental, adapter='spark' -%}
{#-- Validate early so we don't run SQL if the file_format is invalid --#}
{% set file_format = dbt_spark_validate_get_file_format() -%}
{%- set file_format = dbt_spark_validate_get_file_format() -%}
{#-- Validate early so we don't run SQL if the strategy is invalid --#}
{% set strategy = dbt_spark_validate_get_incremental_strategy(file_format) -%}
{%- set strategy = dbt_spark_validate_get_incremental_strategy(file_format) -%}
{%- set unique_key = config.get('unique_key', none) -%}

{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}

{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}

{% if strategy == 'merge' %}
{%- set unique_key = config.require('unique_key') -%}
{% do dbt_spark_validate_merge(file_format) %}
{% endif %}

{% if strategy == 'insert_overwrite' and config.get('partition_by') %}
set spark.sql.sources.partitionOverwriteMode = DYNAMIC
{% endif %}
Expand Down

0 comments on commit 50092a0

Please sign in to comment.