diff --git a/.changes/unreleased/Fixes-20230726-164729.yaml b/.changes/unreleased/Fixes-20230726-164729.yaml new file mode 100644 index 000000000..031e5dfde --- /dev/null +++ b/.changes/unreleased/Fixes-20230726-164729.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Fix `on_configuration_change` setting to properly recognize `continue` and `fail` +time: 2023-07-26T16:47:29.471437-04:00 +custom: + Author: mikealfare + Issue: "708" diff --git a/.flake8 b/.flake8 index bbc3202a0..b08ffcd53 100644 --- a/.flake8 +++ b/.flake8 @@ -12,3 +12,5 @@ ignore = E741, E501, exclude = test +per-file-ignores = + */__init__.py: F401 diff --git a/dbt/adapters/snowflake/relation.py b/dbt/adapters/snowflake/relation.py index 52a676820..9d09e96b4 100644 --- a/dbt/adapters/snowflake/relation.py +++ b/dbt/adapters/snowflake/relation.py @@ -1,24 +1,19 @@ from dataclasses import dataclass, field -from typing import Optional +from typing import Optional, Type -from dbt.adapters.base.relation import BaseRelation, Policy -from dbt.dataclass_schema import StrEnum +from dbt.adapters.base.relation import BaseRelation +from dbt.adapters.relation_configs import RelationConfigChangeAction, RelationResults +from dbt.context.providers import RuntimeConfigObject from dbt.utils import classproperty - -class SnowflakeRelationType(StrEnum): - Table = "table" - View = "view" - CTE = "cte" - External = "external" - DynamicTable = "dynamic_table" - - -@dataclass -class SnowflakeQuotePolicy(Policy): - database: bool = False - schema: bool = False - identifier: bool = False +from dbt.adapters.snowflake.relation_configs import ( + SnowflakeDynamicTableConfig, + SnowflakeDynamicTableConfigChangeset, + SnowflakeDynamicTableTargetLagConfigChange, + SnowflakeDynamicTableWarehouseConfigChange, + SnowflakeQuotePolicy, + SnowflakeRelationType, +) @dataclass(frozen=True, eq=False, repr=False) @@ -33,3 +28,34 @@ def is_dynamic_table(self) -> bool: @classproperty def DynamicTable(cls) -> str: return str(SnowflakeRelationType.DynamicTable) + + @classproperty + def get_relation_type(cls) -> Type[SnowflakeRelationType]: + return SnowflakeRelationType + + @classmethod + def dynamic_table_config_changeset( + cls, relation_results: RelationResults, runtime_config: RuntimeConfigObject + ) -> Optional[SnowflakeDynamicTableConfigChangeset]: + existing_dynamic_table = SnowflakeDynamicTableConfig.from_relation_results( + relation_results + ) + new_dynamic_table = SnowflakeDynamicTableConfig.from_model_node(runtime_config.model) + + config_change_collection = SnowflakeDynamicTableConfigChangeset() + + if new_dynamic_table.target_lag != existing_dynamic_table.target_lag: + config_change_collection.target_lag = SnowflakeDynamicTableTargetLagConfigChange( + action=RelationConfigChangeAction.alter, + context=new_dynamic_table.target_lag, + ) + + if new_dynamic_table.warehouse != existing_dynamic_table.warehouse: + config_change_collection.warehouse = SnowflakeDynamicTableWarehouseConfigChange( + action=RelationConfigChangeAction.alter, + context=new_dynamic_table.warehouse, + ) + + if config_change_collection.has_changes: + return config_change_collection + return None diff --git a/dbt/adapters/snowflake/relation_configs/__init__.py b/dbt/adapters/snowflake/relation_configs/__init__.py index 1ae91bd1f..e53604526 100644 --- a/dbt/adapters/snowflake/relation_configs/__init__.py +++ b/dbt/adapters/snowflake/relation_configs/__init__.py @@ -1,7 +1,15 @@ -from dbt.adapters.snowflake.relation_configs.dynamic_table import ( # noqa: F401 +from dbt.adapters.snowflake.relation_configs.dynamic_table import ( SnowflakeDynamicTableConfig, + SnowflakeDynamicTableConfigChangeset, + SnowflakeDynamicTableWarehouseConfigChange, ) -from dbt.adapters.snowflake.relation_configs.target_lag import ( # noqa: F401 +from dbt.adapters.snowflake.relation_configs.policies import ( + SnowflakeIncludePolicy, + SnowflakeQuotePolicy, + SnowflakeRelationType, +) +from dbt.adapters.snowflake.relation_configs.target_lag import ( SnowflakeDynamicTableTargetLagConfig, + SnowflakeDynamicTableTargetLagConfigChange, SnowflakeDynamicTableTargetLagPeriod, ) diff --git a/dbt/adapters/snowflake/relation_configs/base.py b/dbt/adapters/snowflake/relation_configs/base.py new file mode 100644 index 000000000..d7f9f121b --- /dev/null +++ b/dbt/adapters/snowflake/relation_configs/base.py @@ -0,0 +1,70 @@ +from dataclasses import dataclass +from typing import Any, Dict, Optional + +import agate +from dbt.adapters.base.relation import Policy +from dbt.adapters.relation_configs import ( + RelationConfigBase, + RelationResults, +) +from dbt.contracts.graph.nodes import ModelNode +from dbt.contracts.relation import ComponentName + +from dbt.adapters.snowflake.relation_configs.policies import ( + SnowflakeIncludePolicy, + SnowflakeQuotePolicy, +) + + +@dataclass(frozen=True, eq=True, unsafe_hash=True) +class SnowflakeRelationConfigBase(RelationConfigBase): + """ + This base class implements a few boilerplate methods and provides some light structure for Snowflake relations. + """ + + @classmethod + def include_policy(cls) -> Policy: + return SnowflakeIncludePolicy() + + @classmethod + def quote_policy(cls) -> Policy: + return SnowflakeQuotePolicy() + + @classmethod + def from_model_node(cls, model_node: ModelNode): + relation_config = cls.parse_model_node(model_node) + relation = cls.from_dict(relation_config) + return relation + + @classmethod + def parse_model_node(cls, model_node: ModelNode) -> Dict[str, Any]: + raise NotImplementedError( + "`parse_model_node()` needs to be implemented on this RelationConfigBase instance" + ) + + @classmethod + def from_relation_results(cls, relation_results: RelationResults): + relation_config = cls.parse_relation_results(relation_results) + relation = cls.from_dict(relation_config) + return relation + + @classmethod + def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]: + raise NotImplementedError( + "`parse_relation_results()` needs to be implemented on this RelationConfigBase instance" + ) + + @classmethod + def _render_part(cls, component: ComponentName, value: Optional[str]) -> Optional[str]: + if cls.include_policy().get_part(component) and value: + if cls.quote_policy().get_part(component): + return f'"{value}"' + return value.lower() + return None + + @classmethod + def _get_first_row(cls, results: agate.Table) -> agate.Row: + try: + return results.rows[0] + except IndexError: + return agate.Row(values=set()) diff --git a/dbt/adapters/snowflake/relation_configs/dynamic_table.py b/dbt/adapters/snowflake/relation_configs/dynamic_table.py index f0a395dbb..2b385155f 100644 --- a/dbt/adapters/snowflake/relation_configs/dynamic_table.py +++ b/dbt/adapters/snowflake/relation_configs/dynamic_table.py @@ -1,26 +1,124 @@ from dataclasses import dataclass +from typing import Optional -from dbt.adapters.relation_configs import RelationConfigBase +import agate +from dbt.adapters.relation_configs import RelationConfigChange, RelationResults +from dbt.contracts.graph.nodes import ModelNode +from dbt.contracts.relation import ComponentName -from dbt.adapters.snowflake.relation_configs.target_lag import SnowflakeDynamicTableTargetLagConfig +from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase +from dbt.adapters.snowflake.relation_configs.target_lag import ( + SnowflakeDynamicTableTargetLagConfig, + SnowflakeDynamicTableTargetLagConfigChange, +) @dataclass(frozen=True, eq=True, unsafe_hash=True) -class SnowflakeDynamicTableConfig(RelationConfigBase): +class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase): """ This config follow the specs found here: - TODO: add URL once it's GA + https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table The following parameters are configurable by dbt: - name: name of the dynamic table - query: the query behind the table - - lag: the maximum amount of time that the dynamic table’s content should lag behind updates to the base tables + - target_lag: the maximum amount of time that the dynamic table’s content should lag behind updates to the base tables - warehouse: the name of the warehouse that provides the compute resources for refreshing the dynamic table There are currently no non-configurable parameters. """ name: str + schema_name: str + database_name: str query: str target_lag: SnowflakeDynamicTableTargetLagConfig warehouse: str + + @classmethod + def from_dict(cls, config_dict) -> "SnowflakeDynamicTableConfig": + kwargs_dict = { + "name": cls._render_part(ComponentName.Identifier, config_dict.get("name")), + "schema_name": cls._render_part(ComponentName.Schema, config_dict.get("schema_name")), + "database_name": cls._render_part( + ComponentName.Database, config_dict.get("database_name") + ), + "query": config_dict.get("query"), + "warehouse": config_dict.get("warehouse"), + } + + if target_lag := config_dict.get("target_lag"): + kwargs_dict.update( + {"target_lag": SnowflakeDynamicTableTargetLagConfig.from_dict(target_lag)} + ) + + dynamic_table: "SnowflakeDynamicTableConfig" = super().from_dict(kwargs_dict) # type: ignore + return dynamic_table + + @classmethod + def parse_model_node(cls, model_node: ModelNode) -> dict: + config_dict = { + "name": model_node.identifier, + "schema_name": model_node.schema, + "database_name": model_node.database, + "query": model_node.compiled_code, + "warehouse": model_node.config.extra.get("snowflake_warehouse"), + } + + if model_node.config.extra.get("target_lag"): + config_dict.update( + {"target_lag": SnowflakeDynamicTableTargetLagConfig.parse_model_node(model_node)} + ) + + return config_dict + + @classmethod + def parse_relation_results(cls, relation_results: RelationResults) -> dict: + dynamic_table: agate.Row = relation_results["dynamic_table"].rows[0] + + config_dict = { + "name": dynamic_table.get("name"), + "schema_name": dynamic_table.get("schema_name"), + "database_name": dynamic_table.get("database_name"), + "query": dynamic_table.get("text"), + "warehouse": dynamic_table.get("warehouse"), + } + + if dynamic_table.get("target_lag"): + config_dict.update( + { + "target_lag": SnowflakeDynamicTableTargetLagConfig.parse_relation_results( + dynamic_table + ) + } + ) + + return config_dict + + +@dataclass(frozen=True, eq=True, unsafe_hash=True) +class SnowflakeDynamicTableWarehouseConfigChange(RelationConfigChange): + context: Optional[str] = None + + @property + def requires_full_refresh(self) -> bool: + return False + + +@dataclass +class SnowflakeDynamicTableConfigChangeset: + target_lag: Optional[SnowflakeDynamicTableTargetLagConfigChange] = None + warehouse: Optional[SnowflakeDynamicTableWarehouseConfigChange] = None + + @property + def requires_full_refresh(self) -> bool: + return any( + [ + self.target_lag.requires_full_refresh if self.target_lag else False, + self.warehouse.requires_full_refresh if self.warehouse else False, + ] + ) + + @property + def has_changes(self) -> bool: + return any([self.target_lag, self.warehouse]) diff --git a/dbt/adapters/snowflake/relation_configs/policies.py b/dbt/adapters/snowflake/relation_configs/policies.py new file mode 100644 index 000000000..31f8e0bc8 --- /dev/null +++ b/dbt/adapters/snowflake/relation_configs/policies.py @@ -0,0 +1,25 @@ +from dataclasses import dataclass + +from dbt.adapters.base.relation import Policy +from dbt.dataclass_schema import StrEnum + + +class SnowflakeRelationType(StrEnum): + Table = "table" + View = "view" + CTE = "cte" + External = "external" + DynamicTable = "dynamic_table" + + +class SnowflakeIncludePolicy(Policy): + database: bool = True + schema: bool = True + identifier: bool = True + + +@dataclass +class SnowflakeQuotePolicy(Policy): + database: bool = False + schema: bool = False + identifier: bool = False diff --git a/dbt/adapters/snowflake/relation_configs/target_lag.py b/dbt/adapters/snowflake/relation_configs/target_lag.py index e2a7b429d..9744e3b00 100644 --- a/dbt/adapters/snowflake/relation_configs/target_lag.py +++ b/dbt/adapters/snowflake/relation_configs/target_lag.py @@ -1,21 +1,30 @@ from dataclasses import dataclass +from typing import Any, Dict, Optional, Union -from dbt.adapters.relation_configs import RelationConfigBase +import agate +from dbt.adapters.relation_configs import RelationConfigChange +from dbt.contracts.graph.nodes import ModelNode from dbt.dataclass_schema import StrEnum +from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase + class SnowflakeDynamicTableTargetLagPeriod(StrEnum): seconds = "seconds" minutes = "minutes" hours = "hours" days = "days" + second = "second" + minute = "minute" + hour = "hour" + day = "day" @dataclass(frozen=True, eq=True, unsafe_hash=True) -class SnowflakeDynamicTableTargetLagConfig(RelationConfigBase): +class SnowflakeDynamicTableTargetLagConfig(SnowflakeRelationConfigBase): """ This config follow the specs found here: - TODO: add URL once it's GA + https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table The following parameters are configurable by dbt: - duration: the numeric part of the lag @@ -26,3 +35,77 @@ class SnowflakeDynamicTableTargetLagConfig(RelationConfigBase): duration: int period: SnowflakeDynamicTableTargetLagPeriod + + def __str__(self) -> str: + return f"{self.duration} {self.period}" + + @classmethod + def from_dict(cls, config_dict) -> "SnowflakeDynamicTableTargetLagConfig": + kwargs_dict: Dict[str, Union[int, SnowflakeDynamicTableTargetLagPeriod]] = {} + + if duration := config_dict.get("duration"): + kwargs_dict.update({"duration": int(duration)}) + + if period := config_dict.get("period"): + kwargs_dict.update({"period": SnowflakeDynamicTableTargetLagPeriod(period)}) + + target_lag: "SnowflakeDynamicTableTargetLagConfig" = super().from_dict(kwargs_dict) # type: ignore + return target_lag + + @classmethod + def parse_model_node(cls, model_node: ModelNode) -> Dict[str, Any]: + """ + Translate ModelNode objects from the user-provided config into a standard dictionary. + + Args: + model_node: the description of the target lag from the user in this format: + + { + "target_lag": "int any("seconds", "minutes", "hours", "days")" + } + + Returns: a standard dictionary describing this `SnowflakeDynamicTableTargetLagConfig` instance + """ + target_lag: str = model_node.config.extra["target_lag"] + return cls._parse_target_lag_string(target_lag) + + @classmethod + def parse_relation_results(cls, relation_results_entry: agate.Row) -> Dict[str, Any]: + """ + Translate agate objects from the database into a standard dictionary. + + Args: + relation_results_entry: the description of the target lag from the database in this format: + + agate.Row({ + "target_lag": "int any("seconds", "minutes", "hours", "days")" + }) + + Returns: a standard dictionary describing this `SnowflakeDynamicTableTargetLagConfig` instance + """ + target_lag: str = relation_results_entry["target_lag"] + return cls._parse_target_lag_string(target_lag) + + @staticmethod + def _parse_target_lag_string(target_lag: str) -> Dict[str, Union[Optional[Union[int, str]]]]: + try: + # Snowflake supports strings like `1 \n minutes` despite the docs not suggesting that + duration_str, *_, period = target_lag.split(" ") + duration = int(duration_str) + except (AttributeError, IndexError): + duration, period = None, None + + config_dict = { + "duration": duration, + "period": period, + } + return config_dict + + +@dataclass(frozen=True, eq=True, unsafe_hash=True) +class SnowflakeDynamicTableTargetLagConfigChange(RelationConfigChange): + context: Optional[SnowflakeDynamicTableTargetLagConfig] = None + + @property + def requires_full_refresh(self) -> bool: + return False diff --git a/dbt/include/snowflake/macros/materializations/dynamic_table/ddl.sql b/dbt/include/snowflake/macros/materializations/dynamic_table/ddl.sql index 5ca1d6ada..15553757b 100644 --- a/dbt/include/snowflake/macros/materializations/dynamic_table/ddl.sql +++ b/dbt/include/snowflake/macros/materializations/dynamic_table/ddl.sql @@ -1,13 +1,29 @@ {% macro snowflake__get_alter_dynamic_table_as_sql( - relation, + target_relation, configuration_changes, sql, existing_relation, backup_relation, intermediate_relation ) -%} - {{- log('Applying ALTER to: ' ~ relation) -}} - {{- snowflake__get_replace_dynamic_table_as_sql(relation, sql, existing_relation, backup_relation, intermediate_relation) -}} + {{- log('Applying ALTER to: ' ~ target_relation) -}} + + {% if configuration_changes.requires_full_refresh %} + {{- snowflake__get_replace_dynamic_table_as_sql(target_relation, sql, existing_relation, backup_relation, intermediate_relation) -}} + + {% else %} + + {%- set target_lag = configuration_changes.target_lag -%} + {%- if target_lag -%}{{- log('Applying UPDATE TARGET_LAG to: ' ~ existing_relation) -}}{%- endif -%} + {%- set warehouse = configuration_changes.warehouse -%} + {%- if warehouse -%}{{- log('Applying UPDATE WAREHOUSE to: ' ~ existing_relation) -}}{%- endif -%} + + alter dynamic table {{ existing_relation }} set + {% if target_lag %}target_lag = '{{ target_lag.context }}'{% endif %} + {% if warehouse %}warehouse = {{ warehouse.context }}{% endif %} + + {%- endif -%} + {%- endmacro %} @@ -15,8 +31,8 @@ {{- log('Applying CREATE to: ' ~ relation) -}} create or replace dynamic table {{ relation }} - lag = '{{ config.get("target_lag") }}' - warehouse = {{ config.get("warehouse") }} + target_lag = '{{ config.get("target_lag") }}' + warehouse = {{ config.get("snowflake_warehouse") }} as ( {{ sql }} ) @@ -26,10 +42,31 @@ {%- endmacro %} -{% macro snowflake__get_replace_dynamic_table_as_sql(relation, sql, existing_relation, backup_relation, intermediate_relation) -%} - {{- log('Applying REPLACE to: ' ~ relation) -}} +{% macro snowflake__describe_dynamic_table(relation) %} + {%- set _dynamic_table_sql -%} + show dynamic tables + like '{{ relation.identifier }}' + in schema {{ relation.database }}.{{ relation.schema }} + ; + select + "name", + "schema_name", + "database_name", + "text", + "target_lag", + "warehouse" + from table(result_scan(last_query_id())) + {%- endset %} + {% set _dynamic_table = run_query(_dynamic_table_sql) %} + + {% do return({'dynamic_table': _dynamic_table}) %} +{% endmacro %} + + +{% macro snowflake__get_replace_dynamic_table_as_sql(target_relation, sql, existing_relation, backup_relation, intermediate_relation) -%} + {{- log('Applying REPLACE to: ' ~ target_relation) -}} {{ snowflake__get_drop_dynamic_table_sql(existing_relation) }}; - {{ snowflake__get_create_dynamic_table_as_sql(relation, sql) }} + {{ snowflake__get_create_dynamic_table_as_sql(target_relation, sql) }} {%- endmacro %} @@ -40,9 +77,10 @@ {%- endmacro %} -{% macro snowflake__get_dynamic_table_configuration_changes(relation, new_config) -%} - {{- log('Determining configuration changes on: ' ~ relation) -}} - {%- do return(None) -%} +{% macro snowflake__get_dynamic_table_configuration_changes(existing_relation, new_config) -%} + {% set _existing_dynamic_table = snowflake__describe_dynamic_table(existing_relation) %} + {% set _configuration_changes = existing_relation.dynamic_table_config_changeset(_existing_dynamic_table, new_config) %} + {% do return(_configuration_changes) %} {%- endmacro %} diff --git a/dbt/include/snowflake/macros/materializations/dynamic_table/materialization.sql b/dbt/include/snowflake/macros/materializations/dynamic_table/materialization.sql index 712f6c987..dc1ed8599 100644 --- a/dbt/include/snowflake/macros/materializations/dynamic_table/materialization.sql +++ b/dbt/include/snowflake/macros/materializations/dynamic_table/materialization.sql @@ -70,7 +70,8 @@ {% set configuration_changes = snowflake__get_dynamic_table_configuration_changes(existing_relation, config) %} {% if configuration_changes is none %} - {% set build_sql = snowflake__refresh_dynamic_table(target_relation) %} + {% set build_sql = '' %} + {{ exceptions.warn("No configuration changes were identified on: `" ~ target_relation ~ "`. Continuing.") }} {% elif on_configuration_change == 'apply' %} {% set build_sql = snowflake__get_alter_dynamic_table_as_sql(target_relation, configuration_changes, sql, existing_relation, backup_relation, intermediate_relation) %} diff --git a/setup.py b/setup.py index c923b651a..c4b97b353 100644 --- a/setup.py +++ b/setup.py @@ -69,6 +69,8 @@ def _get_dbt_core_version(): install_requires=[ "dbt-core~={}".format(dbt_core_version), "snowflake-connector-python[secure-local-storage]~=3.0", + # installed via dbt-core but referenced directly; don't pin to avoid version conflicts with dbt-core + "agate", ], zip_safe=False, classifiers=[ diff --git a/tests/functional/adapter/dynamic_table_tests/files.py b/tests/functional/adapter/dynamic_table_tests/files.py index ec0b76f15..6b449d476 100644 --- a/tests/functional/adapter/dynamic_table_tests/files.py +++ b/tests/functional/adapter/dynamic_table_tests/files.py @@ -25,8 +25,8 @@ MY_DYNAMIC_TABLE = """ {{ config( materialized='dynamic_table', - warehouse='DBT_TESTING', - target_lag='60 seconds', + snowflake_warehouse='DBT_TESTING', + target_lag='120 seconds', ) }} select * from {{ ref('my_seed') }} """ diff --git a/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_changes.py b/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_changes.py index 5f51afcc6..5d206dc9c 100644 --- a/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_changes.py +++ b/tests/functional/adapter/dynamic_table_tests/test_dynamic_tables_changes.py @@ -25,27 +25,29 @@ class SnowflakeDynamicTableChanges: @staticmethod - def check_start_state(project, dynamic_table): + def check_start_state(adapter, dynamic_table): """ This needs to be done manually for now until we fix the test suite's runner. The test suite's runner cannot run queries with multiple statements. Snowflake's metadata is all behind `show` and `describe` calls that require a second call to fetch the results; hence, the results cannot be fetched. """ - assert query_target_lag(project, dynamic_table) is None == "60 seconds" - assert query_warehouse(project, dynamic_table) is None == "DBT_TESTING" + assert query_target_lag(adapter, dynamic_table) is None == "120 seconds" + assert query_warehouse(adapter, dynamic_table) is None == "DBT_TESTING" @staticmethod def change_config_via_alter(project, dynamic_table): initial_model = get_model_file(project, dynamic_table) - new_model = initial_model.replace("target_lag='60 seconds'", "target_lag='5 minutes'") + new_model = initial_model.replace( + "target_lag='120 seconds'", "target_lag='5 minutes'" + ) set_model_file(project, dynamic_table, new_model) @staticmethod - def check_state_alter_change_is_applied(project, dynamic_table): + def check_state_alter_change_is_applied(adapter, dynamic_table): # see above - assert query_target_lag(project, dynamic_table) == "5 minutes" - assert query_warehouse(project, dynamic_table) == "DBT_TESTING" + assert query_target_lag(adapter, dynamic_table) == "5 minutes" + assert query_warehouse(adapter, dynamic_table) == "DBT_TESTING" @staticmethod def change_config_via_replace(project, dynamic_table): @@ -111,37 +113,32 @@ class TestSnowflakeDynamicTableChangesApply(SnowflakeDynamicTableChanges): def project_config_update(self): return {"models": {"on_configuration_change": OnConfigurationChangeOption.Apply.value}} - @pytest.mark.skip( - "all changes are currently resulting in a full refresh, regardless of on_configuration_change" - ) - def test_change_is_applied_via_alter(self, project, my_dynamic_table): + def test_change_is_applied_via_alter(self, project, adapter, my_dynamic_table): """ See above about the two commented assertions. In the meantime, these have been validated manually. """ - # self.check_start_state(project, my_dynamic_table) + # self.check_start_state(adapter, my_dynamic_table) self.change_config_via_alter(project, my_dynamic_table) _, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name]) - # self.check_state_alter_change_is_applied(project, my_dynamic_table) + # self.check_state_alter_change_is_applied(adapter, my_dynamic_table) - assert_message_in_logs(f"Applying ALTER to: {str(my_dynamic_table).upper()}", logs) - assert_message_in_logs( - f"Applying REPLACE to: {str(my_dynamic_table).upper()}", logs, False - ) + assert_message_in_logs(f"Applying ALTER to: {my_dynamic_table}", logs) + assert_message_in_logs(f"Applying REPLACE to: {my_dynamic_table}", logs, False) @pytest.mark.skip( "dbt-snowflake does not currently monitor any changes the trigger a full refresh" ) - def test_change_is_applied_via_replace(self, project, my_dynamic_table): - self.check_start_state(project, my_dynamic_table) + def test_change_is_applied_via_replace(self, project, adapter, my_dynamic_table): + # self.check_start_state(adapter, my_dynamic_table) self.change_config_via_alter(project, my_dynamic_table) self.change_config_via_replace(project, my_dynamic_table) _, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name]) - self.check_state_alter_change_is_applied(project, my_dynamic_table) - self.check_state_replace_change_is_applied(project, my_dynamic_table) + # self.check_state_alter_change_is_applied(adapter, my_dynamic_table) + # self.check_state_replace_change_is_applied(adapter, my_dynamic_table) assert_message_in_logs(f"Applying REPLACE to: {my_dynamic_table}", logs) @@ -151,19 +148,16 @@ class TestSnowflakeDynamicTableChangesContinue(SnowflakeDynamicTableChanges): def project_config_update(self): return {"models": {"on_configuration_change": OnConfigurationChangeOption.Continue.value}} - @pytest.mark.skip( - "all changes are currently resulting in a full refresh, regardless of on_configuration_change" - ) - def test_change_is_not_applied_via_alter(self, project, my_dynamic_table): + def test_change_is_not_applied_via_alter(self, project, adapter, my_dynamic_table): """ See above about the two commented assertions. In the meantime, these have been validated manually. """ - # self.check_start_state(project, my_dynamic_table) + # self.check_start_state(adapter, my_dynamic_table) self.change_config_via_alter(project, my_dynamic_table) _, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name]) - # self.check_start_state(project, my_dynamic_table) + # self.check_start_state(adapter, my_dynamic_table) assert_message_in_logs( f"Configuration changes were identified and `on_configuration_change` was set" @@ -173,17 +167,14 @@ def test_change_is_not_applied_via_alter(self, project, my_dynamic_table): assert_message_in_logs(f"Applying ALTER to: {my_dynamic_table}", logs, False) assert_message_in_logs(f"Applying REPLACE to: {my_dynamic_table}", logs, False) - @pytest.mark.skip( - "dbt-snowflake does not currently monitor any changes the trigger a full refresh" - ) - def test_change_is_not_applied_via_replace(self, project, my_dynamic_table): - self.check_start_state(project, my_dynamic_table) + def test_change_is_not_applied_via_replace(self, project, adapter, my_dynamic_table): + # self.check_start_state(adapter, my_dynamic_table) self.change_config_via_alter(project, my_dynamic_table) self.change_config_via_replace(project, my_dynamic_table) _, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name]) - self.check_start_state(project, my_dynamic_table) + # self.check_start_state(adapter, my_dynamic_table) assert_message_in_logs( f"Configuration changes were identified and `on_configuration_change` was set" @@ -199,21 +190,18 @@ class TestSnowflakeDynamicTableChangesFailMixin(SnowflakeDynamicTableChanges): def project_config_update(self): return {"models": {"on_configuration_change": OnConfigurationChangeOption.Fail.value}} - @pytest.mark.skip( - "all changes are currently resulting in a full refresh, regardless of on_configuration_change" - ) - def test_change_is_not_applied_via_alter(self, project, my_dynamic_table): + def test_change_is_not_applied_via_alter(self, project, adapter, my_dynamic_table): """ See above about the two commented assertions. In the meantime, these have been validated manually. """ - # self.check_start_state(project, my_dynamic_table) + # self.check_start_state(adapter, my_dynamic_table) self.change_config_via_alter(project, my_dynamic_table) _, logs = run_dbt_and_capture( ["--debug", "run", "--models", my_dynamic_table.name], expect_pass=False ) - # self.check_start_state(project, my_dynamic_table) + # self.check_start_state(adapter, my_dynamic_table) assert_message_in_logs( f"Configuration changes were identified and `on_configuration_change` was set" @@ -223,11 +211,8 @@ def test_change_is_not_applied_via_alter(self, project, my_dynamic_table): assert_message_in_logs(f"Applying ALTER to: {my_dynamic_table}", logs, False) assert_message_in_logs(f"Applying REPLACE to: {my_dynamic_table}", logs, False) - @pytest.mark.skip( - "dbt-snowflake does not currently monitor any changes the trigger a full refresh" - ) - def test_change_is_not_applied_via_replace(self, project, my_dynamic_table): - self.check_start_state(project, my_dynamic_table) + def test_change_is_not_applied_via_replace(self, project, adapter, my_dynamic_table): + # self.check_start_state(adapter, my_dynamic_table) self.change_config_via_alter(project, my_dynamic_table) self.change_config_via_replace(project, my_dynamic_table) @@ -235,7 +220,7 @@ def test_change_is_not_applied_via_replace(self, project, my_dynamic_table): ["--debug", "run", "--models", my_dynamic_table.name], expect_pass=False ) - self.check_start_state(project, my_dynamic_table) + # self.check_start_state(adapter, my_dynamic_table) assert_message_in_logs( f"Configuration changes were identified and `on_configuration_change` was set" diff --git a/tests/functional/adapter/dynamic_table_tests/utils.py b/tests/functional/adapter/dynamic_table_tests/utils.py index d0a5d0131..1f145ec04 100644 --- a/tests/functional/adapter/dynamic_table_tests/utils.py +++ b/tests/functional/adapter/dynamic_table_tests/utils.py @@ -1,5 +1,9 @@ from typing import Optional +import agate +from dbt.adapters.base import BaseAdapter +from dbt.tests.util import get_connection + from dbt.adapters.snowflake.relation import SnowflakeRelation @@ -26,25 +30,20 @@ def query_relation_type(project, relation: SnowflakeRelation) -> Optional[str]: return results[0].lower() -def query_target_lag(project, dynamic_table: SnowflakeRelation) -> Optional[str]: - sql = f""" - show dynamic tables - like '{ dynamic_table.identifier }' - in schema { dynamic_table.schema } - ; - select "target_lag" - from table(result_scan(last_query_id())) - """ - return project.run_sql(sql, fetch="one") +def query_target_lag(adapter, dynamic_table: SnowflakeRelation) -> Optional[str]: + config = describe_dynamic_table(adapter, dynamic_table) + return config.get("target_lag") -def query_warehouse(project, dynamic_table: SnowflakeRelation) -> Optional[str]: - sql = f""" - show dynamic tables - like '{ dynamic_table.identifier }' - in schema { dynamic_table.schema } - ; - select "warehouse" - from table(result_scan(last_query_id())) - """ - return project.run_sql(sql, fetch="one") +def query_warehouse(adapter, dynamic_table: SnowflakeRelation) -> Optional[str]: + config = describe_dynamic_table(adapter, dynamic_table) + return config.get("warehouse") + + +def describe_dynamic_table(adapter: BaseAdapter, dynamic_table: SnowflakeRelation) -> agate.Row: + with get_connection(adapter): + macro_results = adapter.execute_macro( + "snowflake__describe_dynamic_table", kwargs={"relation": dynamic_table} + ) + config = macro_results["dynamic_table"] + return config.rows[0]