Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partial aggregation easily causes OOM #7511

Closed
zhztheplayer opened this issue Nov 10, 2023 · 17 comments
Closed

Partial aggregation easily causes OOM #7511

zhztheplayer opened this issue Nov 10, 2023 · 17 comments
Labels
enhancement New feature or request

Comments

@zhztheplayer
Copy link
Contributor

Description

Partial AGG currently doesn't support reclaiming/spilling so user should manually control its memory usage through several memory limit settings for partial AGG.

However the limits are hard to be determined in production use cases since the plan may contain many nodes that are all consuming memory at the same time. As a result to avoid OOM we can only disable partial AGG (e.g by always abandoning it) but that way the final AGG will be under huge pressure.

In the case of Velox + Spark, speaking of Vanilla Spark it does seem to support spill-to-disk no matter an AGG is partial or not. So probably supporting spilling is an option to Velox's partial AGG either.

I was working on a patch to make Velox flush AGG intermediate result on OOM but that way doesn't seem to cover some corner cases. So we need a complete solution for the issue.

@zhztheplayer zhztheplayer added the enhancement New feature or request label Nov 10, 2023
@zhztheplayer
Copy link
Contributor Author

@xiaoxmeng @mbasmanova Do you have any comment on this issue? Thanks!

@mbasmanova
Copy link
Contributor

@zhztheplayer Our thinking is that partial agg doesn't require spilling since it can flush on memory pressure. We are also thinking that partial agg doesn't consume much memory before flush, only 16 MB [1]. When Task runs multi-threaded each thread may consume up to 16MB, but I believe Spark runs only one thread.

Still, it sounds like you are seeing something different. You mentioned that there can be many partial aggregations within a single task. This is true, but I wonder how many are you seeing? Would you share a query plan, the OOM error message, and the amount of memory used by partial aggs at OOM time.

[1] https://facebookincubator.github.io/velox/configs.html#memory-management

@zhztheplayer
Copy link
Contributor Author

zhztheplayer commented Nov 14, 2023

@mbasmanova Thanks for your comments.

Our thinking is that partial agg doesn't require spilling since it can flush on memory pressure.

Yes this is also my understanding about the design when I was working with it. But partial agg spilling could still be valuable in the case of N mappers + M reducers, N >> M. We should leverage as much resource as possible from mappers since there are just a few of reducers can be used.

A simple example, using the following SQL query (on TPC-DS partitioned data, SF 1000):

select cast(rand() * 2000000 as integer) as key, count(1) as cnt from catalog_sales group by key order by cnt desc limit 100

In the case of N = 72, M = 1, compare the query performance among 3 profiles:

image

Details about the profiles:

  1. PARTIAL_MODE SPILL: Spill when partial aggregation runs out of memory (I use this patch which seems to be functional to enable spilling for partial aggregation)
  2. PARTIAL_MODE FLUSH: Flush intermediate data within memory limit = ~20 MiB per task
  3. PARTIAL_MODE ABANDON: Abandon partial aggregation immediately

Thus in this case, mode SPILL is far faster than FLUSH and ABANDON.

Reducer shuffle read size of partial mode SPILL:
image

Reducer shuffle read size of partial mode FLUSH:
image

Reducer shuffle read size of partial mode ABANDON:
image

You mentioned that there can be many partial aggregations within a single task. This is true, but I wonder how many are you seeing? Would you share a query plan, the OOM error message, and the amount of memory used by partial aggs at OOM time.

Consecutive partial aggregations are not normal in our workloads. However We did encounter the case including several consecutive joins + one partial aggregation. Whether to OOM is up to our settings, like said, we can just abandon the partial aggregation or adopt more frequent partial flushes by setting lower memory limit numbers. But the way doesn't make sense if in the N+M case I have explained above.

@zhztheplayer
Copy link
Contributor Author

I've opened a draft PR #7558, you can directly see the code changes I used in it.

@mbasmanova
Copy link
Contributor

@zhztheplayer Thank you for explaining. In this case, with a single reducer, I can see how spilling partial agg is beneficial. In this case spilling writes to disk (as shuffle does), but then unspilling processes data on N nodes instead of 1 (in case of shuffle). Since cardinality reduces significantly after partial agg with spilling, the query runs faster with spilling.

At first I was wondering why is there only 1 reducer, but then I noticed that there is a LIMIT clause in the query and Spark has a known inefficiency when it comes to running LIMIT queries. Spark doesn't plan partial and final limit and requires all the data to be collected on a single node to apply the LIMIT. I wonder if newer versions of Spark have addressed this problem already. Do you know?

I also have another question. How much memory do you specify for the query and for partial aggregation to trigger spilling? With default settings, 16MB for partial agg, a few GB per query, spilling won't be triggered because partial agg will be flushing.

My assumption is that allowing spilling for partial agg like in #7558 would have no effect unless one also disables flushing by increasing the memory limit for partial agg from the default 16MB to a few GB (or close to query memory limit). If that's the case, then it means that each query needs to be configured separately, which is not scalable.

@zhztheplayer
Copy link
Contributor Author

At first I was wondering why is there only 1 reducer, but then I noticed that there is a LIMIT clause in the query and Spark has a known inefficiency when it comes to running LIMIT queries. Spark doesn't plan partial and final limit and requires all the data to be collected on a single node to apply the LIMIT. I wonder if newer versions of Spark have addressed this problem already. Do you know?

Aha, the single partition doesn't relate to any of Spark's limitation. It's because I manually set spark's shuffle partition number to 1. Spark's scan stage (the first stage to read data from sources) doesn't always respect shuffle partition number settings so it's normal that we have more scan tasks and less reducer tasks in one query.

I also have another question. How much memory do you specify for the query and for partial aggregation to trigger spilling? With default settings, 16MB for partial agg, a few GB per query, spilling won't be triggered because partial agg will be flushing.

I manually set it to a very large number, something like int.max, in this case 1024 * total host memory. To make sure that partial flushing never gets triggered.

If that's the case, then it means that each query needs to be configured separately, which is not scalable.

IIUC the UT case in #7511 successfully spilled data and didn't require setting mem limit because the memory pool capacity is 10MB which is smaller than 16MB. Anyway the UT should be revived (move to SharedArbitratorTest, probably) later on.

BTW, if setting a very large memory limit number doesn't seem to be a good way, maybe we can use an independent config option to disable partial flush / abandon? If we decided to add spilling support to partial aggregation.

@mbasmanova
Copy link
Contributor

@zhztheplayer We need to consider all possible queries with streaming (Presto) and durable (Spark) shuffles and figure out what works for most of these queries. We want to avoid configuring individual partial aggregations in individual queries as it doesn't scale well.

Partial aggregations are helpful if they meaningfully reduce cardinality. If partial aggregation doesn't reduce cardinality, then it is best to disable it. We already have logic that auto-disables partial aggregation if it doesn't reduce cardinality. I assume this doesn't apply here as partial aggregation does reduce cardinality. BTW, what's cardinality reduction in this case?

If partial aggregation does reduce cardinality, then it is beneficial to keep it. In this case the question is weather it is helpful to spill or flush partial aggregation if it runs out of memory. Intuitively it feels that flushing should work just fine. When is this not the case? I guess this would not be so when parallelism of partial aggregation is much higher than parallelism of final aggregation. In the use case discussed above final aggregation runs single-threaded while partial aggregation runs using N threads (what is N here? 100? 1000?). Is this a common scenario in Spark? What's a common ratio between the number of partial agg tasks and final agg tasks?

If we do find that for Spark spilling is generally better than flushing, then we can introduce a configuration property that tells Velox whether to spill or flush in partial aggregation and update Gluten to enable spilling for partial aggregation by default.

Here are existing configs that affect partial aggregation: https://facebookincubator.github.io/velox/configs.html#memory-management

CC: @xiaoxmeng

@zhztheplayer
Copy link
Contributor Author

zhztheplayer commented Nov 28, 2023

@mbasmanova

what's cardinality reduction in this case?

In this case the cardinality reduced by the same ratio as shuffle size: (5.5GiB - 942 MiB) / 5.5GiB * 100% = 83%.

Intuitively it feels that flushing should work just fine. When is this not the case?

Let me take the above case as example, if host has limited memory space, flushing would not reduce cardinality much. It's nearly 0%: (5.5GiB - 5.5GiB) / 5.5GiB * 100% = 0%, which means flushing doesn't reduce cardinality in the case. So spilling is eventually required.

I also have another question. How much memory do you specify for the query and for partial aggregation to trigger spilling?

I've set it to 20 MiB. Say if we have 5.5GiB data that could be reduced to 550MiB by aggregation, then in the worst case we should set the partial aggregation flushing memory limit to a number which is larger than 550 MiB then it could start to reduce cardinality.

Is this a common scenario in Spark? What's a common ratio between the number of partial agg tasks and final agg tasks?

In our daily testing it's common that N be several times larger than M, because N is usually not directly settable and it's related to source data's partition number. We use source data with large partition number so N >> M can happen frequently.

If we do find that for Spark spilling is generally better than flushing, then we can introduce a configuration property that tells Velox whether to spill or flush in partial aggregation and update Gluten to enable spilling for partial aggregation by default.

Why couldn't flushing and spilling work together? Say the memory pool capacity is 100MiB and flushing limit is set to 10MiB. When free capacity of the pool is smaller than 10MiB (say, other operators allocated 95MiB and there is only 5MiB left for partial aggregation), then partial aggregation should spill 5MiB data out and process another 5MiB, then flush 10MiB together because it reaches 10MiB flushing limit. The challenge here is that (correct me if I was wrong) the spilled 5MiB is not counted to partial aggregation's memory usage.

By the way, I noticed in presto you use a fixed 16 MiB partial aggregation memory limit, is this a trade-off between cardinality reduction and OOM risk? It looks like too small for big data scenario to me. I am curious how far it's better than just disabling partial aggregation in your real workloads.

@mbasmanova
Copy link
Contributor

if host has limited memory space, flushing would not reduce cardinality much.

This is true, but it also means we cannot assess whether partial agg will reduce cardinality even with spilling and may hit another edge case where it doesn't reduce cardinality. In this case, we'll incur the costs of spilling for partial agg and will still have to shuffle all the data.

@mbasmanova
Copy link
Contributor

Why couldn't flushing and spilling work together? Say the memory pool capacity is 100MiB and flushing limit is set to 10MiB. When free capacity of the pool is smaller than 10MiB (say, other operators allocated 95MiB and there is only 5MiB left for partial aggregation),

In this case, memory arbitration should ask other operators to spill first. Partial agg that uses only 5% of the memory should not be the one to spill. Am I missing something?

then partial aggregation should spill 5MiB data out and process another 5MiB, then flush 10MiB together because it reaches 10MiB flushing limit. The challenge here is that (correct me if I was wrong) the spilled 5MiB is not counted to partial aggregation's memory usage.

This is an interesting idea. You are correct spilled data doesn't count toward memory usage because it doesn't use memory.

I noticed in presto you use a fixed 16 MiB partial aggregation memory limit, is this a trade-off between cardinality reduction and OOM risk? It looks like too small for big data scenario to me. I am curious how far it's better than just disabling partial aggregation in your real workloads.

This is a good question. I'll need to do some research to find an answer.

@zhztheplayer
Copy link
Contributor Author

In this case, memory arbitration should ask other operators to spill first. Partial agg that uses only 5% of the memory should not be the one to spill. Am I missing something?

Umm I just took this simple case as an example... In Gluten we have been setting the memory limits to 0.5 * task avaliable memory by default. Which means if other operators used more than a half of total memory, then partial aggregation could go to OOM eaisily. But technically speaking, I think even in the case of 95 MiB + 5 MiB, the 95 MiB allocation is still possible to be from a lot of small consumers, e.g. 95 consecutive joins in which each one allocates 1 MiB... So there can still be some chance for OOM anyway.

@mbasmanova
Copy link
Contributor

is still possible to be from a lot of small consumers, e.g. 95 consecutive joins in which each one allocates 1 MiB... So there can still be some chance for OOM anyway.

While this is possible, it is quite unlikely. If this is the only case that would benefit from the new logic, then it feels that complexity and maintenance costs of this new logic do not justify the benefit. Am I missing something? It would be nice to come up with more realistic use cases that may require spilling for partial aggregation and then figure out the best way to support them.

@zhztheplayer
Copy link
Contributor Author

is still possible to be from a lot of small consumers, e.g. 95 consecutive joins in which each one allocates 1 MiB... So there can still be some chance for OOM anyway.

While this is possible, it is quite unlikely. If this is the only case that would benefit from the new logic, then it feels that complexity and maintenance costs of this new logic do not justify the benefit. Am I missing something? It would be nice to come up with more realistic use cases that may require spilling for partial aggregation and then figure out the best way to support them.

I understand your concern. The realistic use case is actually the one I mentioned: We set 0.5 * task available memory as the partial aggregation memory limit (the one in presto you set to 16 MiB by default, I think).

The code is here:
https://github.com/oap-project/gluten/blob/d2980b73afbf28800727172200e36a84c4c83536/cpp/velox/compute/WholeStageResultIterator.cc#L341-L342

The issue we faced is that if we set the factor to 0.5 or higher, it's high risk to get into OOM. If set set the factor to a low value, say 0.05, there would nearly be no cardinality reduction and partial aggregation is usually skipped.

Preventing OOM is only one of the benefits we can get from partial aggregation spilling. As I explained in previous comments, it can considerably reduce cardinality then to reduce shuffle size in Spark.

@zhztheplayer
Copy link
Contributor Author

zhztheplayer commented Dec 1, 2023

It would be nice to come up with more realistic use cases that may require spilling for partial aggregation and then figure out the best way to support them.

Hi @mbasmanova , I am feeling that our concern here about adding spilling support to partial aggregation is based on the assumption that 16 MiB memory limit is enough for user. Let's think about somehow a higher memory limit is needed, e.g. 128 MiB, 1 GiB, etc. Then spill is reasonable (even the only option) since we don't want OOM. Is that correct?

If yes, the problem will become why we need large number of memory limit. So I recommend running some tests against prestodb to see if 16 MiB memory limit actually helped to reduce data size. In my case you can see 20 MiB didn't reduce data size.

What I could assume is that, small aggregation buffer like 16 MiB can still help a lot in the case of aggregating pre-ordered (or nearly pre-ordered) keys. For example aggregation on time key in time-series data in streaming SQL or time-series database. But this case is only one of the usages from users.

@mbasmanova
Copy link
Contributor

@zhztheplayer Hi Hongze, thank you for providing further context.

Let's think about somehow a higher memory limit is needed, e.g. 128 MiB, 1 GiB, etc. Then spill is reasonable (even the only option) since we don't want OOM. Is that correct?

Yes, spilling might be a reasonable option, but it is not the only one. Flushing is another viable option. I understand that it doesn't work well when there are very small number of reducer tasks (in the original example there is only 1 reducer task), but I don't quite yet understand why this is a common scenario. Even in that scenario, I assume total query CPU time is higher with spilling, while wall time is lower. Is this the case?

Spilling is expensive as it requires sorting data, copying data from row-wise representation into columnar, writing data to disk, then reading data back, merge-sorting, converting to row-wise representation. And after doing all this for partial aggregation we still need to shuffle the partial results (copying data again) and aggregate then again. This might be justified If spilling allows us achieve higher cardinality reduction than flushing.

For example, if we have an unfortunate case of receiving many rows with unique keys first before we start seeing repeated values. In this case though, we'd detect partial aggregation as non-cardinality reducing and disable it. Hence, spilling won't have a chance to kick in.

Are you on Slack? It might be easier to discuss there? Otherwise, maybe create a Google Doc and let's continue discussion there as it might be easier. It would be helpful to come up with a few common use cases that require spilling and explain for each why flushing doesn't work and how you plan to avoid having partial agg disabled before spilling has a chance to kick in. It would be nice to also discuss how you plan to avoid the edge case of triggering spilling for the case of mostly unique keys.

@zhztheplayer
Copy link
Contributor Author

zhztheplayer commented Dec 4, 2023

Thanks @mbasmanova for the detailed explanation.

Yes, spilling might be a reasonable option, but it is not the only one. Flushing is another viable option.

IIUC another option is to flush data via reclaim invocation. Does that make sense to you? I actually tried with the solution earlier but failed to have a quick implementation out. The issue is that a reclaim call is expected to free some memory space immediately but "flushing" is more an action taking effect during getOutput calls. So it might require some arch level changes to implement it I guess.

In another hand, there are some cases that flush can't replace spill on partial aggregation. One is the N >> M case I mentioned, and we can either imagine others including the one that running query on a cluster with slow network, limited memory and enough disk spaces. Partial aggregate with full support of spilling would effectively reduce the network-exchange data size. While if relying on flushing only, since memory is limited, flushing would happen frequently then would add pressure to the network.

Spilling is expensive as it requires sorting data, copying data from row-wise representation into columnar, writing data to disk, then reading data back, merge-sorting, converting to row-wise representation.

Yes spilling is indeed expensive but if we have spilling done from partial aggregation stage and the data size is successfully reduced, then the final aggregation would have lower chance to trigger spilling. Am I understanding correctly?

For example, if we have an unfortunate case of receiving many rows with unique keys first before we start seeing repeated values. In this case though, we'd detect partial aggregation as non-cardinality reducing and disable it. Hence, spilling won't have a chance to kick in.

Yes Velox could skip partial aggregation according to its runtime statistics, that is indeed useful for us and we'd keep the feature on. What I meant by spilling was to handle the case that partial aggregation cannot be skipped, say data size can be reduced a lot with aggregation.

Are you on Slack? It might be easier to discuss there?

Yes, I just sent you a message. Thanks!

Otherwise, maybe create a Google Doc and let's continue discussion there as it might be easier.

All right. I'll try creating one and hopefully we can have some clearer design come up within it.

@zhztheplayer
Copy link
Contributor Author

Closing as not planned. See background here. Thanks everyone.

@zhztheplayer zhztheplayer closed this as not planned Won't fix, can't repro, duplicate, stale Dec 21, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants