Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of metadata-based freshness #796

Merged
merged 27 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e39a6e7
Draft implementation of metadata-based freshness
peterallenwebb Oct 9, 2023
010c781
Add test of metadata-based freshness mechanism
peterallenwebb Oct 9, 2023
181d984
Merge remote-tracking branch 'origin/main' into paw/metadata-freshness
peterallenwebb Oct 10, 2023
32e7044
Strengthen test case.
peterallenwebb Oct 10, 2023
0a01187
Fix some overzealous escaping.
peterallenwebb Oct 11, 2023
8e3fd10
Simplifications per review
peterallenwebb Oct 11, 2023
d909e56
Merge branch 'main' into paw/metadata-freshness
colin-rogers-dbt Oct 11, 2023
fbd0858
Merge remote-tracking branch 'origin/main' into paw/metadata-freshness
peterallenwebb Oct 11, 2023
d04431a
Update for core code review changes
peterallenwebb Oct 11, 2023
3f1ee07
Merge branch 'main' into paw/metadata-freshness
peterallenwebb Oct 11, 2023
231fe49
Repoint to main core branch before merge
peterallenwebb Oct 11, 2023
ffedcc3
Merge remote-tracking branch 'origin/main' into paw/metadata-freshness
peterallenwebb Oct 11, 2023
cd7e3c8
Temporarily skip test
peterallenwebb Oct 11, 2023
4d8647e
test using custom schema for test
colin-rogers-dbt Oct 11, 2023
cb2570f
Merge remote-tracking branch 'origin/paw/metadata-freshness' into paw…
colin-rogers-dbt Oct 11, 2023
f131751
Merge branch 'main' into paw/metadata-freshness
mikealfare Oct 11, 2023
f12d00e
use specific schema env var
colin-rogers-dbt Oct 11, 2023
61929da
use specific schema env var
colin-rogers-dbt Oct 12, 2023
bff9f6c
cleanup env var
colin-rogers-dbt Oct 12, 2023
fc47ba3
skip test_get_last_relation_modified
colin-rogers-dbt Oct 12, 2023
01baf08
remove extra whitespace
mikealfare Oct 12, 2023
ab373e9
remove Capability.TableLastModifiedMetadata: CapabilitySupport(suppor…
mikealfare Oct 12, 2023
452da8f
remove metadata.sql for CI troubleshooting
mikealfare Oct 12, 2023
aa7c2cb
remove test_get_last_relation_modified.py for CI troubleshooting
mikealfare Oct 12, 2023
c7a2d55
add non-test files back for CI troubleshooting
mikealfare Oct 12, 2023
8622ed6
add test files back for CI troubleshooting
mikealfare Oct 12, 2023
11769bd
remove run order dependency from test cases within their test class
mikealfare Oct 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20231008-195410.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add support for checking table-last-modified by metadata
time: 2023-10-08T19:54:10.503476-04:00
custom:
Author: peterallenwebb
Issue: "785"
7 changes: 5 additions & 2 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ class SnowflakeAdapter(SQLAdapter):
ConstraintType.foreign_key: ConstraintSupport.NOT_ENFORCED,
}

_capabilities = CapabilityDict(
{Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full)}
_capabilities: CapabilityDict = CapabilityDict(
{
Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full),
Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full),
}
)

@classmethod
Expand Down
19 changes: 19 additions & 0 deletions dbt/include/snowflake/macros/metadata.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{% macro snowflake__get_relation_last_modified(information_schema, relations) -%}

{%- call statement('last_modified', fetch_result=True) -%}
select table_schema as schema,
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
table_name as identifier,
last_altered as last_modified,
{{ current_timestamp() }} as snapshotted_at
from {{ information_schema }}.tables
where (
peterallenwebb marked this conversation as resolved.
Show resolved Hide resolved
{%- for relation in relations -%}
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
(upper(table_schema) = upper('{{ relation.schema }}') and
upper(table_name) = upper('{{ relation.identifier }}')){%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
{%- endcall -%}

{{ return(load_result('last_modified')) }}

{% endmacro %}
61 changes: 61 additions & 0 deletions tests/functional/adapter/test_get_last_relation_modified.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import os
import pytest

from dbt.cli.main import dbtRunner


freshness_via_metadata_schema_yml = """version: 2
sources:
- name: test_source
freshness:
warn_after: {count: 10, period: hour}
error_after: {count: 1, period: day}
schema: "{{ env_var('DBT_GET_LAST_RELATION_TEST_SCHEMA') }}"
tables:
- name: test_table
"""


class TestGetLastRelationModified:
peterallenwebb marked this conversation as resolved.
Show resolved Hide resolved
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
@pytest.fixture(scope="class", autouse=True)
def set_env_vars(self, project):
os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] = project.test_schema
yield
del os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"]

@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": freshness_via_metadata_schema_yml}

@pytest.fixture(scope="class")
def custom_schema(self, project, set_env_vars):
with project.adapter.connection_named("__test"):
relation = project.adapter.Relation.create(
database=project.database, schema=os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"]
)
project.adapter.drop_schema(relation)
project.adapter.create_schema(relation)

yield relation.schema

with project.adapter.connection_named("__test"):
project.adapter.drop_schema(relation)

@pytest.mark.skip()
def test_get_last_relation_modified(self, project, set_env_vars, custom_schema):
project.run_sql(
f"create table {custom_schema}.test_table (id integer autoincrement, name varchar(100) not null);"
)

warning_or_error = False

def probe(e):
nonlocal warning_or_error
if e.info.level in ["warning", "error"]:
warning_or_error = True

runner = dbtRunner(callbacks=[probe])
runner.invoke(["source", "freshness"])

# The 'source freshness' command should succeed without warnings or errors.
assert not warning_or_error
27 changes: 22 additions & 5 deletions tests/functional/adapter/test_list_relations_without_caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def find_exc_info_in_parsed_logs(parsed_logs, exc_info_name):
)


class TestListRelationsWithoutCaching:
class TestListRelationsWithoutCachingSingle:
@pytest.fixture(scope="class")
def models(self):
my_models = {"my_model_base.sql": TABLE_BASE_SQL}
Expand All @@ -88,16 +88,14 @@ def models(self):
def macros(self):
return {
"validate_list_relations_without_caching.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING,
"validate_list_relations_without_caching_raise_error.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING_RAISE_ERROR,
}

def test__snowflake__list_relations_without_caching_termination(self, project):
"""
validates that we do NOT trigger pagination logic snowflake__list_relations_without_caching
macro when there are fewer than max_results_per_iter relations in the target schema
"""

_ = run_dbt(["run", "-s", "my_model_base"])
run_dbt(["run", "-s", "my_model_base"])

database = project.database
schemas = project.created_schemas
Expand All @@ -121,14 +119,31 @@ def test__snowflake__list_relations_without_caching_termination(self, project):

assert n_relations == "n_relations: 1"


class TestListRelationsWithoutCachingFull:
@pytest.fixture(scope="class")
def models(self):
my_models = {"my_model_base.sql": TABLE_BASE_SQL}
for view in range(0, NUM_VIEWS):
my_models.update({f"my_model_{view}.sql": VIEW_X_SQL})

return my_models

@pytest.fixture(scope="class")
def macros(self):
return {
"validate_list_relations_without_caching.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING,
"validate_list_relations_without_caching_raise_error.sql": MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING_RAISE_ERROR,
}

def test__snowflake__list_relations_without_caching(self, project):
"""
validates pagination logic in snowflake__list_relations_without_caching macro counts
the correct number of objects in the target schema when having to make multiple looped
calls of SHOW TERSE OBJECTS.
"""
# purpose of the first run is to create the replicated views in the target schema
_ = run_dbt(["run", "--exclude", "my_model_base"])
run_dbt(["run"])

database = project.database
schemas = project.created_schemas
Expand Down Expand Up @@ -157,6 +172,8 @@ def test__snowflake__list_relations_without_caching_raise_error(self, project):
validates pagination logic terminates and raises a compilation error
when exceeding the limit of how many results to return.
"""
run_dbt(["run"])

database = project.database
schemas = project.created_schemas

Expand Down