Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V1.9.0 release #252

Merged
merged 1 commit into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbt/adapters/fabric/fabric_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def valid_incremental_strategies(self):
"""The set of standard builtin strategies which this adapter supports out-of-the-box.
Not used to validate custom strategies defined by end users.
"""
return ["append", "delete+insert", "merge", "insert_overwrite"]
return ["append", "delete+insert", "microbatch"]

# This is for use in the test suite
def run_sql_for_tests(self, sql, fetch, conn):
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/fabric/macros/adapters/columns.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% macro fabric__get_empty_subquery_sql(select_sql, select_sql_header=none) %}
{% if sql.strip().lower().startswith('with') %}
{% if select_sql.strip().lower().startswith('with') %}
{{ select_sql }}
{% else -%}
select * from (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{% macro fabric__get_incremental_default_sql(arg_dict) %}

{% if arg_dict["unique_key"] %}
-- Delete + Insert Strategy, calls get_delete_insert_merge_sql
{% do return(get_incremental_delete_insert_sql(arg_dict)) %}
{% else %}
-- Incremental Append will insert data into target table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,35 @@
from {{ source }}
){{ query_label }}
{% endmacro %}

{% macro fabric__get_incremental_microbatch_sql(arg_dict) %}
{%- set target = arg_dict["target_relation"] -%}
{%- set source = arg_dict["temp_relation"] -%}
{%- set dest_columns = arg_dict["dest_columns"] -%}
{%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%}

{#-- Add additional incremental_predicates to filter for batch --#}
{% if model.config.get("__dbt_internal_microbatch_event_time_start") -%}
{{ log("incremenal append event start time > DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") }}
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") %}
{% endif %}
{% if model.config.__dbt_internal_microbatch_event_time_end -%}
{{ log("incremenal append event end time < DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") }}
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") %}
{% endif %}
{% do arg_dict.update({'incremental_predicates': incremental_predicates}) %}

delete DBT_INTERNAL_TARGET from {{ target }} AS DBT_INTERNAL_TARGET
where (
{% for predicate in incremental_predicates %}
{%- if not loop.first %}and {% endif -%} {{ predicate }}
{% endfor %}
);

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
)
{% endmacro %}
189 changes: 101 additions & 88 deletions dbt/include/fabric/macros/materializations/snapshots/helpers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@


{% macro fabric__build_snapshot_table(strategy, relation) %}

{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}
select *,
{{ strategy.scd_id }} as dbt_scd_id,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ get_dbt_valid_to_current(strategy, columns) }}
{%- if strategy.hard_deletes == 'new_record' -%}
, 'False' as {{ columns.dbt_is_deleted }}
{% endif -%}
from (
select * from {{ relation }}
) sbq
Expand All @@ -31,115 +34,125 @@

{% macro fabric__snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) -%}

with snapshot_query as (
{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}

with snapshot_query as (
select * from {{ temp_snapshot_relation }}

),

snapshotted_data as (

select *,
{{ strategy.unique_key }} as dbt_unique_key

{{ unique_key_fields(strategy.unique_key) }}
from {{ target_relation }}
where dbt_valid_to is null

where
{% if config.get('dbt_valid_to_current') %}
{# Check for either dbt_valid_to_current OR null, in order to correctly update records with nulls #}
( {{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} or {{ columns.dbt_valid_to }} is null)
{% else %}
{{ columns.dbt_valid_to }} is null
{% endif %}
),

insertions_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to,
{{ strategy.scd_id }} as dbt_scd_id

select *,
{{ unique_key_fields(strategy.unique_key) }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ get_dbt_valid_to_current(strategy, columns) }},
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }}
from snapshot_query
),

updates_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
{{ strategy.updated_at }} as dbt_valid_to

from snapshot_query
),

{%- if strategy.invalidate_hard_deletes %}

deletes_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key
select *,
{{ unique_key_fields(strategy.unique_key) }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_to }}
from snapshot_query
),
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
deletes_source_data as (
select *, {{ unique_key_fields(strategy.unique_key) }}
from snapshot_query
),
{% endif %}

insertions as (

select
'insert' as dbt_change_type,
source_data.*

select 'insert' as dbt_change_type, source_data.*
{%- if strategy.hard_deletes == 'new_record' -%}
,'False' as {{ columns.dbt_is_deleted }}
{%- endif %}
from insertions_source_data as source_data
left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where snapshotted_data.dbt_unique_key is null
or (
snapshotted_data.dbt_unique_key is not null
and (
{{ strategy.row_changed }}
)
)

left outer join snapshotted_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }}
or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and ({{ strategy.row_changed }}))
),

updates as (

select
'update' as dbt_change_type,
source_data.*,
snapshotted_data.dbt_scd_id

select 'update' as dbt_change_type, source_data.*,
snapshotted_data.{{ columns.dbt_scd_id }}
{%- if strategy.hard_deletes == 'new_record' -%}
, snapshotted_data.{{ columns.dbt_is_deleted }}
{%- endif %}
from updates_source_data as source_data
join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where (
{{ strategy.row_changed }}
)
join snapshotted_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where ({{ strategy.row_changed }})
)

{%- if strategy.invalidate_hard_deletes -%}
,

deletes as (

select
'delete' as dbt_change_type,
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
,
deletes as (
select 'delete' as dbt_change_type,
source_data.*,
{{ snapshot_get_time() }} as dbt_valid_from,
{{ snapshot_get_time() }} as dbt_updated_at,
{{ snapshot_get_time() }} as dbt_valid_to,
snapshotted_data.dbt_scd_id

from snapshotted_data
left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where source_data.dbt_unique_key is null
)
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
{{ snapshot_get_time() }} as {{ columns.dbt_valid_to }},
snapshotted_data.{{ columns.dbt_scd_id }}
{%- if strategy.hard_deletes == 'new_record' -%}
, snapshotted_data.{{ columns.dbt_is_deleted }}
{%- endif %}
from snapshotted_data
left join deletes_source_data as source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
)
{%- endif %}
{%- if strategy.hard_deletes == 'new_record' %}
{%set source_query = "select * from "~temp_snapshot_relation%}
{% set source_sql_cols = get_column_schema_from_query(source_query) %}
,
deletion_records as (

select
'insert' as dbt_change_type,
{%- for col in source_sql_cols -%}
snapshotted_data.{{ adapter.quote(col.column) }},
{% endfor -%}
{%- if strategy.unique_key | is_list -%}
{%- for key in strategy.unique_key -%}
snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }},
{% endfor -%}
{%- else -%}
snapshotted_data.dbt_unique_key as dbt_unique_key,
{% endif -%}
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }},
snapshotted_data.{{ columns.dbt_scd_id }},
'True' as {{ columns.dbt_is_deleted }}
from snapshotted_data
left join deletes_source_data as source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
)
{%- endif %}
select * from insertions
union all
select * from updates
{%- if strategy.invalidate_hard_deletes %}
union all
select * from deletes
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
union all
select * from deletes
{%- endif %}
{%- if strategy.hard_deletes == 'new_record' %}
union all
select * from deletion_records
{%- endif %}

{%- endmacro %}
Expand Down
26 changes: 15 additions & 11 deletions dbt/include/fabric/macros/materializations/snapshots/snapshot.sql
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
{% if not target_relation_exists %}

{% set build_sql = build_snapshot_table(strategy, temp_snapshot_relation) %}
{% set build_or_select_sql = build_sql %}

-- naming a temp relation
{% set tmp_relation_view = target_relation.incorporate(path={"identifier": target_relation.identifier ~ '__dbt_tmp_vw'}, type='view')-%}
Expand All @@ -51,40 +52,43 @@

{% else %}

{{ adapter.valid_snapshot_target(target_relation) }}
{% set columns = config.get("snapshot_meta_column_names") or get_snapshot_table_column_names() %}
{{ adapter.valid_snapshot_target(target_relation, columns) }}
{% set build_or_select_sql = snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) %}
{% set staging_table = build_snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) %}
-- this may no-op if the database does not require column expansion
{% do adapter.expand_target_column_types(from_relation=staging_table,
to_relation=target_relation) %}

{% set remove_columns = ['dbt_change_type', 'DBT_CHANGE_TYPE', 'dbt_unique_key', 'DBT_UNIQUE_KEY'] %}
{% if unique_key | is_list %}
{% for key in strategy.unique_key %}
{{ remove_columns.append('dbt_unique_key_' + loop.index|string) }}
{{ remove_columns.append('DBT_UNIQUE_KEY_' + loop.index|string) }}
{% endfor %}
{% endif %}
{% set missing_columns = adapter.get_missing_columns(staging_table, target_relation)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| rejectattr('name', 'in', remove_columns)
| list %}
{% if missing_columns|length > 0 %}
{{log("Missing columns length is: "~ missing_columns|length)}}
{% do create_columns(target_relation, missing_columns) %}
{% endif %}
{% set source_columns = adapter.get_columns_in_relation(staging_table)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| rejectattr('name', 'in', remove_columns)
| list %}
{% set quoted_source_columns = [] %}
{% for column in source_columns %}
{% do quoted_source_columns.append(adapter.quote(column.name)) %}
{% endfor %}

{% set final_sql = snapshot_merge_sql(
target = target_relation,
source = staging_table,
insert_cols = quoted_source_columns
)
%}
{% endif %}

{{ check_time_data_types(build_or_select_sql) }}
{% call statement('main') %}
{{ final_sql }}
{% endcall %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{% macro fabric__snapshot_merge_sql(target, source, insert_cols) %}

{%- set insert_cols_csv = insert_cols | join(', ') -%}
{%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%}
{%- set target_table = target.include(database=False) -%}
{%- set source_table = source.include(database=False) -%}
{% set target_columns_list = [] %}
Expand All @@ -9,17 +10,21 @@
{% endfor %}
{%- set target_columns = target_columns_list | join(', ') -%}

UPDATE DBT_INTERNAL_DEST
SET dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to
FROM {{ target_table }} as DBT_INTERNAL_DEST
INNER JOIN {{ source_table }} as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id
WHERE DBT_INTERNAL_DEST.dbt_valid_to is null
AND DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete')
update DBT_INTERNAL_DEST
set {{ columns.dbt_valid_to }} = DBT_INTERNAL_SOURCE.{{ columns.dbt_valid_to }}
from {{ target_table }} as DBT_INTERNAL_DEST
inner join {{ source_table }} as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.{{ columns.dbt_scd_id }} = DBT_INTERNAL_DEST.{{ columns.dbt_scd_id }}
where DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete')
{% if config.get("dbt_valid_to_current") %}
and (DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} or DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null)
{% else %}
and DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null
{% endif %}
{{ apply_label() }}

INSERT INTO {{ target_table }} ({{ insert_cols_csv }})
SELECT {{target_columns}} FROM {{ source_table }} as DBT_INTERNAL_SOURCE
WHERE DBT_INTERNAL_SOURCE.dbt_change_type = 'insert'
insert into {{ target_table }} ({{ insert_cols_csv }})
select {{target_columns}} from {{ source_table }} as DBT_INTERNAL_SOURCE
where DBT_INTERNAL_SOURCE.dbt_change_type = 'insert'
{{ apply_label() }}
{% endmacro %}
Loading
Loading