Skip to content

Commit

Permalink
Use the new ucx clientId apis
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Bellina <abellina@nvidia.com>
  • Loading branch information
abellina committed Dec 14, 2021
1 parent bc4cd09 commit 54b6aba
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 5 deletions.
2 changes: 1 addition & 1 deletion shuffle-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<dependency>
<groupId>org.openucx</groupId>
<artifactId>jucx</artifactId>
<version>1.11</version>
<version>1.12.0</version>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class UCX(transport: UCXShuffleTransport, executor: BlockManagerId, rapidsConf:
}

var workerParams = new UcpWorkerParams()
.setClientId(localExecutorId)

if (rapidsConf.shuffleUcxUseWakeup) {
workerParams = workerParams
Expand Down Expand Up @@ -775,6 +776,7 @@ class UCX(transport: UCXShuffleTransport, executor: BlockManagerId, rapidsConf:
// enables `onError` callback
.setPeerErrorHandlingMode()
.setErrorHandler(this)
.sendClientId()

/**
* Get a `ClientConnection` after optionally connecting to a peer given by `peerExecutorId`,
Expand Down Expand Up @@ -806,7 +808,8 @@ class UCX(transport: UCXShuffleTransport, executor: BlockManagerId, rapidsConf:
onWorkerThreadAsync(() => {
endpoints.computeIfAbsent(peerExecutorId, _ => {
val sockAddr = new InetSocketAddress(peerHost, peerPort)
val ep = worker.newEndpoint(epParams.setSocketAddress(sockAddr))
val ep = worker.newEndpoint(
epParams.setSocketAddress(sockAddr))
logDebug(s"Initiator: created an endpoint $ep to $peerExecutorId")
reverseLookupEndpoints.put(ep, peerExecutorId)
ep
Expand All @@ -816,12 +819,20 @@ class UCX(transport: UCXShuffleTransport, executor: BlockManagerId, rapidsConf:

// UcpListenerConnectionHandler interface - called from progress thread
// handles an incoming connection to our UCP Listener
// TODO: in the future, this function may reject `ConnectionRequest`s
// given a peer id we already established a connection to:
// https://github.com/openucx/ucx/pull/6859
override def onConnectionRequest(connectionRequest: UcpConnectionRequest): Unit = {
logInfo(s"Got UcpListener request from ${connectionRequest.getClientAddress}")

val clientId = connectionRequest.getClientId

if (endpoints.containsKey(clientId)) {
connectionRequest.reject()
logWarning(s"Rejected connection request from ${clientId}, we already had an " +
s"endpoint established: ${endpoints.get(clientId)}")
return
} else {
logWarning(s"Accepting connection request from ${clientId}!")
}

// accept it
val ep = worker.newEndpoint(epParams.setConnectionRequest(connectionRequest))

Expand Down

0 comments on commit 54b6aba

Please sign in to comment.