diff --git a/.changes/unreleased/Features-20240715-142132.yaml b/.changes/unreleased/Features-20240715-142132.yaml new file mode 100644 index 000000000..6779e92b1 --- /dev/null +++ b/.changes/unreleased/Features-20240715-142132.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Implement materialized views +time: 2024-07-15T14:21:32.859737-04:00 +custom: + Author: jeremy-thomas-roc + Issue: "870" diff --git a/dbt/adapters/snowflake/relation.py b/dbt/adapters/snowflake/relation.py index ace85695b..bd5046eb4 100644 --- a/dbt/adapters/snowflake/relation.py +++ b/dbt/adapters/snowflake/relation.py @@ -3,11 +3,11 @@ from dbt.adapters.base.relation import BaseRelation from dbt.adapters.contracts.relation import ComponentName, RelationConfig -from dbt.adapters.relation_configs import ( +from dbt.adapters.relation_configs.config_base import ( RelationConfigBase, - RelationConfigChangeAction, RelationResults, ) +from dbt.adapters.relation_configs.config_change import RelationConfigChangeAction from dbt.adapters.utils import classproperty from dbt_common.exceptions import DbtRuntimeError @@ -53,6 +53,10 @@ class SnowflakeRelation(BaseRelation): def is_dynamic_table(self) -> bool: return self.type == SnowflakeRelationType.DynamicTable + @property + def is_materialized_view(self) -> bool: + return self.type == SnowflakeRelationType.MaterializedView + @classproperty def DynamicTable(cls) -> str: return str(SnowflakeRelationType.DynamicTable) diff --git a/dbt/adapters/snowflake/relation_configs/policies.py b/dbt/adapters/snowflake/relation_configs/policies.py index 75195f9a3..940ad42cb 100644 --- a/dbt/adapters/snowflake/relation_configs/policies.py +++ b/dbt/adapters/snowflake/relation_configs/policies.py @@ -1,6 +1,6 @@ from dataclasses import dataclass -from dbt.adapters.base.relation import Policy +from dbt.adapters.contracts.relation import Policy from dbt_common.dataclass_schema import StrEnum @@ -10,6 +10,7 @@ class SnowflakeRelationType(StrEnum): CTE = "cte" External = "external" DynamicTable = "dynamic_table" + MaterializedView = "materialized_view" class SnowflakeIncludePolicy(Policy): diff --git a/dbt/include/snowflake/macros/materializations/materialized_view.sql b/dbt/include/snowflake/macros/materializations/materialized_view.sql new file mode 100644 index 000000000..b3b8ca225 --- /dev/null +++ b/dbt/include/snowflake/macros/materializations/materialized_view.sql @@ -0,0 +1,14 @@ +{% materialization materialized_view, adapter='snowflake' -%} + + {% set original_query_tag = set_query_tag() %} + {% set to_return = snowflake__create_or_replace_materialized_view() %} + + {% set target_relation = this.incorporate(type='materialized_view') %} + + {% do persist_docs(target_relation, model, for_columns=false) %} + + {% do unset_query_tag(original_query_tag) %} + + {% do return(to_return) %} + +{%- endmaterialization %} diff --git a/dbt/include/snowflake/macros/relations/materialized_view/create.sql b/dbt/include/snowflake/macros/relations/materialized_view/create.sql new file mode 100644 index 000000000..f5951461d --- /dev/null +++ b/dbt/include/snowflake/macros/relations/materialized_view/create.sql @@ -0,0 +1,83 @@ +{% macro snowflake__create_materialized_view_as(relation, sql) -%} + {%- set secure = config.get('secure', default=false) -%} + {%- set copy_grants = config.get('copy_grants', default=false) -%} + {%- set sql_header = config.get('sql_header', none) -%} + + {{ sql_header if sql_header is not none }} + create or replace {% if secure -%} + secure + {%- endif %} materialized view {{ relation }} + {% if config.persist_column_docs() -%} + {% set model_columns = model.columns %} + {% set query_columns = get_columns_in_query(sql) %} + {{ get_persist_docs_column_list(model_columns, query_columns) }} + + {%- endif %} + {%- set contract_config = config.get('contract') -%} + {%- if contract_config.enforced -%} + {{ get_assert_columns_equivalent(sql) }} + {%- endif %} + {% if copy_grants -%} copy grants {%- endif %} as ( + {{ sql }} + ); +{% endmacro %} + +{% macro get_create_materialized_view_as_sql(relation, sql) -%} + {{ adapter.dispatch('get_create_materialized_view_as_sql', 'dbt')(relation, sql) }} +{%- endmacro %} + +{% macro snowflake__get_create_materialized_view_as_sql(relation, sql) -%} + {%- set sql_header = config.get('sql_header', none) -%} + + {{ sql_header if sql_header is not none }} + create materialized view {{ relation }} + {% set contract_config = config.get('contract') %} + {% if contract_config.enforced %} + {{ get_assert_columns_equivalent(sql) }} + {%- endif %} + as ( + {{ sql }} + ); +{% endmacro %} + +/* {# +Vendored from dbt-core for the purpose of overwriting small pieces to support dynamics tables. This should +eventually be retired in favor of a standardized approach. Changed line: + +{%- if old_relation is not none and old_relation.is_table -%} -> +{%- if old_relation is not none and not old_relation.is_materialized_view -%} +#} */ + +{% macro snowflake__create_or_replace_materialized_view() %} + {%- set identifier = model['alias'] -%} + + {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} + {%- set exists_as_view = (old_relation is not none and old_relation.is_materialized_view) -%} + + {%- set target_relation = api.Relation.create( + identifier=identifier, schema=schema, database=database, + type='materialized_view') -%} + {% set grant_config = config.get('grants') %} + + {{ run_hooks(pre_hooks) }} + + -- If there's a table with the same name and we weren't told to full refresh, + -- that's an error. If we were told to full refresh, drop it. This behavior differs + -- for Snowflake and BigQuery, so multiple dispatch is used. + {%- if old_relation is not none and not old_relation.is_materialized_view -%} + {{ handle_existing_table(should_full_refresh(), old_relation) }} + {%- endif -%} + + -- build model + {% call statement('main') -%} + {{ get_create_materialized_view_as_sql(target_relation, sql) }} + {%- endcall %} + + {% set should_revoke = should_revoke(exists_as_view, full_refresh_mode=True) %} + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} + + {{ run_hooks(post_hooks) }} + + {{ return({'relations': [target_relation]}) }} + +{% endmacro %} diff --git a/dbt/include/snowflake/macros/relations/materialized_view/drop.sql b/dbt/include/snowflake/macros/relations/materialized_view/drop.sql new file mode 100644 index 000000000..06d3df2c3 --- /dev/null +++ b/dbt/include/snowflake/macros/relations/materialized_view/drop.sql @@ -0,0 +1,3 @@ +{% macro snowflake__get_drop_materialized__view_sql(relation) %} + drop materialized view if exists {{ relation }} +{% endmacro %} diff --git a/dbt/include/snowflake/macros/relations/materialized_view/rename.sql b/dbt/include/snowflake/macros/relations/materialized_view/rename.sql new file mode 100644 index 000000000..f144d6db0 --- /dev/null +++ b/dbt/include/snowflake/macros/relations/materialized_view/rename.sql @@ -0,0 +1,3 @@ +{%- macro snowflake__get_rename_materialized_view_sql(relation, new_name) -%} + alter materialized view {{ relation }} rename to {{ new_name }} +{%- endmacro -%} diff --git a/dbt/include/snowflake/macros/relations/materialized_view/replace.sql b/dbt/include/snowflake/macros/relations/materialized_view/replace.sql new file mode 100644 index 000000000..241813749 --- /dev/null +++ b/dbt/include/snowflake/macros/relations/materialized_view/replace.sql @@ -0,0 +1,3 @@ +{% macro snowflake__get_replace_materialized_view_sql(relation, sql) %} + {{ snowflake__create_materialized_view_as(relation, sql) }} +{% endmacro %} diff --git a/tests/functional/adapter/materialized_view_tests/files.py b/tests/functional/adapter/materialized_view_tests/files.py new file mode 100644 index 000000000..37532b2fb --- /dev/null +++ b/tests/functional/adapter/materialized_view_tests/files.py @@ -0,0 +1,22 @@ +MY_SEED = """ +id,value +1,100 +2,200 +3,300 +""".strip() + + +MY_TABLE = """ +{{ config( + materialized='table', +) }} +select * from {{ ref('my_seed') }} +""" + + +MY_MATERIALIZED_VIEW = """ +{{ config( + materialized='materialized_view', +) }} +select * from {{ ref('my_table') }} +""" diff --git a/tests/functional/adapter/materialized_view_tests/test_materialized_views.py b/tests/functional/adapter/materialized_view_tests/test_materialized_views.py new file mode 100644 index 000000000..f50ac6c09 --- /dev/null +++ b/tests/functional/adapter/materialized_view_tests/test_materialized_views.py @@ -0,0 +1,142 @@ +from typing import Optional, Tuple +import pytest + +from dbt.tests.util import ( + get_model_file, + run_dbt, + set_model_file, +) +from dbt.tests.fixtures.project import TestProjInfo + +from dbt.adapters.snowflake.relation import SnowflakeRelation +from dbt.adapters.snowflake.relation_configs.policies import SnowflakeRelationType +from tests.functional.adapter.materialized_view_tests.files import ( + MY_MATERIALIZED_VIEW, + MY_SEED, + MY_TABLE, +) + + +class TestSnowflakeMaterializedViews: + @staticmethod + def insert_record(project: TestProjInfo, table: SnowflakeRelation, record: Tuple[int, int]): + my_id, value = record + project.run_sql(f"insert into {table} (id, value) values ({my_id}, {value})") + + @staticmethod + def query_relation_type(project: TestProjInfo, relation: SnowflakeRelation) -> Optional[str]: + sql = f""" + select + case + when table_type = 'MATERIALIZED VIEW' then 'materialized_view' + end as relation_type + from information_schema.tables + where table_name like '{relation.identifier.upper()}' + and table_schema like '{relation.schema.upper()}' + and table_catalog like '{relation.database.upper()}' + """ + results = project.run_sql(sql, fetch="one") + if results is None or len(results) == 0: + return None + elif len(results) > 1: + raise ValueError(f"More than one instance of {relation.name} found!") + else: + return results[0].lower() + + @staticmethod + def query_row_count(project: TestProjInfo, relation: SnowflakeRelation) -> int: + sql = f"select count(*) from {relation}" + return project.run_sql(sql, fetch="one")[0] + + @pytest.fixture(scope="class", autouse=True) + def seeds(self): + return {"my_seed.csv": MY_SEED} + + @pytest.fixture(scope="class", autouse=True) + def models(self): + yield { + "my_table.sql": MY_TABLE, + "my_materialized_view.sql": MY_MATERIALIZED_VIEW, + } + + @pytest.fixture(scope="class") + def my_table(self, project: TestProjInfo) -> SnowflakeRelation: + return project.adapter.Relation.create( + identifier="my_table", + schema=project.test_schema, + database=project.database, + type=SnowflakeRelationType.Table, + ) + + @pytest.fixture(scope="class") + def my_seed(self, project: TestProjInfo) -> SnowflakeRelation: + return project.adapter.Relation.create( + identifier="my_seed", + schema=project.test_schema, + database=project.database, + type=SnowflakeRelationType.Table, + ) + + @pytest.fixture(scope="class") + def my_materialized_view(self, project: TestProjInfo) -> SnowflakeRelation: + return project.adapter.Relation.create( + identifier="my_materialized_view", + schema=project.test_schema, + database=project.database, + type=SnowflakeRelationType.MaterializedView, + ) + + @staticmethod + def load_model( + project: TestProjInfo, + current_model: SnowflakeRelation, + new_model: SnowflakeRelation, + ): + model_to_load = get_model_file(project, new_model) + set_model_file(project, current_model, model_to_load) + + @pytest.fixture(scope="function", autouse=True) + def setup( + self, + project: TestProjInfo, + my_materialized_view: SnowflakeRelation, + my_table: SnowflakeRelation, + ): + run_dbt(["seed"]) + run_dbt(["run", "--models", my_materialized_view.identifier, "--full-refresh"]) + + # the tests touch these files, store their contents in memory + my_materialized_view_config = get_model_file(project, my_materialized_view) + my_table_config = get_model_file(project, my_table) + + yield + + # and then reset them after the test runs + set_model_file(project, my_materialized_view, my_materialized_view_config) + set_model_file(project, my_table, my_table_config) + project.run_sql(f"drop schema if exists {project.test_schema} cascade") + + def test_materialized_view_create( + self, project: TestProjInfo, my_materialized_view: SnowflakeRelation + ): + # setup creates it; verify it's there + assert self.query_relation_type(project, my_materialized_view) == "materialized_view" + + def test_materialized_view_update_on_insert( + self, + project: TestProjInfo, + my_table: SnowflakeRelation, + my_materialized_view: SnowflakeRelation, + ): + table_count_start = self.query_row_count(project, my_table) + view_count_start = self.query_row_count(project, my_materialized_view) + + assert table_count_start == view_count_start + + self.insert_record(project, my_table, (4, 400)) + + table_count_end = self.query_row_count(project, my_table) + view_count_end = self.query_row_count(project, my_materialized_view) + + assert table_count_end == view_count_end + assert view_count_start != view_count_end