-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor physical create_initial_plan to iteratively & concurrently construct plan from the bottom up #10023
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keen to get some thoughts on this. Even if we decide not to go ahead with this, it was a fun exercise to try out 🙂
let planning_concurrency = session_state | ||
.config_options() | ||
.execution | ||
.planning_concurrency; | ||
// Can never spawn more tasks than leaves in the tree, as these tasks must | ||
// all converge down to the root node, which can only be processed by a | ||
// single task. | ||
let max_concurrency = planning_concurrency.min(flat_tree_leaf_indices.len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this may be more accurate than what is currently present:
Because the current create_initial_plan_multi
could be called multiple times, and the planning_concurrency
is only enforced within this function call, so if its called multiple times it can spawn more tasks than is configured by planning_concurrency
, I think?. Maybe this is intended?
Either way, with this new code, it will actually limit how many tasks are building the tree for the entire initial planning process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the original behavior is intended. Your change makes sense to me
/// Given a single LogicalPlan node, map it to it's physical ExecutionPlan counterpart. | ||
async fn map_logical_node_to_physical( | ||
&self, | ||
node: &LogicalPlan, | ||
session_state: &SessionState, | ||
// TODO: refactor to not use Vec? Wasted for leaves/1 child | ||
mut children: Vec<Arc<dyn ExecutionPlan>>, | ||
) -> Result<Arc<dyn ExecutionPlan>> { | ||
let exec_node: Arc<dyn ExecutionPlan> = match node { | ||
// Leaves (no children) | ||
LogicalPlan::TableScan(TableScan { | ||
source, | ||
projection, | ||
filters, | ||
fetch, | ||
.. | ||
}) => { | ||
let source = source_as_provider(source)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is largely unchanged, except for some minor changes to accommodate passing in the children, and also for joins (see next note). Can extract to separate PR to make things cleaner?
let (new_logical, physical_left, physical_right) = if has_expr_join_key { | ||
// TODO: Can we extract this transformation to somewhere before physical plan | ||
// creation? | ||
let (left_keys, right_keys): (Vec<_>, Vec<_>) = | ||
keys.iter().cloned().unzip(); | ||
|
||
let (left, left_col_keys, left_projected) = | ||
wrap_projection_for_join_if_necessary( | ||
&left_keys, | ||
left.as_ref().clone(), | ||
)?; | ||
let (right, right_col_keys, right_projected) = | ||
wrap_projection_for_join_if_necessary( | ||
&right_keys, | ||
right.as_ref().clone(), | ||
)?; | ||
let column_on = (left_col_keys, right_col_keys); | ||
|
||
let left = Arc::new(left); | ||
let right = Arc::new(right); | ||
let new_join = LogicalPlan::Join(Join::try_new_with_project_input( | ||
node, | ||
left.clone(), | ||
right.clone(), | ||
column_on, | ||
)?); | ||
|
||
Ok(Arc::new(UnionExec::new(physical_plans))) | ||
} | ||
LogicalPlan::Repartition(Repartition { | ||
input, | ||
partitioning_scheme, | ||
}) => { | ||
let physical_input = self.create_initial_plan(input, session_state).await?; | ||
let input_dfschema = input.as_ref().schema(); | ||
let physical_partitioning = match partitioning_scheme { | ||
LogicalPartitioning::RoundRobinBatch(n) => { | ||
Partitioning::RoundRobinBatch(*n) | ||
} | ||
LogicalPartitioning::Hash(expr, n) => { | ||
let runtime_expr = expr | ||
.iter() | ||
.map(|e| { | ||
self.create_physical_expr( | ||
e, | ||
input_dfschema, | ||
session_state, | ||
) | ||
}) | ||
.collect::<Result<Vec<_>>>()?; | ||
Partitioning::Hash(runtime_expr, *n) | ||
} | ||
LogicalPartitioning::DistributeBy(_) => { | ||
return not_impl_err!("Physical plan does not support DistributeBy partitioning"); | ||
} | ||
// If inputs were projected then create ExecutionPlan for these new | ||
// LogicalPlan nodes. | ||
let physical_left = match (left_projected, left.as_ref()) { | ||
// If left_projected is true we are guaranteed that left is a Projection | ||
( | ||
true, | ||
LogicalPlan::Projection(Projection { input, expr, .. }), | ||
) => self.create_project_physical_exec( | ||
session_state, | ||
physical_left, | ||
input, | ||
expr, | ||
)?, | ||
_ => physical_left, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kinda nasty, as I mentioned here #9573 (comment)
Maybe can split this off too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it is nasty and splitting it off, or moving it to some other part of the code I think sounds like a good idea to me
Perhaps as a follow on PR
fn create_project_physical_exec( | ||
&self, | ||
session_state: &SessionState, | ||
input_exec: Arc<dyn ExecutionPlan>, | ||
input: &Arc<LogicalPlan>, | ||
expr: &[Expr], | ||
) -> Result<Arc<dyn ExecutionPlan>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just extracted this into separate function as it was also used by Join
to create the physical projections that are added during this planning if join has expression equijoin keys.
/benchmark |
Benchmark resultsBenchmarks comparing 75c399c (main) and cf594d6 (PR)
|
/benchmark |
Benchmark resultsBenchmarks comparing 5820507 (main) and e1b41a9 (PR)
|
This looks pretty awesome @Jefffrey -- thank you. I hope to review it later today but it will likely be tomorrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @Jefffrey -- I went through this PR carefully and it is amazing -- both a joy to read and beautifully structured. Thank you for this contribution 🙏 . A very fine piece of engineering work indeed 🏆
I think this is a really cool design and would be awesome to add a high level overview (maybe as comments on pub struct DefaultPhysicalPlanner
) explaining the process. The comments in this PR do a great job already explaining the implemention, I was just thinking of something slightly higher level for people who aren't going to read the implementation -- like that the plan is converted in parallel, based on the configuration etc. This could be done as a follow on PR or never.
I ran the sql_planner
benchmarks and they show this PR seems to improve planning speed slightly. Very cool
cargo bench --bench sql_planner
++ critcmp main refactor_create_initial_plan
group main refactor_create_initial_plan
----- ---- ----------------------------
logical_aggregate_with_join 1.00 1184.2±11.71µs ? ?/sec 1.00 1187.6±17.96µs ? ?/sec
logical_plan_tpcds_all 1.01 155.4±0.74ms ? ?/sec 1.00 154.2±0.77ms ? ?/sec
logical_plan_tpch_all 1.01 16.7±0.19ms ? ?/sec 1.00 16.5±0.16ms ? ?/sec
logical_select_all_from_1000 1.05 19.4±0.10ms ? ?/sec 1.00 18.4±0.08ms ? ?/sec
logical_select_one_from_700 1.00 782.8±18.46µs ? ?/sec 1.00 786.5±6.83µs ? ?/sec
logical_trivial_join_high_numbered_columns 1.00 727.1±6.67µs ? ?/sec 1.01 731.6±6.56µs ? ?/sec
logical_trivial_join_low_numbered_columns 1.00 715.6±7.55µs ? ?/sec 1.01 719.8±22.84µs ? ?/sec
physical_plan_tpcds_all 1.02 1845.7±5.15ms ? ?/sec 1.00 1809.8±5.84ms ? ?/sec
physical_plan_tpch_all 1.02 119.7±0.76ms ? ?/sec 1.00 117.8±0.54ms ? ?/sec
physical_plan_tpch_q1 1.02 7.3±0.05ms ? ?/sec 1.00 7.1±0.05ms ? ?/sec
physical_plan_tpch_q10 1.02 5.5±0.03ms ? ?/sec 1.00 5.4±0.02ms ? ?/sec
physical_plan_tpch_q11 1.02 4.8±0.03ms ? ?/sec 1.00 4.8±0.11ms ? ?/sec
physical_plan_tpch_q12 1.02 3.9±0.02ms ? ?/sec 1.00 3.8±0.02ms ? ?/sec
physical_plan_tpch_q13 1.02 2.6±0.03ms ? ?/sec 1.00 2.6±0.02ms ? ?/sec
physical_plan_tpch_q14 1.02 3.3±0.02ms ? ?/sec 1.00 3.3±0.02ms ? ?/sec
physical_plan_tpch_q16 1.01 4.9±0.03ms ? ?/sec 1.00 4.8±0.02ms ? ?/sec
physical_plan_tpch_q17 1.01 4.6±0.03ms ? ?/sec 1.00 4.6±0.02ms ? ?/sec
physical_plan_tpch_q18 1.02 5.0±0.04ms ? ?/sec 1.00 4.9±0.04ms ? ?/sec
physical_plan_tpch_q19 1.01 9.4±0.05ms ? ?/sec 1.00 9.4±0.05ms ? ?/sec
physical_plan_tpch_q2 1.02 10.6±0.08ms ? ?/sec 1.00 10.4±0.06ms ? ?/sec
physical_plan_tpch_q20 1.01 6.1±0.04ms ? ?/sec 1.00 6.0±0.03ms ? ?/sec
physical_plan_tpch_q21 1.02 8.3±0.05ms ? ?/sec 1.00 8.1±0.05ms ? ?/sec
physical_plan_tpch_q22 1.03 4.4±0.02ms ? ?/sec 1.00 4.3±0.03ms ? ?/sec
physical_plan_tpch_q3 1.02 3.9±0.02ms ? ?/sec 1.00 3.8±0.02ms ? ?/sec
physical_plan_tpch_q4 1.03 2.9±0.02ms ? ?/sec 1.00 2.8±0.02ms ? ?/sec
physical_plan_tpch_q5 1.01 5.6±0.03ms ? ?/sec 1.00 5.5±0.03ms ? ?/sec
physical_plan_tpch_q6 1.02 1995.8±10.47µs ? ?/sec 1.00 1952.7±14.95µs ? ?/sec
physical_plan_tpch_q7 1.02 7.5±0.05ms ? ?/sec 1.00 7.4±0.05ms ? ?/sec
physical_plan_tpch_q8 1.01 9.5±0.07ms ? ?/sec 1.00 9.4±0.06ms ? ?/sec
physical_plan_tpch_q9 1.01 7.2±0.03ms ? ?/sec 1.00 7.1±0.05ms ? ?/sec
physical_select_all_from_1000 1.05 128.5±0.62ms ? ?/sec 1.00 122.4±0.46ms ? ?/sec
physical_select_one_from_700 1.00 4.0±0.02ms ? ?/sec 1.00 4.0±0.02ms ? ?/sec
cc @crepererum
Note to other reviewers: I found whitespace blind diff made the changes easier to review: https://github.com/apache/arrow-datafusion/pull/10023/files?w=1
let planning_concurrency = session_state | ||
.config_options() | ||
.execution | ||
.planning_concurrency; | ||
// Can never spawn more tasks than leaves in the tree, as these tasks must | ||
// all converge down to the root node, which can only be processed by a | ||
// single task. | ||
let max_concurrency = planning_concurrency.min(flat_tree_leaf_indices.len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the original behavior is intended. Your change makes sense to me
} | ||
|
||
/// Create a physical plan from a logical plan | ||
fn create_initial_plan<'a>( | ||
/// These tasks start at a leaf and traverse up the tree towards the root, building |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very clever
let mut guard = children.lock().unwrap(); | ||
// Safe unwrap on option as only the last task reaching this | ||
// node will take the contents (which happens after this line). | ||
let children = guard.as_mut().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could return an internal error instead of panic if for some reason the option was None
// all children. | ||
// | ||
// This take is the only place the Option becomes None. | ||
guard.take().unwrap() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
likewise it would be sweet if this was an internal error rather than panic. but I don't think it is necessary, just a suggestion
let (new_logical, physical_left, physical_right) = if has_expr_join_key { | ||
// TODO: Can we extract this transformation to somewhere before physical plan | ||
// creation? | ||
let (left_keys, right_keys): (Vec<_>, Vec<_>) = | ||
keys.iter().cloned().unzip(); | ||
|
||
let (left, left_col_keys, left_projected) = | ||
wrap_projection_for_join_if_necessary( | ||
&left_keys, | ||
left.as_ref().clone(), | ||
)?; | ||
let (right, right_col_keys, right_projected) = | ||
wrap_projection_for_join_if_necessary( | ||
&right_keys, | ||
right.as_ref().clone(), | ||
)?; | ||
let column_on = (left_col_keys, right_col_keys); | ||
|
||
let left = Arc::new(left); | ||
let right = Arc::new(right); | ||
let new_join = LogicalPlan::Join(Join::try_new_with_project_input( | ||
node, | ||
left.clone(), | ||
right.clone(), | ||
column_on, | ||
)?); | ||
|
||
Ok(Arc::new(UnionExec::new(physical_plans))) | ||
} | ||
LogicalPlan::Repartition(Repartition { | ||
input, | ||
partitioning_scheme, | ||
}) => { | ||
let physical_input = self.create_initial_plan(input, session_state).await?; | ||
let input_dfschema = input.as_ref().schema(); | ||
let physical_partitioning = match partitioning_scheme { | ||
LogicalPartitioning::RoundRobinBatch(n) => { | ||
Partitioning::RoundRobinBatch(*n) | ||
} | ||
LogicalPartitioning::Hash(expr, n) => { | ||
let runtime_expr = expr | ||
.iter() | ||
.map(|e| { | ||
self.create_physical_expr( | ||
e, | ||
input_dfschema, | ||
session_state, | ||
) | ||
}) | ||
.collect::<Result<Vec<_>>>()?; | ||
Partitioning::Hash(runtime_expr, *n) | ||
} | ||
LogicalPartitioning::DistributeBy(_) => { | ||
return not_impl_err!("Physical plan does not support DistributeBy partitioning"); | ||
} | ||
// If inputs were projected then create ExecutionPlan for these new | ||
// LogicalPlan nodes. | ||
let physical_left = match (left_projected, left.as_ref()) { | ||
// If left_projected is true we are guaranteed that left is a Projection | ||
( | ||
true, | ||
LogicalPlan::Projection(Projection { input, expr, .. }), | ||
) => self.create_project_physical_exec( | ||
session_state, | ||
physical_left, | ||
input, | ||
expr, | ||
)?, | ||
_ => physical_left, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it is nasty and splitting it off, or moving it to some other part of the code I think sounds like a good idea to me
Perhaps as a follow on PR
} | ||
|
||
/// Given a single LogicalPlan node, map it to it's physical ExecutionPlan counterpart. | ||
async fn map_logical_node_to_physical( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand it the main reason this structure reduces stack space is that this function requires a non trivial stack frame, but now instead of recursively calling itself (which results in many such frames on the stack) it calls itself iteratively (basically pushing the results on to a Vec)
Or put another way, map_logical_node_to_physical
never calls map_logical_node_to_physical
} | ||
|
||
// 1 Child | ||
LogicalPlan::Copy(CopyTo { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As some follow on PR, we could refactor this logic out of one giant match statement into functions like
match plan {
...
LogicalPlan::Copy(copy_to) => copy_to_physical(copy_to),
...
}
But after this PR that refactoring seems like it would mostly improve readability rather than any stack usage
Thanks for the review. I will definitely add that higher level comment and clean up some of the unwraps for sure, though I am travelling until Monday 👍 |
I will also review this after you finish @alamb's review. Thanks for working on the issue. Can you also enable |
I've cleaned up the unwraps and added the high level doc
Uncommented, seems like it succeeds now (at least locally) 👍 |
- Use tokio::Mutex in async environment - Remove Option from enum, since it is only used for taking.
I've just sent a small commit to lighten your load, considering the effort you've already put in.
Your pull request appears flawless. Much appreciated for all the hard work. |
c6eaf74 looks good to me (it makes sense to avoid the use of |
🚀 |
Thanks again @Jefffrey -- this is epic. Also thanks @metesynnada for the review and improvement |
Which issue does this PR close?
Closes #9573
Rationale for this change
Rather than recursively constructing the initial physical plan from the top down, which can lead to stack overflow errors, iteratively construct the plan from the bottom up, which is also done concurrently.
These were my previous considerations that lead to this design:
ExecutionPlan
s need to know their children at construction time, unless we insert dummy children (likeEmptyRelation
which still might cause other issues) or we introduce an intermediate representation betweenLogicalPlan
andExecutionPlan
to allow incomplete nodes that can populate their children after construction time (seems like more work)Vec
as since we decide to construct bottom up, need a way for children to know their parents, andVec
is easier than trying to addArc
s to parentsWhat changes are included in this PR?
Split up
create_initial_plan
to do the mapping fromLogicalPlan
toArc<dyn ExecutionPlan
in a separate function for organization.In
create_initial_plan
we first DFS the tree to get a flat Vec representation, which stores&LogicalPlan
to not require duplicating the tree (this traversal is iterative).With the flat tree, we can spawn async tasks from the leaves, which will attempt to build the individual branches of the trees from the bottom up, towards the root.
When these tasks encounter a node with 2 or more children, which represents a collision point with other tasks, they append their current tree branch to this parent node (which has it's current children branches which are ready behind a
Mutex<Vec<_>>
for concurrent safety) and then check if there are enough children to build the node. If not, this task terminates forever.If there are enough children, this means the current task is the last node to reach the parent, and can construct the parent and then continue to traverse up the parent towards the root.
This continues until the number of tasks reduces to 1 and it emits the root of the tree.
Are these changes tested?
Are there any user-facing changes?
No