Skip to content

Commit

Permalink
add edges
Browse files Browse the repository at this point in the history
  • Loading branch information
ttusing committed Oct 20, 2024
1 parent a0674db commit c239270
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,10 @@ def add_test_edges(self, manifest: Manifest) -> None:
#
# Produce the following graph:
# model1 --> model2 --> model3
# | /\ | /\ /\
# | | \/ | |
# \/ | test2 ----| |
# test1 ----|---------------|
# | /\ | /\
# | | \/ |
# \/ | test2 ----|
# test1 ----|

for node_id in self.graph:
# If node is executable (in manifest.nodes) and does _not_
Expand All @@ -224,11 +224,17 @@ def add_test_edges(self, manifest: Manifest) -> None:
# Get *everything* upstream of the node
all_upstream_nodes = nx.traversal.bfs_tree(self.graph, node_id, reverse=True)
# Get the set of upstream nodes not including the current node.
upstream_nodes = set([n for n in all_upstream_nodes if n != node_id])
all_upstream_nodes = set([n for n in all_upstream_nodes if n != node_id])

# Get all tests that depend on any upstream nodes.
# Get 1 depth upsteam of the node
direct_upsteam_nodes = nx.traversal.bfs_tree(
self.graph, node_id, reverse=True, depth_limit=1
)
direct_upsteam_nodes = set([n for n in direct_upsteam_nodes if n != node_id])

# Get all tests that depend on direct upstream nodes.
upstream_tests = []
for upstream_node in upstream_nodes:
for upstream_node in direct_upsteam_nodes:
# This gets tests with unique_ids starting with "test."
upstream_tests += _get_tests_for_node(manifest, upstream_node)

Expand All @@ -244,7 +250,7 @@ def add_test_edges(self, manifest: Manifest) -> None:
# If the set of nodes that an upstream test depends on
# is a subset of all upstream nodes of the current node,
# add an edge from the upstream test to the current node.
if test_depends_on.issubset(upstream_nodes):
if test_depends_on.issubset(all_upstream_nodes):
self.graph.add_edge(upstream_test, node_id, edge_type="parent_test")

def get_graph(self, manifest: Manifest) -> Graph:
Expand Down

0 comments on commit c239270

Please sign in to comment.