Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] materialized views #1101

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240715-142132.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Implement materialized views
time: 2024-07-15T14:21:32.859737-04:00
custom:
Author: jeremy-thomas-roc
Issue: "870"
8 changes: 6 additions & 2 deletions dbt/adapters/snowflake/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@

from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.contracts.relation import ComponentName, RelationConfig
from dbt.adapters.relation_configs import (
from dbt.adapters.relation_configs.config_base import (
RelationConfigBase,
RelationConfigChangeAction,
RelationResults,
)
from dbt.adapters.relation_configs.config_change import RelationConfigChangeAction
from dbt.adapters.utils import classproperty
from dbt_common.exceptions import DbtRuntimeError

Expand Down Expand Up @@ -53,6 +53,10 @@ class SnowflakeRelation(BaseRelation):
def is_dynamic_table(self) -> bool:
return self.type == SnowflakeRelationType.DynamicTable

@property
def is_materialized_view(self) -> bool:
return self.type == SnowflakeRelationType.MaterializedView

@classproperty
def DynamicTable(cls) -> str:
return str(SnowflakeRelationType.DynamicTable)
Expand Down
3 changes: 2 additions & 1 deletion dbt/adapters/snowflake/relation_configs/policies.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import dataclass

from dbt.adapters.base.relation import Policy
from dbt.adapters.contracts.relation import Policy
from dbt_common.dataclass_schema import StrEnum


Expand All @@ -10,6 +10,7 @@ class SnowflakeRelationType(StrEnum):
CTE = "cte"
External = "external"
DynamicTable = "dynamic_table"
MaterializedView = "materialized_view"


class SnowflakeIncludePolicy(Policy):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{% materialization materialized_view, adapter='snowflake' -%}

{% set original_query_tag = set_query_tag() %}
{% set to_return = snowflake__create_or_replace_materialized_view() %}

{% set target_relation = this.incorporate(type='materialized_view') %}

{% do persist_docs(target_relation, model, for_columns=false) %}

{% do unset_query_tag(original_query_tag) %}

{% do return(to_return) %}

{%- endmaterialization %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
{% macro snowflake__create_materialized_view_as(relation, sql) -%}
{%- set secure = config.get('secure', default=false) -%}
{%- set copy_grants = config.get('copy_grants', default=false) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{{ sql_header if sql_header is not none }}
create or replace {% if secure -%}
secure
{%- endif %} materialized view {{ relation }}
{% if config.persist_column_docs() -%}
{% set model_columns = model.columns %}
{% set query_columns = get_columns_in_query(sql) %}
{{ get_persist_docs_column_list(model_columns, query_columns) }}

{%- endif %}
{%- set contract_config = config.get('contract') -%}
{%- if contract_config.enforced -%}
{{ get_assert_columns_equivalent(sql) }}
{%- endif %}
{% if copy_grants -%} copy grants {%- endif %} as (
{{ sql }}
);
{% endmacro %}

{% macro get_create_materialized_view_as_sql(relation, sql) -%}
{{ adapter.dispatch('get_create_materialized_view_as_sql', 'dbt')(relation, sql) }}
{%- endmacro %}

{% macro snowflake__get_create_materialized_view_as_sql(relation, sql) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{{ sql_header if sql_header is not none }}
create materialized view {{ relation }}
{% set contract_config = config.get('contract') %}
{% if contract_config.enforced %}
{{ get_assert_columns_equivalent(sql) }}
{%- endif %}
as (
{{ sql }}
);
{% endmacro %}

/* {#
Vendored from dbt-core for the purpose of overwriting small pieces to support dynamics tables. This should
eventually be retired in favor of a standardized approach. Changed line:

{%- if old_relation is not none and old_relation.is_table -%} ->
{%- if old_relation is not none and not old_relation.is_materialized_view -%}
#} */

{% macro snowflake__create_or_replace_materialized_view() %}
{%- set identifier = model['alias'] -%}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set exists_as_view = (old_relation is not none and old_relation.is_materialized_view) -%}

{%- set target_relation = api.Relation.create(
identifier=identifier, schema=schema, database=database,
type='materialized_view') -%}
{% set grant_config = config.get('grants') %}

{{ run_hooks(pre_hooks) }}

-- If there's a table with the same name and we weren't told to full refresh,
-- that's an error. If we were told to full refresh, drop it. This behavior differs
-- for Snowflake and BigQuery, so multiple dispatch is used.
{%- if old_relation is not none and not old_relation.is_materialized_view -%}
{{ handle_existing_table(should_full_refresh(), old_relation) }}
{%- endif -%}

-- build model
{% call statement('main') -%}
{{ get_create_materialized_view_as_sql(target_relation, sql) }}
{%- endcall %}

{% set should_revoke = should_revoke(exists_as_view, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{{ run_hooks(post_hooks) }}

{{ return({'relations': [target_relation]}) }}

{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{% macro snowflake__get_drop_materialized__view_sql(relation) %}
drop materialized view if exists {{ relation }}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{%- macro snowflake__get_rename_materialized_view_sql(relation, new_name) -%}
alter materialized view {{ relation }} rename to {{ new_name }}
{%- endmacro -%}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{% macro snowflake__get_replace_materialized_view_sql(relation, sql) %}
{{ snowflake__create_materialized_view_as(relation, sql) }}
{% endmacro %}
22 changes: 22 additions & 0 deletions tests/functional/adapter/materialized_view_tests/files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
MY_SEED = """
id,value
1,100
2,200
3,300
""".strip()


MY_TABLE = """
{{ config(
materialized='table',
) }}
select * from {{ ref('my_seed') }}
"""


MY_MATERIALIZED_VIEW = """
{{ config(
materialized='materialized_view',
) }}
select * from {{ ref('my_table') }}
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
from typing import Optional, Tuple
import pytest

from dbt.tests.util import (
get_model_file,
run_dbt,
set_model_file,
)
from dbt.tests.fixtures.project import TestProjInfo

from dbt.adapters.snowflake.relation import SnowflakeRelation
from dbt.adapters.snowflake.relation_configs.policies import SnowflakeRelationType
from tests.functional.adapter.materialized_view_tests.files import (
MY_MATERIALIZED_VIEW,
MY_SEED,
MY_TABLE,
)


class TestSnowflakeMaterializedViews:
@staticmethod
def insert_record(project: TestProjInfo, table: SnowflakeRelation, record: Tuple[int, int]):
my_id, value = record
project.run_sql(f"insert into {table} (id, value) values ({my_id}, {value})")

@staticmethod
def query_relation_type(project: TestProjInfo, relation: SnowflakeRelation) -> Optional[str]:
sql = f"""
select
case
when table_type = 'MATERIALIZED VIEW' then 'materialized_view'
end as relation_type
from information_schema.tables
where table_name like '{relation.identifier.upper()}'
and table_schema like '{relation.schema.upper()}'
and table_catalog like '{relation.database.upper()}'
"""
results = project.run_sql(sql, fetch="one")
if results is None or len(results) == 0:
return None
elif len(results) > 1:
raise ValueError(f"More than one instance of {relation.name} found!")
else:
return results[0].lower()

@staticmethod
def query_row_count(project: TestProjInfo, relation: SnowflakeRelation) -> int:
sql = f"select count(*) from {relation}"
return project.run_sql(sql, fetch="one")[0]

@pytest.fixture(scope="class", autouse=True)
def seeds(self):
return {"my_seed.csv": MY_SEED}

@pytest.fixture(scope="class", autouse=True)
def models(self):
yield {
"my_table.sql": MY_TABLE,
"my_materialized_view.sql": MY_MATERIALIZED_VIEW,
}

@pytest.fixture(scope="class")
def my_table(self, project: TestProjInfo) -> SnowflakeRelation:
return project.adapter.Relation.create(
identifier="my_table",
schema=project.test_schema,
database=project.database,
type=SnowflakeRelationType.Table,
)

@pytest.fixture(scope="class")
def my_seed(self, project: TestProjInfo) -> SnowflakeRelation:
return project.adapter.Relation.create(
identifier="my_seed",
schema=project.test_schema,
database=project.database,
type=SnowflakeRelationType.Table,
)

@pytest.fixture(scope="class")
def my_materialized_view(self, project: TestProjInfo) -> SnowflakeRelation:
return project.adapter.Relation.create(
identifier="my_materialized_view",
schema=project.test_schema,
database=project.database,
type=SnowflakeRelationType.MaterializedView,
)

@staticmethod
def load_model(
project: TestProjInfo,
current_model: SnowflakeRelation,
new_model: SnowflakeRelation,
):
model_to_load = get_model_file(project, new_model)
set_model_file(project, current_model, model_to_load)

@pytest.fixture(scope="function", autouse=True)
def setup(
self,
project: TestProjInfo,
my_materialized_view: SnowflakeRelation,
my_table: SnowflakeRelation,
):
run_dbt(["seed"])
run_dbt(["run", "--models", my_materialized_view.identifier, "--full-refresh"])

# the tests touch these files, store their contents in memory
my_materialized_view_config = get_model_file(project, my_materialized_view)
my_table_config = get_model_file(project, my_table)

yield

# and then reset them after the test runs
set_model_file(project, my_materialized_view, my_materialized_view_config)
set_model_file(project, my_table, my_table_config)
project.run_sql(f"drop schema if exists {project.test_schema} cascade")

def test_materialized_view_create(
self, project: TestProjInfo, my_materialized_view: SnowflakeRelation
):
# setup creates it; verify it's there
assert self.query_relation_type(project, my_materialized_view) == "materialized_view"

def test_materialized_view_update_on_insert(
self,
project: TestProjInfo,
my_table: SnowflakeRelation,
my_materialized_view: SnowflakeRelation,
):
table_count_start = self.query_row_count(project, my_table)
view_count_start = self.query_row_count(project, my_materialized_view)

assert table_count_start == view_count_start

self.insert_record(project, my_table, (4, 400))

table_count_end = self.query_row_count(project, my_table)
view_count_end = self.query_row_count(project, my_materialized_view)

assert table_count_end == view_count_end
assert view_count_start != view_count_end