Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] move some deserialization code out of the scope of the gpu-semaphore to increase cpu concurrent #679

Closed
JustPlay opened this issue Sep 8, 2020 · 22 comments
Labels
performance A performance related task/issue

Comments

@JustPlay
Copy link

JustPlay commented Sep 8, 2020

Is your feature request related to a problem? Please describe.
move some deserialization code out of the scope of the gpu-semaphore to increase cpu concurrent

Describe the solution you'd like

val tableInfo = JCudfSerialization.readTableFrom(dIn)

JCudfSerialization.readTableFrom() contains both high-overhead cpu code and gpu-code, the gpu-code (include device memory allocation and copy) should be executed within the scope of gpu-semaphore, but the cpu-code DO NOT NEED

https://github.com/rapidsai/cudf/blob/19237a09fe9b80706ef463580c6a646f6f9ec2ae/java/src/main/java/ai/rapids/cudf/JCudfSerialization.java#L1505
this function i think do the following things

  1. deserialization (not so sure)
  2. alloc host-memory (pin-memory)
  3. call the reloaded function to copy data form host to gpu

the step 1,2 do not need the gpu-semaphore, and it's better to get out of the gpu-semaphore

https://github.com/rapidsai/cudf/blob/19237a09fe9b80706ef463580c6a646f6f9ec2ae/java/src/main/java/ai/rapids/cudf/JCudfSerialization.java#L1522
only this line of code need the protect of the gpu-semaphore
all code before it do not touch the device memory, only touch host-buffer (pin-memory), so i think those code can be executed without holding the gpu-semaphore (just like the Scan operator)

I thinks, we should move some code from cuDF into rapids

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context, code examples, or references to existing implementations about the feature request here.

@JustPlay JustPlay added ? - Needs Triage Need team to review and classify feature request New feature or request labels Sep 8, 2020
@JustPlay
Copy link
Author

JustPlay commented Sep 8, 2020

this is the related issue: #622

@JustPlay
Copy link
Author

JustPlay commented Sep 8, 2020

@jlowe

@revans2
Copy link
Collaborator

revans2 commented Sep 8, 2020

@JustPlay
I agree that we could move the semaphore in this case. Any time we are doing I/O we should not hold the semaphore. Because cudf itself has no knowledge of the semaphore we would need to add in some kind of a callback function to grab it. I just did something similar for the arrow IPC format.

rapidsai/cudf#6143

We should be able to move those APIs to make them more generic so we can reuse them in other places.

@jlowe I know you don't like callbacks because they make the code hard to reason about and I agree, but I was curious what you though about this situation?

@JustPlay
Copy link
Author

JustPlay commented Sep 8, 2020

I agree that we could move the semaphore in this case. Any time we are doing I/O we should not hold the semaphore. Because cudf itself has no knowledge of the semaphore we would need to add in some kind of a callback function to grab it. I just did something similar for the arrow IPC format

why not just copy the related code snippet(s) into rapids-plugin?

@revans2
Copy link
Collaborator

revans2 commented Sep 8, 2020

why not just copy the related code snippet(s) into rapids-plugin?

We could in the short term, but it is a question of a separation of concerns, and the goals of each project. One of the reasons we put the serialization code in cudf was so that users of the cudf library did not have to know what the exact memory layout. We have changed that somewhat when we started to implement the UCX shuffle, but I think generally it still holds true that a user of cudf should not need to know the memory layout to get decent performance.

Cudf is trying to be a generic backend and if we have run into an issue with Spark, that is not some odd corner case of how spark handles a specific SQL command, then others would likely want that same functionality. In this case I can totally see that other users might also want to know when data is completely off of the GPU, or when external I/O has completed and GPU processing is ready to start.

@jlowe
Copy link
Member

jlowe commented Sep 8, 2020

I know you don't like callbacks because they make the code hard to reason about

I don't mind callbacks within the JVM so much, as those can be traced without too much effort. What becomes really hard to trace are callbacks from JNI into the JVM, as the IDE won't help you there.

If we keep the serialization logic in cudf, we could solve this in a couple of ways:

  • callbacks from cudf back into the application just as it's about to place data on the GPU
  • split the serialization into two separate cudf calls that the application makes, the first takes the input stream and returns an opaque object that must be passed to the second call to continue the deserialization. The application can then do any "just before GPU" logic between the two calls.

@sameerz sameerz added performance A performance related task/issue and removed ? - Needs Triage Need team to review and classify labels Sep 8, 2020
@JustPlay
Copy link
Author

JustPlay commented Sep 9, 2020


image

image

for DAG like the above two pics, the GpuShuffledHashJoin node will take the gpu semaphore when building it's hash map and the task will holding the semaphore during the whole life time of GpuShuffledHashJoin . In this case the other table to be joined can be loaded (shuffle read, deserialize, decompress) only by task(s) holding the semaphore even we have moved some deserialization code out of the scope of the gpu semaphore. So no more cpu concurrency can be use to feed the gpu.

I want to know, whether this is a bottleneck? if it is,how can we optimize it?

@jlowe @sameerz sam @revans2

@jlowe
Copy link
Member

jlowe commented Sep 9, 2020

The main point of the GPU semaphore is to limit the amount of GPU memory consumed by limiting concurrent tasks on the GPU. The general rule is: if a task has unspillable buffers on the GPU then it must be holding the GPU semaphore. Building the hash map necessarily means the task has buffers on the GPU, therefore it holds the semaphore. Without it, a lot of tasks could all build large hash tables in GPU memory such that no single task can load a batch of the stream table and make progress without triggering an OOM.

Our plan to fix this involves making the buffers spillable while it is collecting columnar batches. Once the buffers are spillable, the task can release the semaphore while waiting for new batches to arrive. If GPU memory pressure from other tasks gets too high then these buffers can be spilled out of device memory to avoid the OOM becoming fatal to the application.

@revans2
Copy link
Collaborator

revans2 commented Sep 9, 2020

The issue is about memory management. The GpuSemaphore is in place to limit the number of tasks that can be on the GPU at any point in time so the processing cannot use too much GPU memory, which will kill the job. If we allow a task to release the semaphore while holding on to GPU memory then the GpuSemaphore is not doing its job.

We agree that in the ideal case I/O should not happen while the semaphore is held, but there is not a simple solution to this. We can fix it for the deserialization code, like was requested, but it will only help for the very first batch being read. We need a more holistic approach to this to make it work properly and quickly.

We are still trying to think through what the ideal solution would look like, but with the 0.2.0 release close by, we have been a bit distracted trying to get everything ready for that.

In general I see a few things we want.

  1. All data that is not actively being processed needs to be stored in the RapidsBufferCatalog.
  • The code right now is specific to Shuffle, but I am in the process of making it more generic for 0.3.0. There still needs to be a bunch of micro-benchmarks to see what level of performance impact this might have for code that commonly will not spill. This is here to act as a backup in case we get something wrong when releasing the semaphore early.
  1. Try to concentrate I/O. There really are two approaches we could take here, and a final solution is likely to be a combination of both of them.
  • We could have the shuffle layer be aware of the desired batch size and wait for the desired amount of data to be available on the host side before sending any of the data on. This should hopefully make it so that GpuCoalesceBatches does not have to wait for data. The problem is that it shifts memory pressure to the host side and/or may result in more memory pressure in general when using UCX shuffle. So to make this work we would need to figure out a way to insert CPU side data into our RapidsBufferCatalog as well, not just GPU side data.
  • We could have a way for shuffle to signal that there is no more data available right now. This would let the GPU start processing data early, even if it is less efficient at it, but would not solve the issue when all of the data for a given partition is required for an operation (sort for example). The other issue with this is that spark only provides an iterator interface for us to use, so we would either have to do something out of band, or have special metadata in the batch to indicate that this is the end of the non-blocking I/O, which is not super simple, because spark does not let us extend the ColumnarBatch class.
  1. Implement out of core processing. This would let us move away from requiring all of the data be in memory. The problem is that this would come with a trade-off of spilling, which is likely to slow things down even more.

@JustPlay
Copy link
Author

  1. RapidsBufferCatalog

can you make more detailed explanation about RapidsBufferCatalog ?
@revans2

@jlowe
Copy link
Member

jlowe commented Sep 10, 2020

can you make more detailed explanation about RapidsBufferCatalog

RapidsBufferCatalog is part of the spillable buffer framework in the plugin. Columnar batches stored as a contiguous buffer can be registered with the spillable framework (entry point for buffers into the system is via RapidsDeviceMemoryStore as all buffers produced by the GPU start off in device memory. Once registered into the spillable buffer system, buffers can automatically spill out of GPU memory into host memory (and ultimately to disk if necessary) while the buffers are left unreferenced.

When the buffer is needed later, the buffer's ID can be used to lookup and acquire (lock) the buffer via RapidsBufferCatalog. If the buffer is not in device memory at the time, it will automatically be fetched back into device memory, potentially spilling other buffers to make room if necessary.

If all of the GPU memory being held by a task is in the form of unlocked, spillable buffers then it should be able to let go of the GPU semaphore while performing other operations, since a new task arriving will at worst force the old tasks's buffers out of the device store. It will not lead to an OOM that wouldn't also happen with the old task.

@JustPlay
Copy link
Author

JustPlay commented Sep 11, 2020

@revans2 @jlowe
image
we have join time, build time, why not add a stream time to count the time used to loading (and pre-processing) the streamed table (or batch)?

this is the pull request: #756

@JustPlay
Copy link
Author

JustPlay commented Sep 11, 2020

image

We can fix it for the deserialization code, like was requested, but it will only help for the very first batch being read

I am a little confused。Whether moving the deserialization code out-of the gpu semaphore will only help the first batch read in the build phase OR help all the batch read(s) in the whole build phase? @revans2 @jlowe Thanks.

@JustPlay
Copy link
Author

The gpu semaphore limits the task concurrency,even the number of cpu core is bigger than gpu concurrency,they can not run once some of them are taking the semaphore (before release it)。So the shuffle data reading (disk io, net, cpu decompress) can not overlap the gpu processing, so offen, the cpu/net/io can not feed enough data to gpu

Without spilling, the gpu semaphore can not be released, So can we make some of the cpu part (e.g. data reading) async?

e.g. we can add a backgroud aync cpu thead pool to reading data for the spark tasks, the async thread reading data from disk/net to memory, so the spark task can task shuffled data directly from memory. When the async thread reading data quickly enough, the gpu will always has enough data to processing, the gpu semaphore limiting will not be a problem

I am not familiar with the shuffle fetch in cpu, not familiar with the code path of shuffle fetch in rapids,so i need some help

@revans2 @jlowe

@revans2
Copy link
Collaborator

revans2 commented Sep 21, 2020

@JustPlay

I'm not sure a thread pool is the solution in this case. Spark already has a thread pool for these types of situations, and has a bunch of features to avoid doing a DDOS during the shuffle. Instead what we need to to is in JCudfSerializtion have a callback between reading the data into host memory and putting that data on the GPU.

In this case I think it is in between these two lines.

https://github.com/rapidsai/cudf/blob/c39e9168c6d08236f36e5c3203b3ba8d145e86ed/java/src/main/java/ai/rapids/cudf/JCudfSerialization.java#L1521-L1522

return readTableFrom(header, hostBuffer);

will copy the data from the host buffer to the device and then turn it into a table.

After that is done then you would change the shuffle code to grab the semaphore as a part of the callback instead.

val tableInfo = JCudfSerialization.readTableFrom(dIn)

This only fixes the reading side of the issue. Very similar changes could be made for the write side.

@JustPlay
Copy link
Author

@revans2

In the GpuShuffledHashJoin case,once a task had taken the semaphore,it think it should hold it until this task has finished processing all it's input data, because the hash-map should stay in the gpu memory. In this case, only the tasks holding the semaphore can do shuffle fetch from upstream, so only 2 or 3 task can do data reading from disk/net, but 2 or 3 task can not feeding the gpu enough. I do not understand whether the callback will help #679 (comment)
.

@JustPlay
Copy link
Author

JustPlay commented Sep 21, 2020

@revans2 In RapidsShuffleManager, i see there exists a producer-consumer model, is this a async thread pool to do shuffled data fetch?

@JustPlay
Copy link
Author

This only fixes the reading side of the issue. Very similar changes could be made for the write side.

what do you mean by reading side and writing side? @revans2

@revans2
Copy link
Collaborator

revans2 commented Sep 21, 2020

@JustPlay there is a lot going on here with shuffle. The small change that I explained here is a simple thing that we can do right away to improve performance when spark shuffle is being used. You are right it will not fix the issue totally, especially in the case of a join.

But like I said here that is going to take an in-depth design to work out how to fix it. If you want to try and do that design I would be happy to review it, but as it stands right now I have other things on my plate that prevent me from doing it.

what do you mean by reading side and writing side? @revans2

In a shuffle there are two phases. The output of one tast is written and becomes the input to another task that reads it. The GPU semaphore is involved in both cases, although the reading side tends to be more of a problem, because it is generally remote.

@JustPlay
Copy link
Author

Is your feature request related to a problem? Please describe.
move some deserialization code out of the scope of the gpu-semaphore to increase cpu concurrent

Describe the solution you'd like

val tableInfo = JCudfSerialization.readTableFrom(dIn)

JCudfSerialization.readTableFrom() contains both high-overhead cpu code and gpu-code, the gpu-code (include device memory allocation and copy) should be executed within the scope of gpu-semaphore, but the cpu-code DO NOT NEED

https://github.com/rapidsai/cudf/blob/19237a09fe9b80706ef463580c6a646f6f9ec2ae/java/src/main/java/ai/rapids/cudf/JCudfSerialization.java#L1505
this function i think do the following things

1. deserialization (not so sure)

2. alloc host-memory (pin-memory)

3. call the reloaded function to copy data form host to gpu

the step 1,2 do not need the gpu-semaphore, and it's better to get out of the gpu-semaphore

https://github.com/rapidsai/cudf/blob/19237a09fe9b80706ef463580c6a646f6f9ec2ae/java/src/main/java/ai/rapids/cudf/JCudfSerialization.java#L1522
only this line of code need the protect of the gpu-semaphore
all code before it do not touch the device memory, only touch host-buffer (pin-memory), so i think those code can be executed without holding the gpu-semaphore (just like the Scan operator)

I thinks, we should move some code from cuDF into rapids

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context, code examples, or references to existing implementations about the feature request here.

why not just copy the related code snippet(s) into rapids-plugin?

We have modify rapids and tested it,the performance improvement is negligible

Just like what you have mentioned

We agree that in the ideal case I/O should not happen while the semaphore is held, but there is not a simple solution to this. We can fix it for the deserialization code, like was requested, but it will only help for the very first batch being read. We need a more holistic approach to this to make it work properly and quickly.

@JustPlay
Copy link
Author

JustPlay commented Sep 21, 2020

The issue is about memory management. The GpuSemaphore is in place to limit the number of tasks that can be on the GPU at any point in time so the processing cannot use too much GPU memory, which will kill the job. If we allow a task to release the semaphore while holding on to GPU memory then the GpuSemaphore is not doing its job.

We agree that in the ideal case I/O should not happen while the semaphore is held, but there is not a simple solution to this. We can fix it for the deserialization code, like was requested, but it will only help for the very first batch being read. We need a more holistic approach to this to make it work properly and quickly.

We are still trying to think through what the ideal solution would look like, but with the 0.2.0 release close by, we have been a bit distracted trying to get everything ready for that.

In general I see a few things we want.

1. All data that is not actively being processed needs to be stored in the RapidsBufferCatalog.


* The code right now is specific to Shuffle, but I am in the process of making it more generic for 0.3.0.  There still needs to be a bunch of micro-benchmarks to see what level of performance impact this might have for code that commonly will not spill.  This is here to act as a backup in case we get something wrong when releasing the semaphore early.


1. Try to concentrate I/O.  There really are two approaches we could take here, and a final solution is likely to be a combination of both of them.


* We could have the shuffle layer be aware of the desired batch size and wait for the desired amount of data to be available on the host side before sending any of the data on.  This should hopefully make it so that GpuCoalesceBatches does not have to wait for data. The problem is that it shifts memory pressure to the host side and/or may result in more memory pressure in general when using UCX shuffle.  So to make this work we would need to figure out a way to insert CPU side data into our RapidsBufferCatalog as well, not just GPU side data.

* We could have a way for shuffle to signal that there is no more data available right now.  This would let the GPU start processing data early, even if it is less efficient at it, but would not solve the issue when all of the data for a given partition is required for an operation (sort for example).  The other issue with this is that spark only provides an iterator interface for us to use, so we would either have to do something out of band, or have special metadata in the batch to indicate that this is the end of the non-blocking I/O, which is not super simple, because spark does not let us extend the `ColumnarBatch` class.


1. Implement out of core processing.  This would let us move away from requiring all of the data be in memory.  The problem is that this would come with a trade-off of spilling, which is likely to slow things down even more.

We will take a deeper look at it, and hope we have the ability to try to impl them。Thanks。 @revans2

@jlowe
Copy link
Member

jlowe commented Jan 15, 2022

The original ask in the description has been implemented in #4396, and we're working on an additional optimization to avoid holding the semaphore during shuffle in some join cases via #4539.

@jlowe jlowe closed this as completed Jan 15, 2022
@sameerz sameerz removed the feature request New feature or request label Jan 16, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance A performance related task/issue
Projects
None yet
Development

No branches or pull requests

4 participants