From d2782dc6a74ad37bee94df18b2793000420b40a9 Mon Sep 17 00:00:00 2001 From: Anders Swanson Date: Wed, 15 May 2024 15:39:42 -0400 Subject: [PATCH 01/10] add external table --- .../snowflake/relation_configs/policies.py | 1 + .../external_table/external_table.sql | 77 +++++++++++ .../relations/external_table/helpers.sql | 125 ++++++++++++++++++ 3 files changed, 203 insertions(+) create mode 100644 dbt/include/snowflake/macros/relations/external_table/external_table.sql create mode 100644 dbt/include/snowflake/macros/relations/external_table/helpers.sql diff --git a/dbt/adapters/snowflake/relation_configs/policies.py b/dbt/adapters/snowflake/relation_configs/policies.py index 75195f9a3..2ad42dc0b 100644 --- a/dbt/adapters/snowflake/relation_configs/policies.py +++ b/dbt/adapters/snowflake/relation_configs/policies.py @@ -10,6 +10,7 @@ class SnowflakeRelationType(StrEnum): CTE = "cte" External = "external" DynamicTable = "dynamic_table" + ExternalTable = "external_table" class SnowflakeIncludePolicy(Policy): diff --git a/dbt/include/snowflake/macros/relations/external_table/external_table.sql b/dbt/include/snowflake/macros/relations/external_table/external_table.sql new file mode 100644 index 000000000..02fce568a --- /dev/null +++ b/dbt/include/snowflake/macros/relations/external_table/external_table.sql @@ -0,0 +1,77 @@ +{% macro snowflake__create_external_table(source_node) %} + + {%- set columns = source_node.columns.values() -%} + {%- set external = source_node.external -%} + {%- set partitions = external.partitions -%} + {%- set infer_schema = external.infer_schema -%} + + {% if infer_schema %} + {% set query_infer_schema %} + select * from table( infer_schema( location=>'{{external.location}}', file_format=>'{{external.file_format}}') ) + {% endset %} + {% if execute %} + {% set columns_infer = run_query(query_infer_schema) %} + {% endif %} + {% endif %} + + {%- set is_csv = dbt_external_tables.is_csv(external.file_format) -%} + +{# https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html #} +{# This assumes you have already created an external stage #} + create or replace external table {{source(source_node.source_name, source_node.name)}} + {%- if columns or partitions or infer_schema -%} + ( + {%- if partitions -%}{%- for partition in partitions %} + {{partition.name}} {{partition.data_type}} as {{partition.expression}}{{- ',' if not loop.last or columns|length > 0 or infer_schema -}} + {%- endfor -%}{%- endif -%} + {%- if not infer_schema -%} + {%- for column in columns %} + {%- set column_quoted = adapter.quote(column.name) if column.quote else column.name %} + {%- set column_alias -%} + {%- if 'alias' in column and column.quote -%} + {{adapter.quote(column.alias)}} + {%- elif 'alias' in column -%} + {{column.alias}} + {%- else -%} + {{column_quoted}} + {%- endif -%} + {%- endset %} + {%- set col_expression -%} + {%- if column.expression -%} + {{column.expression}} + {%- else -%} + {%- set col_id = 'value:c' ~ loop.index if is_csv else 'value:' ~ column_alias -%} + (case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end) + {%- endif -%} + {%- endset %} + {{column_alias}} {{column.data_type}} as ({{col_expression}}::{{column.data_type}}) + {{- ',' if not loop.last -}} + {% endfor %} + {% else %} + {%- for column in columns_infer %} + {%- set col_expression -%} + {%- set col_id = 'value:' ~ column[0] -%} + (case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end) + {%- endset %} + {{column[0]}} {{column[1]}} as ({{col_expression}}::{{column[1]}}) + {{- ',' if not loop.last -}} + {% endfor %} + {%- endif -%} + ) + {%- endif -%} + {% if partitions %} partition by ({{partitions|map(attribute='name')|join(', ')}}) {% endif %} + location = {{external.location}} {# stage #} + {% if external.auto_refresh in (true, false) -%} + auto_refresh = {{external.auto_refresh}} + {%- endif %} + {% if external.aws_sns_topic -%} + aws_sns_topic = '{{external.aws_sns_topic}}' + {%- endif %} + {% if external.table_format | lower == "delta" %} + refresh_on_create = false + {% endif %} + {% if external.pattern -%} pattern = '{{external.pattern}}' {%- endif %} + {% if external.integration -%} integration = '{{external.integration}}' {%- endif %} + file_format = {{external.file_format}} + {% if external.table_format -%} table_format = '{{external.table_format}}' {%- endif %} +{% endmacro %} diff --git a/dbt/include/snowflake/macros/relations/external_table/helpers.sql b/dbt/include/snowflake/macros/relations/external_table/helpers.sql new file mode 100644 index 000000000..257f35b54 --- /dev/null +++ b/dbt/include/snowflake/macros/relations/external_table/helpers.sql @@ -0,0 +1,125 @@ +{% macro snowflake__create_external_schema(source_node) %} + + {% set schema_exists_query %} + show terse schemas like '{{ source_node.schema }}' in database {{ source_node.database }} limit 1; + {% endset %} + {% if execute %} + {% set schema_exists = run_query(schema_exists_query)|length > 0 %} + {% else %} + {% set schema_exists = false %} + {% endif %} + + {% if schema_exists %} + {% set ddl %} + select 'Schema {{ source_node.schema }} exists' from dual; + {% endset %} + {% else %} + {% set fqn %} + {% if source_node.database %} + {{ source_node.database }}.{{ source_node.schema }} + {% else %} + {{ source_node.schema }} + {% endif %} + {% endset %} + + {% set ddl %} + create schema if not exists {{ fqn }}; + {% endset %} + {% endif %} + + {% do return(ddl) %} + +{% endmacro %} + +{% macro snowflake__refresh_external_table(source_node) %} + + {% set external = source_node.external %} + {% set snowpipe = source_node.external.get('snowpipe', none) %} + + {% set auto_refresh = external.get('auto_refresh', false) %} + {% set partitions = external.get('partitions', none) %} + {% set delta_format = (external.table_format | lower == "delta") %} + + {% set manual_refresh = not auto_refresh %} + + {% if manual_refresh %} + + {% set ddl %} + begin; + alter external table {{source(source_node.source_name, source_node.name)}} refresh; + commit; + {% endset %} + + {% do return([ddl]) %} + + {% else %} + + {% do return([]) %} + + {% endif %} + +{% endmacro %} + +{% macro is_csv(file_format) %} + +{# From https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html: + +Important: The external table does not inherit the file format, if any, in the +stage definition. You must explicitly specify any file format options for the +external table using the FILE_FORMAT parameter. + +Note: FORMAT_NAME and TYPE are mutually exclusive; to avoid unintended behavior, +you should only specify one or the other when creating an external table. + +#} + + {% set ff_ltrimmed = file_format|lower|replace(' ','') %} + + {% if 'type=' in ff_ltrimmed %} + + {% if 'type=csv' in ff_ltrimmed %} + + {{return(true)}} + + {% else %} + + {{return(false)}} + + {% endif %} + + {% else %} + + {% set ff_standardized = ff_ltrimmed + | replace('(','') | replace(')','') + | replace('format_name=','') %} + {% set fqn = ff_standardized.split('.') %} + + {% if fqn | length == 3 %} + {% set ff_database, ff_schema, ff_identifier = fqn[0], fqn[1], fqn[2] %} + {% elif fqn | length == 2 %} + {% set ff_database, ff_schema, ff_identifier = target.database, fqn[0], fqn[1] %} + {% else %} + {% set ff_database, ff_schema, ff_identifier = target.database, target.schema, fqn[0] %} + {% endif %} + + {% call statement('get_file_format', fetch_result = True) %} + show file formats in {{ff_database}}.{{ff_schema}} + {% endcall %} + + {% set ffs = load_result('get_file_format').table %} + + {% for ff in ffs %} + + {% if ff['name']|lower == ff_identifier and ff['type']|lower == 'csv' %} + + {{return(true)}} + + {% endif %} + + {% endfor %} + + {{return(false)}} + + {% endif %} + +{% endmacro %} From bd49b63599fcb0836af401cb24b6b22adc0e2480 Mon Sep 17 00:00:00 2001 From: Anders Swanson Date: Wed, 15 May 2024 17:48:18 -0400 Subject: [PATCH 02/10] materialize exactly one external table --- .../external_table/external_table.sql | 88 ++++++------------- 1 file changed, 27 insertions(+), 61 deletions(-) diff --git a/dbt/include/snowflake/macros/relations/external_table/external_table.sql b/dbt/include/snowflake/macros/relations/external_table/external_table.sql index 02fce568a..edf127abc 100644 --- a/dbt/include/snowflake/macros/relations/external_table/external_table.sql +++ b/dbt/include/snowflake/macros/relations/external_table/external_table.sql @@ -3,75 +3,41 @@ {%- set columns = source_node.columns.values() -%} {%- set external = source_node.external -%} {%- set partitions = external.partitions -%} - {%- set infer_schema = external.infer_schema -%} - {% if infer_schema %} - {% set query_infer_schema %} - select * from table( infer_schema( location=>'{{external.location}}', file_format=>'{{external.file_format}}') ) - {% endset %} - {% if execute %} - {% set columns_infer = run_query(query_infer_schema) %} - {% endif %} - {% endif %} + {# {{ log('XXX: columns: ' ~ columns, info=True) }} + {{ log('XXX: partitions: ' ~ columns, info=True) }} + {% set partition_map = partitions|map(attribute='name')|join(', ') %} + {{ log('XXX: partition_map: ' ~ partition_map, info=True) }} #} + + + {%- set is_csv = dbt_external_tables.is_csv(external.file_format) -%} + {%- set relation = api.Relation.create( + database=source_node.database, schema=source_node.schema, identifier=source_node.name, + type='external_table') -%} + {# https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html #} {# This assumes you have already created an external stage #} - create or replace external table {{source(source_node.source_name, source_node.name)}} - {%- if columns or partitions or infer_schema -%} + + create or replace external table {{ relation }} ( - {%- if partitions -%}{%- for partition in partitions %} - {{partition.name}} {{partition.data_type}} as {{partition.expression}}{{- ',' if not loop.last or columns|length > 0 or infer_schema -}} - {%- endfor -%}{%- endif -%} - {%- if not infer_schema -%} - {%- for column in columns %} - {%- set column_quoted = adapter.quote(column.name) if column.quote else column.name %} - {%- set column_alias -%} - {%- if 'alias' in column and column.quote -%} - {{adapter.quote(column.alias)}} - {%- elif 'alias' in column -%} - {{column.alias}} - {%- else -%} - {{column_quoted}} - {%- endif -%} - {%- endset %} - {%- set col_expression -%} - {%- if column.expression -%} - {{column.expression}} - {%- else -%} - {%- set col_id = 'value:c' ~ loop.index if is_csv else 'value:' ~ column_alias -%} - (case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end) - {%- endif -%} - {%- endset %} - {{column_alias}} {{column.data_type}} as ({{col_expression}}::{{column.data_type}}) - {{- ',' if not loop.last -}} - {% endfor %} - {% else %} - {%- for column in columns_infer %} - {%- set col_expression -%} - {%- set col_id = 'value:' ~ column[0] -%} - (case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end) - {%- endset %} - {{column[0]}} {{column[1]}} as ({{col_expression}}::{{column[1]}}) - {{- ',' if not loop.last -}} - {% endfor %} - {%- endif -%} + + {%- for column in columns %} + {{ log('column: ' ~ column.name, info=True) }} + {%- set column_alias = column.name %} + {%- set col_expression -%} + {%- set col_id = 'value:c' ~ loop.index if is_csv else 'value:' ~ column_alias -%} + (case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end) + {%- endset %} + {{column_alias}} {{column.data_type}} as ({{col_expression}}::{{column.data_type}}) + {{- ',' if not loop.last -}} + {% endfor %} + ) - {%- endif -%} - {% if partitions %} partition by ({{partitions|map(attribute='name')|join(', ')}}) {% endif %} location = {{external.location}} {# stage #} - {% if external.auto_refresh in (true, false) -%} - auto_refresh = {{external.auto_refresh}} - {%- endif %} - {% if external.aws_sns_topic -%} - aws_sns_topic = '{{external.aws_sns_topic}}' - {%- endif %} - {% if external.table_format | lower == "delta" %} - refresh_on_create = false - {% endif %} - {% if external.pattern -%} pattern = '{{external.pattern}}' {%- endif %} - {% if external.integration -%} integration = '{{external.integration}}' {%- endif %} + file_format = {{external.file_format}} - {% if external.table_format -%} table_format = '{{external.table_format}}' {%- endif %} + {% endmacro %} From b1a041c46e19b70490766c11632e2165396f526f Mon Sep 17 00:00:00 2001 From: Anders Swanson Date: Tue, 21 May 2024 15:24:30 -0400 Subject: [PATCH 03/10] model materialization now --- .../external_table/external_table.sql | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/dbt/include/snowflake/macros/relations/external_table/external_table.sql b/dbt/include/snowflake/macros/relations/external_table/external_table.sql index edf127abc..44cdbf7b9 100644 --- a/dbt/include/snowflake/macros/relations/external_table/external_table.sql +++ b/dbt/include/snowflake/macros/relations/external_table/external_table.sql @@ -1,22 +1,20 @@ -{% macro snowflake__create_external_table(source_node) %} +{% macro snowflake__create_external_table(relation, compiled_code) %} - {%- set columns = source_node.columns.values() -%} - {%- set external = source_node.external -%} - {%- set partitions = external.partitions -%} - - {# {{ log('XXX: columns: ' ~ columns, info=True) }} - {{ log('XXX: partitions: ' ~ columns, info=True) }} - {% set partition_map = partitions|map(attribute='name')|join(', ') %} - {{ log('XXX: partition_map: ' ~ partition_map, info=True) }} #} + {# {%- set columns = source_node.columns.values() -%} #} + {# {%- set external = source_node.external -%} #} + {% set file_format = config.get('file_format') %} + {% set location = config.get('location') %} + {# {%- set partitions = external.partitions -%} #} + {{ log('XXX: columns: ' ~ columns, info=True) }} + {{ log('XXX: partitions: ' ~ columns, info=True) }} + {% set partition_map = partitions|map(attribute='name')|join(', ') %} + {{ log('XXX: partition_map: ' ~ partition_map, info=True) }} - {%- set is_csv = dbt_external_tables.is_csv(external.file_format) -%} + {%- set is_csv = is_csv(file_format) -%} - {%- set relation = api.Relation.create( - database=source_node.database, schema=source_node.schema, identifier=source_node.name, - type='external_table') -%} {# https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html #} {# This assumes you have already created an external stage #} @@ -36,8 +34,8 @@ {% endfor %} ) - location = {{external.location}} {# stage #} + location = {{location}} {# stage #} - file_format = {{external.file_format}} + file_format = {{file_format}} {% endmacro %} From 67e3cee43119a7dcc2d06b36cda8f6549db7b9c1 Mon Sep 17 00:00:00 2001 From: Anders Swanson Date: Tue, 21 May 2024 17:09:14 -0400 Subject: [PATCH 04/10] on going work --- .../macros/relations/external_table/external_table.sql | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/dbt/include/snowflake/macros/relations/external_table/external_table.sql b/dbt/include/snowflake/macros/relations/external_table/external_table.sql index 44cdbf7b9..b927b78a2 100644 --- a/dbt/include/snowflake/macros/relations/external_table/external_table.sql +++ b/dbt/include/snowflake/macros/relations/external_table/external_table.sql @@ -1,15 +1,11 @@ -{% macro snowflake__create_external_table(relation, compiled_code) %} +{% macro snowflake__create_external_table(relation, columns) %} - {# {%- set columns = source_node.columns.values() -%} #} - - {# {%- set external = source_node.external -%} #} {% set file_format = config.get('file_format') %} {% set location = config.get('location') %} {# {%- set partitions = external.partitions -%} #} {{ log('XXX: columns: ' ~ columns, info=True) }} - {{ log('XXX: partitions: ' ~ columns, info=True) }} {% set partition_map = partitions|map(attribute='name')|join(', ') %} {{ log('XXX: partition_map: ' ~ partition_map, info=True) }} From f951a61999672c814eaa1d55a0f3ada3e6e84496 Mon Sep 17 00:00:00 2001 From: Anders Swanson Date: Wed, 22 May 2024 17:02:50 -0400 Subject: [PATCH 05/10] schema creation as part of RunTask --- .../relations/external_table/helpers.sql | 50 ++++--------------- 1 file changed, 10 insertions(+), 40 deletions(-) diff --git a/dbt/include/snowflake/macros/relations/external_table/helpers.sql b/dbt/include/snowflake/macros/relations/external_table/helpers.sql index 257f35b54..ce1f1eebe 100644 --- a/dbt/include/snowflake/macros/relations/external_table/helpers.sql +++ b/dbt/include/snowflake/macros/relations/external_table/helpers.sql @@ -1,52 +1,22 @@ -{% macro snowflake__create_external_schema(source_node) %} +{% macro snowflake__refresh_external_table(relation) %} - {% set schema_exists_query %} - show terse schemas like '{{ source_node.schema }}' in database {{ source_node.database }} limit 1; - {% endset %} - {% if execute %} - {% set schema_exists = run_query(schema_exists_query)|length > 0 %} - {% else %} - {% set schema_exists = false %} - {% endif %} - - {% if schema_exists %} - {% set ddl %} - select 'Schema {{ source_node.schema }} exists' from dual; - {% endset %} - {% else %} - {% set fqn %} - {% if source_node.database %} - {{ source_node.database }}.{{ source_node.schema }} - {% else %} - {{ source_node.schema }} - {% endif %} - {% endset %} + {% set snowpipe = config.get('snowpipe', none) %} - {% set ddl %} - create schema if not exists {{ fqn }}; - {% endset %} - {% endif %} - - {% do return(ddl) %} - -{% endmacro %} - -{% macro snowflake__refresh_external_table(source_node) %} - - {% set external = source_node.external %} - {% set snowpipe = source_node.external.get('snowpipe', none) %} + {% set auto_refresh = config.get('auto_refresh', false) %} + {% set manual_refresh = not auto_refresh %} - {% set auto_refresh = external.get('auto_refresh', false) %} - {% set partitions = external.get('partitions', none) %} - {% set delta_format = (external.table_format | lower == "delta") %} + {% set partitions = config.get('partitions', none) %} - {% set manual_refresh = not auto_refresh %} + {% set table_format = config.get('table_format', none) %} + {% if table_format %} + {% set is_delta = table_format | lower == "delta" %} + {% endif %} {% if manual_refresh %} {% set ddl %} begin; - alter external table {{source(source_node.source_name, source_node.name)}} refresh; + alter external table {{ relation }} refresh; commit; {% endset %} From 56f50992ca80fbea9f2010b80e54937f18e489af Mon Sep 17 00:00:00 2001 From: Anders Swanson Date: Wed, 22 May 2024 17:04:27 -0400 Subject: [PATCH 06/10] drop logging --- .../macros/relations/external_table/external_table.sql | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/dbt/include/snowflake/macros/relations/external_table/external_table.sql b/dbt/include/snowflake/macros/relations/external_table/external_table.sql index b927b78a2..5a1bf0633 100644 --- a/dbt/include/snowflake/macros/relations/external_table/external_table.sql +++ b/dbt/include/snowflake/macros/relations/external_table/external_table.sql @@ -2,12 +2,8 @@ {% set file_format = config.get('file_format') %} {% set location = config.get('location') %} - - {# {%- set partitions = external.partitions -%} #} - - {{ log('XXX: columns: ' ~ columns, info=True) }} + {% set partitions = config.get('partitions') %} {% set partition_map = partitions|map(attribute='name')|join(', ') %} - {{ log('XXX: partition_map: ' ~ partition_map, info=True) }} {%- set is_csv = is_csv(file_format) -%} @@ -19,7 +15,6 @@ ( {%- for column in columns %} - {{ log('column: ' ~ column.name, info=True) }} {%- set column_alias = column.name %} {%- set col_expression -%} {%- set col_id = 'value:c' ~ loop.index if is_csv else 'value:' ~ column_alias -%} From e14846f47a1eccb5a8d89c2f302b64b1f4f40c05 Mon Sep 17 00:00:00 2001 From: Anders Swanson Date: Thu, 23 May 2024 15:38:30 -0400 Subject: [PATCH 07/10] restore back features from OG macro --- .../external_table/external_table.sql | 63 +++++++++++++++---- 1 file changed, 51 insertions(+), 12 deletions(-) diff --git a/dbt/include/snowflake/macros/relations/external_table/external_table.sql b/dbt/include/snowflake/macros/relations/external_table/external_table.sql index 5a1bf0633..c846656d3 100644 --- a/dbt/include/snowflake/macros/relations/external_table/external_table.sql +++ b/dbt/include/snowflake/macros/relations/external_table/external_table.sql @@ -12,21 +12,60 @@ {# This assumes you have already created an external stage #} create or replace external table {{ relation }} + {%- if columns or partitions or infer_schema -%} ( - - {%- for column in columns %} - {%- set column_alias = column.name %} - {%- set col_expression -%} - {%- set col_id = 'value:c' ~ loop.index if is_csv else 'value:' ~ column_alias -%} - (case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end) - {%- endset %} - {{column_alias}} {{column.data_type}} as ({{col_expression}}::{{column.data_type}}) - {{- ',' if not loop.last -}} - {% endfor %} - + {%- if partitions -%}{%- for partition in partitions %} + {{partition.name}} {{partition.data_type}} as {{partition.expression}}{{- ',' if not loop.last or columns|length > 0 or infer_schema -}} + {%- endfor -%}{%- endif -%} + {%- if not infer_schema -%} + {%- for column in columns %} + {%- set column_quoted = adapter.quote(column.name) if column.quote else column.name %} + {%- set column_alias -%} + {%- if 'alias' in column and column.quote -%} + {{adapter.quote(column.alias)}} + {%- elif 'alias' in column -%} + {{column.alias}} + {%- else -%} + {{column_quoted}} + {%- endif -%} + {%- endset %} + {%- set col_expression -%} + {%- if column.expression -%} + {{column.expression}} + {%- else -%} + {%- set col_id = 'value:c' ~ loop.index if is_csv else 'value:' ~ column_alias -%} + (case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end) + {%- endif -%} + {%- endset %} + {{column_alias}} {{column.data_type}} as ({{col_expression}}::{{column.data_type}}) + {{- ',' if not loop.last -}} + {% endfor %} + {% else %} + {%- for column in columns_infer %} + {%- set col_expression -%} + {%- set col_id = 'value:' ~ column[0] -%} + (case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end) + {%- endset %} + {{column[0]}} {{column[1]}} as ({{col_expression}}::{{column[1]}}) + {{- ',' if not loop.last -}} + {% endfor %} + {%- endif -%} ) + {%- endif -%} + {% if partitions %} partition by ({{partitions|map(attribute='name')|join(', ')}}) {% endif %} location = {{location}} {# stage #} - + {% if auto_refresh in (true, false) -%} + auto_refresh = {{auto_refresh}} + {%- endif %} + {% if aws_sns_topic -%} + aws_sns_topic = '{{aws_sns_topic}}' + {%- endif %} + {% if table_format | lower == "delta" %} + refresh_on_create = false + {% endif %} + {% if pattern -%} pattern = '{{pattern}}' {%- endif %} + {% if integration -%} integration = '{{integration}}' {%- endif %} file_format = {{file_format}} + {% if table_format -%} table_format = '{{table_format}}' {%- endif %} {% endmacro %} From 10535027e4cde272eddca3ecc7fd8a9807e858a5 Mon Sep 17 00:00:00 2001 From: Anders Swanson Date: Thu, 23 May 2024 17:01:34 -0400 Subject: [PATCH 08/10] support snowpipe --- .../external_table/external_table.sql | 24 ++++- .../relations/external_table/helpers.sql | 2 - .../macros/relations/snowpipe/helpers.sql | 100 ++++++++++++++++++ .../macros/relations/snowpipe/snowpipe.sql | 9 ++ 4 files changed, 129 insertions(+), 6 deletions(-) create mode 100644 dbt/include/snowflake/macros/relations/snowpipe/helpers.sql create mode 100644 dbt/include/snowflake/macros/relations/snowpipe/snowpipe.sql diff --git a/dbt/include/snowflake/macros/relations/external_table/external_table.sql b/dbt/include/snowflake/macros/relations/external_table/external_table.sql index c846656d3..a0d3a7715 100644 --- a/dbt/include/snowflake/macros/relations/external_table/external_table.sql +++ b/dbt/include/snowflake/macros/relations/external_table/external_table.sql @@ -1,5 +1,25 @@ {% macro snowflake__create_external_table(relation, columns) %} + {% set snowpipe = config.get('snowpipe') %} + + {% if snowpipe %} + + {{ snowflake_get_build_snowpipe_sql(relation) }} + + {% else %} + + {{ get_create_external_table_sql(relation, columns) }} + + {% endif %} + +{# https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html #} +{# This assumes you have already created an external stage #} + + +{% endmacro %} + +{% macro get_create_external_table_sql(relation, columns) %} + {% set file_format = config.get('file_format') %} {% set location = config.get('location') %} {% set partitions = config.get('partitions') %} @@ -7,10 +27,6 @@ {%- set is_csv = is_csv(file_format) -%} - -{# https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html #} -{# This assumes you have already created an external stage #} - create or replace external table {{ relation }} {%- if columns or partitions or infer_schema -%} ( diff --git a/dbt/include/snowflake/macros/relations/external_table/helpers.sql b/dbt/include/snowflake/macros/relations/external_table/helpers.sql index ce1f1eebe..3cfaae3df 100644 --- a/dbt/include/snowflake/macros/relations/external_table/helpers.sql +++ b/dbt/include/snowflake/macros/relations/external_table/helpers.sql @@ -1,7 +1,5 @@ {% macro snowflake__refresh_external_table(relation) %} - {% set snowpipe = config.get('snowpipe', none) %} - {% set auto_refresh = config.get('auto_refresh', false) %} {% set manual_refresh = not auto_refresh %} diff --git a/dbt/include/snowflake/macros/relations/snowpipe/helpers.sql b/dbt/include/snowflake/macros/relations/snowpipe/helpers.sql new file mode 100644 index 000000000..04fa9de73 --- /dev/null +++ b/dbt/include/snowflake/macros/relations/snowpipe/helpers.sql @@ -0,0 +1,100 @@ +{% macro snowflake_create_empty_table(relation) %} + + {%- set columns = relation.columns.values() %} + + create or replace table {{ relation }} ( + {% if columns|length == 0 %} + value variant, + {% else -%} + {%- for column in columns -%} + {{column.name}} {{column.data_type}}, + {% endfor -%} + {% endif %} + metadata_filename varchar, + metadata_file_row_number bigint, + metadata_file_last_modified timestamp, + _dbt_copied_at timestamp + ); + +{% endmacro %} + +{% macro snowflake_get_copy_sql(relation, explicit_transaction=false) %} +{# This assumes you have already created an external stage #} + + {%- set columns = relation.columns.values() -%} + + {% set location = config.get('location') %} + {% set file_format = config.get('file_format') %} + + {% set pattern = config.get('pattern') %} + {%- set is_csv = is_csv(file_format) %} + + {% set snowpipe = config.get('snowpipe', none) %} + {%- set copy_options = snowpipe.get('copy_options', none) -%} + + {%- if explicit_transaction -%} begin; {%- endif %} + + copy into {{ relation }} + from ( + select + {% if columns|length == 0 %} + $1::variant as value, + {% else -%} + {%- for column in columns -%} + {%- set col_expression -%} + {%- if is_csv -%}nullif(${{loop.index}},''){# special case: get columns by ordinal position #} + {%- else -%}nullif($1:{{column.name}},''){# standard behavior: get columns by name #} + {%- endif -%} + {%- endset -%} + {{col_expression}}::{{column.data_type}} as {{column.name}}, + {% endfor -%} + {% endif %} + metadata$filename::varchar as metadata_filename, + metadata$file_row_number::bigint as metadata_file_row_number, + metadata$file_last_modified::timestamp as metadata_file_last_modified, + metadata$start_scan_time::timestamp as _dbt_copied_at + from {{location}} {# stage #} + ) + file_format = {{file_format}} + {% if pattern -%} pattern = '{{pattern}}' {%- endif %} + {% if copy_options %} {{copy_options}} {% endif %}; + + {% if explicit_transaction -%} commit; {%- endif -%} + +{% endmacro %} + + +{% macro snowflake_create_snowpipe(relation) %} + + {% set snowpipe = config.get('snowpipe', none) %} + +{# https://docs.snowflake.com/en/sql-reference/sql/create-pipe.html #} + create or replace pipe {{ relation }} + {% if snowpipe.auto_ingest -%} auto_ingest = {{snowpipe.auto_ingest}} {%- endif %} + {% if snowpipe.aws_sns_topic -%} aws_sns_topic = '{{snowpipe.aws_sns_topic}}' {%- endif %} + {% if snowpipe.integration -%} integration = '{{snowpipe.integration}}' {%- endif %} + {% if snowpipe.error_integration -%} error_integration = '{{snowpipe.error_integration}}' {%- endif %} + as {{ snowflake_get_copy_sql(relation) }} + +{% endmacro %} + +{% macro snowflake_refresh_snowpipe(relation) %} + + {% set snowpipe = config.get('snowpipe', none) %} + {% set auto_ingest = snowpipe.get('auto_ingest', false) if snowpipe is mapping %} + + {% if auto_ingest is true %} + + {% do return([]) %} + + {% else %} + + {% set ddl %} + alter pipe {{ relation }} refresh + {% endset %} + + {{ return([ddl]) }} + + {% endif %} + +{% endmacro %} diff --git a/dbt/include/snowflake/macros/relations/snowpipe/snowpipe.sql b/dbt/include/snowflake/macros/relations/snowpipe/snowpipe.sql new file mode 100644 index 000000000..9aff87445 --- /dev/null +++ b/dbt/include/snowflake/macros/relations/snowpipe/snowpipe.sql @@ -0,0 +1,9 @@ +{% macro snowflake_get_build_snowpipe_sql(relation) %} + + {{ snowflake_create_empty_table(relation) }} + + {{ snowflake_get_copy_sql(relation, explicit_transaction) }} + + {{ snowflake_create_snowpipe(relation) }} + +{% endmacro %} From b311145607f5aeed3211ff8ac58a4039c77b1017 Mon Sep 17 00:00:00 2001 From: Anders Swanson Date: Fri, 24 May 2024 16:09:13 -0400 Subject: [PATCH 09/10] support snowpipe --- dbt/adapters/snowflake/relation.py | 8 ++++++++ dbt/adapters/snowflake/relation_configs/policies.py | 1 + .../relations/external_table/external_table.sql | 2 +- .../macros/relations/external_table/helpers.sql | 10 ++++++++-- .../snowflake/macros/relations/snowpipe/helpers.sql | 12 ++++-------- .../snowflake/macros/relations/snowpipe/snowpipe.sql | 8 ++++---- 6 files changed, 26 insertions(+), 15 deletions(-) diff --git a/dbt/adapters/snowflake/relation.py b/dbt/adapters/snowflake/relation.py index ff94abc33..f91839345 100644 --- a/dbt/adapters/snowflake/relation.py +++ b/dbt/adapters/snowflake/relation.py @@ -48,6 +48,14 @@ def is_dynamic_table(self) -> bool: def DynamicTable(cls) -> str: return str(SnowflakeRelationType.DynamicTable) + @property + def is_snowpipe(self) -> bool: + return self.type == SnowflakeRelationType.SnowPipe + + @classproperty + def SnowPipe(cls) -> str: + return str(SnowflakeRelationType.SnowPipe) + @classproperty def get_relation_type(cls) -> Type[SnowflakeRelationType]: return SnowflakeRelationType diff --git a/dbt/adapters/snowflake/relation_configs/policies.py b/dbt/adapters/snowflake/relation_configs/policies.py index 2ad42dc0b..533c8dffb 100644 --- a/dbt/adapters/snowflake/relation_configs/policies.py +++ b/dbt/adapters/snowflake/relation_configs/policies.py @@ -11,6 +11,7 @@ class SnowflakeRelationType(StrEnum): External = "external" DynamicTable = "dynamic_table" ExternalTable = "external_table" + SnowPipe = "snowpipe" class SnowflakeIncludePolicy(Policy): diff --git a/dbt/include/snowflake/macros/relations/external_table/external_table.sql b/dbt/include/snowflake/macros/relations/external_table/external_table.sql index a0d3a7715..3a59474d0 100644 --- a/dbt/include/snowflake/macros/relations/external_table/external_table.sql +++ b/dbt/include/snowflake/macros/relations/external_table/external_table.sql @@ -4,7 +4,7 @@ {% if snowpipe %} - {{ snowflake_get_build_snowpipe_sql(relation) }} + {{ snowflake_get_build_snowpipe_sql(relation, columns) }} {% else %} diff --git a/dbt/include/snowflake/macros/relations/external_table/helpers.sql b/dbt/include/snowflake/macros/relations/external_table/helpers.sql index 3cfaae3df..5a4e1bd52 100644 --- a/dbt/include/snowflake/macros/relations/external_table/helpers.sql +++ b/dbt/include/snowflake/macros/relations/external_table/helpers.sql @@ -10,11 +10,17 @@ {% set is_delta = table_format | lower == "delta" %} {% endif %} - {% if manual_refresh %} + {# snowpipe as well #} + {% set snowpipe = config.get('snowpipe', none) %} + {% set auto_ingest = snowpipe.get('auto_ingest', false) if snowpipe is mapping %} + + {% set relation_type = 'pipe' if snowpipe is not none else 'external table' %} + + {% if manual_refresh or auto_ingest %} {% set ddl %} begin; - alter external table {{ relation }} refresh; + alter {{ relation_type }} {{ relation }} refresh; commit; {% endset %} diff --git a/dbt/include/snowflake/macros/relations/snowpipe/helpers.sql b/dbt/include/snowflake/macros/relations/snowpipe/helpers.sql index 04fa9de73..0a044184b 100644 --- a/dbt/include/snowflake/macros/relations/snowpipe/helpers.sql +++ b/dbt/include/snowflake/macros/relations/snowpipe/helpers.sql @@ -1,6 +1,4 @@ -{% macro snowflake_create_empty_table(relation) %} - - {%- set columns = relation.columns.values() %} +{% macro snowflake_create_empty_table(relation, columns) %} create or replace table {{ relation }} ( {% if columns|length == 0 %} @@ -18,11 +16,9 @@ {% endmacro %} -{% macro snowflake_get_copy_sql(relation, explicit_transaction=false) %} +{% macro snowflake_get_copy_sql(relation, columns, explicit_transaction=false) %} {# This assumes you have already created an external stage #} - {%- set columns = relation.columns.values() -%} - {% set location = config.get('location') %} {% set file_format = config.get('file_format') %} @@ -64,7 +60,7 @@ {% endmacro %} -{% macro snowflake_create_snowpipe(relation) %} +{% macro snowflake_create_snowpipe(relation, columns) %} {% set snowpipe = config.get('snowpipe', none) %} @@ -74,7 +70,7 @@ {% if snowpipe.aws_sns_topic -%} aws_sns_topic = '{{snowpipe.aws_sns_topic}}' {%- endif %} {% if snowpipe.integration -%} integration = '{{snowpipe.integration}}' {%- endif %} {% if snowpipe.error_integration -%} error_integration = '{{snowpipe.error_integration}}' {%- endif %} - as {{ snowflake_get_copy_sql(relation) }} + as {{ snowflake_get_copy_sql(relation, columns) }} {% endmacro %} diff --git a/dbt/include/snowflake/macros/relations/snowpipe/snowpipe.sql b/dbt/include/snowflake/macros/relations/snowpipe/snowpipe.sql index 9aff87445..fa8726259 100644 --- a/dbt/include/snowflake/macros/relations/snowpipe/snowpipe.sql +++ b/dbt/include/snowflake/macros/relations/snowpipe/snowpipe.sql @@ -1,9 +1,9 @@ -{% macro snowflake_get_build_snowpipe_sql(relation) %} +{% macro snowflake_get_build_snowpipe_sql(relation, columns) %} - {{ snowflake_create_empty_table(relation) }} + {{ snowflake_create_empty_table(relation, columns) }} - {{ snowflake_get_copy_sql(relation, explicit_transaction) }} + {{ snowflake_get_copy_sql(relation, columns, explicit_transaction) }} - {{ snowflake_create_snowpipe(relation) }} + {{ snowflake_create_snowpipe(relation, columns) }} {% endmacro %} From a0cd90fda352c62ab326ded552dd024caa829085 Mon Sep 17 00:00:00 2001 From: Anders Swanson Date: Fri, 24 May 2024 16:09:51 -0400 Subject: [PATCH 10/10] catalog returns external tables and snowpipes --- dbt/adapters/snowflake/impl.py | 17 +++++++++ dbt/include/snowflake/macros/adapters.sql | 44 +++++++++++++++++++++-- 2 files changed, 58 insertions(+), 3 deletions(-) diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index e3bc3ae0b..6de132c08 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -300,3 +300,20 @@ def valid_incremental_strategies(self): def debug_query(self): """Override for DebugTask method""" self.execute("select 1 as id") + + @available.parse_none + def stack_tables(self, tables_list: List[agate.Table]) -> agate.Table: + """ + Given a list of agate_tables with the same column names & types + return a single unioned agate table. + """ + non_empty_tables = [table for table in tables_list if len(table.rows) > 0] + + if len(non_empty_tables) == 0: + return tables_list[0] + else: + return ( + agate.TableSet(non_empty_tables, keys=range(len(non_empty_tables))) + .merge() + .exclude(["group"]) + ) diff --git a/dbt/include/snowflake/macros/adapters.sql b/dbt/include/snowflake/macros/adapters.sql index 0bf7b7d1b..44249a4b6 100644 --- a/dbt/include/snowflake/macros/adapters.sql +++ b/dbt/include/snowflake/macros/adapters.sql @@ -124,11 +124,47 @@ {%- set max_total_results = max_results_per_iter * max_iter -%} {%- set sql -%} - show objects in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }} + show objects in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }}; + + SELECT + "database_name", + "schema_name", + "name", + "kind", + "is_dynamic" + FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())); {%- endset -%} {%- set result = run_query(sql) -%} + {%- set sql_extab -%} + show external tables in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }}; + + SELECT + "database_name", + "schema_name", + "name", + 'external_table' as "kind", + 'N' as "is_dynamic" + FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())); + {%- endset -%} + + {%- set result_extab = run_query(sql_extab) -%} + + {%- set sql_pipes -%} + show pipes in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }}; + + SELECT + "database_name", + "schema_name", + "name", + 'snowpipe' as "kind", + 'N' as "is_dynamic" + FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())); + {%- endset -%} + + {%- set result_pipes = run_query(sql_pipes) -%} + {%- set n = (result | length) -%} {%- set watermark = namespace(table_name=result.columns[1].values()[-1]) -%} {%- set paginated = namespace(result=[]) -%} @@ -147,8 +183,10 @@ {% endif %} {%- set all_results_array = [result] + paginated.result -%} - {%- set result = result.merge(all_results_array) -%} - {%- do return(result) -%} + + {%- set result_stacked = adapter.stack_tables([result, result_pipes, result_extab]) -%} + + {%- do return(result_stacked) -%} {% endmacro %}