-
Notifications
You must be signed in to change notification settings - Fork 237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use UCP Listener for UCX connections and enable peer error handling #2886
Conversation
Signed-off-by: Alessandro Bellina <abellina@nvidia.com>
01a6431
to
f3b7e35
Compare
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuShuffleEnv.scala
Show resolved
Hide resolved
tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleClientSuite.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServerSuite.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleTestHelper.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServer.scala
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCXConnection.scala
Outdated
Show resolved
Hide resolved
tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleClientSuite.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleClient.scala
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCXConnection.scala
Outdated
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Alessandro Bellina <abellina@nvidia.com>
tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleClientSuite.scala
Outdated
Show resolved
Hide resolved
shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala
Show resolved
Hide resolved
hsBuff.putInt(localExecutorId) | ||
|
||
def packHandshake(localExecutorId: Long, rkeys: Seq[ByteBuffer]): ByteBuffer = { | ||
val size = 8 + 4 + (4 * rkeys.size) + rkeys.map(_.capacity).sum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could use JDK constants java.lang.Long.BYTES + java.lang.Integer.BYTES
for 8 + 4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did this.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleTransport.scala
Show resolved
Hide resolved
I'll make the |
build |
sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServer.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
build |
Closes #1590
Closes #2275
Closes #737
Created this as a draft because I may add more unit tests. But I could use some feedback on the patch. (I can't think of ways of adding tests without refactoring
UCX.scala
, so I am going to hold off).This switches to use
UCPListener
as our incoming connection port, and enables peer error handling by default.With this change, if a peer terminates the following is expected (@petro-rudenko please confirm):
onError
callback for each of the pending live requests.Because of how UCX works right now, a failing endpoint may or may not be of interest. We make an attempt to not initiate endpoints if we detect that we already had an endpoint created due to a peer initiating against us, but this is a race condition and in this PR we simply make one of those two endpoints win and be the default, while the other sticks around.
In the future: openucx/ucx#6859, we'll be able to better detect this (earlier) and reject certain requests.
@jlowe @petro-rudenko