-
Notifications
You must be signed in to change notification settings - Fork 237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change shuffle metadata messages to use UCX Active Messages #2409
Change shuffle metadata messages to use UCX Active Messages #2409
Conversation
If anyone would like me to walk them throw the code, I am happy to do so. |
Signed-off-by: Alessandro Bellina <abellina@nvidia.com>
f5ad142
to
dcc173b
Compare
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Outdated
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala
Outdated
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@abellina this is the first pass through UCX.scala still have the other files to go.
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala
Outdated
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala
Outdated
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala
Outdated
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala
Outdated
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala
Outdated
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala
Outdated
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala
Outdated
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCXConnection.scala
Outdated
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCXConnection.scala
Outdated
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCXConnection.scala
Outdated
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCXConnection.scala
Outdated
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCXConnection.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/BufferSendState.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServer.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleClientSuite.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServerSuite.scala
Outdated
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala
Outdated
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala
Outdated
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala
Outdated
Show resolved
Hide resolved
tx.completeWithSuccess(requestType, Option(am.header), Option(buff)) | ||
} | ||
|
||
override def onHostMessageReceived(size: Long): RefCountedDirectByteBuffer = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would consider initially handle both metadata and data buffers with AM. Maybe split to some public API, that would handle both cases. Even left onDeviceMessageReceived
unimplemented, but to see the whole picture. Trying to falling my or with AM: #2416 - want it to work with both GPU and host memory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should RNDV for GPU buffers work in 1.10 for Active Messages? Just checking, I have done mostly host buffers.
I would really like to add GPU support as an different PR, need to discuss with others as well on the timing.
@jlowe I think I have addressed your comments. Could you take another look? |
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala
Outdated
Show resolved
Hide resolved
build |
) * Change metadata messages to use Active Messages Signed-off-by: Alessandro Bellina <abellina@nvidia.com> * Code review comments * Comments: copyright, visibility, tests, conf check * Comments: spacing * Comments: private final, no side effects in BSS * Comments: simplify UCXActiveMessage * Comments: UCX.scala better comments, interface changes * Comments: UCX.scala putIfAbsent * Comments: small cleanup in UCX.scala * Move some static tag-handling functions out of the UCXConnection instances. To help testing * Fix a couple of bugs I introduced during refactorings in this PR * Comments: Refactor Active Message registrations a bit * Update comment * UCXConnectionSuite * Comments: shift-then-mask while extracting executorId * Add a few test cases touching the active message header generation * Comments: Make request am handler a constructor argument * Fix RequestActiveMessageRegistration
) * Change metadata messages to use Active Messages Signed-off-by: Alessandro Bellina <abellina@nvidia.com> * Code review comments * Comments: copyright, visibility, tests, conf check * Comments: spacing * Comments: private final, no side effects in BSS * Comments: simplify UCXActiveMessage * Comments: UCX.scala better comments, interface changes * Comments: UCX.scala putIfAbsent * Comments: small cleanup in UCX.scala * Move some static tag-handling functions out of the UCXConnection instances. To help testing * Fix a couple of bugs I introduced during refactorings in this PR * Comments: Refactor Active Message registrations a bit * Update comment * UCXConnectionSuite * Comments: shift-then-mask while extracting executorId * Add a few test cases touching the active message header generation * Comments: Make request am handler a constructor argument * Fix RequestActiveMessageRegistration
) * Change metadata messages to use Active Messages Signed-off-by: Alessandro Bellina <abellina@nvidia.com> * Code review comments * Comments: copyright, visibility, tests, conf check * Comments: spacing * Comments: private final, no side effects in BSS * Comments: simplify UCXActiveMessage * Comments: UCX.scala better comments, interface changes * Comments: UCX.scala putIfAbsent * Comments: small cleanup in UCX.scala * Move some static tag-handling functions out of the UCXConnection instances. To help testing * Fix a couple of bugs I introduced during refactorings in this PR * Comments: Refactor Active Message registrations a bit * Update comment * UCXConnectionSuite * Comments: shift-then-mask while extracting executorId * Add a few test cases touching the active message header generation * Comments: Make request am handler a constructor argument * Fix RequestActiveMessageRegistration
Closes #1588
Closes #980
Please set whitespace off when reviewing the bulk of this, it makes it easier to understand. Also there are changes here to the flatbuffer java files, and those are generated. I did do some minor
withResources
cleanup, can split that up into another PR.This PR changes the metadata/transfer request messaging from UCX tags to UCX Active Messages. The advantage of Active Messages is that error handling in UCX is going this way, and when things go wrong Active Messages will clean up resources that UCX is using (like fragments from their pools for pipelined protocols). The other big one is that it removes the need to worry about the max metadata size. I haven't refactored the metadata pool yet to make it remove this setting alltogether, can do as a follow up.
It was tested with UCX 1.10.x, and 1.11. With UCX 1.10.x the RNDV mode is the only mode supported (Active Messages could also be EAGER, and they have an automatic mode to switch based on size), so I set "rndv" set as default. The config is internal, but I should say also there doesn't seem to be a big need for EAGER mode right now, not from the 3TB tests I've run.
I had to tweak client/server/iterator tests here, and they all pass. I made some changes and need to retest in a cluster setup, but from my testing at 3TB I see no performance regressions. There is some bit manipulation I'll write tests for in this PR.