Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): check for dbt materialization before proceeding #2842

Merged
merged 10 commits into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 71 additions & 51 deletions metadata-ingestion/src/datahub/ingestion/source/dbt.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, List, Optional

import dateutil.parser
Expand Down Expand Up @@ -50,6 +51,7 @@ class DBTConfig(ConfigModel):
node_type_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()


@dataclass
class DBTColumn:
name: str
comment: str
Expand All @@ -61,18 +63,23 @@ def __repr__(self):
return self.__class__.__name__ + str(tuple(sorted(fields))).replace("'", "")


@dataclass
class DBTNode:
dbt_name: str
database: str
schema: str
dbt_file_path: str
node_type: str # source, model
materialization: str # table, view, ephemeral
name: str # name, identifier
columns: List[DBTColumn]
upstream_urns: List[str]

datahub_urn: str

dbt_name: str
dbt_file_path: str

node_type: str # source, model
max_loaded_at: Optional[str]
materialization: Optional[str] # table, view, ephemeral

columns: List[DBTColumn] = field(default_factory=list)
upstream_urns: List[str] = field(default_factory=list)

def __repr__(self):
fields = tuple("{}={}".format(k, v) for k, v in self.__dict__.items())
Expand All @@ -87,11 +94,12 @@ def get_columns(catalog_node: dict) -> List[DBTColumn]:
for key in raw_columns:
raw_column = raw_columns[key]

dbtCol = DBTColumn()
dbtCol.comment = raw_column["comment"]
dbtCol.data_type = raw_column["type"]
dbtCol.index = raw_column["index"]
dbtCol.name = raw_column["name"]
dbtCol = DBTColumn(
comment=raw_column["comment"],
data_type=raw_column["type"],
index=raw_column["index"],
name=raw_column["name"],
)
columns.append(dbtCol)
return columns

Expand All @@ -111,26 +119,22 @@ def extract_dbt_entities(

dbt_entities = []
for key, node in all_manifest_entities.items():
dbtNode = DBTNode()

# check if node pattern allowed based on config file
if not node_type_pattern.allowed(node["resource_type"]):
continue
dbtNode.dbt_name = key
dbtNode.database = node["database"]
dbtNode.schema = node["schema"]
dbtNode.dbt_file_path = node["original_file_path"]
dbtNode.node_type = node["resource_type"]
dbtNode.max_loaded_at = sources_by_id.get(key, {}).get("max_loaded_at")

name = node["name"]

if "identifier" in node and not load_catalog:
dbtNode.name = node["identifier"]
else:
dbtNode.name = node["name"]
name = node["identifier"]

materialization = None
upstream_urns = []

if "materialized" in node["config"].keys():
if "materialized" in node.get("config", {}).keys():
# It's a model
dbtNode.materialization = node["config"]["materialized"]
dbtNode.upstream_urns = get_upstreams(
materialization = node["config"]["materialized"]
kevinhu marked this conversation as resolved.
Show resolved Hide resolved
upstream_urns = get_upstreams(
node["depends_on"]["nodes"],
all_manifest_entities,
load_catalog,
Expand All @@ -144,21 +148,42 @@ def extract_dbt_entities(
if catalog_node is None:
report.report_warning(
key,
f"Entity {dbtNode.dbt_name} is in manifest but missing from catalog",
f"Entity {name} is in manifest but missing from catalog",
)

else:

dbtNode.materialization = all_catalog_entities[key]["metadata"][
"type"
] # get materialization from catalog? required?
dbtNode.upstream_urns = []
upstream_urns = []

dbtNode = DBTNode(
dbt_name=key,
database=node["database"],
schema=node["schema"],
dbt_file_path=node["original_file_path"],
node_type=node["resource_type"],
max_loaded_at=sources_by_id.get(key, {}).get("max_loaded_at"),
name=name,
upstream_urns=upstream_urns,
materialization=materialization,
columns=[],
datahub_urn=get_urn_from_dbtNode(
node["database"],
node["schema"],
name,
target_platform,
environment,
),
)

# overwrite columns from catalog
if (
dbtNode.materialization != "ephemeral" and load_catalog
): # we don't want columns if platform isn't 'dbt'
logger.debug("Loading schema info")
catalog_node = all_catalog_entities.get(dbtNode.dbt_name)
catalog_node = all_catalog_entities.get(key)

if catalog_node is None:
report.report_warning(
Expand All @@ -171,14 +196,6 @@ def extract_dbt_entities(
else:
dbtNode.columns = []

dbtNode.datahub_urn = get_urn_from_dbtNode(
dbtNode.database,
dbtNode.schema,
dbtNode.name,
target_platform,
environment,
)

dbt_entities.append(dbtNode)

return dbt_entities
Expand Down Expand Up @@ -240,11 +257,18 @@ def get_urn_from_dbtNode(


def get_custom_properties(node: DBTNode) -> Dict[str, str]:
return {
"dbt_node_type": node.node_type,
"materialization": node.materialization,
"dbt_file_path": node.dbt_file_path,
}

custom_properties = {}

node_attributes = ["node_type", "materialization", "dbt_file_path"]

for attribute in node_attributes:
node_attribute_value = getattr(node, attribute)

if node_attribute_value is not None:
custom_properties[attribute] = node_attribute_value

return custom_properties


def get_upstreams(
Expand All @@ -257,20 +281,17 @@ def get_upstreams(
upstream_urns = []

for upstream in upstreams:
dbtNode_upstream = DBTNode()

dbtNode_upstream.database = all_nodes[upstream]["database"]
dbtNode_upstream.schema = all_nodes[upstream]["schema"]
if "identifier" in all_nodes[upstream] and not load_catalog:
dbtNode_upstream.name = all_nodes[upstream]["identifier"]
name = all_nodes[upstream]["identifier"]
else:
dbtNode_upstream.name = all_nodes[upstream]["name"]
name = all_nodes[upstream]["name"]

upstream_urns.append(
get_urn_from_dbtNode(
dbtNode_upstream.database,
dbtNode_upstream.schema,
dbtNode_upstream.name,
all_nodes[upstream]["database"],
all_nodes[upstream]["schema"],
name,
target_platform,
environment,
)
Expand Down Expand Up @@ -407,11 +428,10 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
urn=node.datahub_urn,
aspects=[],
)
custom_properties = get_custom_properties(node)

dbt_properties = DatasetPropertiesClass(
description=node.dbt_name,
customProperties=custom_properties,
customProperties=get_custom_properties(node),
tags=[],
)
dataset_snapshot.aspects.append(dbt_properties)
Expand Down
35 changes: 17 additions & 18 deletions metadata-ingestion/tests/integration/dbt/dbt_mces_golden.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"dbt_node_type": "model",
"node_type": "model",
"materialization": "ephemeral",
"dbt_file_path": "models/transform/customer_details.sql"
},
Expand Down Expand Up @@ -94,7 +94,7 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"dbt_node_type": "model",
"node_type": "model",
"materialization": "table",
"dbt_file_path": "models/billing/monthly_billing_with_cust.sql"
},
Expand Down Expand Up @@ -232,7 +232,7 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"dbt_node_type": "model",
"node_type": "model",
"materialization": "view",
"dbt_file_path": "models/base/payments_base.sql"
},
Expand Down Expand Up @@ -445,8 +445,8 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"dbt_node_type": "model",
"materialization": "table",
"node_type": "model",
"materialization": "BASE TABLE",
"dbt_file_path": "models/transform/payments_by_customer_by_month.sql"
},
"externalUrl": null,
Expand Down Expand Up @@ -559,7 +559,7 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"dbt_node_type": "source",
"node_type": "source",
"materialization": "BASE TABLE",
"dbt_file_path": "models/base.yml"
},
Expand Down Expand Up @@ -678,7 +678,7 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"dbt_node_type": "source",
"node_type": "source",
"materialization": "BASE TABLE",
"dbt_file_path": "models/base.yml"
},
Expand Down Expand Up @@ -857,7 +857,7 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"dbt_node_type": "source",
"node_type": "source",
"materialization": "BASE TABLE",
"dbt_file_path": "models/base.yml"
},
Expand Down Expand Up @@ -961,7 +961,7 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"dbt_node_type": "source",
"node_type": "source",
"materialization": "BASE TABLE",
"dbt_file_path": "models/base.yml"
},
Expand Down Expand Up @@ -1080,7 +1080,7 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"dbt_node_type": "source",
"node_type": "source",
"materialization": "BASE TABLE",
"dbt_file_path": "models/base.yml"
},
Expand Down Expand Up @@ -1184,7 +1184,7 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"dbt_node_type": "source",
"node_type": "source",
"materialization": "BASE TABLE",
"dbt_file_path": "models/base.yml"
},
Expand Down Expand Up @@ -1393,7 +1393,7 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"dbt_node_type": "source",
"node_type": "source",
"materialization": "BASE TABLE",
"dbt_file_path": "models/base.yml"
},
Expand Down Expand Up @@ -1542,7 +1542,7 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"dbt_node_type": "source",
"node_type": "source",
"materialization": "BASE TABLE",
"dbt_file_path": "models/base.yml"
},
Expand Down Expand Up @@ -1691,7 +1691,7 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"dbt_node_type": "source",
"node_type": "source",
"materialization": "BASE TABLE",
"dbt_file_path": "models/base.yml"
},
Expand Down Expand Up @@ -1840,7 +1840,7 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"dbt_node_type": "source",
"node_type": "source",
"materialization": "BASE TABLE",
"dbt_file_path": "models/base.yml"
},
Expand Down Expand Up @@ -1989,7 +1989,7 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"dbt_node_type": "source",
"node_type": "source",
"materialization": "BASE TABLE",
"dbt_file_path": "models/base.yml"
},
Expand Down Expand Up @@ -2138,8 +2138,7 @@
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"dbt_node_type": "source",
"materialization": "BASE TABLE",
"node_type": "source",
"dbt_file_path": "models/base.yml"
},
"externalUrl": null,
Expand Down