Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delete+insert materialization #62

Merged
merged 1 commit into from
Aug 16, 2022
Merged

Add delete+insert materialization #62

merged 1 commit into from
Aug 16, 2022

Conversation

mdesmet
Copy link
Member

@mdesmet mdesmet commented Apr 15, 2022

Overview

  • Description: Support multiple dbt native incremental strategies (delete+insert, merge (currently deactivated))
  • Related issue(s):
  • Related code pull request(s):
  • Related link(s):

Checklist

  • This PR includes tests, or tests are not required/relevant for this PR
  • README.md updated and added information about my change
  • CHANGELOG.md updated and added information about my change

@mdesmet mdesmet marked this pull request as draft April 15, 2022 20:34
@findinpath
Copy link
Collaborator

Add a reference towards https://docs.getdbt.com/docs/building-a-dbt-project/building-models/configuring-incremental-models and state the options for the incremental strategy

{% do return(strategy) %}
{% endmacro %}

{% macro dbt_trino_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any specific reason why the new methods are prefixed with "dbt_trino_" ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I based myself on the snowflake implementation. I think their reasoning is that macro's are global and prefixing it will avoid name clashes from happening, similar like java namespaces.

See snowflake adapter

{% do return(get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns)) %}
{% elif strategy == 'merge' %}
{#-- Not yet supported by trino --#}
{% do return(get_merge_sql(target_relation, tmp_relation, unique_key, dest_columns)) %}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a valid strategy for Trino at the moment?

trinodb/trino#7708

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this strategy is currently disabled:

  {% set invalid_strategy_msg -%}
    Invalid incremental strategy provided: {{ strategy }}
    Expected one of: 'append', 'delete+insert'
  {%- endset %}
  {% if strategy not in ['append', 'delete+insert'] %}
    {% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
  {% endif %}

@hovaesco hovaesco mentioned this pull request Apr 26, 2022
1 task
@retrry
Copy link

retrry commented May 17, 2022

What is needed to push forward this MR?

@mdesmet
Copy link
Member Author

mdesmet commented May 17, 2022

I actually had a stab at the integration test for this feature with postgres but the query is not supported

delete from "postgresql"."test16528192466294309628_test_incremental_delete_insert"."incremental"
            where (
                id) in (
                select (id)
                from "postgresql"."test16528192466294309628_test_incremental_delete_insert"."incremental__dbt_tmp"
            )
io.trino.spi.TrinoException: Unsupported delete
	at io.trino.plugin.jdbc.DefaultJdbcMetadata.beginDelete(DefaultJdbcMetadata.java:733)
	at io.trino.spi.connector.ConnectorMetadata.beginDelete(ConnectorMetadata.java:664)
	at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.beginDelete(ClassLoaderSafeConnectorMetadata.java:684)
	at io.trino.metadata.MetadataManager.beginDelete(MetadataManager.java:1002)
	at io.trino.sql.planner.optimizations.BeginTableWrite$Rewriter.createWriterTarget(BeginTableWrite.java:265)
	at io.trino.sql.planner.optimizations.BeginTableWrite$Rewriter.visitTableFinish(BeginTableWrite.java:201)
	at io.trino.sql.planner.optimizations.BeginTableWrite$Rewriter.visitTableFinish(BeginTableWrite.java:104)
	at io.trino.sql.planner.plan.TableFinishNode.accept(TableFinishNode.java:106)
	at io.trino.sql.planner.plan.SimplePlanRewriter$RewriteContext.rewrite(SimplePlanRewriter.java:81)
	at io.trino.sql.planner.plan.SimplePlanRewriter$RewriteContext.lambda$defaultRewrite$0(SimplePlanRewriter.java:72)
	at com.google.common.collect.ImmutableList.forEach(ImmutableList.java:422)
	at io.trino.sql.planner.plan.SimplePlanRewriter$RewriteContext.defaultRewrite(SimplePlanRewriter.java:72)
	at io.trino.sql.planner.plan.SimplePlanRewriter.visitPlan(SimplePlanRewriter.java:37)
	at io.trino.sql.planner.plan.SimplePlanRewriter.visitPlan(SimplePlanRewriter.java:21)
	at io.trino.sql.planner.plan.PlanVisitor.visitOutput(PlanVisitor.java:49)
	at io.trino.sql.planner.plan.OutputNode.accept(OutputNode.java:83)
	at io.trino.sql.planner.plan.SimplePlanRewriter.rewriteWith(SimplePlanRewriter.java:31)
	at io.trino.sql.planner.optimizations.BeginTableWrite.optimize(BeginTableWrite.java:89)
	at io.trino.sql.planner.LogicalPlanner.plan(LogicalPlanner.java:239)
	at io.trino.sql.planner.LogicalPlanner.plan(LogicalPlanner.java:216)
	at io.trino.sql.planner.LogicalPlanner.plan(LogicalPlanner.java:211)
	at io.trino.execution.SqlQueryExecution.doPlanQuery(SqlQueryExecution.java:477)
	at io.trino.execution.SqlQueryExecution.planQuery(SqlQueryExecution.java:458)
	at io.trino.execution.SqlQueryExecution.start(SqlQueryExecution.java:399)
	at io.trino.execution.SqlQueryManager.createQuery(SqlQueryManager.java:243)
	at io.trino.dispatcher.LocalDispatchQuery.lambda$startExecution$7(LocalDispatchQuery.java:143)
	at io.trino.$gen.Trino_379____20220517_201455_2.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
	Suppressed: java.lang.Exception: Current plan:
                Output[rows]
                │   Layout: [rows:bigint]
                └─ TableCommit[[]]
                   │   Layout: [rows:bigint]
                   └─ LocalExchange[SINGLE] ()
                      │   Layout: [partialrows:bigint, fragment:varbinary]
                      └─ RemoteExchange[GATHER]
                         │   Layout: [partialrows:bigint, fragment:varbinary]
                         └─ Delete[[]]
                            │   Layout: [partialrows:bigint, fragment:varbinary]
                            └─ FilterProject[filterPredicate = "expr"]
                               │   Layout: [field:bigint]
                               └─ Project[]
                                  │   Layout: [field:bigint, expr:boolean]
                                  └─ SemiJoin[id = id_0][$hashvalue, $hashvalue_6]
                                     │   Layout: [id:integer, field:bigint, $hashvalue:bigint, expr:boolean]
                                     │   Distribution: REPLICATED
                                     │   dynamicFilterId: df_554
                                     ├─ ScanFilterProject[table = postgresql:test16528192466294309628_test_incremental_delete_insert.incremental test16528192466294309628_test_incremental_delete_insert.incremental columns=[id:integer:int4, $update_row_id:bigint:bigint], filterPredicate = true, dynamicFilters = {"id" = #df_554}]
                                     │      Layout: [id:integer, field:bigint, $hashvalue:bigint]
                                     │      $hashvalue := combine_hash(bigint '0', COALESCE("$operator$hash_code"("id"), 0))
                                     │      field := $update_row_id:bigint:bigint
                                     │      id := id:integer:int4
                                     └─ LocalExchange[SINGLE] ()
                                        │   Layout: [id_0:integer, $hashvalue_6:bigint]
                                        └─ RemoteExchange[REPLICATE]
                                           │   Layout: [id_0:integer, $hashvalue_7:bigint]
                                           └─ ScanProject[table = postgresql:test16528192466294309628_test_incremental_delete_insert.incremental__dbt_tmp test16528192466294309628_test_incremental_delete_insert.incremental__dbt_tmp columns=[id:integer:int4]]
                                                  Layout: [id_0:integer, $hashvalue_8:bigint]
                                                  $hashvalue_8 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("id_0"), 0))
                                                  id_0 := id:integer:int4

		at io.trino.sql.planner.optimizations.BeginTableWrite.optimize(BeginTableWrite.java:95)
		... 12 more

Next i will try with delta lake + minio. I will probably create a separate PR to add delta + minio first.

@findinpath
Copy link
Collaborator

@mdesmet

Today DELETE on rdbms connectors is only supported when the predicate can be entirely pushed down to the connector.
Also neither DELETE or UPDATE support using subqueries as of now.

@mdesmet
Copy link
Member Author

mdesmet commented May 18, 2022

@mdesmet

Today DELETE on rdbms connectors is only supported when the predicate can be entirely pushed down to the connector. Also neither DELETE or UPDATE support using subqueries as of now.

Indeed, that was also my conclusion.

I did a basic test with the Delta connector and there DELETE with subqueries are supported.

CREATE TABLE test_delta.test.test AS 
SELECT * from tpch.sf1.customer;

DELETE FROM test_delta.test.test WHERE custkey
 IN (SELECT custkey from test_delta.test.test where custkey % 15 <> 0)

@hovaesco
Copy link
Contributor

Probably we would need to switch from PostgreSQL to Minio for testing.

@mdesmet
Copy link
Member Author

mdesmet commented May 18, 2022

Probably we would need to switch from PostgreSQL to Minio for testing.

Yes indeed, will post a PR with delta + minio + hms support later today.

@mdesmet mdesmet mentioned this pull request May 27, 2022
@mdesmet mdesmet requested a review from hovaesco August 9, 2022 08:18
@hovaesco hovaesco marked this pull request as ready for review August 9, 2022 08:29
@@ -0,0 +1,71 @@
{% macro dbt_trino_validate_get_incremental_strategy(config) %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

folder incremental not needed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a little cleaner to split it to keep the logic in incremental light.

"seed": self.column_type_override(),
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: EOF


{% if unique_key %}
{% if unique_key is sequence and unique_key is not string %}
delete from {{target }}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{{ target }}

{% macro trino__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

{% if unique_key %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if unique_key is false? Probably some error handling would be required.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unique_key is not required. In that case it actually behaves like append. This behavior is tested in following tests:

  • test__empty_str_unique_key
  • test__empty_unique_key_list

It just behaves exactly as in OOTB dbt (snowflake).

delete from {{ target }}
where (
{{ unique_key }}) in (
select ({{ unique_key }})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parentheses not needed

{#-- Not yet supported by trino (blocked in dbt_trino_validate_get_incremental_strategy macro) --#}
{% do return(get_merge_sql(target_relation, tmp_relation, unique_key, dest_columns)) %}
{% else %}
{% do exceptions.raise_compiler_error('invalid strategy: ' ~ strategy) %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks redundant if error handling for strategies is implemented in dbt_trino_validate_get_incremental_strategy

Copy link
Contributor

@hovaesco hovaesco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % some README comments and please update CHANGELOG and change commit message to reflect that delete+insert strategy is added.

Let's wait for @findinpath review before merging.

README.md Outdated

##### `delete+insert`

Through the `delete+insert` incremental strategy, you can instruct dbt to use a two-step incremental approach. Note this strategy doesn't delete any records, it simply upserts the new and updated records.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be somehow rephrased? This strategy deletes records and insert new ones under the hood. So from the end user perspective it's upsert but from engine perspective it's not.

@mdesmet mdesmet changed the title Support multiple incremental strategies (delete+insert) Add delete+insert materialization Aug 10, 2022
{% do return(get_append_sql(target_relation, tmp_relation, dest_columns)) %}
{% elif strategy == 'delete+insert' %}
{% do return(get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns)) %}
{% elif strategy == 'merge' %}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't quite follow why do we have this branch now (as written, MERGE is at the moment) not yet supported. Why not creating a follow-up PR when a Trino version with MERGE support lands?

insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm wondering whether you missed

.include(database=true, schema=true)

like it is done for append

https://github.com/starburstdata/dbt-trino/pull/62/files#diff-a5aa2b035a51111d752170921f513337ce6af7d4231b586252df7b0a7dbf71b6R30

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not required, it is actually already generated using database and schema. In the line after this one we can see:

drop table if exists {{ tmp_relation }};

If it would be necessary we should include it there too, otherwise the table would not be found.

Note that snowflake also doesn't do that.

@findinpath
Copy link
Collaborator

Please rebase your changes on master

{% endif %}
{% endmacro %}

{% macro get_append_sql(target_relation, tmp_relation, dest_columns) %}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've discussed this already.

I see get_append_sql and trino__get_delete_insert_merge_sql which are namewise not consistent.

I'd be in favor of prefixing with trino__ until we get consistent macro names across all the codebase.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest that the trino__ prefix should only be used for adapter macros that are dispatched in core dbt. The get_append_sql macro is not part of core dbt while get_delete_insert_merge_sql is. See https://github.com/dbt-labs/dbt-core/blob/1071a4681df91633301fdf23e34de819b66fbeb7/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql#L51-L53

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we know whether a certain macro is not going to be part of core dbt?

In any case keep this in mind for the eventual follow-up task of having consistent naming for the macros.

delete from {{ target }}
where
{% for key in unique_key %}
{{ target }}.{{ key }} IN (SELECT {{ key }} FROM {{ source }})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to eventually take into consideration quoting the column names here?
If yes, could you please add a corresponding test case ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the unique_key is supplied by the user here. It's up to the user to supply a quoted or unquoted identifier.

delete from {{ target }}
where
{% for key in unique_key %}
{{ target }}.{{ key }} IN (SELECT {{ key }} FROM {{ source }})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

most of the statements here use lowercase for the reserved sql keywords.
pls change IN SELECT and FROM` accordingly.

@mdesmet mdesmet merged commit 0abc6ab into starburstdata:master Aug 16, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants