Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

187: Adding apache hudi support to dbt #210

Merged
merged 24 commits into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
3b781cb
initial working version
Aug 24, 2021
c3d11fe
Rebased and resolve all the merge conflicts.
Aug 25, 2021
022edba
Rebased and resolved merge conflicts.
Aug 25, 2021
cd22177
Removed hudi dep jar and used the released version via packages option
Aug 27, 2021
59b1370
Added insert overwrite unit tests for hudi
Aug 30, 2021
b0e45fd
Used unique_key as default value for hudi primaryKey option
Aug 30, 2021
10a50ca
Updated changelog.md with this new update.
Aug 30, 2021
705a777
Final round of testing and few minor fixes
Aug 30, 2021
9616bb0
Fixed lint issues
Aug 30, 2021
283c7d1
Fixed the integration tests
Aug 30, 2021
8f49b09
Fixed the circle ci env to add hudi packages
Aug 31, 2021
a4f0699
Updated hudi spark bundle to use scala 2.11
Aug 31, 2021
f521ca9
Fixed Hudi incremental strategy integration tests and other integrati…
Aug 31, 2021
7ba9b1b
Fixed the hudi hive sync hms integration test issues
Aug 31, 2021
46be053
Added sql HMS config to fix the integration tests.
Aug 31, 2021
d9e15a0
Added hudi hive sync mode conf to CI
Aug 31, 2021
ca588b2
Set the hms schema verification to false
Sep 1, 2021
2d5ba2e
Removed the merge update columns hence its not supported.
Sep 2, 2021
4b43b46
Passed the correct hiveconf to the circle ci build script
vingov Oct 20, 2021
aab2160
Disabled few incremental tests for spark2 and reverted to spark2 config
vingov Oct 25, 2021
ae3bfe3
Added hudi configs to the circle ci build script
vingov Oct 26, 2021
0723de9
Commented out the Hudi integration test until we have the hudi 0.10.0…
Nov 17, 2021
202e88a
Fixed the macro which checks the table type.
Nov 18, 2021
22a2025
Disabled this model since hudi is not supported in databricks runtime…
Nov 18, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ jobs:
--conf spark.hadoop.javax.jdo.option.ConnectionUserName=dbt
--conf spark.hadoop.javax.jdo.option.ConnectionPassword=dbt
--conf spark.hadoop.javax.jdo.option.ConnectionDriverName=org.postgresql.Driver
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.jars.packages=org.apache.hudi:hudi-spark-bundle_2.11:0.9.0
--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
--conf spark.driver.userClassPathFirst=true
--conf spark.hadoop.datanucleus.autoCreateTables=true
--conf spark.hadoop.datanucleus.schema.autoCreateTables=true
--conf spark.hadoop.datanucleus.fixedDatastore=false
--conf spark.sql.hive.convertMetastoreParquet=false
--hiveconf hoodie.datasource.hive_sync.use_jdbc=false
--hiveconf hoodie.datasource.hive_sync.mode=hms
--hiveconf datanucleus.schema.autoCreateAll=true
--hiveconf hive.metastore.schema.verification=false

- image: postgres:9.6.17-alpine
environment:
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
### Fixes
- Enhanced get_columns_in_relation method to handle a bug in open source deltalake which doesnt return schema details in `show table extended in databasename like '*'` query output. This impacts dbt snapshots if file format is open source deltalake ([#207](https://github.com/dbt-labs/dbt-spark/pull/207))
- Parse properly columns when there are struct fields to avoid considering inner fields: Issue ([#202](https://github.com/dbt-labs/dbt-spark/issues/202))
- Add support for Apache Hudi (hudi file format) which supports incremental merge strategies: Issue ([#187](https://github.com/dbt-labs/dbt-spark/issues/187))

### Under the hood
- Add `unique_field` to better understand adapter adoption in anonymous usage tracking ([#211](https://github.com/dbt-labs/dbt-spark/pull/211))

### Contributors
- [@harryharanb](https://github.com/harryharanb) ([#207](https://github.com/dbt-labs/dbt-spark/pull/207))
- [@SCouto](https://github.com/Scouto) ([#204](https://github.com/dbt-labs/dbt-spark/pull/204))
- [@vingov](https://github.com/vingov) ([#210](https://github.com/dbt-labs/dbt-spark/pull/210))

## dbt-spark 0.21.0b2 (August 20, 2021)

Expand Down
13 changes: 13 additions & 0 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ class SparkAdapter(SQLAdapter):
INFORMATION_OWNER_REGEX = re.compile(r"^Owner: (.*)$", re.MULTILINE)
INFORMATION_STATISTICS_REGEX = re.compile(
r"^Statistics: (.*)$", re.MULTILINE)
HUDI_METADATA_COLUMNS = [
'_hoodie_commit_time',
'_hoodie_commit_seqno',
'_hoodie_record_key',
'_hoodie_partition_path',
'_hoodie_file_name'
]

Relation = SparkRelation
Column = SparkColumn
Expand Down Expand Up @@ -143,12 +150,14 @@ def list_relations_without_caching(
rel_type = RelationType.View \
if 'Type: VIEW' in information else RelationType.Table
is_delta = 'Provider: delta' in information
is_hudi = 'Provider: hudi' in information
relation = self.Relation.create(
schema=_schema,
identifier=name,
type=rel_type,
information=information,
is_delta=is_delta,
is_hudi=is_hudi,
)
relations.append(relation)

Expand Down Expand Up @@ -222,6 +231,10 @@ def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]:
# which would execute 'describe extended tablename' query
rows: List[agate.Row] = super().get_columns_in_relation(relation)
columns = self.parse_describe_extended(relation, rows)

# strip hudi metadata columns.
columns = [x for x in columns
if x.name not in self.HUDI_METADATA_COLUMNS]
return columns

def parse_columns_from_information(
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/spark/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class SparkRelation(BaseRelation):
include_policy: SparkIncludePolicy = SparkIncludePolicy()
quote_character: str = '`'
is_delta: Optional[bool] = None
is_hudi: Optional[bool] = None
information: str = None

def __post_init__(self):
Expand Down
13 changes: 12 additions & 1 deletion dbt/include/spark/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@

{% macro options_clause() -%}
{%- set options = config.get('options') -%}
{%- if config.get('file_format') == 'hudi' -%}
{%- set unique_key = config.get('unique_key') -%}
{%- if unique_key is not none and options is none -%}
{%- set options = {'primaryKey': config.get('unique_key')} -%}
{%- elif unique_key is not none and options is not none and 'primaryKey' not in options -%}
{%- set _ = options.update({'primaryKey': config.get('unique_key')}) -%}
{%- elif options is not none and 'primaryKey' in options and options['primaryKey'] != unique_key -%}
{{ exceptions.raise_compiler_error("unique_key and options('primaryKey') should be the same column(s).") }}
{%- endif %}
{%- endif %}

{%- if options is not none %}
options (
{%- for option in options -%}
Expand Down Expand Up @@ -181,7 +192,7 @@
{% endmacro %}

{% macro spark__alter_column_comment(relation, column_dict) %}
{% if config.get('file_format', validator=validation.any[basestring]) == 'delta' %}
{% if config.get('file_format', validator=validation.any[basestring]) in ['delta', 'hudi'] %}
{% for column_name in column_dict %}
{% set comment = column_dict[column_name]['description'] %}
{% set escaped_comment = comment | replace('\'', '\\\'') %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{% macro dbt_spark_validate_get_file_format(raw_file_format) %}
{#-- Validate the file format #}

{% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm'] %}
{% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm', 'hudi'] %}

{% set invalid_file_format_msg -%}
Invalid file format provided: {{ raw_file_format }}
Expand All @@ -26,7 +26,7 @@

{% set invalid_merge_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
You can only choose this strategy when file_format is set to 'delta'
You can only choose this strategy when file_format is set to 'delta' or 'hudi'
{%- endset %}

{% set invalid_insert_overwrite_delta_msg -%}
Expand All @@ -44,7 +44,7 @@
{% if raw_strategy not in ['append', 'merge', 'insert_overwrite'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
{% if raw_strategy == 'merge' and file_format != 'delta' %}
{% if raw_strategy == 'merge' and file_format not in ['delta', 'hudi'] %}
{% do exceptions.raise_compiler_error(invalid_merge_msg) %}
{% endif %}
{% if raw_strategy == 'insert_overwrite' and file_format == 'delta' %}
Expand Down
8 changes: 4 additions & 4 deletions dbt/include/spark/macros/materializations/snapshot.sql
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,18 @@
identifier=target_table,
type='table') -%}

{%- if file_format != 'delta' -%}
{%- if file_format not in ['delta', 'hudi'] -%}
{% set invalid_format_msg -%}
Invalid file format: {{ file_format }}
Snapshot functionality requires file_format be set to 'delta'
Snapshot functionality requires file_format be set to 'delta' or 'hudi'
{%- endset %}
{% do exceptions.raise_compiler_error(invalid_format_msg) %}
{% endif %}

{%- if target_relation_exists -%}
{%- if not target_relation.is_delta -%}
{%- if not target_relation.is_delta and not target_relation.is_hudi -%}
{% set invalid_format_msg -%}
The existing table {{ model.schema }}.{{ target_table }} is in another format than 'delta'
The existing table {{ model.schema }}.{{ target_table }} is in another format than 'delta' or 'hudi'
{%- endset %}
{% do exceptions.raise_compiler_error(invalid_format_msg) %}
{% endif %}
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ services:
volumes:
- ./.spark-warehouse/:/spark-warehouse/
- ./docker/hive-site.xml:/usr/spark/conf/hive-site.xml
- ./docker/spark-defaults.conf:/usr/spark/conf/spark-defaults.conf
environment:
- WAIT_FOR=dbt-hive-metastore:5432

Expand Down
4 changes: 4 additions & 0 deletions docker/hive-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,8 @@
<value>dbt</value>
</property>

<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
</configuration>
7 changes: 7 additions & 0 deletions docker/spark-defaults.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
spark.hadoop.datanucleus.autoCreateTables true
spark.hadoop.datanucleus.schema.autoCreateTables true
spark.hadoop.datanucleus.fixedDatastore false
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.jars.packages org.apache.hudi:hudi-spark3-bundle_2.12:0.9.0
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.driver.userClassPathFirst true
19 changes: 19 additions & 0 deletions tests/integration/incremental_strategies/models_hudi/append.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'append',
file_format = 'hudi',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
file_format = 'hudi',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
partition_by = 'id',
file_format = 'hudi',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
file_format = 'hudi',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
file_format = 'hudi',
unique_key = 'id',
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg
union all
select cast(2 as bigint) as id, 'goodbye' as msg

{% else %}

select cast(2 as bigint) as id, 'yo' as msg
union all
select cast(3 as bigint) as id, 'anyway' as msg

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
file_format = 'hudi',
unique_key = 'id',
merge_update_columns = ['msg'],
) }}

{% if not is_incremental() %}

select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color
union all
select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color

{% else %}

-- msg will be updated, color will be ignored
select cast(2 as bigint) as id, 'yo' as msg, 'green' as color
union all
select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color

{% endif %}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ def project_config(self):
},
}

def seed_and_run_once(self):
self.run_dbt(["seed"])
self.run_dbt(["run"])

def seed_and_run_twice(self):
self.run_dbt(["seed"])
self.run_dbt(["run"])
Expand Down Expand Up @@ -77,6 +81,26 @@ def run_and_test(self):
def test_delta_strategies_databricks_cluster(self):
self.run_and_test()

# Uncomment this hudi integration test after the hudi 0.10.0 release to make it work.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat! Out of curiosity, what's the change coming in v0.10 that will make this sail smoothly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark SQL DML support has been added to Apache Hudi recently with the 0.9.0 release, but there were a few gaps that got fixed after we released the last version, which is scheduled for the next release in a few weeks.

Most specifically, these commits are the ones that are relevant to making these tests run smoothly.

# class TestHudiStrategies(TestIncrementalStrategies):
# @property
# def models(self):
# return "models_hudi"
#
# def run_and_test(self):
# self.seed_and_run_once()
# self.assertTablesEqual("append", "expected_append")
# self.assertTablesEqual("merge_no_key", "expected_append")
# self.assertTablesEqual("merge_unique_key", "expected_upsert")
# self.assertTablesEqual(
# "insert_overwrite_no_partitions", "expected_overwrite")
# self.assertTablesEqual(
# "insert_overwrite_partitions", "expected_upsert")
#
# @use_profile("apache_spark")
# def test_hudi_strategies_apache_spark(self):
# self.run_and_test()


class TestBadStrategies(TestIncrementalStrategies):
@property
Expand Down
24 changes: 24 additions & 0 deletions tests/integration/persist_docs/models/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,30 @@ models:
description: |
Some stuff here and then a call to
{{ doc('my_fun_doc')}}

- name: table_hudi_model
description: |
Table model description "with double quotes"
and with 'single quotes' as welll as other;
'''abc123'''
reserved -- characters
--
/* comment */
Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting
columns:
- name: id
description: |
id Column description "with double quotes"
and with 'single quotes' as welll as other;
'''abc123'''
reserved -- characters
--
/* comment */
Some $lbl$ labeled $lbl$ and $$ unlabeled $$ dollar-quoting
- name: name
description: |
Some stuff here and then a call to
{{ doc('my_fun_doc')}}

- name: view_model
description: |
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/persist_docs/models/table_hudi_model.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{{ config(materialized='table', file_format='hudi') }}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I right in thinking that this is failing on Databricks because the hudi file format is not available there?

This specific test case (tests.integration.persist_docs) isn't running on Apache Spark right now. You're welcome to either:

  • add a test to test_persist_docs.py, separate from test_delta_comments, with @use_profile("apache_spark"), and configure this model to be enabled only for that test, and disabled when running with a databricks profile
  • disable this model for the time being

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right, since there were many iterations on this PR already, For now, I'll disable the model to keep it simple and merge this PR, later in the next iteration I'll bring back both these tests.

select 1 as id, 'Vino' as name
Loading