-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CBO] JoinSelection Rule, select HashJoin Partition Mode based on the Join Type and available statistics, option for SortMergeJoin #4219
Conversation
… available statistics
@alamb @Dandandan @yahoNanJing @jackwener |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a super cool PR @mingmwang! I am wondering whether we should keep everything in JoinSelection
or split it up (or maybe rename it to JoinSideSelection
to make it precise that this is a local join side optimizer)?
// TODO stats: it is not possible in general to know the output size of joins | ||
// There are some special cases though, for example: | ||
// - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` | ||
estimate_join_statistics( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really glad that this can also be used in other places 💯
// Sort-Merge join support currently is experimental | ||
if join_filter.is_some() { | ||
// TODO SortMergeJoinExec need to support join filter | ||
Err(DataFusionError::Plan("SortMergeJoinExec does not support join_filter now.".to_string())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Err(DataFusionError::Plan("SortMergeJoinExec does not support join_filter now.".to_string())) | |
Err(DataFusionError::NotImplemented("SortMergeJoinExec does not support join_filter now.".to_string())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do.
if let Some(size) = plan.statistics().total_byte_size { | ||
size < collection_size_threshold | ||
} else if let Some(row_count) = plan.statistics().num_rows { | ||
row_count < collection_size_threshold |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: are we treating both bytes and rows equally here? Seems like collection_size_threshold
has a unit of bytes
compared to the row_count
which specifies number of rows. I guess we can normalize it a bit if we want to pursue this (e.g. collection_size_threshold / SOME_MAGIC_CONSTANT
) but otherwise it might be a bit off from a desirable scenerio.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My original thinking is making the threshold configuration represent both 1M in bytes or 1M number of rows.
Or maybe we can keep two different configuration items explicitly and document them clearly..
} | ||
} | ||
|
||
fn try_collect_left( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to describe the logic here for each of the different scenarios (i was a bit lost till the end to figure out each state and how it should behave)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will add more comments.
I will try and review this carefully tomorrow |
There is some bug with HashJoin CollectLeft: |
) { | ||
Ok(Arc::new(new_join)) | ||
} else { | ||
// TODO avoid adding ProjectionExec again and again, only adding Final Projection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very careful👍! I also meet this problem in Doris.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought this was DataFusion specific problem. Because for HashJoin, DataFusion always choose to build the left side. If DataFusion can also support build right side, then there is no need to swap the Join, and can avoid this problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some databases choose to build one side, TiDB
also is.
As I wrote elsewhere, I plan to review this and other join related PRs tomorrow. I apologize for the delays. The join work is really neat, but it is not a high priority at the moment in IOx so I have had to prioritize other work higher and do join related I appreciate the help that @jackwener @mingmwang are giving each other in the review process. 🙏 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this looks really good @mingmwang -- thank you .
I recommend moving the settings into ConfigOptions to make them more discoverable and documented. I really like the idea of a feature that is off by default as we work out the details.
The only other thing that would be nice to see for this feature is some sort of overall integration test (e.g. that shows a plan with join reordering happening as well as a SortMergeJoin). That is present in the unit tests but not the overall integration tests
Also, is the SortMergeJoin exercised anywhere in sql level tests?
Anyone else I found https://github.com/apache/arrow-datafusion/pull/4219/files?w=1 easier to review for diffs
@@ -1228,6 +1228,11 @@ pub struct SessionConfig { | |||
pub collect_statistics: bool, | |||
/// Should DataFusion optimizer run a top down process to reorder the join keys | |||
pub top_down_join_key_reordering: bool, | |||
/// Should DataFusion optimizer prefer HashJoin over SortMergeJoin. | |||
/// HashJoin can work more efficently than SortMergeJoin but consumes more memory. | |||
pub prefer_hash_join: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would you think about moving these new settings into ConfigOptions (where they are visible via SHOW
and automatically documented)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will move the new added settings to ConfigOptions.
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
//! Utilizing exact statistics from sources to avoid scanning data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment seems incorrect for this module
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was coming from the original hash_build_probe_order.rs
. I will fix it in this PR.
use crate::error::Result; | ||
use crate::physical_plan::rewrite::TreeNodeRewritable; | ||
|
||
/// For hash join with the partition mode [PartitionMode::Auto], JoinSelection rule will make |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this design with PartitionMode::Auto
} | ||
|
||
// TODO we need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller. | ||
// TODO In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is is 8 times. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the prestosql approach makes sense to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I am not very sure about this, because our HashJoin implementation is quite different from PrestoSQL's. I think we need to do more benchmark on this.
collection_size_threshold: usize, | ||
) -> bool { | ||
// Currently we do not trust the 0 value from stats, due to stats collection might have bug | ||
// TODO check the logic in datasource::get_statistics_with_limit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this TODO worth tracking with a ticket?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
match filter { | ||
Some(filter) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of match filter
I think you can use map
like:
filter.map(|filter| {
let column_indicies = ...
Not critical, I just figured I woudl point it out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will change it.
PartitionMode::Auto => { | ||
try_collect_left(hash_join, Some(collect_left_threshold)) | ||
.unwrap() | ||
.or_else(|| Some(partitioned_hash_join(hash_join).unwrap())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this code (and below) use unwrap
? I think they should return an error rather than panic
ing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have another PR to address this. The interface of the Rewrite/transform Closures need to be changed.
@@ -29,6 +29,9 @@ pub enum PartitionMode { | |||
Partitioned, | |||
/// Left side will collected into one partition | |||
CollectLeft, | |||
/// When set to Auto, DataFusion optimizer will decide which PartitionMode mode(Partitioned/CollectLeft) is optimal based on statistics. | |||
/// It will also consider swapping the left and right inputs for the Join |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The optimizer also will swap the inputs for Partitioned and CollectLeft mode too, right? As written, this comment could be confusing and imply that inputs will only be swapped if the mode is set to Auto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. Especially when the mode is CollectLeft, some join types is unable to run CollectLeft mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's why in this PR I also modify the existing UTs in https://github.com/apache/arrow-datafusion/blob/d5d2de3362649db85ad54161ee28d9374ed3437c/datafusion/core/tests/sql/joins.rs
to make sure both the CollectLeft/Partitioned mode have enough test coverage.
@@ -1228,6 +1228,11 @@ pub struct SessionConfig { | |||
pub collect_statistics: bool, | |||
/// Should DataFusion optimizer run a top down process to reorder the join keys | |||
pub top_down_join_key_reordering: bool, | |||
/// Should DataFusion optimizer prefer HashJoin over SortMergeJoin. | |||
/// HashJoin can work more efficently than SortMergeJoin but consumes more memory. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// HashJoin can work more efficently than SortMergeJoin but consumes more memory. | |
/// HashJoin can work more efficiently than SortMergeJoin but consumes more memory. Defaults to true |
/// HashJoin can work more efficently than SortMergeJoin but consumes more memory. | ||
pub prefer_hash_join: bool, | ||
/// The maximum estimated size in bytes for the left input a hash join will be collected into one partition | ||
pub hash_join_collect_left_threshold: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend calling this seetting something that doesn't have 'left' and 'right' as that can get confusing.
How about hash_join_single_partition_threshold
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
@alamb Would you mind to take a look again ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it looks great -- thank you @mingmwang
} | ||
|
||
#[tokio::test] | ||
async fn sort_merge_join_on_date32() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
I will plan to merge this PR tomorrow unless anyone else would like more time to review |
I wasn't able to go through the last revision in detail but overall this looks great 💯 A minor question of mine is still standing though (maybe let's do a follow up ticket on it): #4219 (comment) ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice Job👍.
I try to checkout this branch and git pull.
Look like we need fix conflict.
} | ||
} | ||
|
||
fn swap_join_type(join_type: JoinType) -> JoinType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can put it into JoinType
) { | ||
Ok(Arc::new(new_join)) | ||
} else { | ||
// TODO avoid adding ProjectionExec again and again, only adding Final Projection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some databases choose to build one side, TiDB
also is.
I am not sure what conflict you ran into @jackwener. I checked this branch out locally and merged master to this branch and reran all the tests and they passed. |
Benchmark runs are scheduled for baseline = 22fdbcf and contender = 561be4f. 561be4f is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
It should be my mistake.🥲 |
Which issue does this PR close?
Closes #4139
closes #4230.
Rationale for this change
Choose join orderings based on cost based optimizer
What changes are included in this PR?
The PR covers below changes,
HashBuildProbeOrder
toJoinSelection
.PartitionMode
mode(Partitioned
/CollectLeft
) is optimal forHashJoin
prefer_hash_join
to preferHashJoin
orSortMergeJoin
, document SortMergeJoin support currently is experimental.Are these changes tested?
Are there any user-facing changes?