Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-2723] [spike+] Maximally parallelize dbt clone operations, a different mechanism for processing a queue #7914

Closed
Tracked by #7301
aranke opened this issue Jun 21, 2023 · 7 comments · Fixed by #10129
Assignees
Labels
clone related to the dbt clone command enhancement New feature or request performance

Comments

@aranke
Copy link
Member

aranke commented Jun 21, 2023

Originally posted by @jtcohen6 in #7881 (comment)

One thing I noticed in my previous spike, and I'm seeing in this implementation as well: Even though each node's clone operation is completely independent of any other node, we are still running clone in DAG order. That significantly limits how fast this can be with --threads 1000.

I'm not sure how to fix that, and it will require digging into our base task / graph queue logic — but I suspect that will be key to unlocking a lot of the value here.

Repro

-- models/model_a.sql
select 1 as id

-- models/model_b.sql
select * from {{ ref('model_a') }} 
$ dbt run --target prod
$ mkdir state
$ mv target/manifest.json state/
$ dbt clone --target dev --state state --full-refresh

From the logs: Notice that model_b runs in serial after model_a, even though they're on different threads. dbt understands that model_b depends on model_a, even though for the purposes of the clone task, it really doesn't.

============================== 11:36:51.356566 | 2fab3bf3-3220-47f5-a842-a8e7aee780d1 ==============================
�[0m11:36:51.356566 [info ] [MainThread]: Running with dbt=1.6.0-b4
�[0m11:36:51.360523 [debug] [MainThread]: running dbt with arguments {'printer_width': '80', 'indirect_selection': 'eager', 'write_json': 'True', 'log_cache_events': 'False', 'partial_parse': 'True', 'cache_selected_only': 'False', 'warn_error': 'None', 'version_check': 'False', 'debug': 'False', 'log_path': '/Users/jerco/dev/scratch/testy/logs', 'fail_fast': 'False', 'profiles_dir': '/Users/jerco/.dbt', 'use_colors': 'True', 'use_experimental_parser': 'False', 'no_print': 'None', 'quiet': 'False', 'warn_error_options': 'WarnErrorOptions(include=[], exclude=[])', 'static_parser': 'True', 'log_format': 'default', 'introspect': 'True', 'target_path': 'None', 'send_anonymous_usage_stats': 'False'}
�[0m11:36:51.616469 [info ] [MainThread]: Registered adapter: snowflake=1.6.0-b3
�[0m11:36:51.632107 [debug] [MainThread]: checksum: 85def1bdb9aa2c18a6a11d52b89108c001273905a0f5fd72c238caeba233ef8d, vars: {}, profile: , target: dev, version: 1.6.0b4
�[0m11:36:51.669565 [debug] [MainThread]: Partial parsing enabled: 0 files deleted, 0 files added, 0 files changed.
�[0m11:36:51.669910 [debug] [MainThread]: Partial parsing enabled, no changes found, skipping parsing
�[0m11:36:51.670469 [debug] [MainThread]: Publication artifact available
�[0m11:36:51.692800 [info ] [MainThread]: Found 2 models, 0 tests, 0 snapshots, 0 analyses, 371 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics, 0 groups
�[0m11:36:51.694034 [info ] [MainThread]: 
�[0m11:36:51.694695 [debug] [MainThread]: Acquiring new snowflake connection 'master'
�[0m11:36:51.700555 [debug] [ThreadPool]: Acquiring new snowflake connection 'list_analytics'
�[0m11:36:51.712206 [debug] [ThreadPool]: Using snowflake connection "list_analytics"
�[0m11:36:51.712583 [debug] [ThreadPool]: On list_analytics: /* {"app": "dbt", "dbt_version": "1.6.0b4", "profile_name": "sandbox-snowflake", "target_name": "dev", "connection_name": "list_analytics"} */
show terse schemas in database analytics
    limit 10000
�[0m11:36:51.712825 [debug] [ThreadPool]: Opening a new connection, currently in state init
�[0m11:36:52.957601 [debug] [ThreadPool]: SQL status: SUCCESS 119 in 1.0 seconds
�[0m11:36:53.011881 [debug] [ThreadPool]: On list_analytics: Close
�[0m11:36:53.372661 [debug] [ThreadPool]: Re-using an available connection from the pool (formerly list_analytics, now list_analytics_dbt_jcohen_dev)
�[0m11:36:53.381793 [debug] [ThreadPool]: Using snowflake connection "list_analytics_dbt_jcohen_dev"
�[0m11:36:53.382231 [debug] [ThreadPool]: On list_analytics_dbt_jcohen_dev: /* {"app": "dbt", "dbt_version": "1.6.0b4", "profile_name": "sandbox-snowflake", "target_name": "dev", "connection_name": "list_analytics_dbt_jcohen_dev"} */
show terse objects in analytics.dbt_jcohen_dev limit 10000
�[0m11:36:53.382470 [debug] [ThreadPool]: Opening a new connection, currently in state closed
�[0m11:36:54.382890 [debug] [ThreadPool]: SQL status: SUCCESS 2 in 1.0 seconds
�[0m11:36:54.387887 [debug] [ThreadPool]: On list_analytics_dbt_jcohen_dev: Close
�[0m11:36:54.748809 [info ] [MainThread]: Concurrency: 8 threads (target='dev')
�[0m11:36:54.749807 [info ] [MainThread]: 
�[0m11:36:54.755801 [debug] [Thread-1 (]: Began running node model.my_dbt_project.model_a
�[0m11:36:54.756728 [debug] [Thread-1 (]: Re-using an available connection from the pool (formerly list_analytics_dbt_jcohen_dev, now model.my_dbt_project.model_a)
�[0m11:36:54.757151 [debug] [Thread-1 (]: Began compiling node model.my_dbt_project.model_a
�[0m11:36:54.757645 [debug] [Thread-1 (]: Timing info for model.my_dbt_project.model_a (compile): 11:36:54.757426 => 11:36:54.757430
�[0m11:36:54.758014 [debug] [Thread-1 (]: Began executing node model.my_dbt_project.model_a
�[0m11:36:54.775804 [debug] [Thread-1 (]: On "model.my_dbt_project.model_a": cache miss for schema "analytics.dbt_jcohen_prod", this is inefficient
�[0m11:36:54.778532 [debug] [Thread-1 (]: Using snowflake connection "model.my_dbt_project.model_a"
�[0m11:36:54.778925 [debug] [Thread-1 (]: On model.my_dbt_project.model_a: /* {"app": "dbt", "dbt_version": "1.6.0b4", "profile_name": "sandbox-snowflake", "target_name": "dev", "node_id": "model.my_dbt_project.model_a"} */
show terse objects in analytics.dbt_jcohen_prod limit 10000
�[0m11:36:54.779255 [debug] [Thread-1 (]: Opening a new connection, currently in state closed
�[0m11:36:55.787694 [debug] [Thread-1 (]: SQL status: SUCCESS 5 in 1.0 seconds
�[0m11:36:55.793394 [debug] [Thread-1 (]: While listing relations in database=analytics, schema=dbt_jcohen_prod, found: ANOTHER_MODEL, MODEL_A, MODEL_B, MY_MODEL, MY_SEED
�[0m11:36:55.842689 [debug] [Thread-1 (]: Writing runtime sql for node "model.my_dbt_project.model_a"
�[0m11:36:55.845516 [debug] [Thread-1 (]: Using snowflake connection "model.my_dbt_project.model_a"
�[0m11:36:55.845921 [debug] [Thread-1 (]: On model.my_dbt_project.model_a: /* {"app": "dbt", "dbt_version": "1.6.0b4", "profile_name": "sandbox-snowflake", "target_name": "dev", "node_id": "model.my_dbt_project.model_a"} */
create or replace   view analytics.dbt_jcohen_dev.model_a
  
   as (
    
        select * from analytics.dbt_jcohen_prod.model_a
    
  );
�[0m11:36:56.184105 [debug] [Thread-1 (]: SQL status: SUCCESS 1 in 0.0 seconds
�[0m11:36:56.215770 [debug] [Thread-1 (]: Timing info for model.my_dbt_project.model_a (execute): 11:36:54.758266 => 11:36:56.215606
�[0m11:36:56.216137 [debug] [Thread-1 (]: On model.my_dbt_project.model_a: Close
�[0m11:36:56.587539 [debug] [Thread-1 (]: Finished running node model.my_dbt_project.model_a
�[0m11:36:56.589991 [debug] [Thread-3 (]: Began running node model.my_dbt_project.model_b
�[0m11:36:56.591451 [debug] [Thread-3 (]: Acquiring new snowflake connection 'model.my_dbt_project.model_b'
�[0m11:36:56.592146 [debug] [Thread-3 (]: Began compiling node model.my_dbt_project.model_b
�[0m11:36:56.592824 [debug] [Thread-3 (]: Timing info for model.my_dbt_project.model_b (compile): 11:36:56.592575 => 11:36:56.592579
�[0m11:36:56.593252 [debug] [Thread-3 (]: Began executing node model.my_dbt_project.model_b
�[0m11:36:56.600984 [debug] [Thread-3 (]: Writing runtime sql for node "model.my_dbt_project.model_b"
�[0m11:36:56.602917 [debug] [Thread-3 (]: Using snowflake connection "model.my_dbt_project.model_b"
�[0m11:36:56.603500 [debug] [Thread-3 (]: On model.my_dbt_project.model_b: /* {"app": "dbt", "dbt_version": "1.6.0b4", "profile_name": "sandbox-snowflake", "target_name": "dev", "node_id": "model.my_dbt_project.model_b"} */
create or replace   view analytics.dbt_jcohen_dev.model_b
  
   as (
    
        select * from analytics.dbt_jcohen_prod.model_b
    
  );
�[0m11:36:56.603966 [debug] [Thread-3 (]: Opening a new connection, currently in state init
�[0m11:36:57.722641 [debug] [Thread-3 (]: SQL status: SUCCESS 1 in 1.0 seconds
�[0m11:36:57.728537 [debug] [Thread-3 (]: Timing info for model.my_dbt_project.model_b (execute): 11:36:56.593543 => 11:36:57.728157
�[0m11:36:57.729557 [debug] [Thread-3 (]: On model.my_dbt_project.model_b: Close
�[0m11:36:58.086873 [debug] [Thread-3 (]: Finished running node model.my_dbt_project.model_b
�[0m11:36:58.089987 [debug] [MainThread]: Connection 'master' was properly closed.
�[0m11:36:58.090496 [debug] [MainThread]: Connection 'model.my_dbt_project.model_a' was properly closed.
�[0m11:36:58.090840 [debug] [MainThread]: Connection 'model.my_dbt_project.model_b' was properly closed.
�[0m11:36:58.091862 [debug] [MainThread]: Command end result
�[0m11:36:58.108549 [info ] [MainThread]: 
�[0m11:36:58.109036 [info ] [MainThread]: �[32mCompleted successfully�[0m
�[0m11:36:58.109432 [info ] [MainThread]: 
�[0m11:36:58.109785 [info ] [MainThread]: Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2
�[0m11:36:58.110456 [debug] [MainThread]: Command `dbt clone` succeeded at 11:36:58.110325 after 6.42 seconds
�[0m11:36:58.110747 [debug] [MainThread]: Flushing usage events
@github-actions github-actions bot changed the title Run dbt clone operations in parallel [CT-2723] Run dbt clone operations in parallel Jun 21, 2023
@aranke aranke mentioned this issue Jun 21, 2023
6 tasks
@aranke aranke self-assigned this Jun 21, 2023
@jtcohen6 jtcohen6 added enhancement New feature or request performance labels Jun 21, 2023
@jtcohen6
Copy link
Contributor

I said in #7881 (comment):

IIRC - compile and docs generate don't run in DAG order. (They also interrupt the entire command on the first failure encountered during compilation.) I think the inconsistency is justifiable, if the parallelism delivers significantly better performance.

After looking into it a bit more, I don't think this is actually is true. It would make sense, though — even if model_b depends on model_a (e.g. for an introspective query), we're not actually materializing model_a, so it makes no difference whether we compile it before/after/concurrently with model_b.

So: This would be a first in dbt, and require some modification to how we iterate over the job/graph queue.

@jtcohen6 jtcohen6 changed the title [CT-2723] Run dbt clone operations in parallel [CT-2723] Maximally parallelize dbt clone operations Jul 14, 2023
@graciegoheen
Copy link
Contributor

graciegoheen commented Aug 21, 2023

Notes from refinement:

  • where would we want to put this change?
  • we could remove all edges in the node so they're all the "same", but we'd be constructing a graph in a way different from the DAG order
  • for clone, we should always be using the maximum number of threads
  • instead, could we run in a different "mode" (where we don't worry about if they're are edges)
  • this may have implications to ephemeral models

Where would this speed things up and by how much? @jtcohen6 do you have an example here to understand the concrete benefit of doing this - we'd like to understand what the performance gain would be to decide if this change is worth it

@jtcohen6
Copy link
Contributor

We still need to construct a DAG for the dbt clone task to support node selection — but when it comes time to execute, we can treat it as a "flat" list, not a directed graph with edges. I agree a different run "mode" makes sense here. The same "run mode" could apply to compile + docs generate.

It's a fair thought re: ephemeral models. The good news is, at least for dbt clone, they're totally irrelevant: They're not cloned, and we don't need to interpolate them into the compiled_code of other nodes.

I believe this would significantly speed up the execution of dbt clone, though "by how much" will depend on the size & shape of a given project's DAG and the actual concurrency that Snowflake can handle.

In theory, if you're running with as many threads as you have selected models for cloning:

  • Let's say it takes X seconds to clone one model (whether creating a table clone or a "pointer" view). Conservatively, let's say this is ~2 seconds.
  • Current state is that dbt clone will take no less than X seconds * Y, where Y is the greatest parent-child depth in the DAG, because each parent will be cloned before its child.
  • This could be as fast as X (total) — up to the level of parallelism that Snowflake can actually support, which is not documented :)

Meaning: The maximum theoretical speedup is to run this Y times faster, where Y is the greatest parent-child depth among selected resources. In dbt Labs' internal-analytics project, Y = 31.

import networkx
G = networkx.readwrite.gpickle.read_gpickle("target/graph.gpickle")
networkx.dag_longest_path(G)
print(len(networkx.dag_longest_path(G)))

Instead of dbt clone taking ~200 seconds, it could be taking <10 seconds. In theory!!

We're never going to achieve that theoretical speedup in practice, but I'd hope it could be pretty darn significant. There is the additional latency due to dbt maintaining a central adapter cache, which each thread must lock while updating. @peterallenwebb had identified some bonus cache-related slowness in #6844. I had tried pulling that change into my previous spike PR, and it shaved off ~40% of the total runtime: #7258 (comment).

@graciegoheen
Copy link
Contributor

The same "run mode" could apply to compile + docs generate.

I believe this will also be relevant for our unit testing work, since unit tests do not need to run in DAG order

@MichelleArk
Copy link
Contributor

A simple approach that might work here to achieve an 'maximally parallelized execution mode' would be to modify the get_graph_queue method to accept an optional config that builds the graph queue without any edges.

@graciegoheen
Copy link
Contributor

graciegoheen commented Jan 2, 2024

More notes from refinement:

  • we should do something deterministic here
  • this feels similar to the ordering of the unit tests in the build command (having some flexibility around execution order, a more general approach)
  • for clone, every node becomes independent -> in theory, it could run fully in parallel
  • PriorityQueue determines the order things should be run in, could we use an alternative queue object (flat)?
  • alt. make the DAG without any edges -> just a flat list
  • just depends on where we implement it -> in the queue object vs. changing the DAG
  • theme: the DAG doesn't quite tell us what the execution order is... we need a more general, less coupled mechanism for determining execution order
  • the queue starts with small subset, then looks for more things to add -> instead we have a mode that just accepts a list and works through that list (maybe we need a different mechanism for processing a queue, new flag or new queue object)
  • testing this could be tricky

@graciegoheen graciegoheen changed the title [CT-2723] Maximally parallelize dbt clone operations [CT-2723] [spike+] Maximally parallelize dbt clone operations, a different mechanism for processing a queue Jan 2, 2024
@dbeatty10 dbeatty10 added the clone related to the dbt clone command label Feb 1, 2024
@ChenyuLInx
Copy link
Contributor

This might also apply to unittest(in test command).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
clone related to the dbt clone command enhancement New feature or request performance
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants