Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] dynamic sizing of input data #4164

Open
revans2 opened this issue Nov 19, 2021 · 3 comments
Open

[FEA] dynamic sizing of input data #4164

revans2 opened this issue Nov 19, 2021 · 3 comments
Labels
feature request New feature or request

Comments

@revans2
Copy link
Collaborator

revans2 commented Nov 19, 2021

This is something that might be interesting to push back to Spark too. When we are trying to figure out the splits for reading, can we look at the read schema and the file schema to get an idea of how much data is going to be thrown out when we do a read so we can better size the splits. I can see a few options for this.

One option is where we look at a small amount of data (1 or 2 files at most) to try and determine the file schema. The other option is to launch an actual job that all it does is read the file schema to get better knowledge about exactly what is happening. Option 1 is nice because it can be fast and probably can give us a decent estimate on what things will look like. Option 2 fits more with things like delta lake that will read and cache metadata information before running a much larger query. We might even be able to switch between the two based off of the number of files, or the number of directories involved in a single query.

@revans2 revans2 added feature request New feature or request ? - Needs Triage Need team to review and classify performance A performance related task/issue labels Nov 19, 2021
@jlowe
Copy link
Member

jlowe commented Nov 19, 2021

we look at a small amount of data (1 or 2 files at most) to try and determine the file schema.

The file schema is already known, fetched by Spark as part of planning and validating the query, and available via the dataSchema field of the HadoopFsRelation. Do you mean something more sophisticated like parsing the footer of the files and looking at relative column data sizes or something more simplistic like a blind guess as to the relative data sizes of the columns based on their known Spark data types?

@revans2
Copy link
Collaborator Author

revans2 commented Nov 19, 2021

I would start off with the simplest possible approach and see how far that gets us. I would only make it more complicated if we run into real world situations where it is too far off for us to get the benefit we want.

The simplest I can think of is to keep targeting maxPartitionBytes, but we look at the read schema vs the file schema and SWAG how much of each batch we are going to be able to skip and read the block size accordingly. Alternatively we could try and target the GPU target batch size instead. But then we have to try and understand the compression ratio of the columns that we are going to read in. That feels harder to do.

@sameerz sameerz removed the ? - Needs Triage Need team to review and classify label Nov 23, 2021
@revans2
Copy link
Collaborator Author

revans2 commented Dec 20, 2021

I have been working on this for a while and there is no simple way to do what we want with only the information we have ahead of time. I tried a few heuristics based off of information that we currently have access to through Spark when creating the splits. I mainly looked at the number and types of columns to try and increase the maximum batch size config per-input. That way the batches would get larger and the sub-linear scaling of the GPU works better. This worked well for a few extreme cases, NDS queries 9, 7, and 88 but not as well in the general case.

There are a number of problems with the initial approach that I took.

  1. Predicate push down, compression ratios, and row groups. I was able to get the size of the batches to increase in the general case, but it was far from consistent. This means that the maximum batch that was processed grew about 4 fold, to the point that it was larger than the "maximum" batch size. It also spread the sizes out a lot more, which made getting predictable computation much more difficult.
  2. gpu decoding is volatile. For smaller batch sizes (up to about 200 MiB) there is a clear trend that more data is better. After that point I don't think I have enough data to really come to any real conclusion. There is a lot of volatility from one bucket to another. There is huge volatility within buckets. The size has a clear impact on computation time, but predicting what that computation time will be is not as clear.
  3. Increased host memory pressure. Before we send the data to the GPU we buffer it on the CPU. If we increase the size of the batches, we now have more data held on the CPU at any point in time and that increases memory pressure, especially in relation to pinned memory.
  4. Buffer time. The time to decode parquet is not all about the GPU decoding time. We also have to read that data in and cache it in host memory. In general we want to overlap buffering data and computation on the GPU. But we still have to pay the initial cost of downloading the first batch of data before we can put anything on the GPU. If we increase the average size of a batch we are also increasing the amount of time it takes to download the data before we can put it on the GPU.
  5. Side impacts. I also saw a number of other impacts caused by changing the number of tasks and size of the batches. We saw increased contention for the GPU, which slowed down other processing too. We saw in many cases fewer tasks later in the processing too. I am not totally sure why AQE was changing things like this, but generally if there were fewer upstream tasks it resulted in fewer downstream tasks, even if the total amount of data stayed the same. We also saw some impact to shuffle and compression on the GPU.

Despite all of this I have hope that this can be useful and we should look into this more. Even this imperfect code saved over 3,400 seconds of GPU compute time from decoding parquet. That is about 27.8% of the total compute time used to parse the parquet data on the GPU, and about 1.7% of the total run time of all of the queries, assuming that the computation could be spread evenly among all of the tasks. So there is potential here to make a decent improvement. But it has not worked out perfectly.

We are looking at ways to improve the decoding performance of the parquet kernels themselves. In addition to this we might want to look at more of a control system approach instead. We lack information up front and it looks to be expensive to try and get that information early on. It might be better to try and dynamically adjust the sizes of the batches we buffer/send to the GPU based off of throughput rates that we are able to achieve while buffering and guesses about how much data the GPU can processes efficiently. But we first need to do a fair amount of benchmarking to understand what really impacts the performance of the buffering and the performance of decode. We also would need to look how AQE will impact downstream processing.

@revans2 revans2 mentioned this issue Apr 8, 2022
14 tasks
@mattahrens mattahrens removed the performance A performance related task/issue label May 17, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants