Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 1.6.latest] ADAP-728: Fix "On Configuration Change" config #713

Merged
merged 1 commit into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230726-164729.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fix `on_configuration_change` setting to properly recognize `continue` and `fail`
time: 2023-07-26T16:47:29.471437-04:00
custom:
Author: mikealfare
Issue: "708"
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ ignore =
E741,
E501,
exclude = test
per-file-ignores =
*/__init__.py: F401
60 changes: 43 additions & 17 deletions dbt/adapters/snowflake/relation.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
from dataclasses import dataclass, field
from typing import Optional
from typing import Optional, Type

from dbt.adapters.base.relation import BaseRelation, Policy
from dbt.dataclass_schema import StrEnum
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.relation_configs import RelationConfigChangeAction, RelationResults
from dbt.context.providers import RuntimeConfigObject
from dbt.utils import classproperty


class SnowflakeRelationType(StrEnum):
Table = "table"
View = "view"
CTE = "cte"
External = "external"
DynamicTable = "dynamic_table"


@dataclass
class SnowflakeQuotePolicy(Policy):
database: bool = False
schema: bool = False
identifier: bool = False
from dbt.adapters.snowflake.relation_configs import (
SnowflakeDynamicTableConfig,
SnowflakeDynamicTableConfigChangeset,
SnowflakeDynamicTableTargetLagConfigChange,
SnowflakeDynamicTableWarehouseConfigChange,
SnowflakeQuotePolicy,
SnowflakeRelationType,
)


@dataclass(frozen=True, eq=False, repr=False)
Expand All @@ -33,3 +28,34 @@ def is_dynamic_table(self) -> bool:
@classproperty
def DynamicTable(cls) -> str:
return str(SnowflakeRelationType.DynamicTable)

@classproperty
def get_relation_type(cls) -> Type[SnowflakeRelationType]:
return SnowflakeRelationType

@classmethod
def dynamic_table_config_changeset(
cls, relation_results: RelationResults, runtime_config: RuntimeConfigObject
) -> Optional[SnowflakeDynamicTableConfigChangeset]:
existing_dynamic_table = SnowflakeDynamicTableConfig.from_relation_results(
relation_results
)
new_dynamic_table = SnowflakeDynamicTableConfig.from_model_node(runtime_config.model)

config_change_collection = SnowflakeDynamicTableConfigChangeset()

if new_dynamic_table.target_lag != existing_dynamic_table.target_lag:
config_change_collection.target_lag = SnowflakeDynamicTableTargetLagConfigChange(
action=RelationConfigChangeAction.alter,
context=new_dynamic_table.target_lag,
)

if new_dynamic_table.warehouse != existing_dynamic_table.warehouse:
config_change_collection.warehouse = SnowflakeDynamicTableWarehouseConfigChange(
action=RelationConfigChangeAction.alter,
context=new_dynamic_table.warehouse,
)

if config_change_collection.has_changes:
return config_change_collection
return None
12 changes: 10 additions & 2 deletions dbt/adapters/snowflake/relation_configs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
from dbt.adapters.snowflake.relation_configs.dynamic_table import ( # noqa: F401
from dbt.adapters.snowflake.relation_configs.dynamic_table import (
SnowflakeDynamicTableConfig,
SnowflakeDynamicTableConfigChangeset,
SnowflakeDynamicTableWarehouseConfigChange,
)
from dbt.adapters.snowflake.relation_configs.target_lag import ( # noqa: F401
from dbt.adapters.snowflake.relation_configs.policies import (
SnowflakeIncludePolicy,
SnowflakeQuotePolicy,
SnowflakeRelationType,
)
from dbt.adapters.snowflake.relation_configs.target_lag import (
SnowflakeDynamicTableTargetLagConfig,
SnowflakeDynamicTableTargetLagConfigChange,
SnowflakeDynamicTableTargetLagPeriod,
)
70 changes: 70 additions & 0 deletions dbt/adapters/snowflake/relation_configs/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional

import agate
from dbt.adapters.base.relation import Policy
from dbt.adapters.relation_configs import (
RelationConfigBase,
RelationResults,
)
from dbt.contracts.graph.nodes import ModelNode
from dbt.contracts.relation import ComponentName

from dbt.adapters.snowflake.relation_configs.policies import (
SnowflakeIncludePolicy,
SnowflakeQuotePolicy,
)


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeRelationConfigBase(RelationConfigBase):
"""
This base class implements a few boilerplate methods and provides some light structure for Snowflake relations.
"""

@classmethod
def include_policy(cls) -> Policy:
return SnowflakeIncludePolicy()

@classmethod
def quote_policy(cls) -> Policy:
return SnowflakeQuotePolicy()

@classmethod
def from_model_node(cls, model_node: ModelNode):
relation_config = cls.parse_model_node(model_node)
relation = cls.from_dict(relation_config)
return relation

@classmethod
def parse_model_node(cls, model_node: ModelNode) -> Dict[str, Any]:
raise NotImplementedError(
"`parse_model_node()` needs to be implemented on this RelationConfigBase instance"
)

@classmethod
def from_relation_results(cls, relation_results: RelationResults):
relation_config = cls.parse_relation_results(relation_results)
relation = cls.from_dict(relation_config)
return relation

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
raise NotImplementedError(
"`parse_relation_results()` needs to be implemented on this RelationConfigBase instance"
)

@classmethod
def _render_part(cls, component: ComponentName, value: Optional[str]) -> Optional[str]:
if cls.include_policy().get_part(component) and value:
if cls.quote_policy().get_part(component):
return f'"{value}"'
return value.lower()
return None

@classmethod
def _get_first_row(cls, results: agate.Table) -> agate.Row:
try:
return results.rows[0]
except IndexError:
return agate.Row(values=set())
108 changes: 103 additions & 5 deletions dbt/adapters/snowflake/relation_configs/dynamic_table.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,124 @@
from dataclasses import dataclass
from typing import Optional

from dbt.adapters.relation_configs import RelationConfigBase
import agate
from dbt.adapters.relation_configs import RelationConfigChange, RelationResults
from dbt.contracts.graph.nodes import ModelNode
from dbt.contracts.relation import ComponentName

from dbt.adapters.snowflake.relation_configs.target_lag import SnowflakeDynamicTableTargetLagConfig
from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase
from dbt.adapters.snowflake.relation_configs.target_lag import (
SnowflakeDynamicTableTargetLagConfig,
SnowflakeDynamicTableTargetLagConfigChange,
)


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeDynamicTableConfig(RelationConfigBase):
class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
"""
This config follow the specs found here:
TODO: add URL once it's GA
https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table

The following parameters are configurable by dbt:
- name: name of the dynamic table
- query: the query behind the table
- lag: the maximum amount of time that the dynamic table’s content should lag behind updates to the base tables
- target_lag: the maximum amount of time that the dynamic table’s content should lag behind updates to the base tables
- warehouse: the name of the warehouse that provides the compute resources for refreshing the dynamic table

There are currently no non-configurable parameters.
"""

name: str
schema_name: str
database_name: str
query: str
target_lag: SnowflakeDynamicTableTargetLagConfig
warehouse: str

@classmethod
def from_dict(cls, config_dict) -> "SnowflakeDynamicTableConfig":
kwargs_dict = {
"name": cls._render_part(ComponentName.Identifier, config_dict.get("name")),
"schema_name": cls._render_part(ComponentName.Schema, config_dict.get("schema_name")),
"database_name": cls._render_part(
ComponentName.Database, config_dict.get("database_name")
),
"query": config_dict.get("query"),
"warehouse": config_dict.get("warehouse"),
}

if target_lag := config_dict.get("target_lag"):
kwargs_dict.update(
{"target_lag": SnowflakeDynamicTableTargetLagConfig.from_dict(target_lag)}
)

dynamic_table: "SnowflakeDynamicTableConfig" = super().from_dict(kwargs_dict) # type: ignore
return dynamic_table

@classmethod
def parse_model_node(cls, model_node: ModelNode) -> dict:
config_dict = {
"name": model_node.identifier,
"schema_name": model_node.schema,
"database_name": model_node.database,
"query": model_node.compiled_code,
"warehouse": model_node.config.extra.get("snowflake_warehouse"),
}

if model_node.config.extra.get("target_lag"):
config_dict.update(
{"target_lag": SnowflakeDynamicTableTargetLagConfig.parse_model_node(model_node)}
)

return config_dict

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> dict:
dynamic_table: agate.Row = relation_results["dynamic_table"].rows[0]

config_dict = {
"name": dynamic_table.get("name"),
"schema_name": dynamic_table.get("schema_name"),
"database_name": dynamic_table.get("database_name"),
"query": dynamic_table.get("text"),
"warehouse": dynamic_table.get("warehouse"),
}

if dynamic_table.get("target_lag"):
config_dict.update(
{
"target_lag": SnowflakeDynamicTableTargetLagConfig.parse_relation_results(
dynamic_table
)
}
)

return config_dict


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeDynamicTableWarehouseConfigChange(RelationConfigChange):
context: Optional[str] = None

@property
def requires_full_refresh(self) -> bool:
return False


@dataclass
class SnowflakeDynamicTableConfigChangeset:
target_lag: Optional[SnowflakeDynamicTableTargetLagConfigChange] = None
warehouse: Optional[SnowflakeDynamicTableWarehouseConfigChange] = None

@property
def requires_full_refresh(self) -> bool:
return any(
[
self.target_lag.requires_full_refresh if self.target_lag else False,
self.warehouse.requires_full_refresh if self.warehouse else False,
]
)

@property
def has_changes(self) -> bool:
return any([self.target_lag, self.warehouse])
25 changes: 25 additions & 0 deletions dbt/adapters/snowflake/relation_configs/policies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from dataclasses import dataclass

from dbt.adapters.base.relation import Policy
from dbt.dataclass_schema import StrEnum


class SnowflakeRelationType(StrEnum):
Table = "table"
View = "view"
CTE = "cte"
External = "external"
DynamicTable = "dynamic_table"


class SnowflakeIncludePolicy(Policy):
database: bool = True
schema: bool = True
identifier: bool = True


@dataclass
class SnowflakeQuotePolicy(Policy):
database: bool = False
schema: bool = False
identifier: bool = False
Loading