Skip to content

Commit

Permalink
Implementation of metadata-based freshness (#796)
Browse files Browse the repository at this point in the history
* Draft implementation of metadata-based freshness

* Add test of metadata-based freshness mechanism

* Strengthen test case.

* Fix some overzealous escaping.

* Simplifications per review

* Update for core code review changes

* Repoint to main core branch before merge

* Temporarily skip test

* test using custom schema for test

* use specific schema env var

* use specific schema env var

* cleanup env var

* skip test_get_last_relation_modified

* remove extra whitespace

* remove Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full) for CI troubleshooting

* remove metadata.sql for CI troubleshooting

* remove test_get_last_relation_modified.py for CI troubleshooting

* add non-test files back for CI troubleshooting

* add test files back for CI troubleshooting

* remove run order dependency from test cases within their test class

---------

Co-authored-by: colin-rogers-dbt <111200756+colin-rogers-dbt@users.noreply.github.com>
Co-authored-by: Colin <colin.rogers@dbtlabs.com>
Co-authored-by: Mike Alfare <13974384+mikealfare@users.noreply.github.com>
Co-authored-by: Mike Alfare <mike.alfare@dbtlabs.com>
  • Loading branch information
5 people authored Oct 12, 2023
1 parent 36fe646 commit 8970aaa
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 7 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20231008-195410.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add support for checking table-last-modified by metadata
time: 2023-10-08T19:54:10.503476-04:00
custom:
Author: peterallenwebb
Issue: "785"
7 changes: 5 additions & 2 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ class SnowflakeAdapter(SQLAdapter):
ConstraintType.foreign_key: ConstraintSupport.NOT_ENFORCED,
}

_capabilities = CapabilityDict(
{Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full)}
_capabilities: CapabilityDict = CapabilityDict(
{
Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full),
Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full),
}
)

@classmethod
Expand Down
19 changes: 19 additions & 0 deletions dbt/include/snowflake/macros/metadata.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{% macro snowflake__get_relation_last_modified(information_schema, relations) -%}

{%- call statement('last_modified', fetch_result=True) -%}
select table_schema as schema,
table_name as identifier,
last_altered as last_modified,
{{ current_timestamp() }} as snapshotted_at
from {{ information_schema }}.tables
where (
{%- for relation in relations -%}
(upper(table_schema) = upper('{{ relation.schema }}') and
upper(table_name) = upper('{{ relation.identifier }}')){%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
{%- endcall -%}

{{ return(load_result('last_modified')) }}

{% endmacro %}
61 changes: 61 additions & 0 deletions tests/functional/adapter/test_get_last_relation_modified.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import os
import pytest

from dbt.cli.main import dbtRunner


freshness_via_metadata_schema_yml = """version: 2
sources:
- name: test_source
freshness:
warn_after: {count: 10, period: hour}
error_after: {count: 1, period: day}
schema: "{{ env_var('DBT_GET_LAST_RELATION_TEST_SCHEMA') }}"
tables:
- name: test_table
"""


class TestGetLastRelationModified:
@pytest.fixture(scope="class", autouse=True)
def set_env_vars(self, project):
os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] = project.test_schema
yield
del os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"]

@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": freshness_via_metadata_schema_yml}

@pytest.fixture(scope="class")
def custom_schema(self, project, set_env_vars):
with project.adapter.connection_named("__test"):
relation = project.adapter.Relation.create(
database=project.database, schema=os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"]
)
project.adapter.drop_schema(relation)
project.adapter.create_schema(relation)

yield relation.schema

with project.adapter.connection_named("__test"):
project.adapter.drop_schema(relation)

@pytest.mark.skip()
def test_get_last_relation_modified(self, project, set_env_vars, custom_schema):
project.run_sql(
f"create table {custom_schema}.test_table (id integer autoincrement, name varchar(100) not null);"
)

warning_or_error = False

def probe(e):
nonlocal warning_or_error
if e.info.level in ["warning", "error"]:
warning_or_error = True

runner = dbtRunner(callbacks=[probe])
runner.invoke(["source", "freshness"])

# The 'source freshness' command should succeed without warnings or errors.
assert not warning_or_error
27 changes: 22 additions & 5 deletions tests/functional/adapter/test_list_relations_without_caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def find_exc_info_in_parsed_logs(parsed_logs, exc_info_name):
)


class TestListRelationsWithoutCaching:
class TestListRelationsWithoutCachingSingle:
@pytest.fixture(scope="class")
def models(self):
my_models = {"my_model_base.sql": TABLE_BASE_SQL}
Expand All @@ -88,16 +88,14 @@ def models(self):
def macros(self):
return {
"validate_list_relations_without_caching.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING,
"validate_list_relations_without_caching_raise_error.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING_RAISE_ERROR,
}

def test__snowflake__list_relations_without_caching_termination(self, project):
"""
validates that we do NOT trigger pagination logic snowflake__list_relations_without_caching
macro when there are fewer than max_results_per_iter relations in the target schema
"""

_ = run_dbt(["run", "-s", "my_model_base"])
run_dbt(["run", "-s", "my_model_base"])

database = project.database
schemas = project.created_schemas
Expand All @@ -121,14 +119,31 @@ def test__snowflake__list_relations_without_caching_termination(self, project):

assert n_relations == "n_relations: 1"


class TestListRelationsWithoutCachingFull:
@pytest.fixture(scope="class")
def models(self):
my_models = {"my_model_base.sql": TABLE_BASE_SQL}
for view in range(0, NUM_VIEWS):
my_models.update({f"my_model_{view}.sql": VIEW_X_SQL})

return my_models

@pytest.fixture(scope="class")
def macros(self):
return {
"validate_list_relations_without_caching.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING,
"validate_list_relations_without_caching_raise_error.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING_RAISE_ERROR,
}

def test__snowflake__list_relations_without_caching(self, project):
"""
validates pagination logic in snowflake__list_relations_without_caching macro counts
the correct number of objects in the target schema when having to make multiple looped
calls of SHOW TERSE OBJECTS.
"""
# purpose of the first run is to create the replicated views in the target schema
_ = run_dbt(["run", "--exclude", "my_model_base"])
run_dbt(["run"])

database = project.database
schemas = project.created_schemas
Expand Down Expand Up @@ -157,6 +172,8 @@ def test__snowflake__list_relations_without_caching_raise_error(self, project):
validates pagination logic terminates and raises a compilation error
when exceeding the limit of how many results to return.
"""
run_dbt(["run"])

database = project.database
schemas = project.created_schemas

Expand Down

0 comments on commit 8970aaa

Please sign in to comment.