Skip to content

reifyhealth/dbt-athena

Β 
Β 

Repository files navigation

Features

  • Supports dbt version 1.6.*
  • Support for Python
  • Supports seeds
  • Correctly detects views and their columns
  • Supports table materialization
    • Iceberg tables are supported only with Athena Engine v3 and a unique table location (see table location section below)
    • Hive tables are supported by both Athena engines.
  • Supports incremental models
    • On Iceberg tables :
      • Supports the use of unique_key only with the merge strategy
      • Supports the append strategy
    • On Hive tables :
      • Supports two incremental update strategies: insert_overwrite and append
      • Does not support the use of unique_key
  • Supports snapshots
  • Does not support Python models

Quick Start

Installation

  • pip install dbt-athena-community
  • Or pip install git+https://github.com/dbt-athena/dbt-athena.git

Prerequisites

To start, you will need an S3 bucket, for instance my-bucket and an Athena database:

CREATE DATABASE IF NOT EXISTS analytics_dev
COMMENT 'Analytics models generated by dbt (development)'
LOCATION 's3://my-bucket/'
WITH DBPROPERTIES ('creator'='Foo Bar', 'email'='foo@bar.com');

Notes:

  • Take note of your AWS region code (e.g. us-west-2 or eu-west-2, etc.).
  • You can also use AWS Glue to create and manage Athena databases.

Credentials

Credentials can be passed directly to the adapter, or they can be determined automatically based on aws cli/boto3 conventions. You can either:

  • configure aws_access_key_id and aws_secret_access_key
  • configure aws_profile_name to match a profile defined in your AWS credentials file Checkout dbt profile configuration below for details.

Configuring your profile

A dbt profile can be configured to run against AWS Athena using the following configuration:

Option Description Required? Example
s3_staging_dir S3 location to store Athena query results and metadata Required s3://bucket/dbt/
s3_data_dir Prefix for storing tables, if different from the connection's s3_staging_dir Optional s3://bucket2/dbt/
s3_data_naming How to generate table paths in s3_data_dir Optional schema_table_unique
s3_tmp_table_dir Prefix for storing temporary tables, if different from the connection's s3_data_dir Optional s3://bucket3/dbt/
region_name AWS region of your Athena instance Required eu-west-1
schema Specify the schema (Athena database) to build models into (lowercase only) Required dbt
database Specify the database (Data catalog) to build models into (lowercase only) Required awsdatacatalog
poll_interval Interval in seconds to use for polling the status of query results in Athena Optional 5
debug_query_state Flag if debug message with Athena query state is needed Optional false
aws_access_key_id Access key ID of the user performing requests. Optional AKIAIOSFODNN7EXAMPLE
aws_secret_access_key Secret access key of the user performing requests Optional wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
aws_profile_name Profile to use from your AWS shared credentials file. Optional my-profile
work_group Identifier of Athena workgroup Optional my-custom-workgroup
num_retries Number of times to retry a failing query Optional 3
seed_s3_upload_args Dictionary containing boto3 ExtraArgs when uploading to S3 Optional {"ACL": "bucket-owner-full-control"}
lf_tags_database Default LF tags for new database if it's created by dbt Optional tag_key: tag_value

Example profiles.yml entry:

athena:
  target: dev
  outputs:
    dev:
      type: athena
      s3_staging_dir: s3://athena-query-results/dbt/
      s3_data_dir: s3://your_s3_bucket/dbt/
      s3_data_naming: schema_table
      s3_tmp_table_dir: s3://your_s3_bucket/temp/
      region_name: eu-west-1
      schema: dbt
      database: awsdatacatalog
      aws_profile_name: my-profile
      work_group: my-workgroup
      seed_s3_upload_args:
        ACL: bucket-owner-full-control

Additional information

  • threads is supported
  • database and catalog can be used interchangeably

Models

Table Configuration

  • external_location (default=none)

    • If set, the full S3 path in which the table will be saved.
    • Does not work with Iceberg table or Hive table with ha set to true.
  • partitioned_by (default=none)

    • An array list of columns by which the table will be partitioned
    • Limited to creation of 100 partitions (currently)
  • bucketed_by (default=none)

    • An array list of columns to bucket data, ignored if using Iceberg
  • bucket_count (default=none)

    • The number of buckets for bucketing your data, ignored if using Iceberg
  • table_type (default='hive')

    • The type of table
    • Supports hive or iceberg
  • ha (default=false)

    • If the table should be built using the high-availability method. This option is only available for Hive tables since it is by default for Iceberg tables (see the section below)
  • format (default='parquet')

    • The data format for the table
    • Supports ORC, PARQUET, AVRO, JSON, TEXTFILE
  • write_compression (default=none)

    • The compression type to use for any storage format that allows compression to be specified. To see which options are available, check out CREATE TABLE AS
  • field_delimiter (default=none)

    • Custom field delimiter, for when format is set to TEXTFILE
  • table_properties: table properties to add to the table, valid for Iceberg only

  • native_drop: Relation drop operations will be performed with SQL, not direct Glue API calls. No S3 calls will be made to manage data in S3. Data in S3 will only be cleared up for Iceberg tables see AWS docs. Note that Iceberg DROP TABLE operations may timeout if they take longer than 60 seconds.

  • seed_by_insert (default=false)

    • default behaviour uploads seed data to S3. This flag will create seeds using an SQL insert statement
    • large seed files cannot use seed_by_insert, as the SQL insert statement would exceed the Athena limit of 262144 bytes
  • lf_tags_config (default=none)

    • AWS lakeformation tags to associate with the table and columns
    • format for model config:
{{
  config(
    materialized='incremental',
    incremental_strategy='append',
    on_schema_change='append_new_columns',
    table_type='iceberg',
    schema='test_schema',
    lf_tags_config={
          'enabled': true,
          'tags': {
            'tag1': 'value1',
            'tag2': 'value2'
          },
          'tags_columns': {
            'tag1': {
              'value1': ['column1', 'column2'],
              'value2': ['column3', 'column4']
            }
          }
    }
  )
}}
  • format for dbt_project.yml:
  +lf_tags_config:
    enabled: true
    tags:
      tag1: value1
      tag2: value2
    tags_columns:
      tag1:
        value1: [ column1, column2 ]
  • lf_grants (default=none)

    • lakeformation grants config for data_cell filters
    • format:
    lf_grants={
            'data_cell_filters': {
                'enabled': True | False,
                'filters': {
                    'filter_name': {
                        'row_filter': '<filter_condition>',
                        'principals': ['principal_arn1', 'principal_arn2']
                    }
                }
            }
        }

Notes:

  • lf_tags and lf_tags_columns configs support only attaching lf tags to corresponding resources. > We recommend managing LF Tags permissions somewhere outside dbt. For example, you may use > terraform or > aws cdk for such purpose.
  • data_cell_filters management can't be automated outside dbt because the filter can't be attached to the table > which doesn't exist. Once you enable this config, dbt will set all filters and their permissions during every > dbt run. Such approach keeps the actual state of row level security configuration actual after every dbt run and > apply changes if they occur: drop, create, update filters and their permissions.

Table location

The location in which a table is saved is determined by:

  1. If external_location is defined, that value is used.
  2. If s3_data_dir is defined, the path is determined by that and s3_data_naming
  3. If s3_data_dir is not defined, data is stored under s3_staging_dir/tables/

Here all the options available for s3_data_naming:

  • unique: {s3_data_dir}/{uuid4()}/
  • table: {s3_data_dir}/{table}/
  • table_unique: {s3_data_dir}/{table}/{uuid4()}/
  • schema_table: {s3_data_dir}/{schema}/{table}/
  • s3_data_naming=schema_table_unique: {s3_data_dir}/{schema}/{table}/{uuid4()}/

It's possible to set the s3_data_naming globally in the target profile, or overwrite the value in the table config, or setting up the value for groups of model in dbt_project.yml.

Note: when using a workgroup with a default output location configured, s3_data_naming and any configured buckets are ignored and the location configured in the workgroup is used.

Incremental models

Support for incremental models.

These strategies are supported:

  • insert_overwrite (default): The insert overwrite strategy deletes the overlapping partitions from the destination table, and then inserts the new records from the source. This strategy depends on the partitioned_by keyword! If no partitions are defined, dbt will fall back to the append strategy.
  • append: Insert new records without updating, deleting or overwriting any existing data. There might be duplicate data (e.g. great for log or historical data).
  • merge: Conditionally updates, deletes, or inserts rows into an Iceberg table. Used in combination with unique_key. Only available when using Iceberg.

On schema change

on_schema_change is an option to reflect changes of schema in incremental models. The following options are supported:

  • ignore (default)
  • fail
  • append_new_columns
  • sync_all_columns

For details, please refer to dbt docs.

Iceberg

The adapter supports table materialization for Iceberg.

To get started just add this as your model:

{{ config(
    materialized='table',
    table_type='iceberg',
    format='parquet',
    partitioned_by=['bucket(user_id, 5)'],
    table_properties={
     'optimize_rewrite_delete_file_threshold': '2'
     }
) }}

select 'A'          as user_id,
       'pi'         as name,
       'active'     as status,
       17.89        as cost,
       1            as quantity,
       100000000    as quantity_big,
       current_date as my_date

Iceberg supports bucketing as hidden partitions, therefore use the partitioned_by config to add specific bucketing conditions.

Iceberg supports several table formats for data : PARQUET, AVRO and ORC.

It is possible to use Iceberg in an incremental fashion, specifically two strategies are supported:

  • append: New records are appended to the table, this can lead to duplicates.
  • merge: Performs an upsert (and optional delete), where new records are added and existing records are updated. Only available with Athena engine version 3.
    • unique_key (required): columns that define a unique record in the source and target tables.
    • incremental_predicates (optional): SQL conditions that enable custom join clauses in the merge statement. This can be useful for improving performance via predicate pushdown on the target table.
    • delete_condition (optional): SQL condition used to identify records that should be deleted.
    • update_condition (optional): SQL condition used to identify records that should be updated.
      • incremental_predicates, delete_condition and update_condition can include any column of the incremental table (src) or the final table (target). Column names must be prefixed by either src or target to prevent a Column is ambiguous error.

delete_condition example:

{{ config(
    materialized='incremental',
    table_type='iceberg',
    incremental_strategy='merge',
    unique_key='user_id',
    incremental_predicates=["src.quantity > 1", "target.my_date >= now() - interval '4' year"],
    delete_condition="src.status != 'active' and target.my_date < now() - interval '2' year",
    format='parquet'
) }}

select 'A' as user_id,
       'pi' as name,
       'active' as status,
       17.89 as cost,
       1 as quantity,
       100000000 as quantity_big,
       current_date as my_date

update_condition example:

{{ config(
        materialized='incremental',
        incremental_strategy='merge',
        unique_key=['id'],
        update_condition='target.id > 1',
        schema='sandbox'
    )
}}

{% if is_incremental() %}

select * from (
    values
    (1, 'v1-updated')
    , (2, 'v2-updated')
) as t (id, value)

{% else %}

select * from (
    values
    (-1, 'v-1')
    , (0, 'v0')
    , (1, 'v1')
    , (2, 'v2')
) as t (id, value)

{% endif %}

Highly available table (HA)

The current implementation of the table materialization can lead to downtime, as target table is dropped and re-created. To have the less destructive behavior it's possible to use the ha config on your table materialized models. It leverages the table versions feature of glue catalog, creating a tmp table and swapping the target table to the location of the tmp table. This materialization is only available for table_type=hive and requires using unique locations. For iceberg, high availability is by default.

{{ config(
    materialized='table',
    ha=true,
    format='parquet',
    table_type='hive',
    partitioned_by=['status'],
    s3_data_naming='table_unique'
) }}

select 'a'      as user_id,
       'pi'     as user_name,
       'active' as status
union all
select 'b'        as user_id,
       'sh'       as user_name,
       'disabled' as status

By default, the materialization keeps the last 4 table versions, you can change it by setting versions_to_keep.

HA Known issues

  • When swapping from a table with partitions to a table without (and the other way around), there could be a little downtime. In case high performances are needed consider bucketing instead of partitions
  • By default, Glue "duplicates" the versions internally, so the last two versions of a table point to the same location
  • It's recommended to have versions_to_keep >= 4, as this will avoid having the older location removed
  • The macro athena__end_of_time needs to be overwritten by the user if using Athena engine v3 since it requires a precision parameter for timestamps

Snapshots

The adapter supports snapshot materialization. It supports both timestamp and check strategy. To create a snapshot create a snapshot file in the snapshots directory. If the directory does not exist create one.

Timestamp strategy

To use the timestamp strategy refer to the dbt docs

Check strategy

To use the check strategy refer to the dbt docs

Hard-deletes

The materialization also supports invalidating hard deletes. Check the docs to understand usage.

AWS Lakeformation integration

The adapter implements AWS Lakeformation tags management in the following way:

  • you can enable or disable lf-tags management via config (disabled by default)
  • once you enable the feature, lf-tags will be updated on every dbt run
  • first, all lf-tags for columns are removed to avoid inheritance issues
  • then all redundant lf-tags are removed from table and actual tags from config are applied
  • finally, lf-tags for columns are applied

It's important to understand the following points:

  • dbt does not manage lf-tags for database
  • dbt does not manage lakeformation permissions

That's why you should handle this by yourself manually or using some automation tools like terraform, AWS CDK etc.
You may find the following links useful to manage that:

Working example

seed file - employent_indicators_november_2022_csv_tables.csv

Series_reference,Period,Data_value,Suppressed
MEIM.S1WA,1999.04,80267,
MEIM.S1WA,1999.05,70803,
MEIM.S1WA,1999.06,65792,
MEIM.S1WA,1999.07,66194,
MEIM.S1WA,1999.08,67259,
MEIM.S1WA,1999.09,69691,
MEIM.S1WA,1999.1,72475,
MEIM.S1WA,1999.11,79263,
MEIM.S1WA,1999.12,86540,
MEIM.S1WA,2000.01,82552,
MEIM.S1WA,2000.02,81709,
MEIM.S1WA,2000.03,84126,
MEIM.S1WA,2000.04,77089,
MEIM.S1WA,2000.05,73811,
MEIM.S1WA,2000.06,70070,
MEIM.S1WA,2000.07,69873,
MEIM.S1WA,2000.08,71468,
MEIM.S1WA,2000.09,72462,
MEIM.S1WA,2000.1,74897,

model.sql

{{ config(
    materialized='table'
) }}

select row_number() over() as id
       , *
       , cast(from_unixtime(to_unixtime(now())) as timestamp(6)) as refresh_timestamp
from {{ ref('employment_indicators_november_2022_csv_tables') }}

timestamp strategy - model_snapshot_1

{% snapshot model_snapshot_1 %}

{{
    config(
      strategy='timestamp',
      updated_at='refresh_timestamp',
      unique_key='id'
    )
}}

select *
from {{ ref('model') }} {% endsnapshot %}

invalidate hard deletes - model_snapshot_2

{% snapshot model_snapshot_2 %}

{{
    config
    (
        unique_key='id',
        strategy='timestamp',
        updated_at='refresh_timestamp',
        invalidate_hard_deletes=True,
    )
}}
select *
from {{ ref('model') }} {% endsnapshot %}

check strategy - model_snapshot_3

{% snapshot model_snapshot_3 %}

{{
    config
    (
        unique_key='id',
        strategy='check',
        check_cols=['series_reference','data_value']
    )
}}
select *
from {{ ref('model') }} {% endsnapshot %}

Snapshots Known issues

  • Incremental Iceberg models - Sync all columns on schema change can't remove columns used as partitioning. The only way, from a dbt perspective, is to do a full-refresh of the incremental model.

  • Tables, schemas and database should only be lowercase

  • In order to avoid potential conflicts, make sure dbt-athena-adapter is not installed in the target environment. See dbt-labs#103 for more details.

  • Snapshot does not support dropping columns from the source table. If you drop a column make sure to drop the column from the snapshot as well. Another workaround is to NULL the column in the snapshot definition to preserve history

Contracts

The adapter partly supports contract definition.

  • Concerning the data_type, it is supported but needs to be adjusted for complex types. They must be specified entirely (for instance array<int>) even though they won't be checked. Indeed, as dbt recommends, we only compare the broader type (array, map, int, varchar). The complete definition is used in order to check that the data types defined in athena are ok (pre-flight check).
  • the adapter does not support the constraints since no constraints don't exist in Athena.

Contributing

See CONTRIBUTING for more information on how to contribute to this project.

Contributors ✨

Thanks goes to these wonderful people (emoji key):

nicor88
nicor88

πŸ’» 🚧 πŸ›
Jesse Dobbelaere
Jesse Dobbelaere

πŸ› 🚧
Lemiffe
Lemiffe

🎨
JΓ©rΓ©my Guiselin
JΓ©rΓ©my Guiselin

🚧 πŸ’» πŸ›
Tom
Tom

🚧 πŸ’»
Mattia
Mattia

🚧
Gatsby Lee
Gatsby Lee

πŸ›
BrechtDeVlieger
BrechtDeVlieger

πŸ›
Andrea Artaria
Andrea Artaria

πŸ›
Maiara Reinaldo
Maiara Reinaldo

πŸ›
Henri Blancke
Henri Blancke

πŸ’» πŸ›
Serhii Dimchenko
Serhii Dimchenko

πŸ’» πŸ›
chrischin478
chrischin478

πŸ’» πŸ›

This project follows the all-contributors specification. Contributions of any kind welcome!

Releases

No releases published

Packages

No packages published

Languages

  • Python 99.5%
  • Makefile 0.5%