Skip to content

Commit

Permalink
Use clientId api
Browse files Browse the repository at this point in the history
  • Loading branch information
abellina committed Aug 3, 2021
1 parent eac75ce commit f156f7b
Showing 1 changed file with 8 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ class UCX(transport: UCXShuffleTransport, executor: BlockManagerId, rapidsConf:
// enables `onError` callback
.setPeerErrorHandlingMode()
.setErrorHandler(this)
.setClientId(executor.executorId.toLong)

This comment has been minimized.

Copy link
@petro-rudenko

petro-rudenko Aug 3, 2021

Need to set it only for forward ep. Backward ep is creating with the same parameters, but with conn_request


/**
* Get a `ClientConnection` after optionally connecting to a peer given by `peerExecutorId`,
Expand Down Expand Up @@ -820,7 +821,13 @@ class UCX(transport: UCXShuffleTransport, executor: BlockManagerId, rapidsConf:
// given a peer id we already established a connection to:
// https://github.com/openucx/ucx/pull/6859
override def onConnectionRequest(connectionRequest: UcpConnectionRequest): Unit = {
logInfo(s"Got UcpListener request from ${connectionRequest.getClientAddress}")
logInfo(s"Got UcpListener request from ${connectionRequest.getClientAddress} for " +
s"executorId ${connectionRequest.getClientId}")
// if we already have an endpoint in our endpoints cache, reject
if (endpoints.contains(connectionRequest.getClientId)) {
logError(s"SHOULD REJECT ${connectionRequest.getClientId}")
return
}

// accept it
val ep = worker.newEndpoint(epParams.setConnectionRequest(connectionRequest))
Expand Down

0 comments on commit f156f7b

Please sign in to comment.