From 1a146df804d3dab89c7ca7d5de487a43b3eaee24 Mon Sep 17 00:00:00 2001 From: Amit Murthy Date: Tue, 10 Nov 2015 13:19:02 +0530 Subject: [PATCH] added tests and some fixes --- base/multi.jl | 35 ++++++---- base/sharedarray.jl | 9 +++ test/examples.jl | 2 - test/parallel_exec.jl | 154 ++++++++++++++++++++++++++++++++++++++---- 4 files changed, 172 insertions(+), 28 deletions(-) diff --git a/base/multi.jl b/base/multi.jl index aafd59df3bc02..7e008d5e43842 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -564,7 +564,7 @@ function isready(rr::Future) if rr.where == myid() isready(lookup_ref(rid).c) else - remotecall_fetch(id->isready(lookup_ref(rid).c), rr.where, rid) + remotecall_fetch(rid->isready(lookup_ref(rid).c), rr.where, rid) end end @@ -573,7 +573,7 @@ function isready(rr::RemoteRef, args...) if rr.where == myid() isready(lookup_ref(rid).c, args...) else - remotecall_fetch(id->isready(lookup_ref(rid).c, args...), rr.where, rid) + remotecall_fetch(rid->isready(lookup_ref(rid).c, args...), rr.where, rid) end end @@ -634,17 +634,15 @@ function add_clients(pairs::Vector) end function send_add_client(rr::AbstractRemoteRef, i) - args = isa(rr, Future) ? (remoteref_id(rr), i) : (remoteref_id(rr), (i, :remoteref)) - if rr.where == myid() - add_client(args...) + add_client(remoteref_id(rr), i) elseif i != rr.where # don't need to send add_client if the message is already going # to the processor that owns the remote ref. it will add_client # itself inside deserialize(). w = worker_from_id(rr.where) #println("$(myid()) adding $((remoteref_id(rr), i)) for $(rr.where)") - push!(w.add_msgs, args) + push!(w.add_msgs, (remoteref_id(rr), i)) w.gcflag = true notify(any_gc_flag) end @@ -653,7 +651,7 @@ end channel_type{T}(rr::RemoteRef{T}) = T serialize(s::SerializationState, f::Future) = serialize(s, f, isnull(f.v)) -serialize(s::SerializationState, f::RemoteRef) = serialize(s, rr, true) +serialize(s::SerializationState, rr::RemoteRef) = serialize(s, rr, true) function serialize(s::SerializationState, rr::AbstractRemoteRef, addclient) if addclient p = worker_id_from_socket(s.io) @@ -687,7 +685,10 @@ end def_rv_channel() = Channel(1) type RemoteValue c::AbstractChannel - clientset::IntSet + clientset::IntSet # Set of workerids that have a reference to this channel. + # Keeping ids instead of a count aids in cleaning up upon + # a worker exit. + waitingfor::Int # processor we need to hear from to fill this, or 0 RemoteValue(c) = new(c, IntSet(), 0) @@ -881,18 +882,22 @@ fetch(x::ANY) = x isready(rv::RemoteValue, args...) = isready(rv.c, args...) function put!(rr::Future, v) !isnull(rr.v) && error("Future can be set only once") - call_on_owner(put_future, rr, v) + call_on_owner(put_future, rr, v, myid()) + rr.v = v rr end -function put_future(rid, v) +function put_future(rid, v, callee) rv = lookup_ref(rid) isready(rv) && error("Future can be set only once") put!(rv, v) + # The callee has the value and hence can be removed from the remote store. + del_client(rid, callee) + nothing end put!(rv::RemoteValue, args...) = put!(rv.c, args...) -put_ref(rid, args...) = put!(lookup_ref(rid), args...) +put_ref(rid, args...) = (put!(lookup_ref(rid), args...); nothing) put!(rr::RemoteRef, args...) = (call_on_owner(put_ref, rr, args...); rr) # take! is not supported on Future @@ -1785,10 +1790,14 @@ function terminate_all_workers() end end -getindex(r::AbstractRemoteRef) = fetch(r) -function getindex(r::AbstractRemoteRef, args...) +getindex(r::RemoteRef) = fetch(r) +getindex(r::Future) = fetch(r) + +getindex(r::Future, args...) = getindex(fetch(r), args...) +function getindex(r::RemoteRef, args...) if r.where == myid() return getindex(fetch(r), args...) end return remotecall_fetch(getindex, r.where, r, args...) end + diff --git a/base/sharedarray.jl b/base/sharedarray.jl index 5984f73847bb9..a07f072e9e6db 100644 --- a/base/sharedarray.jl +++ b/base/sharedarray.jl @@ -167,6 +167,15 @@ function initialize_shared_array(S, s, onlocalhost, init, pids) end end end + + finalizer(S, finalize_refs) + S +end + +function finalize_refs(S) + for r in S.refs + finalize(r) + end end typealias SharedVector{T} SharedArray{T,1} diff --git a/test/examples.jl b/test/examples.jl index 603cae04a0b16..8c6e2133c2403 100644 --- a/test/examples.jl +++ b/test/examples.jl @@ -39,8 +39,6 @@ include(joinpath(dir, "queens.jl")) @unix_only begin script = joinpath(dir, "clustermanager/simple/test_simple.jl") cmd = `$(Base.julia_cmd()) $script` - - wait(proc) if !success(pipeline(cmd; stdout=STDOUT, stderr=STDERR)) && ccall(:jl_running_on_valgrind,Cint,()) == 0 error("UnixDomainCM failed test, cmd : $cmd") end diff --git a/test/parallel_exec.jl b/test/parallel_exec.jl index e6d5acbe9f9b4..eb1edf8e72375 100644 --- a/test/parallel_exec.jl +++ b/test/parallel_exec.jl @@ -7,23 +7,151 @@ addprocs(3; exeflags=`--check-bounds=yes --depwarn=error`) id_me = myid() id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))] +# Test Futures +function testf(id) + f=Future(id) + @test isready(f) == false + @test isnull(f.v) == true + put!(f, :OK) + @test isready(f) == true + @test isnull(f.v) == false + + @test_throws ErrorException put!(f, :OK) # Cannot put! to a already set future + @test_throws MethodError take!(f) # take! is unsupported on a Future + + @test fetch(f) == :OK +end + +testf(id_me) +testf(id_other) + +# Distributed GC tests for Futures +function test_futures_dgc(id) + f = remotecall(myid, id) + fid = remoteref_id(f) + + # remote value should be deleted after a fetch + @test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, fid) == true + @test isnull(f.v) == true + @test fetch(f) == id + @test isnull(f.v) == false + @test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, fid) == false + + + # if unfetched, it should be deleted after a finalize + f = remotecall(myid, id) + fid = remoteref_id(f) + @test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, fid) == true + @test isnull(f.v) == true + finalize(f) + Base.flush_gc_msgs() + @test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, fid) == false +end + +test_futures_dgc(id_me) +test_futures_dgc(id_other) + +# if sent to another worker, it should not be deleted till the other worker has fetched. +wid1 = workers()[1] +wid2 = workers()[2] +f = remotecall(myid, wid1) +fid = remoteref_id(f) + +fstore = RemoteRef(wid2) +put!(fstore, f) + +@test fetch(f) == wid1 +@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == true +remotecall_fetch(r->fetch(fetch(r)), wid2, fstore) +@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == false + +# put! should release remote reference since it would have been cached locally +f = Future(wid1) +fid = remoteref_id(f) + +# should not be created remotely till accessed +@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == false +# create it remotely +isready(f) + +@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == true +put!(f, :OK) +@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == false +@test fetch(f) == :OK + +# RemoteException should be thrown on a put! when another process has set the value +f = Future(wid1) +fid = remoteref_id(f) + +fstore = RemoteRef(wid2) +put!(fstore, f) # send f to wid2 +put!(f, :OK) # set value from master + +@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == true + +testval = remotecall_fetch(wid2, fstore) do x + try + put!(fetch(x), :OK) + return 0 + catch e + if isa(e, RemoteException) + return 1 + else + return 2 + end + end +end +@test testval == 1 + +# Distributed GC tests for RemoteRefs +function test_remoteref_dgc(id) + rr = RemoteRef(id) + put!(rr, :OK) + rrid = remoteref_id(rr) + + # remote value should be deleted after finalizing the ref + @test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, rrid) == true + @test fetch(rr) == :OK + @test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, rrid) == true + finalize(rr) + Base.flush_gc_msgs() + @test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, rrid) == false +end +test_remoteref_dgc(id_me) +test_remoteref_dgc(id_other) + +# if sent to another worker, it should not be deleted till the other worker has also finalized. +wid1 = workers()[1] +wid2 = workers()[2] +rr = RemoteRef(wid1) +rrid = remoteref_id(rr) + +fstore = RemoteRef(wid2) +put!(fstore, rr) + +@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, rrid) == true +finalize(rr); Base.flush_gc_msgs() # finalize locally +@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, rrid) == true +remotecall_fetch(r->(finalize(take!(r)); Base.flush_gc_msgs(); nothing), wid2, fstore) # finalize remotely +sleep(0.5) # to ensure that wid2 messages have been executed on wid1 +@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, rrid) == false + @test fetch(@spawnat id_other myid()) == id_other @test @fetchfrom id_other begin myid() end == id_other @fetch begin myid() end -rr=Future() -@test typeof(rr) == Future -a = rand(5,5) -put!(rr, a) -@test rr[2,3] == a[2,3] -@test rr[] == a - -rr=Future(workers()[1]) -@test typeof(rr) == Future -a = rand(5,5) -put!(rr, a) -@test rr[1,5] == a[1,5] -@test rr[] == a +# test getindex on Futures and RemoteRefs +function test_indexing(rr) + a = rand(5,5) + put!(rr, a) + @test rr[2,3] == a[2,3] + @test rr[] == a +end + +test_indexing(Future()) +test_indexing(Future(id_other)) +test_indexing(RemoteRef()) +test_indexing(RemoteRef(id_other)) dims = (20,20,20)