From eed64ed1476746113fd3d6f2f2f7fb7363339b3b Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Wed, 23 Jun 2021 12:24:56 -0400 Subject: [PATCH] Add merge_update_columns --- CHANGELOG.md | 4 ++++ dbt/adapters/spark/impl.py | 1 + .../incremental/strategies.sql | 13 +++++++++-- .../data/expected_partial_upsert.csv | 4 ++++ .../models_delta/merge_update_columns.sql | 22 +++++++++++++++++++ .../test_incremental_strategies.py | 1 + 6 files changed, 43 insertions(+), 2 deletions(-) create mode 100644 test/custom/incremental_strategies/data/expected_partial_upsert.csv create mode 100644 test/custom/incremental_strategies/models_delta/merge_update_columns.sql diff --git a/CHANGELOG.md b/CHANGELOG.md index 339f7f5c4..d0b8344f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## dbt-spark 0.20.0 (Release TBD) +### Features + +- Add support for `merge_update_columns` config in `merge`-strategy incremental models ([#183](https://github.com/fishtown-analytics/dbt-spark/pull/183), ([#184](https://github.com/fishtown-analytics/dbt-spark/pull/184)) + ### Fixes - Fix column-level `persist_docs` on Delta tables, add tests ([#180](https://github.com/fishtown-analytics/dbt-spark/pull/180)) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index b2060f8c9..9f4ae514c 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -37,6 +37,7 @@ class SparkConfig(AdapterConfig): clustered_by: Optional[Union[List[str], str]] = None buckets: Optional[int] = None options: Optional[Dict[str, str]] = None + merge_update_columns: Optional[str] = None class SparkAdapter(SQLAdapter): diff --git a/dbt/include/spark/macros/materializations/incremental/strategies.sql b/dbt/include/spark/macros/materializations/incremental/strategies.sql index d3ffafc10..ec5dad67a 100644 --- a/dbt/include/spark/macros/materializations/incremental/strategies.sql +++ b/dbt/include/spark/macros/materializations/incremental/strategies.sql @@ -20,7 +20,8 @@ {% macro spark__get_merge_sql(target, source, unique_key, dest_columns, predicates=none) %} - {# ignore dest_columns - we will just use `*` #} + {# skip dest_columns, use merge_update_columns config if provided, otherwise use "*" #} + {%- set update_columns = config.get("merge_update_columns") -%} {% set merge_condition %} {% if unique_key %} @@ -32,8 +33,16 @@ merge into {{ target }} as DBT_INTERNAL_DEST using {{ source.include(schema=false) }} as DBT_INTERNAL_SOURCE + {{ merge_condition }} - when matched then update set * + + when matched then update set + {% if update_columns -%}{%- for column_name in update_columns %} + {{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }} + {%- if not loop.last %}, {%- endif %} + {%- endfor %} + {%- else %} * {% endif %} + when not matched then insert * {% endmacro %} diff --git a/test/custom/incremental_strategies/data/expected_partial_upsert.csv b/test/custom/incremental_strategies/data/expected_partial_upsert.csv new file mode 100644 index 000000000..bc922cdec --- /dev/null +++ b/test/custom/incremental_strategies/data/expected_partial_upsert.csv @@ -0,0 +1,4 @@ +id,msg,color +1,hello,blue +2,yo,red +3,anyway,purple \ No newline at end of file diff --git a/test/custom/incremental_strategies/models_delta/merge_update_columns.sql b/test/custom/incremental_strategies/models_delta/merge_update_columns.sql new file mode 100644 index 000000000..d934b2997 --- /dev/null +++ b/test/custom/incremental_strategies/models_delta/merge_update_columns.sql @@ -0,0 +1,22 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'merge', + file_format = 'delta', + unique_key = 'id', + merge_update_columns = ['msg'], +) }} + +{% if not is_incremental() %} + +select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color +union all +select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color + +{% else %} + +-- msg will be updated, color will be ignored +select cast(2 as bigint) as id, 'yo' as msg, 'green' as color +union all +select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color + +{% endif %} diff --git a/test/custom/incremental_strategies/test_incremental_strategies.py b/test/custom/incremental_strategies/test_incremental_strategies.py index 4d13a7708..64966ece5 100644 --- a/test/custom/incremental_strategies/test_incremental_strategies.py +++ b/test/custom/incremental_strategies/test_incremental_strategies.py @@ -71,6 +71,7 @@ def run_and_test(self): self.assertTablesEqual("append_delta", "expected_append") self.assertTablesEqual("merge_no_key", "expected_append") self.assertTablesEqual("merge_unique_key", "expected_upsert") + self.assertTablesEqual("merge_update_columns", "expected_partial_upsert") @use_profile("databricks_cluster") def test_delta_strategies_databricks_cluster(self):