From 57f2aa60ef9f60f5bf2a065fd508c5c3335b6ea1 Mon Sep 17 00:00:00 2001 From: Pradeep Srikakolapu Date: Thu, 11 Apr 2024 13:54:06 -0700 Subject: [PATCH] Fixed snapshot related bug which is not working with CTE's --- dbt/adapters/fabric/__version__.py | 2 +- dbt/adapters/fabric/fabric_adapter.py | 7 + .../fabric/macros/adapters/columns.sql | 1 + .../fabric/macros/adapters/relation.sql | 2 +- .../materializations/snapshots/helpers.sql | 199 ++++++++++++++++++ .../materializations/snapshots/snapshot.sql | 163 +++++++++----- .../macros/materializations/tests/helpers.sql | 2 +- 7 files changed, 315 insertions(+), 61 deletions(-) create mode 100644 dbt/include/fabric/macros/materializations/snapshots/helpers.sql diff --git a/dbt/adapters/fabric/__version__.py b/dbt/adapters/fabric/__version__.py index 6aaa73b..72126ce 100644 --- a/dbt/adapters/fabric/__version__.py +++ b/dbt/adapters/fabric/__version__.py @@ -1 +1 @@ -version = "1.8.0" +version = "1.8.1" diff --git a/dbt/adapters/fabric/fabric_adapter.py b/dbt/adapters/fabric/fabric_adapter.py index 50784f2..1e787e8 100644 --- a/dbt/adapters/fabric/fabric_adapter.py +++ b/dbt/adapters/fabric/fabric_adapter.py @@ -192,6 +192,13 @@ def render_limited(self) -> str: else: return f"(select TOP {self.limit} * from {rendered}) _dbt_top_subq" + # TODO: Standardizing quote characters + # def quoted(self, identifier): + # return "[{identifier}]".format( + # quote_char=self.quote_character, + # identifier=identifier, + # ) + @available @classmethod def render_column_constraint(cls, constraint: ColumnLevelConstraint) -> Optional[str]: diff --git a/dbt/include/fabric/macros/adapters/columns.sql b/dbt/include/fabric/macros/adapters/columns.sql index 57297d4..db081ff 100644 --- a/dbt/include/fabric/macros/adapters/columns.sql +++ b/dbt/include/fabric/macros/adapters/columns.sql @@ -79,6 +79,7 @@ {% set tempTableName %} {{ relation.schema }}.{{ relation.identifier }}_{{ range(1300, 19000) | random }} {% endset %} + {{ log("Cannot Alter table type, as it is not supported. Using random table as a temp table. - " ~ tempTableName) }} {% set tempTable %} CREATE TABLE {{tempTableName}} diff --git a/dbt/include/fabric/macros/adapters/relation.sql b/dbt/include/fabric/macros/adapters/relation.sql index 380d131..4918eed 100644 --- a/dbt/include/fabric/macros/adapters/relation.sql +++ b/dbt/include/fabric/macros/adapters/relation.sql @@ -91,7 +91,7 @@ {% set tempTableName %} {{ relation.include(database=False).identifier.replace("#", "") }}_{{ range(21000, 109000) | random }} {% endset %} - + {{ log("Truncate Statement is not supported, Using random table as a temp table. - " ~ tempTableName) }} {% call statement('truncate_relation') -%} CREATE TABLE {{ tempTableName }} AS SELECT * FROM {{ relation }} WHERE 1=2 EXEC('DROP TABLE IF EXISTS {{ relation.include(database=False) }};'); diff --git a/dbt/include/fabric/macros/materializations/snapshots/helpers.sql b/dbt/include/fabric/macros/materializations/snapshots/helpers.sql new file mode 100644 index 0000000..58107d4 --- /dev/null +++ b/dbt/include/fabric/macros/materializations/snapshots/helpers.sql @@ -0,0 +1,199 @@ +{% macro fabric__post_snapshot(staging_relation) %} + -- Clean up the snapshot temp table + {% do drop_relation(staging_relation) %} +{% endmacro %} + +--Due to Alter not being supported, have to rely on this for temporarily +{% macro fabric__create_columns(relation, columns) %} + {# default__ macro uses "add column" + TSQL preferes just "add" + #} + + {% set columns %} + {% for column in columns %} + , CAST(NULL AS {{column.data_type}}) AS {{column_name}} + {% endfor %} + {% endset %} + + {% set tempTableName %} + [{{relation.database}}].[{{ relation.schema }}].[{{ relation.identifier }}_{{ range(1300, 19000) | random }}] + {% endset %} + {{ log("Creating new columns are not supported without dropping a table. Using random table as a temp table. - " ~ tempTableName) }} + + {% set tempTable %} + CREATE TABLE {{tempTableName}} + AS SELECT * {{columns}} FROM [{{relation.database}}].[{{ relation.schema }}].[{{ relation.identifier }}] {{ information_schema_hints() }} + {% endset %} + + {% call statement('create_temp_table') -%} + {{ tempTable }} + {%- endcall %} + + {% set dropTable %} + DROP TABLE [{{relation.database}}].[{{ relation.schema }}].[{{ relation.identifier }}] + {% endset %} + + {% call statement('drop_table') -%} + {{ dropTable }} + {%- endcall %} + + {% set createTable %} + CREATE TABLE {{ relation }} + AS SELECT * FROM {{tempTableName}} {{ information_schema_hints() }} + {% endset %} + + {% call statement('create_Table') -%} + {{ createTable }} + {%- endcall %} + + {% set dropTempTable %} + DROP TABLE {{tempTableName}} + {% endset %} + + {% call statement('drop_temp_table') -%} + {{ dropTempTable }} + {%- endcall %} +{% endmacro %} + +{% macro fabric__get_true_sql() %} + {{ return('1=1') }} +{% endmacro %} + + +{% macro fabric__build_snapshot_table(strategy, relation) %} + + select *, + {{ strategy.scd_id }} as dbt_scd_id, + {{ strategy.updated_at }} as dbt_updated_at, + {{ strategy.updated_at }} as dbt_valid_from, + nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to + from ( + select * from {{ relation }} + ) sbq + +{% endmacro %} + +{% macro fabric__snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) -%} + + with snapshot_query as ( + + select * from {{ temp_snapshot_relation }} + + ), + + snapshotted_data as ( + + select *, + {{ strategy.unique_key }} as dbt_unique_key + + from {{ target_relation }} + where dbt_valid_to is null + + ), + + insertions_source_data as ( + + select + *, + {{ strategy.unique_key }} as dbt_unique_key, + {{ strategy.updated_at }} as dbt_updated_at, + {{ strategy.updated_at }} as dbt_valid_from, + nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to, + {{ strategy.scd_id }} as dbt_scd_id + + from snapshot_query + ), + + updates_source_data as ( + + select + *, + {{ strategy.unique_key }} as dbt_unique_key, + {{ strategy.updated_at }} as dbt_updated_at, + {{ strategy.updated_at }} as dbt_valid_from, + {{ strategy.updated_at }} as dbt_valid_to + + from snapshot_query + ), + + {%- if strategy.invalidate_hard_deletes %} + + deletes_source_data as ( + + select + *, + {{ strategy.unique_key }} as dbt_unique_key + from snapshot_query + ), + {% endif %} + + insertions as ( + + select + 'insert' as dbt_change_type, + source_data.* + + from insertions_source_data as source_data + left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key + where snapshotted_data.dbt_unique_key is null + or ( + snapshotted_data.dbt_unique_key is not null + and ( + {{ strategy.row_changed }} + ) + ) + + ), + + updates as ( + + select + 'update' as dbt_change_type, + source_data.*, + snapshotted_data.dbt_scd_id + + from updates_source_data as source_data + join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key + where ( + {{ strategy.row_changed }} + ) + ) + + {%- if strategy.invalidate_hard_deletes -%} + , + + deletes as ( + + select + 'delete' as dbt_change_type, + source_data.*, + {{ snapshot_get_time() }} as dbt_valid_from, + {{ snapshot_get_time() }} as dbt_updated_at, + {{ snapshot_get_time() }} as dbt_valid_to, + snapshotted_data.dbt_scd_id + + from snapshotted_data + left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key + where source_data.dbt_unique_key is null + ) + {%- endif %} + + select * from insertions + union all + select * from updates + {%- if strategy.invalidate_hard_deletes %} + union all + select * from deletes + {%- endif %} + +{%- endmacro %} + +{% macro build_snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) %} + {% set temp_relation = make_temp_relation(target_relation) %} + {% set select = fabric__snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) %} + {% call statement('build_snapshot_staging_relation') %} + {{ create_table_as(True, temp_relation, select) }} + {% endcall %} + + {% do return(temp_relation) %} +{% endmacro %} diff --git a/dbt/include/fabric/macros/materializations/snapshots/snapshot.sql b/dbt/include/fabric/macros/materializations/snapshots/snapshot.sql index 3b5ddc7..02131f4 100644 --- a/dbt/include/fabric/macros/materializations/snapshots/snapshot.sql +++ b/dbt/include/fabric/macros/materializations/snapshots/snapshot.sql @@ -1,58 +1,105 @@ -{% macro fabric__post_snapshot(staging_relation) %} - -- Clean up the snapshot temp table - {% do drop_relation(staging_relation) %} -{% endmacro %} - -{% macro fabric__create_columns(relation, columns) %} - {# default__ macro uses "add column" - TSQL preferes just "add" - #} - - {% set columns %} - {% for column in columns %} - , CAST(NULL AS {{column.data_type}}) AS {{column_name}} - {% endfor %} - {% endset %} - - {% set tempTableName %} - [{{relation.database}}].[{{ relation.schema }}].[{{ relation.identifier }}_{{ range(1300, 19000) | random }}] - {% endset %} - - {% set tempTable %} - CREATE TABLE {{tempTableName}} - AS SELECT * {{columns}} FROM [{{relation.database}}].[{{ relation.schema }}].[{{ relation.identifier }}] {{ information_schema_hints() }} - {% endset %} - - {% call statement('create_temp_table') -%} - {{ tempTable }} - {%- endcall %} - - {% set dropTable %} - DROP TABLE [{{relation.database}}].[{{ relation.schema }}].[{{ relation.identifier }}] - {% endset %} - - {% call statement('drop_table') -%} - {{ dropTable }} - {%- endcall %} - - {% set createTable %} - CREATE TABLE {{ relation }} - AS SELECT * FROM {{tempTableName}} {{ information_schema_hints() }} - {% endset %} - - {% call statement('create_Table') -%} - {{ createTable }} - {%- endcall %} - - {% set dropTempTable %} - DROP TABLE {{tempTableName}} - {% endset %} - - {% call statement('drop_temp_table') -%} - {{ dropTempTable }} - {%- endcall %} -{% endmacro %} - -{% macro fabric__get_true_sql() %} - {{ return('1=1') }} -{% endmacro %} +{% materialization snapshot, adapter='fabric' %} + + {%- set config = model['config'] -%} + {%- set target_table = model.get('alias', model.get('name')) -%} + {%- set strategy_name = config.get('strategy') -%} + {%- set unique_key = config.get('unique_key') %} + -- grab current tables grants config for comparision later on + {%- set grant_config = config.get('grants') -%} + + {% set target_relation_exists, target_relation = get_or_create_relation( + database=model.database, + schema=model.schema, + identifier=target_table, + type='table') -%} + + {%- if not target_relation.is_table -%} + {% do exceptions.relation_wrong_type(target_relation, 'table') %} + {%- endif -%} + + {{ run_hooks(pre_hooks, inside_transaction=False) }} + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + {% set strategy_macro = strategy_dispatch(strategy_name) %} + {% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %} + + {% set temp_snapshot_relation_exists, temp_snapshot_relation = get_or_create_relation( + database=model.database, + schema=model.schema, + identifier=target_table+"_snapshot_staging_temp_view", + type='view') -%} + + -- Create a temporary view to manage if user SQl uses CTE + {% set temp_snapshot_relation_sql = model['compiled_code'].replace("'", "''") %} + {% call statement('create temp_snapshot_relation') %} + EXEC('DROP VIEW IF EXISTS {{ temp_snapshot_relation.include(database=False) }};'); + EXEC('create view {{ temp_snapshot_relation.include(database=False) }} as {{ temp_snapshot_relation_sql }};'); + {% endcall %} + + {% if not target_relation_exists %} + + {% set build_sql = build_snapshot_table(strategy, temp_snapshot_relation) %} + {% set final_sql = create_table_as(False, target_relation, build_sql) %} + + {% else %} + + {{ adapter.valid_snapshot_target(target_relation) }} + {% set staging_table = build_snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) %} + -- this may no-op if the database does not require column expansion + {% do adapter.expand_target_column_types(from_relation=staging_table, + to_relation=target_relation) %} + {% set missing_columns = adapter.get_missing_columns(staging_table, target_relation) + | rejectattr('name', 'equalto', 'dbt_change_type') + | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') + | rejectattr('name', 'equalto', 'dbt_unique_key') + | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') + | list %} + {% do create_columns(target_relation, missing_columns) %} + {% set source_columns = adapter.get_columns_in_relation(staging_table) + | rejectattr('name', 'equalto', 'dbt_change_type') + | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') + | rejectattr('name', 'equalto', 'dbt_unique_key') + | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') + | list %} + {% set quoted_source_columns = [] %} + {% for column in source_columns %} + {% do quoted_source_columns.append(adapter.quote(column.name)) %} + {% endfor %} + + {% set final_sql = snapshot_merge_sql( + target = target_relation, + source = staging_table, + insert_cols = quoted_source_columns + ) + %} + + {% endif %} + + {% call statement('main') %} + {{ final_sql }} + {% endcall %} + + fabric__drop_relation_script(temp_snapshot_relation) + + {% set should_revoke = should_revoke(target_relation_exists, full_refresh_mode=False) %} + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} + + {% do persist_docs(target_relation, model) %} + + {% if not target_relation_exists %} + {% do create_indexes(target_relation) %} + {% endif %} + + {{ run_hooks(post_hooks, inside_transaction=True) }} + + {{ adapter.commit() }} + + {% if staging_table is defined %} + {% do post_snapshot(staging_table) %} + {% endif %} + + {{ run_hooks(post_hooks, inside_transaction=False) }} + + {{ return({'relations': [target_relation]}) }} + +{% endmaterialization %} diff --git a/dbt/include/fabric/macros/materializations/tests/helpers.sql b/dbt/include/fabric/macros/materializations/tests/helpers.sql index 4003057..e902553 100644 --- a/dbt/include/fabric/macros/materializations/tests/helpers.sql +++ b/dbt/include/fabric/macros/materializations/tests/helpers.sql @@ -2,7 +2,7 @@ {% if main_sql.strip().lower().startswith('with') %} {% set testview %} - dbo.testview_{{ range(1300, 19000) | random }} + {{ generate_schema_name('tests_schema') }}.testview_{{ range(1300, 19000) | random }} {% endset %} {% set sql = main_sql.replace("'", "''")%}