Skip to content

Commit

Permalink
Configs in schema files
Browse files Browse the repository at this point in the history
  • Loading branch information
gshank committed Jul 27, 2021
1 parent 9c58f34 commit 8b089af
Show file tree
Hide file tree
Showing 25 changed files with 426 additions and 160 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Features
- Add `dbt build` command to run models, tests, seeds, and snapshots in DAG order. ([#2743] (https://github.com/dbt-labs/dbt/issues/2743), [#3490] (https://github.com/dbt-labs/dbt/issues/3490), [#3608](https://github.com/dbt-labs/dbt/issues/3608))
- Introduce `on_schema_change` config to detect and handle schema changes on incremental models ([#1132](https://github.com/fishtown-analytics/dbt/issues/1132), [#3387](https://github.com/fishtown-analytics/dbt/issues/3387))
- Enable setting configs in schema files ([#2401](https://github.com/dbt-labs/dbt/issues/2401))

### Breaking changes
- Add full node selection to source freshness command and align selection syntax with other tasks (`dbt source freshness --select source_name` --> `dbt source freshness --select source:souce_name`) and rename `dbt source snapshot-freshness` -> `dbt source freshness`. ([#2987](https://github.com/dbt-labs/dbt/issues/2987), [#3554](https://github.com/dbt-labs/dbt/pull/3554))
Expand Down
57 changes: 45 additions & 12 deletions core/dbt/context/context_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,12 @@ def initial_result(self, resource_type: NodeType, base: bool) -> T:

def calculate_node_config(
self,
config_calls: List[Dict[str, Any]],
config_call_dict: Dict[str, Any],
fqn: List[str],
resource_type: NodeType,
project_name: str,
base: bool,
patch_config_dict: Dict[str, Any] = None
) -> BaseConfig:
own_config = self.get_node_project(project_name)

Expand All @@ -134,8 +135,15 @@ def calculate_node_config(
for fqn_config in project_configs:
result = self._update_from_config(result, fqn_config)

for config_call in config_calls:
result = self._update_from_config(result, config_call)
# When schema files patch config, it has lower precedence than
# config in the models (config_call_dict), so we add the patch_config_dict
# before the config_call_dict
if patch_config_dict:
result = self._update_from_config(result, patch_config_dict)

# config_calls are created in the 'experimental' model parser and
# the ParseConfigObject (via add_config_call)
result = self._update_from_config(result, config_call_dict)

if own_config.project_name != self._active_project.project_name:
for fqn_config in self._active_project_configs(fqn, resource_type):
Expand All @@ -147,11 +155,12 @@ def calculate_node_config(
@abstractmethod
def calculate_node_config_dict(
self,
config_calls: List[Dict[str, Any]],
config_call_dict: Dict[str, Any],
fqn: List[str],
resource_type: NodeType,
project_name: str,
base: bool,
patch_config_dict: Dict[str, Any],
) -> Dict[str, Any]:
...

Expand Down Expand Up @@ -186,18 +195,20 @@ def _update_from_config(

def calculate_node_config_dict(
self,
config_calls: List[Dict[str, Any]],
config_call_dict: Dict[str, Any],
fqn: List[str],
resource_type: NodeType,
project_name: str,
base: bool,
patch_config_dict: dict = None
) -> Dict[str, Any]:
config = self.calculate_node_config(
config_calls=config_calls,
config_call_dict=config_call_dict,
fqn=fqn,
resource_type=resource_type,
project_name=project_name,
base=base,
patch_config_dict=patch_config_dict
)
finalized = config.finalize_and_validate()
return finalized.to_dict(omit_none=True)
Expand All @@ -209,18 +220,20 @@ def get_config_source(self, project: Project) -> ConfigSource:

def calculate_node_config_dict(
self,
config_calls: List[Dict[str, Any]],
config_call_dict: Dict[str, Any],
fqn: List[str],
resource_type: NodeType,
project_name: str,
base: bool,
patch_config_dict: dict = None
) -> Dict[str, Any]:
return self.calculate_node_config(
config_calls=config_calls,
config_call_dict=config_call_dict,
fqn=fqn,
resource_type=resource_type,
project_name=project_name,
base=base,
patch_config_dict=patch_config_dict
)

def initial_result(
Expand Down Expand Up @@ -251,30 +264,50 @@ def __init__(
resource_type: NodeType,
project_name: str,
) -> None:
self._config_calls: List[Dict[str, Any]] = []
self._config_call_dict: Dict[str, Any] = {}
self._active_project = active_project
self._fqn = fqn
self._resource_type = resource_type
self._project_name = project_name

def update_in_model_config(self, opts: Dict[str, Any]) -> None:
self._config_calls.append(opts)
def add_config_call(self, opts: Dict[str, Any]) -> None:
dct = self._config_call_dict
self._add_config_call(dct, opts)

@classmethod
def _add_config_call(cls, config_call_dict, opts: Dict[str, Any]) -> None:
for k, v in opts.items():
# MergeBehavior for post-hook and pre-hook is to collect all
# values, instead of overwriting
if k in BaseConfig.mergebehavior['append']:
if not isinstance(v, list):
v = [v]
if k in BaseConfig.mergebehavior['update'] and not isinstance(v, dict):
raise InternalException(f'expected dict, got {v}')
if k in config_call_dict and isinstance(config_call_dict[k], list):
config_call_dict[k].extend(v)
elif k in config_call_dict and isinstance(config_call_dict[k], dict):
config_call_dict[k].update(v)
else:
config_call_dict[k] = v

def build_config_dict(
self,
base: bool = False,
*,
rendered: bool = True,
patch_config_dict: dict = None
) -> Dict[str, Any]:
if rendered:
src = ContextConfigGenerator(self._active_project)
else:
src = UnrenderedConfigGenerator(self._active_project)

return src.calculate_node_config_dict(
config_calls=self._config_calls,
config_call_dict=self._config_call_dict,
fqn=self._fqn,
resource_type=self._resource_type,
project_name=self._project_name,
base=base,
patch_config_dict=patch_config_dict
)
4 changes: 2 additions & 2 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def __init__(self, model, context_config: Optional[ContextConfig]):
...


# `config` implementations
# Implementation of "config(..)" calls in models
class ParseConfigObject(Config):
def __init__(self, model, context_config: Optional[ContextConfig]):
self.model = model
Expand Down Expand Up @@ -316,7 +316,7 @@ def __call__(self, *args, **kwargs):
raise RuntimeException(
'At parse time, did not receive a context config'
)
self.context_config.update_in_model_config(opts)
self.context_config.add_config_call(opts)
return ''

def set(self, name, value):
Expand Down
60 changes: 2 additions & 58 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
CompileResultNode, ManifestNode, NonSourceCompiledNode, GraphMemberNode
)
from dbt.contracts.graph.parsed import (
ParsedMacro, ParsedDocumentation, ParsedNodePatch, ParsedMacroPatch,
ParsedMacro, ParsedDocumentation,
ParsedSourceDefinition, ParsedExposure, HasUniqueID,
UnpatchedSourceDefinition, ManifestNodes
)
Expand All @@ -26,9 +26,7 @@
from dbt.dataclass_schema import dbtClassMixin
from dbt.exceptions import (
CompilationException,
raise_duplicate_resource_name, raise_compiler_error, warn_or_error,
raise_duplicate_patch_name,
raise_duplicate_macro_patch_name, raise_duplicate_source_patch_name,
raise_duplicate_resource_name, raise_compiler_error,
)
from dbt.helper_types import PathSet
from dbt.logger import GLOBAL_LOGGER as logger
Expand Down Expand Up @@ -718,60 +716,6 @@ def get_resource_fqns(self) -> Mapping[str, PathSet]:
resource_fqns[resource_type_plural].add(tuple(resource.fqn))
return resource_fqns

# This is called by 'parse_patch' in the NodePatchParser
def add_patch(
self, source_file: SchemaSourceFile, patch: ParsedNodePatch,
) -> None:
if patch.yaml_key in ['models', 'seeds', 'snapshots']:
unique_id = self.ref_lookup.get_unique_id(patch.name, None)
elif patch.yaml_key == 'analyses':
unique_id = self.analysis_lookup.get_unique_id(patch.name, None)
else:
raise dbt.exceptions.InternalException(
f'Unexpected yaml_key {patch.yaml_key} for patch in '
f'file {source_file.path.original_file_path}'
)
if unique_id is None:
# This will usually happen when a node is disabled
return

# patches can't be overwritten
node = self.nodes.get(unique_id)
if node:
if node.patch_path:
package_name, existing_file_path = node.patch_path.split('://')
raise_duplicate_patch_name(patch, existing_file_path)
source_file.append_patch(patch.yaml_key, unique_id)
node.patch(patch)

def add_macro_patch(
self, source_file: SchemaSourceFile, patch: ParsedMacroPatch,
) -> None:
# macros are fully namespaced
unique_id = f'macro.{patch.package_name}.{patch.name}'
macro = self.macros.get(unique_id)
if not macro:
warn_or_error(
f'WARNING: Found documentation for macro "{patch.name}" '
f'which was not found'
)
return
if macro.patch_path:
package_name, existing_file_path = macro.patch_path.split('://')
raise_duplicate_macro_patch_name(patch, existing_file_path)
source_file.macro_patches.append(unique_id)
macro.patch(patch)

def add_source_patch(
self, source_file: SchemaSourceFile, patch: SourcePatch,
) -> None:
# source patches must be unique
key = (patch.overrides, patch.name)
if key in self.source_patches:
raise_duplicate_source_patch_name(patch, self.source_patches[key])
self.source_patches[key] = patch
source_file.source_patches.append(key)

def get_used_schemas(self, resource_types=None):
return frozenset({
(node.database, node.schema) for node in
Expand Down
19 changes: 16 additions & 3 deletions core/dbt/contracts/graph/model_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,15 @@ def same_contents(
return False
return True

# This is used in 'add_config_call' to created the combined config_call_dict.
# 'meta' moved here from node
mergebehavior = {
"append": ['pre-hook', 'pre_hook', 'post-hook', 'post_hook', 'tags'],
"update": ['quoting', 'column_types', 'meta'],
}

@classmethod
def _extract_dict(
def _merge_dicts(
cls, src: Dict[str, Any], data: Dict[str, Any]
) -> Dict[str, Any]:
"""Find all the items in data that match a target_field on this class,
Expand Down Expand Up @@ -286,10 +293,10 @@ def update_from(

adapter_config_cls = get_config_class_by_name(adapter_type)

self_merged = self._extract_dict(dct, data)
self_merged = self._merge_dicts(dct, data)
dct.update(self_merged)

adapter_merged = adapter_config_cls._extract_dict(dct, data)
adapter_merged = adapter_config_cls._merge_dicts(dct, data)
dct.update(adapter_merged)

# any remaining fields must be "clobber"
Expand Down Expand Up @@ -322,6 +329,8 @@ class SourceConfig(BaseConfig):

@dataclass
class NodeConfig(BaseConfig):
# Note: if any new fields are added with MergeBehavior, also update the
# 'mergebehavior' dictionary
enabled: bool = True
materialized: str = 'view'
persist_docs: Dict[str, Any] = field(default_factory=dict)
Expand Down Expand Up @@ -370,6 +379,10 @@ class NodeConfig(BaseConfig):
)
full_refresh: Optional[bool] = None
on_schema_change: Optional[str] = 'ignore'
meta: Dict[str, Any] = field(
default_factory=dict,
metadata=MergeBehavior.Update.meta(),
)

@classmethod
def __pre_deserialize__(cls, data):
Expand Down
11 changes: 11 additions & 0 deletions core/dbt/contracts/graph/parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ def patch(self, patch: 'ParsedNodePatch'):
self.columns = patch.columns
self.meta = patch.meta
self.docs = patch.docs
if patch.config:
pass # for now. reorganizing code...
# we need to re-do the 'update_parsed_node_config' steps, i.e.
# apply dbt_project config, patch config, and model file config
if flags.STRICT_MODE:
# It seems odd that an instance can be invalid
# Maybe there should be validation or restrictions
Expand Down Expand Up @@ -203,6 +207,7 @@ class ParsedNodeDefaults(ParsedNodeMandatory):
deferred: bool = False
unrendered_config: Dict[str, Any] = field(default_factory=dict)
created_at: int = field(default_factory=lambda: int(time.time()))
config_call_dict: Dict[str, Any] = field(default_factory=dict)

def write_node(self, target_path: str, subdirectory: str, payload: str):
if (os.path.basename(self.path) ==
Expand All @@ -229,6 +234,11 @@ class ParsedNode(ParsedNodeDefaults, ParsedNodeMixins, SerializableType):
def _serialize(self):
return self.to_dict()

def __post_serialize__(self, dct):
if 'config_call_dict' in dct:
del dct['config_call_dict']
return dct

@classmethod
def _deserialize(cls, dct: Dict[str, int]):
# The serialized ParsedNodes do not differ from each other
Expand Down Expand Up @@ -456,6 +466,7 @@ class ParsedPatch(HasYamlMetadata, Replaceable):
description: str
meta: Dict[str, Any]
docs: Docs
config: Dict[str, Any]


# The parsed node update is only the 'patch', not the test. The test became a
Expand Down
12 changes: 9 additions & 3 deletions core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,17 @@ def file_id(self):


@dataclass
class UnparsedAnalysisUpdate(HasColumnDocs, HasDocs, HasYamlMetadata):
class HasConfig():
config: Dict[str, Any] = field(default_factory=dict)


@dataclass
class UnparsedAnalysisUpdate(HasConfig, HasColumnDocs, HasDocs, HasYamlMetadata):
pass


@dataclass
class UnparsedNodeUpdate(HasColumnTests, HasTests, HasYamlMetadata):
class UnparsedNodeUpdate(HasConfig, HasColumnTests, HasTests, HasYamlMetadata):
quote_columns: Optional[bool] = None


Expand All @@ -143,7 +148,7 @@ class MacroArgument(dbtClassMixin):


@dataclass
class UnparsedMacroUpdate(HasDocs, HasYamlMetadata):
class UnparsedMacroUpdate(HasConfig, HasDocs, HasYamlMetadata):
arguments: List[MacroArgument] = field(default_factory=list)


Expand Down Expand Up @@ -261,6 +266,7 @@ class UnparsedSourceDefinition(dbtClassMixin, Replaceable):
loaded_at_field: Optional[str] = None
tables: List[UnparsedSourceTableDefinition] = field(default_factory=list)
tags: List[str] = field(default_factory=list)
config: Dict[str, Any] = field(default_factory=dict)

@property
def yaml_key(self) -> 'str':
Expand Down
Loading

0 comments on commit 8b089af

Please sign in to comment.