Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rationalize incremental materialization #141

Merged
merged 16 commits into from
Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

### Breaking changes
- Users of the `http` and `thrift` connection methods need to install extra requirements: `pip install dbt-spark[PyHive]` ([#109](https://github.com/fishtown-analytics/dbt-spark/pull/109), [#126](https://github.com/fishtown-analytics/dbt-spark/pull/126))
- Incremental models have `incremental_strategy: append` by default. This strategy adds new records
without updating or overwriting existing records. For that, use `merge` or `insert_overwrite` instead, depending
on the file format, connection method, and attributes of your underlying data. dbt will try to raise a helpful error
if you configure a strategy that is not supported for a given file format or connection. ([#140](https://github.com/fishtown-analytics/dbt-spark/pull/140), [#141](https://github.com/fishtown-analytics/dbt-spark/pull/141))

### Under the hood
- Enable `CREATE OR REPLACE` support when using Delta. Instead of dropping and recreating the table, it will keep the existing table, and add a new version as supported by Delta. This will ensure that the table stays available when running the pipeline, and you can track the history.
Expand Down
47 changes: 33 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,48 +161,67 @@ The following configurations can be supplied to models run with the dbt-spark pl
| partition_by | Partition the created table by the specified columns. A directory is created for each partition. | Optional | `partition_1` |
| clustered_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. | Optional | `cluster_1` |
| buckets | The number of buckets to create while clustering | Required if `clustered_by` is specified | `8` |
| incremental_strategy | The strategy to use for incremental models (`insert_overwrite` or `merge`). Note `merge` requires `file_format` = `delta` and `unique_key` to be specified. | Optional (default: `insert_overwrite`) | `merge` |
| incremental_strategy | The strategy to use for incremental models (`append`, `insert_overwrite`, or `merge`). | Optional (default: `append`) | `merge` |
| persist_docs | Whether dbt should include the model description as a table `comment` | Optional | `{'relation': true}` |


**Incremental Models**

To use incremental models, specify a `partition_by` clause in your model config. The default incremental strategy used is `insert_overwrite`, which will overwrite the partitions included in your query. Be sure to re-select _all_ of the relevant
data for a partition when using the `insert_overwrite` strategy. If a `partition_by` config is not specified, dbt will overwrite the entire table as an atomic operation, replacing it with new data of the same schema. This is analogous to `truncate` + `insert`.
dbt has a number of ways to build models incrementally, called "incremental strategies." Some strategies depend on certain file formats, connection types, and other model configurations:
- `append` (default): Insert new records without updating or overwriting any existing data.
- `insert_overwrite`: If `partition_by` is specified, overwrite partitions in the table with new data. (Be sure to re-select _all_ of the relevant data for a partition.) If no `partition_by` is specified, overwrite the entire table with new data. [Cannot be used with `file_format: delta`. Not available on Databricks SQL Endpoints. For atomic replacement of Delta tables, use the `table` materialization.]
- `merge`: Match records based on a `unique_key`; update old records, insert new ones. (If no `unique_key` is specified, all new data is inserted, similar to `append`.) [Requires `file_format: delta`. Available only on Databricks Runtime.]

Examples:

```sql
{{ config(
materialized='incremental',
incremental_strategy='append'
) }}


-- All rows returned by this query will be appended to the existing table

select * from {{ ref('events') }}
{% if is_incremental() %}
where event_ts > (select max(event_ts) from {{ this }})
{% endif %}
```

```sql
{{ config(
materialized='incremental',
incremental_strategy='merge',
partition_by=['date_day'],
file_format='parquet'
) }}

/*
Every partition returned by this query will be overwritten
when this model runs
*/
-- Every partition returned by this query will overwrite existing partitions

select
date_day,
count(*) as users

from {{ ref('events') }}
where date_day::date >= '2019-01-01'
{% if is_incremental() %}
where date_day > (select max(date_day) from {{ this }})
{% endif %}
group by 1
```

The `merge` strategy is only supported when using file_format `delta` (supported in Databricks). It also requires you to specify a `unique key` to match existing records.

```
```sql
{{ config(
materialized='incremental',
incremental_strategy='merge',
partition_by=['date_day'],
unique_key='event_id',
file_format='delta'
) }}

select *
from {{ ref('events') }}
-- Existing events, matched on `event_id`, will be updated
-- New events will be appended

select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day > (select max(date_day) from {{ this }})
{% endif %}
Expand Down
89 changes: 55 additions & 34 deletions dbt/include/spark/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% macro get_insert_overwrite_sql(source_relation, target_relation) %}

{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
insert overwrite table {{ target_relation }}
Expand All @@ -8,6 +8,17 @@

{% endmacro %}


{% macro get_insert_into_sql(source_relation, target_relation) %}

{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
insert into table {{ target_relation }}
select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }}

{% endmacro %}


{% macro dbt_spark_validate_get_file_format() %}
{#-- Find and validate the file format #}
{%- set file_format = config.get("file_format", default="parquet") -%}
Expand All @@ -24,59 +35,79 @@
{% do return(file_format) %}
{% endmacro %}


{% macro dbt_spark_validate_get_incremental_strategy(file_format) %}
{#-- Find and validate the incremental strategy #}
{%- set strategy = config.get("incremental_strategy", default="insert_overwrite") -%}
{%- set strategy = config.get("incremental_strategy", default="append") -%}

{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ strategy }}
Expected one of: 'merge', 'insert_overwrite'
Expected one of: 'append', 'merge', 'insert_overwrite'
{%- endset %}

{% set invalid_merge_msg -%}
Invalid incremental strategy provided: {{ strategy }}
You can only choose this strategy when file_format is set to 'delta'
{%- endset %}

{% set invalid_insert_overwrite_delta_msg -%}
Invalid incremental strategy provided: {{ strategy }}
You cannot use this strategy when file_format is set to 'delta'
Use the 'append' or 'merge' strategy instead
{%- endset %}

{% set invalid_insert_overwrite_endpoint_msg -%}
Invalid incremental strategy provided: {{ strategy }}
You cannot use this strategy when connecting via endpoint
Use the 'append' or 'merge' strategy instead
{%- endset %}

{% if strategy not in ['merge', 'insert_overwrite'] %}
{% if strategy not in ['append', 'merge', 'insert_overwrite'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
{% if strategy == 'merge' and file_format != 'delta' %}
{% do exceptions.raise_compiler_error(invalid_merge_msg) %}
{% endif %}
{% if strategy == 'insert_overwrite' and file_format == 'delta' %}
{% do exceptions.raise_compiler_error(invalid_insert_overwrite_delta_msg) %}
{% endif %}
{% if strategy == 'insert_overwrite' and target.endpoint %}
{% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %}
{% endif %}
{% endif %}

{% do return(strategy) %}
{% endmacro %}

{% macro dbt_spark_validate_merge(file_format) %}
{% set invalid_file_format_msg -%}
You can only choose the 'merge' incremental_strategy when file_format is set to 'delta'
{%- endset %}

{% if file_format != 'delta' %}
{% do exceptions.raise_compiler_error(invalid_file_format_msg) %}
{% endif %}

{% endmacro %}


{% macro spark__get_merge_sql(target, source, unique_key, dest_columns, predicates=none) %}
{# ignore dest_columns - we will just use `*` #}

{% set merge_condition %}
{% if unique_key %}
on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{% else %}
on false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels awkward, why not just use an INSERT INTO statement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in line with default dbt behavior here.

As I see it, the only reason we implement spark__get_merge_sql is to benefit from the improved wildcard update */insert *.

I agree it feels a bit silly—this is identical to append strategy / INSERT INTO statement. The alternative is to keep requiring a unique_key with merge. In any case, I'd rather err closer to the side of default behavior.

{% endif %}
{% endset %}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source.include(schema=false) }} as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{{ merge_condition }}
when matched then update set *
when not matched then insert *
{% endmacro %}


{% macro dbt_spark_get_incremental_sql(strategy, source, target, unique_key) %}
{%- if strategy == 'insert_overwrite' -%}
{%- if strategy == 'append' -%}
{#-- insert new records into existing table, without updating or overwriting #}
{{ get_insert_into_sql(source, target) }}
{%- elif strategy == 'insert_overwrite' -%}
{#-- insert statements don't like CTEs, so support them via a temp view #}
{{ get_insert_overwrite_sql(source, target) }}
{%- else -%}
{#-- merge all columns with databricks delta - schema changes are handled for us #}
{%- elif strategy == 'merge' -%}
{#-- merge all columns with databricks delta - schema changes are handled for us #}
{{ get_merge_sql(target, source, unique_key, dest_columns=none, predicates=none) }}
{%- endif -%}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe raise an error if it doesn't match any of the cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. It's already raised earlier on:
https://github.com/fishtown-analytics/dbt-spark/blob/c8e3770e077e8c54026156b14e61133ef59fa7ff/dbt/include/spark/macros/materializations/incremental.sql#L65-L66

Just the same, I'll add another explicit exception here, just in case users override some macros but not others.


Expand All @@ -85,31 +116,21 @@

{% materialization incremental, adapter='spark' -%}
{#-- Validate early so we don't run SQL if the file_format is invalid --#}
{% set file_format = dbt_spark_validate_get_file_format() -%}
{%- set file_format = dbt_spark_validate_get_file_format() -%}
{#-- Validate early so we don't run SQL if the strategy is invalid --#}
{% set strategy = dbt_spark_validate_get_incremental_strategy(file_format) -%}
{%- set strategy = dbt_spark_validate_get_incremental_strategy(file_format) -%}
{%- set unique_key = config.get('unique_key', none) -%}

{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}

{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}

{% if strategy == 'merge' %}
{%- set unique_key = config.require('unique_key') -%}
{% do dbt_spark_validate_merge(file_format) %}
{% if strategy == 'insert_overwrite' and config.get('partition_by') %}
set spark.sql.sources.partitionOverwriteMode = DYNAMIC
{% endif %}

{% if config.get('partition_by') %}
{% call statement() %}
set spark.sql.sources.partitionOverwriteMode = DYNAMIC
{% endcall %}
{% endif %}

{% call statement() %}
set spark.sql.hive.convertMetastoreParquet = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to see this good :)

{% endcall %}

{{ run_hooks(pre_hooks) }}

{% if existing_relation is none %}
Expand Down
11 changes: 0 additions & 11 deletions test/integration/spark-databricks-http.dbtspec
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,6 @@ target:
connect_retries: 5
connect_timeout: 60
projects:
- overrides: incremental
paths:
"models/incremental.sql":
materialized: incremental
body: "select * from {{ source('raw', 'seed') }}"
facts:
base:
rowcount: 10
added:
rowcount: 20
- overrides: snapshot_strategy_check_cols
dbt_project_yml: &file_format_delta
# we're going to UPDATE the seed tables as part of testing, so we must make them delta format
Expand All @@ -40,4 +30,3 @@ sequences:
test_dbt_data_test: data_test
test_dbt_ephemeral_data_tests: data_test_ephemeral_models
test_dbt_schema_test: schema_test

10 changes: 0 additions & 10 deletions test/integration/spark-databricks-odbc-cluster.dbtspec
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,6 @@ target:
connect_retries: 5
connect_timeout: 60
projects:
- overrides: incremental
paths:
"models/incremental.sql":
materialized: incremental
body: "select * from {{ source('raw', 'seed') }}"
facts:
base:
rowcount: 10
added:
rowcount: 20
- overrides: snapshot_strategy_check_cols
dbt_project_yml: &file_format_delta
# we're going to UPDATE the seed tables as part of testing, so we must make them delta format
Expand Down
10 changes: 0 additions & 10 deletions test/integration/spark-databricks-odbc-sql-endpoint.dbtspec
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,6 @@ target:
connect_retries: 5
connect_timeout: 60
projects:
- overrides: incremental
paths:
"models/incremental.sql":
materialized: incremental
body: "select * from {{ source('raw', 'seed') }}"
facts:
base:
rowcount: 10
added:
rowcount: 20
- overrides: snapshot_strategy_check_cols
dbt_project_yml: &file_format_delta
# we're going to UPDATE the seed tables as part of testing, so we must make them delta format
Expand Down
11 changes: 0 additions & 11 deletions test/integration/spark-thrift.dbtspec
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,6 @@ target:
connect_retries: 5
connect_timeout: 60
schema: "analytics_{{ var('_dbt_random_suffix') }}"
projects:
- overrides: incremental
paths:
"models/incremental.sql":
materialized: incremental
body: "select * from {{ source('raw', 'seed') }}"
facts:
base:
rowcount: 10
added:
rowcount: 20
sequences:
test_dbt_empty: empty
test_dbt_base: base
Expand Down