Skip to content

Commit

Permalink
added tests and some fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Nov 12, 2015
1 parent 4411e66 commit 1a146df
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 28 deletions.
35 changes: 22 additions & 13 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ function isready(rr::Future)
if rr.where == myid()
isready(lookup_ref(rid).c)
else
remotecall_fetch(id->isready(lookup_ref(rid).c), rr.where, rid)
remotecall_fetch(rid->isready(lookup_ref(rid).c), rr.where, rid)
end
end

Expand All @@ -573,7 +573,7 @@ function isready(rr::RemoteRef, args...)
if rr.where == myid()
isready(lookup_ref(rid).c, args...)
else
remotecall_fetch(id->isready(lookup_ref(rid).c, args...), rr.where, rid)
remotecall_fetch(rid->isready(lookup_ref(rid).c, args...), rr.where, rid)
end
end

Expand Down Expand Up @@ -634,17 +634,15 @@ function add_clients(pairs::Vector)
end

function send_add_client(rr::AbstractRemoteRef, i)
args = isa(rr, Future) ? (remoteref_id(rr), i) : (remoteref_id(rr), (i, :remoteref))

if rr.where == myid()
add_client(args...)
add_client(remoteref_id(rr), i)
elseif i != rr.where
# don't need to send add_client if the message is already going
# to the processor that owns the remote ref. it will add_client
# itself inside deserialize().
w = worker_from_id(rr.where)
#println("$(myid()) adding $((remoteref_id(rr), i)) for $(rr.where)")
push!(w.add_msgs, args)
push!(w.add_msgs, (remoteref_id(rr), i))
w.gcflag = true
notify(any_gc_flag)
end
Expand All @@ -653,7 +651,7 @@ end
channel_type{T}(rr::RemoteRef{T}) = T

serialize(s::SerializationState, f::Future) = serialize(s, f, isnull(f.v))
serialize(s::SerializationState, f::RemoteRef) = serialize(s, rr, true)
serialize(s::SerializationState, rr::RemoteRef) = serialize(s, rr, true)
function serialize(s::SerializationState, rr::AbstractRemoteRef, addclient)
if addclient
p = worker_id_from_socket(s.io)
Expand Down Expand Up @@ -687,7 +685,10 @@ end
def_rv_channel() = Channel(1)
type RemoteValue
c::AbstractChannel
clientset::IntSet
clientset::IntSet # Set of workerids that have a reference to this channel.
# Keeping ids instead of a count aids in cleaning up upon
# a worker exit.

waitingfor::Int # processor we need to hear from to fill this, or 0

RemoteValue(c) = new(c, IntSet(), 0)
Expand Down Expand Up @@ -881,18 +882,22 @@ fetch(x::ANY) = x
isready(rv::RemoteValue, args...) = isready(rv.c, args...)
function put!(rr::Future, v)
!isnull(rr.v) && error("Future can be set only once")
call_on_owner(put_future, rr, v)
call_on_owner(put_future, rr, v, myid())
rr.v = v
rr
end
function put_future(rid, v)
function put_future(rid, v, callee)
rv = lookup_ref(rid)
isready(rv) && error("Future can be set only once")
put!(rv, v)
# The callee has the value and hence can be removed from the remote store.
del_client(rid, callee)
nothing
end


put!(rv::RemoteValue, args...) = put!(rv.c, args...)
put_ref(rid, args...) = put!(lookup_ref(rid), args...)
put_ref(rid, args...) = (put!(lookup_ref(rid), args...); nothing)
put!(rr::RemoteRef, args...) = (call_on_owner(put_ref, rr, args...); rr)

# take! is not supported on Future
Expand Down Expand Up @@ -1785,10 +1790,14 @@ function terminate_all_workers()
end
end

getindex(r::AbstractRemoteRef) = fetch(r)
function getindex(r::AbstractRemoteRef, args...)
getindex(r::RemoteRef) = fetch(r)
getindex(r::Future) = fetch(r)

getindex(r::Future, args...) = getindex(fetch(r), args...)
function getindex(r::RemoteRef, args...)
if r.where == myid()
return getindex(fetch(r), args...)
end
return remotecall_fetch(getindex, r.where, r, args...)
end

9 changes: 9 additions & 0 deletions base/sharedarray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,15 @@ function initialize_shared_array(S, s, onlocalhost, init, pids)
end
end
end

finalizer(S, finalize_refs)
S
end

function finalize_refs(S)
for r in S.refs
finalize(r)
end
end

typealias SharedVector{T} SharedArray{T,1}
Expand Down
2 changes: 0 additions & 2 deletions test/examples.jl
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ include(joinpath(dir, "queens.jl"))
@unix_only begin
script = joinpath(dir, "clustermanager/simple/test_simple.jl")
cmd = `$(Base.julia_cmd()) $script`

wait(proc)
if !success(pipeline(cmd; stdout=STDOUT, stderr=STDERR)) && ccall(:jl_running_on_valgrind,Cint,()) == 0
error("UnixDomainCM failed test, cmd : $cmd")
end
Expand Down
154 changes: 141 additions & 13 deletions test/parallel_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,151 @@ addprocs(3; exeflags=`--check-bounds=yes --depwarn=error`)
id_me = myid()
id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))]

# Test Futures
function testf(id)
f=Future(id)
@test isready(f) == false
@test isnull(f.v) == true
put!(f, :OK)
@test isready(f) == true
@test isnull(f.v) == false

@test_throws ErrorException put!(f, :OK) # Cannot put! to a already set future
@test_throws MethodError take!(f) # take! is unsupported on a Future

@test fetch(f) == :OK
end

testf(id_me)
testf(id_other)

# Distributed GC tests for Futures
function test_futures_dgc(id)
f = remotecall(myid, id)
fid = remoteref_id(f)

# remote value should be deleted after a fetch
@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, fid) == true
@test isnull(f.v) == true
@test fetch(f) == id
@test isnull(f.v) == false
@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, fid) == false


# if unfetched, it should be deleted after a finalize
f = remotecall(myid, id)
fid = remoteref_id(f)
@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, fid) == true
@test isnull(f.v) == true
finalize(f)
Base.flush_gc_msgs()
@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, fid) == false
end

test_futures_dgc(id_me)
test_futures_dgc(id_other)

# if sent to another worker, it should not be deleted till the other worker has fetched.
wid1 = workers()[1]
wid2 = workers()[2]
f = remotecall(myid, wid1)
fid = remoteref_id(f)

fstore = RemoteRef(wid2)
put!(fstore, f)

@test fetch(f) == wid1
@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == true
remotecall_fetch(r->fetch(fetch(r)), wid2, fstore)
@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == false

# put! should release remote reference since it would have been cached locally
f = Future(wid1)
fid = remoteref_id(f)

# should not be created remotely till accessed
@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == false
# create it remotely
isready(f)

@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == true
put!(f, :OK)
@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == false
@test fetch(f) == :OK

# RemoteException should be thrown on a put! when another process has set the value
f = Future(wid1)
fid = remoteref_id(f)

fstore = RemoteRef(wid2)
put!(fstore, f) # send f to wid2
put!(f, :OK) # set value from master

@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == true

testval = remotecall_fetch(wid2, fstore) do x
try
put!(fetch(x), :OK)
return 0
catch e
if isa(e, RemoteException)
return 1
else
return 2
end
end
end
@test testval == 1

# Distributed GC tests for RemoteRefs
function test_remoteref_dgc(id)
rr = RemoteRef(id)
put!(rr, :OK)
rrid = remoteref_id(rr)

# remote value should be deleted after finalizing the ref
@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, rrid) == true
@test fetch(rr) == :OK
@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, rrid) == true
finalize(rr)
Base.flush_gc_msgs()
@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, rrid) == false
end
test_remoteref_dgc(id_me)
test_remoteref_dgc(id_other)

# if sent to another worker, it should not be deleted till the other worker has also finalized.
wid1 = workers()[1]
wid2 = workers()[2]
rr = RemoteRef(wid1)
rrid = remoteref_id(rr)

fstore = RemoteRef(wid2)
put!(fstore, rr)

@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, rrid) == true
finalize(rr); Base.flush_gc_msgs() # finalize locally
@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, rrid) == true
remotecall_fetch(r->(finalize(take!(r)); Base.flush_gc_msgs(); nothing), wid2, fstore) # finalize remotely
sleep(0.5) # to ensure that wid2 messages have been executed on wid1
@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, rrid) == false

@test fetch(@spawnat id_other myid()) == id_other
@test @fetchfrom id_other begin myid() end == id_other
@fetch begin myid() end

rr=Future()
@test typeof(rr) == Future
a = rand(5,5)
put!(rr, a)
@test rr[2,3] == a[2,3]
@test rr[] == a

rr=Future(workers()[1])
@test typeof(rr) == Future
a = rand(5,5)
put!(rr, a)
@test rr[1,5] == a[1,5]
@test rr[] == a
# test getindex on Futures and RemoteRefs
function test_indexing(rr)
a = rand(5,5)
put!(rr, a)
@test rr[2,3] == a[2,3]
@test rr[] == a
end

test_indexing(Future())
test_indexing(Future(id_other))
test_indexing(RemoteRef())
test_indexing(RemoteRef(id_other))

dims = (20,20,20)

Expand Down

0 comments on commit 1a146df

Please sign in to comment.