Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove injected_sql. Store non-ephemeral injected_sql in compiled_sql #2834

Merged
merged 1 commit into from
Oct 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- Added dbt_invocation_id for each BigQuery job to enable performance analysis ([#2808](https://github.com/fishtown-analytics/dbt/issues/2808), [#2809](https://github.com/fishtown-analytics/dbt/pull/2809))
- Save cli and rpc arguments in run_results.json ([#2510](https://github.com/fishtown-analytics/dbt/issues/2510), [#2813](https://github.com/fishtown-analytics/dbt/pull/2813))
- Added support for BigQuery connections using refresh tokens ([#2344](https://github.com/fishtown-analytics/dbt/issues/2344), [#2805](https://github.com/fishtown-analytics/dbt/pull/2805))
- Remove injected_sql from manifest nodes ([#2762](https://github.com/fishtown-analytics/dbt/issues/2762), [#2834](https://github.com/fishtown-analytics/dbt/pull/2834))

### Under the hood
- Added strategy-specific validation to improve the relevancy of compilation errors for the `timestamp` and `check` snapshot strategies. (([#2787](https://github.com/fishtown-analytics/dbt/issues/2787), [#2791](https://github.com/fishtown-analytics/dbt/pull/2791))
Expand Down
190 changes: 96 additions & 94 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,15 @@ def initialize(self):
make_directory(self.config.target_path)
make_directory(self.config.modules_path)

# creates a ModelContext which is converted to
# a dict for jinja rendering of SQL
def _create_node_context(
self,
node: NonSourceCompiledNode,
manifest: Manifest,
extra_context: Dict[str, Any],
) -> Dict[str, Any]:

context = generate_runtime_model(
node, self.config, manifest
)
Expand All @@ -169,36 +172,6 @@ def add_ephemeral_prefix(self, name: str):
relation_cls = adapter.Relation
return relation_cls.add_ephemeral_prefix(name)

def _get_compiled_model(
self,
manifest: Manifest,
cte_id: str,
extra_context: Dict[str, Any],
) -> NonSourceCompiledNode:

if cte_id not in manifest.nodes:
raise InternalException(
f'During compilation, found a cte reference that could not be '
f'resolved: {cte_id}'
)
cte_model = manifest.nodes[cte_id]
if getattr(cte_model, 'compiled', False):
assert isinstance(cte_model, tuple(COMPILED_TYPES.values()))
return cast(NonSourceCompiledNode, cte_model)
elif cte_model.is_ephemeral_model:
# this must be some kind of parsed node that we can compile.
# we know it's not a parsed source definition
assert isinstance(cte_model, tuple(COMPILED_TYPES))
# update the node so
node = self.compile_node(cte_model, manifest, extra_context)
manifest.sync_update_node(node)
return node
else:
raise InternalException(
f'During compilation, found an uncompiled cte that '
f'was not an ephemeral model: {cte_id}'
)

def _inject_ctes_into_sql(self, sql: str, ctes: List[InjectedCTE]) -> str:
"""
`ctes` is a list of InjectedCTEs like:
Expand Down Expand Up @@ -260,73 +233,107 @@ def _inject_ctes_into_sql(self, sql: str, ctes: List[InjectedCTE]) -> str:

return str(parsed)

def _model_prepend_ctes(
self,
model: NonSourceCompiledNode,
prepended_ctes: List[InjectedCTE]
) -> NonSourceCompiledNode:
if model.compiled_sql is None:
raise RuntimeException(
'Cannot prepend ctes to an unparsed node', model
)
injected_sql = self._inject_ctes_into_sql(
model.compiled_sql,
prepended_ctes,
)

model.extra_ctes_injected = True
model.extra_ctes = prepended_ctes
model.injected_sql = injected_sql
model.validate(model.to_dict())
return model

def _get_dbt_test_name(self) -> str:
return 'dbt__CTE__INTERNAL_test'

# This method is called by the 'compile_node' method. Starting
# from the node that it is passed in, it will recursively call
# itself using the 'extra_ctes'. The 'ephemeral' models do
# not produce SQL that is executed directly, instead they
# are rolled up into the models that refer to them by
# inserting CTEs into the SQL.
def _recursively_prepend_ctes(
self,
model: NonSourceCompiledNode,
manifest: Manifest,
extra_context: Dict[str, Any],
extra_context: Optional[Dict[str, Any]],
) -> Tuple[NonSourceCompiledNode, List[InjectedCTE]]:

if model.compiled_sql is None:
raise RuntimeException(
'Cannot inject ctes into an unparsed node', model
)
if model.extra_ctes_injected:
return (model, model.extra_ctes)

if flags.STRICT_MODE:
if not isinstance(model, tuple(COMPILED_TYPES.values())):
raise InternalException(
f'Bad model type: {type(model)}'
)
# Just to make it plain that nothing is actually injected for this case
if not model.extra_ctes:
model.extra_ctes_injected = True
manifest.update_node(model)
return (model, model.extra_ctes)

# This stores the ctes which will all be recursively
# gathered and then "injected" into the model.
prepended_ctes: List[InjectedCTE] = []

dbt_test_name = self._get_dbt_test_name()

# extra_ctes are added to the model by
# RuntimeRefResolver.create_relation, which adds an
# extra_cte for every model relation which is an
# ephemeral model.
for cte in model.extra_ctes:
if cte.id == dbt_test_name:
sql = cte.sql
else:
cte_model = self._get_compiled_model(
manifest,
cte.id,
extra_context,
)
cte_model, new_prepended_ctes = self._recursively_prepend_ctes(
cte_model, manifest, extra_context
)
if cte.id not in manifest.nodes:
raise InternalException(
f'During compilation, found a cte reference that '
f'could not be resolved: {cte.id}'
)
cte_model = manifest.nodes[cte.id]

if not cte_model.is_ephemeral_model:
raise InternalException(f'{cte.id} is not ephemeral')

# This model has already been compiled, so it's been
# through here before
if getattr(cte_model, 'compiled', False):
assert isinstance(cte_model,
tuple(COMPILED_TYPES.values()))
cte_model = cast(NonSourceCompiledNode, cte_model)
new_prepended_ctes = cte_model.extra_ctes

# if the cte_model isn't compiled, i.e. first time here
else:
# This is an ephemeral parsed model that we can compile.
# Compile and update the node
cte_model = self._compile_node(
cte_model, manifest, extra_context)
# recursively call this method
cte_model, new_prepended_ctes = \
self._recursively_prepend_ctes(
cte_model, manifest, extra_context
)
# Save compiled SQL file and sync manifest
self._write_node(cte_model)
manifest.sync_update_node(cte_model)

_extend_prepended_ctes(prepended_ctes, new_prepended_ctes)

new_cte_name = self.add_ephemeral_prefix(cte_model.name)
sql = f' {new_cte_name} as (\n{cte_model.compiled_sql}\n)'

_add_prepended_cte(prepended_ctes, InjectedCTE(id=cte.id, sql=sql))

model = self._model_prepend_ctes(model, prepended_ctes)
# We don't save injected_sql into compiled sql for ephemeral models
# because it will cause problems with processing of subsequent models.
# Ephemeral models do not produce executable SQL of their own.
if not model.is_ephemeral_model:
injected_sql = self._inject_ctes_into_sql(
model.compiled_sql,
prepended_ctes,
)
model.compiled_sql = injected_sql
model.extra_ctes_injected = True
model.extra_ctes = prepended_ctes
model.validate(model.to_dict())

manifest.update_node(model)

return model, prepended_ctes

def _insert_ctes(
def _add_ctes(
self,
compiled_node: NonSourceCompiledNode,
manifest: Manifest,
Expand All @@ -352,11 +359,12 @@ def _insert_ctes(
compiled_node.extra_ctes.append(cte)
compiled_node.compiled_sql = f'\nselect count(*) from {name}'

injected_node, _ = self._recursively_prepend_ctes(
compiled_node, manifest, extra_context
)
return injected_node
return compiled_node

# creates a compiled_node from the ManifestNode passed in,
# creates a "context" dictionary for jinja rendering,
# and then renders the "compiled_sql" using the node, the
# raw_sql and the context.
def _compile_node(
self,
node: ManifestNode,
Expand All @@ -374,7 +382,6 @@ def _compile_node(
'compiled_sql': None,
'extra_ctes_injected': False,
'extra_ctes': [],
'injected_sql': None,
})
compiled_node = _compiled_type_for(node).from_dict(data)

Expand All @@ -390,11 +397,13 @@ def _compile_node(

compiled_node.compiled = True

injected_node = self._insert_ctes(
# add ctes for specific test nodes, and also for
# possible future use in adapters
compiled_node = self._add_ctes(
compiled_node, manifest, extra_context
)

return injected_node
return compiled_node

def write_graph_file(self, linker: Linker, manifest: Manifest):
filename = graph_file_name
Expand Down Expand Up @@ -449,26 +458,26 @@ def compile(self, manifest: Manifest, write=True) -> Graph:

return Graph(linker.graph)

# writes the "compiled_sql" into the target/compiled directory
def _write_node(self, node: NonSourceCompiledNode) -> ManifestNode:
if not _is_writable(node):
if (not node.extra_ctes_injected or
node.resource_type == NodeType.Snapshot):
return node
logger.debug(f'Writing injected SQL for node "{node.unique_id}"')

if node.injected_sql is None:
# this should not really happen, but it'd be a shame to crash
# over it
logger.error(
f'Compiled node "{node.unique_id}" had no injected_sql, '
'cannot write sql!'
)
else:
if node.compiled_sql:
node.build_path = node.write_node(
self.config.target_path,
'compiled',
node.injected_sql
node.compiled_sql
)
return node

# This is the main entry point into this code. It's called by
# CompileRunner.compile, GenericRPCRunner.compile, and
# RunTask.get_hook_sql. It calls '_compile_node' to convert
# the node into a compiled node, and then calls the
# recursive method to "prepend" the ctes.
def compile_node(
self,
node: ManifestNode,
Expand All @@ -478,16 +487,9 @@ def compile_node(
) -> NonSourceCompiledNode:
node = self._compile_node(node, manifest, extra_context)

if write and _is_writable(node):
node, _ = self._recursively_prepend_ctes(
node, manifest, extra_context
)
if write:
self._write_node(node)
return node


def _is_writable(node):
if not node.injected_sql:
return False

if node.resource_type == NodeType.Snapshot:
return False

return True
4 changes: 3 additions & 1 deletion core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1217,7 +1217,9 @@ def post_hooks(self) -> List[Dict[str, Any]]:

@contextproperty
def sql(self) -> Optional[str]:
return getattr(self.model, 'injected_sql', None)
if getattr(self.model, 'extra_ctes_injected', None):
gshank marked this conversation as resolved.
Show resolved Hide resolved
return self.model.compiled_sql
return None

@contextproperty
def database(self) -> str:
Expand Down
1 change: 0 additions & 1 deletion core/dbt/contracts/graph/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class CompiledNode(ParsedNode, CompiledNodeMixin):
compiled_sql: Optional[str] = None
extra_ctes_injected: bool = False
extra_ctes: List[InjectedCTE] = field(default_factory=list)
injected_sql: Optional[str] = None

def set_cte(self, cte_id: str, sql: str):
"""This is the equivalent of what self.extra_ctes[cte_id] = sql would
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def data(self):
result.update({
'raw_sql': self.node.raw_sql,
# the node isn't always compiled, but if it is, include that!
'compiled_sql': getattr(self.node, 'injected_sql', None),
'compiled_sql': getattr(self.node, 'compiled_sql', None),
})
return result

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@

{% if not target_relation_exists %}

{% set build_sql = build_snapshot_table(strategy, model['injected_sql']) %}
{% set build_sql = build_snapshot_table(strategy, model['compiled_sql']) %}
{% set final_sql = create_table_as(False, target_relation, build_sql) %}

{% else %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@


{% macro snapshot_check_all_get_existing_columns(node, target_exists) -%}
{%- set query_columns = get_columns_in_query(node['injected_sql']) -%}
{%- set query_columns = get_columns_in_query(node['compiled_sql']) -%}
{%- if not target_exists -%}
{# no table yet -> return whatever the query does #}
{{ return([false, query_columns]) }}
Expand Down
6 changes: 3 additions & 3 deletions core/dbt/rpc/node_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class RPCCompileRunner(GenericRPCRunner[RemoteCompileResult]):
def execute(self, compiled_node, manifest) -> RemoteCompileResult:
return RemoteCompileResult(
raw_sql=compiled_node.raw_sql,
compiled_sql=compiled_node.injected_sql,
compiled_sql=compiled_node.compiled_sql,
node=compiled_node,
timing=[], # this will get added later
logs=[],
Expand All @@ -88,7 +88,7 @@ def from_run_result(
class RPCExecuteRunner(GenericRPCRunner[RemoteRunResult]):
def execute(self, compiled_node, manifest) -> RemoteRunResult:
_, execute_result = self.adapter.execute(
compiled_node.injected_sql, fetch=True
compiled_node.compiled_sql, fetch=True
)

table = ResultTable(
Expand All @@ -98,7 +98,7 @@ def execute(self, compiled_node, manifest) -> RemoteRunResult:

return RemoteRunResult(
raw_sql=compiled_node.raw_sql,
compiled_sql=compiled_node.injected_sql,
compiled_sql=compiled_node.compiled_sql,
node=compiled_node,
table=table,
timing=[],
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def raise_on_first_error(self):
def get_hook_sql(self, adapter, hook, idx, num_hooks, extra_context):
compiler = adapter.get_compiler()
compiled = compiler.compile_node(hook, self.manifest, extra_context)
statement = compiled.injected_sql
statement = compiled.compiled_sql
hook_index = hook.index or num_hooks
hook_obj = get_hook(statement, index=hook_index)
return hook_obj.sql or ''
Expand Down
Loading