diff --git a/README.md b/README.md index 9297c9833..e9cee6ade 100644 --- a/README.md +++ b/README.md @@ -192,7 +192,8 @@ where date_day::date >= '2019-01-01' group by 1 ``` -The `merge` strategy is only supported when using file_format `delta` (supported in Databricks). It also requires you to specify a `unique key` to match existing records. +The `merge` strategy is only supported when using file_format `delta` (supported in Databricks). If a `unique key` is specified, the statement will match existing records and overwrite them with new values. If a `unique key` config is not specified, dbt will simply +append new data to the model, without overwriting any existing data. (For atomic replacement of an entire Delta table, use the `'table'` materialization instead.) ``` {{ config( diff --git a/dbt/include/spark/macros/materializations/incremental.sql b/dbt/include/spark/macros/materializations/incremental.sql index 8b67188f4..26cbe543e 100644 --- a/dbt/include/spark/macros/materializations/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental.sql @@ -61,23 +61,21 @@ {% do return(strategy) %} {% endmacro %} -{% macro dbt_spark_validate_merge(file_format) %} - {% set invalid_file_format_msg -%} - You can only choose the 'merge' incremental_strategy when file_format is set to 'delta' - {%- endset %} - - {% if file_format != 'delta' %} - {% do exceptions.raise_compiler_error(invalid_file_format_msg) %} - {% endif %} - -{% endmacro %} - {% macro spark__get_merge_sql(target, source, unique_key, dest_columns, predicates=none) %} {# ignore dest_columns - we will just use `*` #} + + {% set merge_condition %} + {% if unique_key %} + on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} + {% else %} + on false + {% endif %} + {% endset %} + merge into {{ target }} as DBT_INTERNAL_DEST using {{ source.include(schema=false) }} as DBT_INTERNAL_SOURCE - on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} + {{ merge_condition }} when matched then update set * when not matched then insert * {% endmacro %} @@ -97,9 +95,10 @@ {% materialization incremental, adapter='spark' -%} {#-- Validate early so we don't run SQL if the file_format is invalid --#} - {% set file_format = dbt_spark_validate_get_file_format() -%} + {%- set file_format = dbt_spark_validate_get_file_format() -%} {#-- Validate early so we don't run SQL if the strategy is invalid --#} - {% set strategy = dbt_spark_validate_get_incremental_strategy(file_format) -%} + {%- set strategy = dbt_spark_validate_get_incremental_strategy(file_format) -%} + {%- set unique_key = config.get('unique_key', none) -%} {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} @@ -107,11 +106,6 @@ {% set existing_relation = load_relation(this) %} {% set tmp_relation = make_temp_relation(this) %} - {% if strategy == 'merge' %} - {%- set unique_key = config.require('unique_key') -%} - {% do dbt_spark_validate_merge(file_format) %} - {% endif %} - {% if strategy == 'insert_overwrite' and config.get('partition_by') %} set spark.sql.sources.partitionOverwriteMode = DYNAMIC {% endif %}