Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

port improvements from vchuravy/NCCL.jl #5

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions examples/scaffold.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import MPI
import NCCL
using CuArrays
using CUDAdrv
using CUDAnative

MPI.Init()
comm = MPI.COMM_WORLD
myrank = MPI.Comm_rank(comm)
nranks = MPI.Comm_size(comm)

# Issues:
# - Avoid allocations during allReduce

print(stdout, ENV)

@info "MPI initialized" myrank nranks

if myrank == 0
uid = NCCL.UniqueID()
else
uid = nothing
end
uid = MPI.bcast(uid, 0, comm)::NCCL.UniqueID

dev = CuDevice(parse(Int, first(split(ENV["CUDA_VISIBLE_DEVICES"], ","))))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "right" way of doing this is:

    lcomm = MPI.Comm_split_type(mpicomm, MPI.MPI_COMM_TYPE_SHARED,
                                MPI.Comm_rank(mpicomm))
    CUDAnative.device!(MPI.Comm_rank(lcomm))

For NCCL to work best all devices need to be visible to all MPI processes and there should only be a maxium of n local ranks per n devices

@info "NCCL uid bcast" myrank uid dev
CUDAnative.device!(dev)

cuComm = NCCL.Communicator(nranks, uid, myrank)

recv = CuArray{Float32}(undef, 1024)
send = CuArray{Float32}(undef, 1024)
fill!(send, float(myrank))

# Stream to do communication on
stream = CuStream()

event = CuEvent(CUDAdrv.EVENT_DISABLE_TIMING)
NCCL.allReduce(+, send, recv, cuComm, stream)
CUDAdrv.record(event, stream) # mark communication as done

# Enqueue a marker on CuDefaultStream to wait on the communication
wait(event)
# Now do work on CuDefaultStream()
# ...

synchronize(stream)
NCCL.destroy(cuComm)
MPI.Finalize()
14 changes: 14 additions & 0 deletions src/base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,17 @@ function ncclDataType(T::DataType)
throw(ArgumentError("ncclDataType equivalent for input type $T does not exist!"))
end
end

function ncclReductionOp(T::DataType)
if T == typeof(+)
return ncclSum
elseif T == typeof(*)
return ncclProd
elseif T == typeof(min)
return ncclMin
elseif T == typeof(max)
return ncclMax
else
throw(ArgumentError("ncclReductionOp equivalent for input function type $T does not exist!"))
end
end
6 changes: 6 additions & 0 deletions src/collective.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

export Allreduce!, Broadcast!, Reduce!, Allgather!, ReduceScatter!

function allReduce!(::Op, sendbuf, recvbuf, comm::Communicator; stream=CUDAdrv.CuDefaultStream()) where Op
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following the MPI.jl convention to lowercase Julia specific implementations

op = ncclReductionOp(Op)
@assert size(sendbuf) == size(recvbuf)
Allreduce!(sendbuf, recvbuf, length(sendbuf), op, comm, stream=stream)
end

function Allreduce!(sendbuf, recvbuf, count::Integer, op, comm::Communicator; stream::CuStream=CuDefaultStream() )
data_type = ncclDataType(eltype(recvbuf))
ncclAllReduce(sendbuf, recvbuf, count, data_type, op, comm.handle, stream)
Expand Down
19 changes: 19 additions & 0 deletions src/communicator.jl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ end


# creates a new communicator (multi thread/process version)
"""
Communicator(nranks, uid, rank)

Creates a new Communicator (multi thread/process version)
`rank` must be between `0` and `nranks-1` and unique within a communicator
clique. Each rank is associated to a CUDA device which has to be set before
calling `Communicator`. Implicitly synchroniszed with other ranks so it must
be called by different threads/processes or used within `group`.
"""
function Communicator(nranks, comm_id, rank)
handle_ref = Ref{ncclComm_t}(C_NULL)
ncclCommInitRank(handle_ref, nranks, comm_id.internal, rank)
Expand Down Expand Up @@ -70,3 +79,13 @@ function rank(comm::Communicator)
ncclCommUserRank(comm.handle, rank_ref)
return rank_ref[]
end

function abort(comm::Communicator)
ncclCommAbort(comm.handle)
end

function getError(comm::Communicator)
ref = Ref{ncclResult_t}()
ncclCommGetAsyncError(comm.handle, ref)
return NCCLError(ref[])
end
8 changes: 7 additions & 1 deletion src/group.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# Group calls

export groupStart, groupEnd
export groupStart, groupEnd, group

groupStart() = ncclGroupStart()
groupEnd() = ncclGroupEnd()

function group(f)
groupStart()
f()
groupEnd()
end