Skip to content

Commit

Permalink
Reorganize macros following dbt-labs/dbt-core#4154
Browse files Browse the repository at this point in the history
  • Loading branch information
courentin committed Dec 21, 2021
1 parent 4ebaa0f commit cceb32a
Show file tree
Hide file tree
Showing 15 changed files with 184 additions and 194 deletions.
125 changes: 0 additions & 125 deletions dbt/include/athena/macros/adapters.sql

This file was deleted.

22 changes: 22 additions & 0 deletions dbt/include/athena/macros/adapters/columns.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{% macro athena__get_columns_in_relation(relation) -%}
{% call statement('get_columns_in_relation', fetch_result=True) %}

select
column_name,
data_type,
null as character_maximum_length,
null as numeric_precision,
null as numeric_scale

from {{ relation.information_schema('columns') }}
where LOWER(table_name) = LOWER('{{ relation.identifier }}')
{% if relation.schema %}
and LOWER(table_schema) = LOWER('{{ relation.schema }}')
{% endif %}
order by ordinal_position

{% endcall %}

{% set table = load_result('get_columns_in_relation').table %}
{% do return(sql_convert_columns_in_relation(table)) %}
{% endmacro %}
5 changes: 5 additions & 0 deletions dbt/include/athena/macros/adapters/freshness.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{% macro athena__current_timestamp() -%}
-- pyathena converts time zoned timestamps to strings so lets avoid them
-- now()
cast(now() as timestamp)
{%- endmacro %}
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

{% macro athena__get_catalog(information_schema, schemas) -%}
{%- set query -%}
select * from (
Expand Down Expand Up @@ -77,3 +76,45 @@
{{ return(run_query(query)) }}

{%- endmacro %}


{% macro athena__list_schemas(database) -%}
{% call statement('list_schemas', fetch_result=True) %}
select
distinct schema_name

from {{ information_schema_name(database) }}.schemata
{% endcall %}
{{ return(load_result('list_schemas').table) }}
{% endmacro %}


{% macro athena__list_relations_without_caching(schema_relation) %}
{% call statement('list_relations_without_caching', fetch_result=True) -%}
WITH views AS (
select
table_catalog as database,
table_name as name,
table_schema as schema
from {{ schema_relation.information_schema() }}.views
where LOWER(table_schema) = LOWER('{{ schema_relation.schema }}')
), tables AS (
select
table_catalog as database,
table_name as name,
table_schema as schema

from {{ schema_relation.information_schema() }}.tables
where LOWER(table_schema) = LOWER('{{ schema_relation.schema }}')

-- Views appear in both `tables` and `views`, so excluding them from tables
EXCEPT

select * from views
)
select views.*, 'view' AS table_type FROM views
UNION ALL
select tables.*, 'table' AS table_type FROM tables
{% endcall %}
{% do return(load_result('list_relations_without_caching').table) %}
{% endmacro %}
8 changes: 8 additions & 0 deletions dbt/include/athena/macros/adapters/relation.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{% macro athena__drop_relation(relation) -%}
{% if config.get('incremental_strategy') == 'insert_overwrite' %}
{%- do adapter.clean_up_table(relation.schema, relation.table) -%}
{% endif %}
{% call statement('drop_relation', auto_begin=False) -%}
drop {{ relation.type }} if exists {{ relation }}
{%- endcall %}
{% endmacro %}
7 changes: 7 additions & 0 deletions dbt/include/athena/macros/materializations/models/helpers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{% macro set_table_classification(relation, default_value) -%}
{%- set format = config.get('format', default=default_value) -%}

{% call statement('set_table_classification', auto_begin=False) -%}
alter table {{ relation }} set tblproperties ('classification' = '{{ format }}')
{%- endcall %}
{%- endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
{% macro validate_get_incremental_strategy(raw_strategy) %}
{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
Expected one of: 'append', 'insert_overwrite'
{%- endset %}

{% if raw_strategy not in ['append', 'insert_overwrite'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{% endif %}

{% do return(raw_strategy) %}
{% endmacro %}

{% macro incremental_insert(tmp_relation, target_relation, statement_name="main") %}
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}

insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ tmp_relation }}
);
{%- endmacro %}

{% macro delete_overlapping_partitions(target_relation, tmp_relation, partitioned_by) %}
{%- set partitioned_keys = partitioned_by | tojson | replace('\"', '') | replace('[', '') | replace(']', '') -%}
{% call statement('get_partitions', fetch_result=True) %}
select distinct {{partitioned_keys}} from {{ tmp_relation }};
{% endcall %}
{%- set table = load_result('get_partitions').table -%}
{%- set rows = table.rows -%}
{%- set partitions = [] -%}
{%- for row in rows -%}
{%- set single_partition = [] -%}
{%- for col in row -%}
{%- set column_type = adapter.convert_type(table, loop.index0) -%}
{%- if column_type == 'integer' -%}
{%- set value = col|string -%}
{%- elif column_type == 'string' -%}
{%- set value = "'" + col + "'" -%}
{%- elif column_type == 'date' -%}
{%- set value = "'" + col|string + "'" -%}
{%- else -%}
{%- do exceptions.raise_compiler_error('Need to add support for column type ' + column_type) -%}
{%- endif -%}
{%- do single_partition.append(partitioned_by[loop.index0] + '=' + value) -%}
{%- endfor -%}
{%- set single_partition_expression = single_partition | join(' and ') -%}
{%- do partitions.append('(' + single_partition_expression + ')') -%}
{%- endfor -%}
{%- for i in range(partitions | length) %}
{%- do adapter.clean_up_partitions(target_relation.schema, target_relation.table, partitions[i]) -%}
{%- endfor -%}
{%- endmacro %}
Original file line number Diff line number Diff line change
@@ -1,58 +1,3 @@
{% macro validate_get_incremental_strategy(raw_strategy) %}
{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
Expected one of: 'append', 'insert_overwrite'
{%- endset %}

{% if raw_strategy not in ['append', 'insert_overwrite'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{% endif %}

{% do return(raw_strategy) %}
{% endmacro %}

{% macro incremental_insert(tmp_relation, target_relation, statement_name="main") %}
{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}

insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ tmp_relation }}
);
{%- endmacro %}

{% macro delete_overlapping_partitions(target_relation, tmp_relation, partitioned_by) %}
{%- set partitioned_keys = partitioned_by | tojson | replace('\"', '') | replace('[', '') | replace(']', '') -%}
{% call statement('get_partitions', fetch_result=True) %}
select distinct {{partitioned_keys}} from {{ tmp_relation }};
{% endcall %}
{%- set table = load_result('get_partitions').table -%}
{%- set rows = table.rows -%}
{%- set partitions = [] -%}
{%- for row in rows -%}
{%- set single_partition = [] -%}
{%- for col in row -%}
{%- set column_type = adapter.convert_type(table, loop.index0) -%}
{%- if column_type == 'integer' -%}
{%- set value = col|string -%}
{%- elif column_type == 'string' -%}
{%- set value = "'" + col + "'" -%}
{%- elif column_type == 'date' -%}
{%- set value = "'" + col|string + "'" -%}
{%- else -%}
{%- do exceptions.raise_compiler_error('Need to add support for column type ' + column_type) -%}
{%- endif -%}
{%- do single_partition.append(partitioned_by[loop.index0] + '=' + value) -%}
{%- endfor -%}
{%- set single_partition_expression = single_partition | join(' and ') -%}
{%- do partitions.append('(' + single_partition_expression + ')') -%}
{%- endfor -%}
{%- for i in range(partitions | length) %}
{%- do adapter.clean_up_partitions(target_relation.schema, target_relation.table, partitions[i]) -%}
{%- endfor -%}
{%- endmacro %}

{% materialization incremental, adapter='athena' -%}

{% set unique_key = config.get('unique_key') %}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{% macro athena__create_table_as(temporary, relation, sql) -%}
{%- set external_location = config.get('external_location', default=none) -%}
{%- set partitioned_by = config.get('partitioned_by', default=none) -%}
{%- set bucketed_by = config.get('bucketed_by', default=none) -%}
{%- set bucket_count = config.get('bucket_count', default=none) -%}
{%- set field_delimiter = config.get('field_delimiter', default=none) -%}
{%- set format = config.get('format', default='parquet') -%}

create table
{{ relation }}

with (
{%- if external_location is not none and not temporary %}
external_location='{{ external_location }}',
{%- endif %}
{%- if partitioned_by is not none %}
partitioned_by=ARRAY{{ partitioned_by | tojson | replace('\"', '\'') }},
{%- endif %}
{%- if bucketed_by is not none %}
bucketed_by=ARRAY{{ bucketed_by | tojson | replace('\"', '\'') }},
{%- endif %}
{%- if bucket_count is not none %}
bucket_count={{ bucket_count }},
{%- endif %}
{%- if field_delimiter is not none %}
field_delimiter='{{ field_delimiter }}',
{%- endif %}
format='{{ format }}'
)
as
{{ sql }}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

{% materialization table, adapter='athena' -%}
{%- set identifier = model['alias'] -%}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,3 @@
{{ return({'relations': [target_relation]}) }}

{% endmacro %}


{% materialization view, adapter='athena' -%}
{% set to_return = create_or_replace_view(run_outside_transaction_hooks=False) %}

{% set target_relation = this.incorporate(type='view') %}
{% do persist_docs(target_relation, model) %}

{% do return(to_return) %}
{%- endmaterialization %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{% macro athena__create_view_as(relation, sql) -%}
create or replace view
{{ relation }}
as
{{ sql }}
{% endmacro %}
Loading

0 comments on commit cceb32a

Please sign in to comment.