Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory Limited Joins (Externalized / Spill) #1599

Open
2 of 5 tasks
Tracked by #587
alamb opened this issue Jan 17, 2022 · 11 comments
Open
2 of 5 tasks
Tracked by #587

Memory Limited Joins (Externalized / Spill) #1599

alamb opened this issue Jan 17, 2022 · 11 comments

Comments

@alamb
Copy link
Contributor

alamb commented Jan 17, 2022

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Support Joining "arbitrarily" large inputs (e.g. when one or both of the inputs don't fit in the available RAM)

This ticket concerns the memory used the JoinExec operator -- it doesn't cover other potential targets (e.g. externalized sort or grouping). That will be covered by other tasks tracked by #587

Describe the solution you'd like

  1. Allow DataFusion users to specify a RAM budget (aka via the config introduced in Initial MemoryManager and DiskManager APIs for query execution + External Sort implementation #1526) and have their queries complete running without the join by exceeding the budget allocated to it via the memory manager.

There are many potential ways to Limit the memory used while joining. The classic way is "sort-merge-join" where the input data on both sides is sorted according to the equality predicates (using externalized sort, such as described in #1568 ) and then the two join inputs are streamed through and the output computed, depending on the type of Join required (INNER, LEFT, RIGHT, SEMI, etc)

I personally think the following would be the ideal behavior for DataFusion Joins:

  1. A single Join operator that behaved like like the following:
  2. Hashed one input or the other, if the memory limit was not exhausted, behave like the existing JoinExec
  3. If memory was exhausted, switch to a merge join strategy: sort the one or both sides that didn't fit in memory using externalized sort on the equality predicates, then stream them back through the Join

The rationale for a runtime switch is that then the optimizer (which always has limited information) can't make the "wrong" choice related to join order

In case anyone wants some "light reading" this stuff is nicely described by Goetz Graffe in "Query evaluation techniques for large databases": https://scholar.google.com/citations?view_op=view_citation&hl=en&user=pdDeRScAAAAJ&citation_for_view=pdDeRScAAAAJ:u5HHmVD_uO8C

Online link: http://infolab.stanford.edu/~hyunjung/cs346/graefe.pdf

Describe alternatives you've considered
Have the Optimizer (aka the HashBuildProbeOrder ) pick both the order and the algorithm to use based on statistics or heuristics

Context
This is follow on work from the great PR from @yjshen in #1526 and part of the story of limiting memory used by DataFusion #587

Task Tracking

@alamb
Copy link
Contributor Author

alamb commented Jan 17, 2022

I would love to implement this algorithm in DataFusion:

https://arxiv.org/abs/2010.00152
Sort-based grouping and aggregation
Thanh Do, Goetz Graefe

@xudong963
Copy link
Member

I would love to implement this algorithm in DataFusion:

https://arxiv.org/abs/2010.00152 Sort-based grouping and aggregation Thanh Do, Goetz Graefe

Maybe you can share it at the next meeting 😄

@Ted-Jiang
Copy link
Member

I would love to implement this algorithm in DataFusion:
https://arxiv.org/abs/2010.00152 Sort-based grouping and aggregation Thanh Do, Goetz Graefe

Maybe you can share it at the next meeting 😄

+1 well look forward 😊

@yjshen
Copy link
Member

yjshen commented Jan 18, 2022

I would love to implement this algorithm in DataFusion:

https://arxiv.org/abs/2010.00152 Sort-based grouping and aggregation Thanh Do, Goetz Graefe

Haha, that's great! I have talked with @houqp about this paper before.

@alamb
Copy link
Contributor Author

alamb commented Feb 7, 2022

Related #141

@yjshen
Copy link
Member

yjshen commented Apr 15, 2022

I would propose we do this in several steps:

- [ ] Provide a classic SortMergeJoin implementation that is less memory bound itself (but move the need of memory management to the sort operator, which we already have memory controlled).
- [ ] Follow-up choice 1: Consolidate HashJoin and SortMergeJoin, providing a unified JoinExec, and do adaptive execution as @alamb suggested above.
- [ ] Follow-up choice 2: incorporate a cost-based join optimizer to choose the most suitable physical plan: sort-based or hash-based.

Moved to issue descriptions above. 👆

@alamb
Copy link
Contributor Author

alamb commented Apr 15, 2022

@yjshen sounds like a great plan to me -- and I see that @richox has an implementation for SortMergeJoin already :)

@korowa
Copy link
Contributor

korowa commented Mar 21, 2023

Looks like now that we are able to fail query in case of breaching memory limit, it's the right time to start working on spills.

Taking into account what has been written above, I guess, next step could be to implement spilling for MergeJoin -- if our final intention to have runtime HJ -> MJ conversion it would be nice to have some guarantees that MJ won't fail for the same reason. I believe MJ spilling logic could be pretty straightforward without any pitfalls -- the naive approach would be to spill buffered-side data in .ipc batch by batch, more complex, and, probably, more effective way to think about would be spilling concatenation of all batches that fit in memory.

After that we could follow-up with what is mentioned in issue description -- HJ -> MJ conversion (I believe #2628 worth to be mentioned here, to unlock ability for more hash joins to be converted), and spilling mechanisms for other join implementations.

If this plan is fine, I'd like to take a stab at MJ spilling.

@alamb
Copy link
Contributor Author

alamb commented Mar 21, 2023

Looks like now that we are able to fail query in case of breaching memory limit, it's the right time to start working on spills.

I agree

I believe MJ spilling logic could be pretty straightforward without any pitfalls -- the naive approach would be to spill buffered-side data in .ipc batch by batch, more complex, and, probably, more effective way to think about would be spilling concatenation of all batches that fit in memory.

Unless there is a very compelling reason to have a separate implementation, I think we should leverage (reuse) the existing ExteraSorter used in spilling sort:

https://github.com/apache/arrow-datafusion/blob/30dba587f4749327605a2eecb7ae9c0c41769c58/datafusion/core/src/physical_plan/sorts/sort.rs#L73-L85

After that we could follow-up with what is mentioned in issue description
👍

@korowa
Copy link
Contributor

korowa commented Mar 22, 2023

I think we should leverage (reuse) the existing ExteraSorter used in spilling sort

After some reading, it looks like that for MergeJoin case it makes sense to split spilling part of ExternalSorter (responsible for tracking spillable batch buffer) and sorting one -- we already have sorted streams in MJ, so spillable buffer is all that needed -- I believe this implementation could be acceptable and first I'll try to stick to this way.

And, further, ExternalSorter also seems to be a perfect fit for HashJoin -- this is the case where it can be reused as it is for resorting both build and probe sides of HJ.

@alamb
Copy link
Contributor Author

alamb commented Mar 23, 2023

After some reading, it looks like that for MergeJoin case it makes sense to split spilling part of ExternalSorter (responsible for tracking spillable batch buffer) and sorting one -- we already have sorted streams in MJ, so spillable buffer is all that needed -- I believe this implementation could be acceptable and first I'll try to stick to this way.

Thank you -- reusing the ExternalSorter will allow whatever spilling logic we develop to benefit from additional improvements in sort. For example, @jaylmiller has #5292 which makes substantial changes to ExternalSorter and hopefully it will be faster, but has some performance regressions that we haven't worked out yet,

It will be great if that work can directly benefit the spilling operators as well

And, further, ExternalSorter also seems to be a perfect fit for HashJoin -- this is the case where it can be reused as it is for resorting both build and probe sides of HJ.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants