Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[incremental models] add tests, various bugfixes and support for incremental predicates #376

Merged
merged 8 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion dbt/adapters/sqlserver/sql_server_column.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import ClassVar, Dict
from typing import Any, ClassVar, Dict

from dbt.adapters.base import Column

Expand All @@ -11,3 +11,10 @@ class SQLServerColumn(Column):
"INTEGER": "INT",
"BOOLEAN": "BIT",
}

@classmethod
def string_type(cls, size: int) -> str:
return f"varchar({size if size > 0 else 'MAX'})"

def literal(self, value: Any) -> str:
return "cast('{}' as {})".format(value, self.data_type)
29 changes: 24 additions & 5 deletions dbt/include/sqlserver/macros/adapters/columns.sql
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,31 @@
{%- set tmp_column = column_name + "__dbt_alter" -%}

{% call statement('alter_column_type') -%}

alter {{ relation.type }} {{ relation }} add {{ tmp_column }} {{ new_column_type }};
update {{ relation }} set {{ tmp_column }} = {{ column_name }};
alter {{ relation.type }} {{ relation }} drop column {{ column_name }};
alter {{ relation.type }} {{ relation }} add "{{ tmp_column }}" {{ new_column_type }};
{%- endcall -%}
{% call statement('alter_column_type') -%}
update {{ relation }} set "{{ tmp_column }}" = "{{ column_name }}";
{%- endcall -%}
{% call statement('alter_column_type') -%}
alter {{ relation.type }} {{ relation }} drop column "{{ column_name }}";
{%- endcall -%}
{% call statement('alter_column_type') -%}
exec sp_rename '{{ relation | replace('"', '') }}.{{ tmp_column }}', '{{ column_name }}', 'column'

{%- endcall -%}

{% endmacro %}


{% macro sqlserver__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}
{% call statement('add_drop_columns') -%}
{% if add_columns %}
alter {{ relation.type }} {{ relation }}
add {% for column in add_columns %}"{{ column.name }}" {{ column.data_type }}{{ ', ' if not loop.last }}{% endfor %};
{% endif %}

{% if remove_columns %}
alter {{ relation.type }} {{ relation }}
drop column {% for column in remove_columns %}"{{ column.name }}"{{ ',' if not loop.last }}{% endfor %};
{% endif %}
{%- endcall -%}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,53 @@
https://getdbt.slack.com/archives/C50NEBJGG/p1636045535056600
#}

{% macro sqlserver__get_merge_sql(target, source, unique_key, dest_columns, predicates) %}
{{ default__get_merge_sql(target, source, unique_key, dest_columns, predicates) }};
{% macro sqlserver__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none) %}
{{ default__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) }};
{% endmacro %}

{% macro sqlserver__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %}
{% if incremental_predicates %}
{{ exceptions.raise_not_implemented('incremental_predicates are not implemented in dbt-sqlserver') }}
{% endif %}
{% macro sqlserver__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) %}
{{ default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) }};
{% endmacro %}

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{% macro sqlserver__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none) %}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

{% if unique_key %}
{% if unique_key is sequence and unique_key is not string %}
delete from {{ target }}
where exists (
SELECT NULL
FROM
{{ source }}
WHERE
{% if unique_key %}
{% if unique_key is sequence and unique_key is not string %}
delete from {{ target }}
where exists (
select null
from {{ source }}
where
{% for key in unique_key %}
{{ source }}.{{ key }} = {{ target }}.{{ key }}
{{ "and " if not loop.last }}
{% endfor %}
);
{% else %}
delete from {{ target }}
where (
{{ unique_key }}) in (
select ({{ unique_key }})
from {{ source }}
);

{% endif %}
{% endif %}

insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
)

{% endmacro %}

{% macro sqlserver__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) %}
{{ default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) }};
)
{% if incremental_predicates %}
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
{% endif %};
{% else %}
delete from {{ target }}
where (
{{ unique_key }}) in (
select ({{ unique_key }})
from {{ source }}
)
{%- if incremental_predicates %}
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
{%- endif -%};
{% endif %}
{% endif %}

insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
)
{% endmacro %}
115 changes: 115 additions & 0 deletions tests/functional/adapter/test_incremental.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,120 @@
import pytest
from dbt.tests.adapter.incremental.fixtures import (
_MODELS__A,
_MODELS__INCREMENTAL_APPEND_NEW_COLUMNS,
_MODELS__INCREMENTAL_APPEND_NEW_COLUMNS_REMOVE_ONE,
_MODELS__INCREMENTAL_APPEND_NEW_COLUMNS_REMOVE_ONE_TARGET,
_MODELS__INCREMENTAL_APPEND_NEW_COLUMNS_TARGET,
_MODELS__INCREMENTAL_FAIL,
_MODELS__INCREMENTAL_IGNORE_TARGET,
_MODELS__INCREMENTAL_SYNC_ALL_COLUMNS,
_MODELS__INCREMENTAL_SYNC_REMOVE_ONLY,
)
from dbt.tests.adapter.incremental.test_incremental_on_schema_change import (
BaseIncrementalOnSchemaChange,
)
from dbt.tests.adapter.incremental.test_incremental_predicates import BaseIncrementalPredicates
from dbt.tests.adapter.incremental.test_incremental_unique_id import BaseIncrementalUniqueKey

_MODELS__INCREMENTAL_IGNORE = """
{{
config(
materialized='incremental',
unique_key='id',
on_schema_change='ignore'
)
}}

WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )

{% if is_incremental() %}

SELECT
id,
field1,
field2,
field3,
field4
FROM source_data
WHERE id NOT IN (SELECT id from {{ this }} )

{% else %}

SELECT TOP 3 id, field1, field2 FROM source_data

{% endif %}
"""

_MODELS__INCREMENTAL_SYNC_REMOVE_ONLY_TARGET = """
{{
config(materialized='table')
}}

with source_data as (

select * from {{ ref('model_a') }}

)

{% set string_type = dbt.type_string() %}

select id
,cast(field1 as {{string_type}}) as field1

from source_data
"""

_MODELS__INCREMENTAL_SYNC_ALL_COLUMNS_TARGET = """
{{
config(materialized='table')
}}

with source_data as (

select * from {{ ref('model_a') }}

)

{% set string_type = dbt.type_string() %}

select id
,cast(field1 as {{string_type}}) as field1
--,field2
,cast(case when id <= 3 then null else field3 end as {{string_type}}) as field3
,cast(case when id <= 3 then null else field4 end as {{string_type}}) as field4

from source_data
"""


class TestBaseIncrementalUniqueKeySQLServer(BaseIncrementalUniqueKey):
pass


class TestIncrementalOnSchemaChangeSQLServer(BaseIncrementalOnSchemaChange):
@pytest.fixture(scope="class")
def models(self):
return {
"incremental_sync_remove_only.sql": _MODELS__INCREMENTAL_SYNC_REMOVE_ONLY,
"incremental_ignore.sql": _MODELS__INCREMENTAL_IGNORE,
"incremental_sync_remove_only_target.sql": _MODELS__INCREMENTAL_SYNC_REMOVE_ONLY_TARGET, # noqa: E501
"incremental_ignore_target.sql": _MODELS__INCREMENTAL_IGNORE_TARGET,
"incremental_fail.sql": _MODELS__INCREMENTAL_FAIL,
"incremental_sync_all_columns.sql": _MODELS__INCREMENTAL_SYNC_ALL_COLUMNS,
"incremental_append_new_columns_remove_one.sql": _MODELS__INCREMENTAL_APPEND_NEW_COLUMNS_REMOVE_ONE, # noqa: E501
"model_a.sql": _MODELS__A,
"incremental_append_new_columns_target.sql": _MODELS__INCREMENTAL_APPEND_NEW_COLUMNS_TARGET, # noqa: E501
"incremental_append_new_columns.sql": _MODELS__INCREMENTAL_APPEND_NEW_COLUMNS,
"incremental_sync_all_columns_target.sql": _MODELS__INCREMENTAL_SYNC_ALL_COLUMNS_TARGET, # noqa: E501
"incremental_append_new_columns_remove_one_target.sql": _MODELS__INCREMENTAL_APPEND_NEW_COLUMNS_REMOVE_ONE_TARGET, # noqa: E501
}


class TestIncrementalPredicatesDeleteInsertSQLServer(BaseIncrementalPredicates):
pass


class TestPredicatesDeleteInsertSQLServer(BaseIncrementalPredicates):
@pytest.fixture(scope="class")
def project_config_update(self):
return {"models": {"+predicates": ["id != 2"], "+incremental_strategy": "delete+insert"}}
2 changes: 1 addition & 1 deletion tests/functional/adapter/test_seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
{% endfor %}

{% set col_type = col_types.get(column_name) %}
{% set col_type = 'text' if col_type and 'character varying' in col_type else col_type %}
{% set col_type = 'text' if col_type and 'varchar' in col_type else col_type %}

{% set validation_message = 'Got a column type of ' ~ col_type ~ ', expected ' ~ type %}

Expand Down