Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dbt clone #7881

Merged
merged 37 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
3056eda
Rebase Jerco's clone code
aranke Jun 15, 2023
f63c379
fix unit tests
aranke Jun 15, 2023
afdada9
Delete .user.yml
aranke Jun 15, 2023
23b4113
Delete profiles.yml
aranke Jun 15, 2023
dafb250
get integration test working
aranke Jun 15, 2023
b78e837
Merge branch 'main' into 7258_dbt_clone
aranke Jun 15, 2023
31870f9
add state_relation to required_model_keys
aranke Jun 15, 2023
692e542
Move macros into models directory
aranke Jun 16, 2023
69a3dbd
Simplify test
aranke Jun 16, 2023
125fb0b
Merge branch 'main' into 7258_dbt_clone
aranke Jun 20, 2023
b5dde06
add no state test, move state_relation to maybe_keys
aranke Jun 20, 2023
5303346
Merge branch 'main' into 7258_dbt_clone
aranke Jun 20, 2023
c936856
rename: state_relation -> defer_relation
aranke Jun 21, 2023
2b0eb25
missed a spot
aranke Jun 21, 2023
95758d1
Merge branch 'main' into 7258_dbt_clone
aranke Jun 21, 2023
bcecced
Move _get_deferred_manifest to GraphRunnableTask
aranke Jun 21, 2023
7327283
Reword error message
aranke Jun 21, 2023
0543303
create adapter zone vesions of dbt_clone tests to be inherited by oth…
McKnight-42 Jun 22, 2023
ed66dc0
Merge branch 'main' into 7258_dbt_clone
aranke Jun 22, 2023
5aa4209
Add Matt McKnight to contributors list
aranke Jun 22, 2023
7eace73
Merge branch 'main' into 7258_dbt_clone
aranke Jun 22, 2023
65eaa0a
remove context.update hack
aranke Jun 26, 2023
4bce05f
Merge branch 'main' into 7258_dbt_clone
aranke Jun 26, 2023
824a2fc
Merge branch 'main' of github.com:dbt-labs/dbt into 7258_dbt_clone
McKnight-42 Jun 27, 2023
6a8ecb0
add clean_up method to drop alt schema names after tests run
McKnight-42 Jun 27, 2023
d4f2eca
Merge branch '7258_dbt_clone' of github.com:dbt-labs/dbt into 7258_db…
McKnight-42 Jun 27, 2023
5be28bc
Add context to comments
aranke Jun 27, 2023
18ad7ca
Merge branch 'main' into 7258_dbt_clone
aranke Jun 28, 2023
6efe6e3
Use relation_name instead of constructing string
aranke Jun 28, 2023
48e01e5
Merge branch 'main' into 7258_dbt_clone
aranke Jun 28, 2023
81dad80
Fix add_from_artifact test
aranke Jun 28, 2023
d3d3014
Merge branch 'main' into 7258_dbt_clone
aranke Jun 28, 2023
88a726b
remove node.relation_name check
aranke Jun 28, 2023
020cde5
add if v.relation_name test
aranke Jun 28, 2023
e761aa0
fix seed relation_name bug
aranke Jun 28, 2023
4eb7aa4
Merge branch 'main' into 7258_dbt_clone
aranke Jun 28, 2023
cd78b64
Skip `test_semantic_model_deleted_partial_parsing`
aranke Jun 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230616-104849.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: dbt clone
time: 2023-06-16T10:48:49.079961-05:00
custom:
Author: jtcohen6 aranke
Issue: "7258"
1 change: 1 addition & 0 deletions core/dbt/cli/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ def command_args(command: CliCommand) -> ArgsList:
CMD_DICT: Dict[CliCommand, ClickCommand] = {
CliCommand.BUILD: cli.build,
CliCommand.CLEAN: cli.clean,
CliCommand.CLONE: cli.clone,
CliCommand.COMPILE: cli.compile,
CliCommand.DOCS_GENERATE: cli.docs_generate,
CliCommand.DOCS_SERVE: cli.docs_serve,
Expand Down
38 changes: 38 additions & 0 deletions core/dbt/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dbt.events.base_types import EventMsg
from dbt.task.build import BuildTask
from dbt.task.clean import CleanTask
from dbt.task.clone import CloneTask
from dbt.task.compile import CompileTask
from dbt.task.debug import DebugTask
from dbt.task.deps import DepsTask
Expand Down Expand Up @@ -608,6 +609,43 @@ def retry(ctx, **kwargs):
return results, success


# dbt clone
@cli.command("clone")
@click.pass_context
@p.defer_state
@p.exclude
@p.full_refresh
@p.profile
@p.profiles_dir
@p.project_dir
@p.resource_type
@p.select
@p.selector
@p.state # required
aranke marked this conversation as resolved.
Show resolved Hide resolved
@p.target
@p.target_path
@p.threads
@p.vars
@p.version_check
@requires.preflight
@requires.profile
@requires.project
@requires.runtime_config
@requires.manifest
@requires.postflight
def clone(ctx, **kwargs):
"""Create clones of selected nodes based on their location in the manifest provided to --state."""
task = CloneTask(
ctx.obj["flags"],
ctx.obj["runtime_config"],
ctx.obj["manifest"],
)

results = task.run()
success = task.interpret_results(results)
return results, success


# dbt run operation
@cli.command("run-operation")
@click.pass_context
Expand Down
1 change: 1 addition & 0 deletions core/dbt/cli/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class Command(Enum):
BUILD = "build"
CLEAN = "clean"
COMPILE = "compile"
CLONE = "clone"
DOCS_GENERATE = "generate"
DOCS_SERVE = "serve"
DEBUG = "debug"
Expand Down
14 changes: 14 additions & 0 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1432,6 +1432,20 @@ def this(self) -> Optional[RelationProxy]:
return None
return self.db_wrapper.Relation.create_from(self.config, self.model)

@contextproperty
def state_relation(self) -> Optional[RelationProxy]:
aranke marked this conversation as resolved.
Show resolved Hide resolved
"""
For commands which add information about this node's corresponding
production version (via a --state artifact), access the Relation
object for that stateful other
"""
if getattr(self.model, "state_relation", None):
return self.db_wrapper.Relation.create_from_node(
self.config, self.model.state_relation # type: ignore
)
else:
return None


# This is called by '_context_for', used in 'render_with_context'
def generate_parser_model_context(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{% macro can_clone_table() %}
{{ return(adapter.dispatch('can_clone_table', 'dbt')()) }}
{% endmacro %}

{% macro default__can_clone_table() %}
{{ return(False) }}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
{%- materialization clone, default -%}

{%- set relations = {'relations': []} -%}

{%- if not state_relation -%}
-- nothing to do
{{ log("No relation found in state manifest for " ~ model.unique_id, info=True) }}
{{ return(relations) }}
{%- endif -%}

{%- set existing_relation = load_cached_relation(this) -%}

{%- if existing_relation and not flags.FULL_REFRESH -%}
-- noop!
{{ log("Relation " ~ existing_relation ~ " already exists", info=True) }}
{{ return(relations) }}
{%- endif -%}

{%- set other_existing_relation = load_cached_relation(state_relation) -%}

-- If this is a database that can do zero-copy cloning of tables, and the other relation is a table, then this will be a table
-- Otherwise, this will be a view

{% set can_clone_table = can_clone_table() %}

{%- if other_existing_relation and other_existing_relation.type == 'table' and can_clone_table -%}

{%- set target_relation = this.incorporate(type='table') -%}
{% if existing_relation is not none and not existing_relation.is_table %}
{{ log("Dropping relation " ~ existing_relation ~ " because it is of type " ~ existing_relation.type) }}
{{ drop_relation_if_exists(existing_relation) }}
{% endif %}

-- as a general rule, data platforms that can clone tables can also do atomic 'create or replace'
{% call statement('main') %}
{{ create_or_replace_clone(target_relation, state_relation) }}
{% endcall %}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
{% do persist_docs(target_relation, model) %}

{{ return({'relations': [target_relation]}) }}

{%- else -%}

{%- set target_relation = this.incorporate(type='view') -%}

-- TODO: this should probably be illegal
-- I'm just doing it out of convenience to reuse the 'view' materialization logic
aranke marked this conversation as resolved.
Show resolved Hide resolved
{%- do context.update({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way that this update works is to overwrite the jinja context so that when we execute materialization_macro later on will be running with the updated context to actually create the view.
This logic is pretty hidden, can we spend a bit more time thinking about how to avoid changing context in macro space? @gshank Any suggestion on whether it is possible to do this in a better way?

I did a quick search in our codebase and it seems that we are creating a new pattern here. If this is the only way we can do it but we should add comment and call out this is not a pattern we would want to do in the codebase.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call-out, I'll discuss this with @gshank in our 1:1 tomorrow.

'sql': get_clone_target(state_relation),
'compiled_code': get_clone_target(state_relation)
}) -%}

-- reuse the view materialization
-- TODO: support actual dispatch for materialization macros
{% set search_name = "materialization_view_" ~ adapter.type() %}
aranke marked this conversation as resolved.
Show resolved Hide resolved
{% if not search_name in context %}
{% set search_name = "materialization_view_default" %}
{% endif %}
{% set materialization_macro = context[search_name] %}
{% set relations = materialization_macro() %}
{{ return(relations) }}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely the bit I felt worst about in #7258.

I opened an issue to track real dispatch support for materialization macros: #7799. Until we do that, this will work much of the time, but it won't support adapters that inherit from other adapters. E.g. dbt-postgresdbt-redshift, ergo Redshift is allowed to use postgres__ implementations.

As far as hard-overwriting the context values for sql + compiled_code: Gross, right? The other potential approach I can think of is, making materialization macros accept kwargs, so that sql/compiled_code can be passed in explicitly rather than set + pulled out of the aether context.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ergo Redshift is allowed to use postgres__ implementations

TIL, wow

As far as hard-overwriting the context values for sql + compiled_code: Gross, right?

Yes, but I've seen worse

making materialization macros accept kwargs, so that sql/compiled_code can be passed in explicitly

This makes sense, we can pass in a dict for now before migrating to a typed object (similar to dbtRunner)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To break down the questions here a bit:

  1. Do we have to reuse the view materialization? Would it be better if we just write the logic here given clone potentially have a more restrictive assumption than the actual view materialization, for example: always create a view that points to the other table no matter what's going on? Looks like there's not as much going on in view as other materialization, sometimes it's better to repeat simple stuff again instead of reuse
  2. If option 1 is not viable, would this implementation bite us somewhere else? what happens if we try to run this code with clone on redshift today?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll defer to @dbt-labs/core-adapters on 1, I'm fine with either approach.

@McKnight-42: question for you when you get back.

For 2, I don't know the answer, but can sync with @McKnight-42 and reply here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good breakdown of the questions @ChenyuLInx!

Thinking through the logic we'd want from the view materialization:

  • Handling the cases where the object already exists as a table, and needs to be dropped first
  • Knowing whether this adapter wants to create or replace view versus create temp + alter/rename/swap
  • Applying grants, persisting descriptions as comments - I think we still might actually want these things!

So - I would argue that's the right way to go. I'm definitely interested in a world where materializations are more composable than they are currently, so that more-complex build strategies ("clone an object from one env into another") can call into simpler ones ("make a view").

Macros are already pretty composable. Materializations are just macros, but they pull all their inputs from this node's g-d context, rather than receiving them as explicit arguments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answers to @ChenyuLInx questions from offline chat with @McKnight-42:

Do we have to reuse the view materialization?

I believe so, since different adapters implement views differently.
I previously believed that all adapters share a single view materialization, but that doesn't seem to be the case.

If option 1 is not viable, would this implementation bite us somewhere else?

I don't know.

What happens if we try to run this code with clone on Redshift today?

In theory, it should do the same thing as Postgres (create views) since neither support zero-copy cloning.
I'll have to verify what happens in practice.


{%- endif -%}

{%- endmaterialization -%}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{% macro create_or_replace_clone(this_relation, state_relation) %}
{{ return(adapter.dispatch('create_or_replace_clone', 'dbt')(this_relation, state_relation)) }}
{% endmacro %}

{% macro default__create_or_replace_clone(this_relation, state_relation) %}
create or replace table {{ this_relation }} clone {{ state_relation }}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{% macro get_clone_target(to_relation) %}
{{ return(adapter.dispatch('get_clone_target', 'dbt')(to_relation)) }}
{% endmacro %}

{% macro default__get_clone_target(to_relation) %}
{% set target_sql %}
select * from {{ to_relation }}
{% endset %}
{{ return(target_sql) }}
{% endmacro %}
183 changes: 183 additions & 0 deletions core/dbt/task/clone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import threading
from typing import AbstractSet, Optional, Any, List, Iterable, Set

from dbt.dataclass_schema import dbtClassMixin

from dbt.contracts.graph.manifest import WritableManifest
from dbt.contracts.results import RunStatus, RunResult
from dbt.exceptions import DbtInternalError, DbtRuntimeError, CompilationError
from dbt.graph import ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.parser.manifest import write_manifest
from dbt.task.base import BaseRunner
from dbt.task.runnable import GraphRunnableTask
from dbt.task.run import _validate_materialization_relations_dict
from dbt.adapters.base import BaseRelation
from dbt.clients.jinja import MacroGenerator
from dbt.context.providers import generate_runtime_model_context


class CloneRunner(BaseRunner):
def before_execute(self):
pass

def after_execute(self, result):
pass

def _build_run_model_result(self, model, context):
result = context["load_result"]("main")
if result:
status = RunStatus.Success
message = str(result.response)
else:
status = RunStatus.Success
message = "No-op"
adapter_response = {}
if result and isinstance(result.response, dbtClassMixin):
adapter_response = result.response.to_dict(omit_none=True)
return RunResult(
node=model,
status=status,
timing=[],
thread_id=threading.current_thread().name,
execution_time=0,
message=message,
adapter_response=adapter_response,
failures=None,
)

def compile(self, manifest):
# no-op
return self.node

def _materialization_relations(self, result: Any, model) -> List[BaseRelation]:
if isinstance(result, str):
msg = (
'The materialization ("{}") did not explicitly return a '
"list of relations to add to the cache.".format(str(model.get_materialization()))
)
raise CompilationError(msg, node=model)

if isinstance(result, dict):
return _validate_materialization_relations_dict(result, model)

msg = (
"Invalid return value from materialization, expected a dict "
'with key "relations", got: {}'.format(str(result))
)
raise CompilationError(msg, node=model)

def execute(self, model, manifest):
context = generate_runtime_model_context(model, self.config, manifest)
materialization_macro = manifest.find_materialization_macro_by_name(
self.config.project_name, "clone", self.adapter.type()
)

if "config" not in context:
raise DbtInternalError(
"Invalid materialization context generated, missing config: {}".format(context)
)

context_config = context["config"]

hook_ctx = self.adapter.pre_model_hook(context_config)
try:
result = MacroGenerator(materialization_macro, context)()
finally:
self.adapter.post_model_hook(context_config, hook_ctx)

for relation in self._materialization_relations(result, model):
self.adapter.cache_added(relation.incorporate(dbt_created=True))

return self._build_run_model_result(model, context)


class CloneTask(GraphRunnableTask):
def raise_on_first_error(self):
return False
aranke marked this conversation as resolved.
Show resolved Hide resolved

def get_model_schemas(self, adapter, selected_uids: Iterable[str]) -> Set[BaseRelation]:
if self.manifest is None:
raise DbtInternalError("manifest was None in get_model_schemas")
result: Set[BaseRelation] = set()

for node in self.manifest.nodes.values():
if node.unique_id not in selected_uids:
continue
if node.is_relational and not node.is_ephemeral:
relation = adapter.Relation.create_from(self.config, node)
result.add(relation.without_identifier())

# cache the 'other' schemas too!
if node.state_relation: # type: ignore
other_relation = adapter.Relation.create_from_node(
self.config, node.state_relation # type: ignore
)
result.add(other_relation.without_identifier())

return result

def before_run(self, adapter, selected_uids: AbstractSet[str]):
with adapter.connection_named("master"):
# unlike in other tasks, we want to add information from the --state manifest *before* caching!
self.defer_to_manifest(adapter, selected_uids)
# only create *our* schemas, but cache *other* schemas in addition
schemas_to_create = super().get_model_schemas(adapter, selected_uids)
self.create_schemas(adapter, schemas_to_create)
schemas_to_cache = self.get_model_schemas(adapter, selected_uids)
self.populate_adapter_cache(adapter, schemas_to_cache)

@property
def resource_types(self):
if not self.args.resource_types:
return NodeType.refable()

values = set(self.args.resource_types)

if "all" in values:
values.remove("all")
values.update(NodeType.refable())

values = [NodeType(val) for val in values if val in NodeType.refable()]

return list(values)

def get_node_selector(self) -> ResourceTypeSelector:
resource_types = self.resource_types

if self.manifest is None or self.graph is None:
raise DbtInternalError("manifest and graph must be set to get perform node selection")
return ResourceTypeSelector(
graph=self.graph,
manifest=self.manifest,
previous_state=self.previous_state,
resource_types=resource_types,
)

def get_runner_type(self, _):
return CloneRunner

def _get_deferred_manifest(self) -> Optional[WritableManifest]:
state = self.previous_state
if state is None:
raise DbtRuntimeError(
"--state is required for cloning relations from another environment"
)
aranke marked this conversation as resolved.
Show resolved Hide resolved

if state.manifest is None:
raise DbtRuntimeError(f'Could not find manifest in --state path: "{self.args.state}"')
return state.manifest

# Note that this is different behavior from --defer with other commands, which *merge*
# selected nodes from this manifest + unselected nodes from the other manifest
def defer_to_manifest(self, adapter, selected_uids: AbstractSet[str]):
aranke marked this conversation as resolved.
Show resolved Hide resolved
deferred_manifest = self._get_deferred_manifest()
if deferred_manifest is None:
return
if self.manifest is None:
raise DbtInternalError(
"Expected to defer to manifest, but there is no runtime manifest to defer from!"
)
self.manifest.add_from_artifact(other=deferred_manifest)
# TODO: is it wrong to write the manifest here? I think it's right...
write_manifest(self.manifest, self.config.target_path)
3 changes: 3 additions & 0 deletions core/dbt/task/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dbt.graph import GraphQueue
from dbt.task.base import ConfiguredTask
from dbt.task.build import BuildTask
from dbt.task.clone import CloneTask
from dbt.task.compile import CompileTask
from dbt.task.generate import GenerateTask
from dbt.task.run import RunTask
Expand All @@ -22,6 +23,7 @@
TASK_DICT = {
"build": BuildTask,
"compile": CompileTask,
"clone": CloneTask,
"generate": GenerateTask,
"seed": SeedTask,
"snapshot": SnapshotTask,
Expand All @@ -33,6 +35,7 @@
CMD_DICT = {
"build": CliCommand.BUILD,
"compile": CliCommand.COMPILE,
"clone": CliCommand.CLONE,
"generate": CliCommand.DOCS_GENERATE,
"seed": CliCommand.SEED,
"snapshot": CliCommand.SNAPSHOT,
Expand Down
Loading