Skip to content

Commit

Permalink
Apply review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
abellina committed Jun 3, 2021
1 parent 9cb1e56 commit 161c6fa
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,7 @@ class UCXShuffleTransport(shuffleServerId: BlockManagerId, rapidsConf: RapidsCon
private val altList = new HashedPriorityQueue[PendingTransferRequest](
1000,
(t: PendingTransferRequest, t1: PendingTransferRequest) => {
if (t.getLength < t1.getLength) {
-1;
} else if (t.getLength > t1.getLength) {
1;
} else {
0
}
java.lang.Long.compare(t.getLength, t1.getLength)
})

// access to this set must hold the `altList` lock
Expand Down Expand Up @@ -489,10 +483,12 @@ class UCXShuffleTransport(shuffleServerId: BlockManagerId, rapidsConf: RapidsCon
altList.synchronized {
if (validHandlers.contains(handler)) {
// This is expensive, but will be refactored with a queue per client.
// As it stands, in the good case it should be invoked once per task/peer,
// on task completion, and `altList` should be empty
// will be skipped.
// When there are errors, we would get more invocations.
// As it stands, in the good case, it should be invoked once per task/peer,
// on task completion, and `altList` should be empty, turning this into
// mostly a noop.
// When there are errors, we will get more invocations, specifically as `transferError`
// is handled by `RapidsShuffleFetchHandler` and then later when the task finally
// fails.
if (!altList.isEmpty) {
val it = altList.iterator()
val toRemove = new ArrayBuffer[PendingTransferRequest]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ class RapidsShuffleClient(
/**
* Cancel pending requests for handler `handler` to the peer represented by this client.
* @param handler instance to use to find requests to cancel
* @note this currentl only cancels pending requests that are queued in the transport,
* @note this currently only cancels pending requests that are queued in the transport,
* and not in flight.
*/
def cancelPending(handler: RapidsShuffleFetchHandler): Unit = {
Expand Down

0 comments on commit 161c6fa

Please sign in to comment.