diff --git a/base/docs/helpdb.jl b/base/docs/helpdb.jl index 5f45c8c8e49bb..6e789068541e2 100644 --- a/base/docs/helpdb.jl +++ b/base/docs/helpdb.jl @@ -1155,18 +1155,43 @@ Return a tuple `(I, J, V)` where `I` and `J` are the row and column indexes of t findnz doc""" - RemoteRef() + Future() -Make an uninitialized remote reference on the local machine. +Create a `Future` on the local machine. """ -RemoteRef() +Future() doc""" - RemoteRef(n) + Future(n) -Make an uninitialized remote reference on process `n`. +Create a `Future` on process `n`. """ -RemoteRef(::Integer) +Future(::Integer) + +doc""" + RemoteChannel() + +Make an reference to a `Channel{Any}(1)` on the local machine. +""" +RemoteChannel() + +doc""" + RemoteChannel(n) + +Make an reference to a `Channel{Any}(1)` on process `n`. +""" +RemoteChannel(::Integer) + +doc""" + RemoteChannel(f::Function, pid) + +Create references to remote channels of a specific size and type. `f()` is a function that when +executed on `pid` must return an implementation of an `AbstractChannel`. + +For example, `RemoteChannel(()->Channel{Int}(10), pid)`, will return a reference to a channel of type `Int` +and size 10 on `pid`. +""" +RemoteChannel(f::Function, pid) doc""" ```rst @@ -2468,7 +2493,7 @@ display doc""" @spawnat -Accepts two arguments, `p` and an expression. A closure is created around the expression and run asynchronously on process `p`. Returns a `RemoteRef` to the result. +Accepts two arguments, `p` and an expression. A closure is created around the expression and run asynchronously on process `p`. Returns a `Future` to the result. """ :@spawnat @@ -4840,11 +4865,11 @@ Optional argument `msg` is a descriptive error string. DimensionMismatch doc""" - take!(RemoteRef) + take!(RemoteChannel) -Fetch the value of a remote reference, removing it so that the reference is empty again. +Fetch a value from a remote channel, also removing it in the processs. """ -take!(::RemoteRef) +take!(::RemoteChannel) doc""" take!(Channel) @@ -6639,7 +6664,8 @@ doc""" Block the current task until some event occurs, depending on the type of the argument: -* `RemoteRef`: Wait for a value to become available for the specified remote reference. +* `RemoteChannel` : Wait for a value to become available on the specified remote channel. +* `Future` : Wait for a value to become available for the specified future. * `Channel`: Wait for a value to be appended to the channel. * `Condition`: Wait for `notify` on a condition. * `Process`: Wait for a process or process chain to exit. The `exitcode` field of a process can be used to determine success or failure. @@ -7029,17 +7055,25 @@ Return the minimum of the arguments. Operates elementwise over arrays. min doc""" - isready(r::RemoteRef) + isready(r::RemoteChannel) + +Determine whether a `RemoteChannel` has a value stored to it. Note that this function can cause race conditions, since by the time you receive its result it may no longer be true. +However, it can be safely used on a `Future` since they are assigned only once. +""" +isready + +doc""" + isready(r::Future) -Determine whether a `RemoteRef` has a value stored to it. Note that this function can cause race conditions, since by the time you receive its result it may no longer be true. It is recommended that this function only be used on a `RemoteRef` that is assigned once. +Determine whether a `Future` has a value stored to it. -If the argument `RemoteRef` is owned by a different node, this call will block to wait for the answer. It is recommended to wait for `r` in a separate task instead, or to use a local `RemoteRef` as a proxy: +If the argument `Future` is owned by a different node, this call will block to wait for the answer. It is recommended to wait for `r` in a separate task instead, or to use a local `Channel` as a proxy: - rr = RemoteRef() - @async put!(rr, remotecall_fetch(long_computation, p)) - isready(rr) # will not block + c = Channel(1) + @async put!(c, remotecall_fetch(long_computation, p)) + isready(c) # will not block """ -isready + isready(r::Future) doc""" InexactError() @@ -8109,11 +8143,19 @@ An iterator that cycles through `iter` forever. cycle doc""" - put!(RemoteRef, value) + put!(RemoteChannel, value) + +Store a value to the remote channel. If the channel is full, blocks until space is available. Returns its first argument. +""" +put!(::RemoteChannel, value) + +doc""" + put!(Future, value) -Store a value to a remote reference. Implements "shared queue of length 1" semantics: if a value is already present, blocks until the value is removed with `take!`. Returns its first argument. +Store a value to a future. Future's are write-once remote references. A `put!` on an already set `Future` throws an Exception. +All asynchronous remote calls return `Future`s and set the value to the return value of the call upon completion. """ -put!(::RemoteRef, value) +put!(::Future, value) doc""" put!(Channel, value) @@ -8860,7 +8902,7 @@ Base.(:(!=)) doc""" @spawn -Creates a closure around an expression and runs it on an automatically-chosen process, returning a `RemoteRef` to the result. +Creates a closure around an expression and runs it on an automatically-chosen process, returning a `Future` to the result. """ :@spawn @@ -9114,7 +9156,7 @@ readavailable doc""" remotecall(func, id, args...) -Call a function asynchronously on the given arguments on the specified process. Returns a `RemoteRef`. +Call a function asynchronously on the given arguments on the specified process. Returns a `Future`. """ remotecall @@ -9510,7 +9552,10 @@ doc""" Waits and fetches a value from `x` depending on the type of `x`. Does not remove the item fetched: -* `RemoteRef`: Wait for and get the value of a remote reference. If the remote value is an exception, throws a `RemoteException` which captures the remote exception and backtrace. +* `Future`: Wait for and get the value of a Future. The fetched value is cached locally. Further calls to `fetch` on the same reference +return the cached value. +If the remote value is an exception, throws a `RemoteException` which captures the remote exception and backtrace. +* `RemoteChannel`: Wait for and get the value of a remote reference. Exceptions raised are same as for a `Future` . * `Channel` : Wait for and get the first available item from the channel. """ fetch diff --git a/base/exports.jl b/base/exports.jl index 92de059e5e76f..c52536c2ec3f1 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -61,6 +61,7 @@ export FileOffset, Filter, FloatRange, + Future, Hermitian, UniformScaling, InsertionSort, @@ -87,7 +88,7 @@ export Rational, Regex, RegexMatch, - RemoteRef, + RemoteChannel, RepString, RevString, RoundFromZero, diff --git a/base/multi.jl b/base/multi.jl index 989cd42f24a4d..ecbb9ba02c47a 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -439,7 +439,7 @@ function deregister_worker(pg, pid) end push!(map_del_wrkr, pid) - # delete this worker from our RemoteRef client sets + # delete this worker from our remote reference client sets ids = [] tonotify = [] for (id,rv) in pg.refs @@ -462,25 +462,50 @@ function deregister_worker(pg, pid) end ## remote refs ## - const client_refs = WeakKeyDict() -type RemoteRef{T<:AbstractChannel} +abstract AbstractRemoteRef + +type Future <: AbstractRemoteRef + where::Int + whence::Int + id::Int + v::Nullable{Any} + + Future(w, wh, id) = Future(w, wh, id, Nullable{Any}()) + Future(w, wh, id, v) = (r = new(w,wh,id,v); test_existing_ref(r)) +end +finalize_future(f::Future) = (isnull(f.v) && send_del_client(f)) + +type RemoteChannel{T<:AbstractChannel} <: AbstractRemoteRef where::Int whence::Int id::Int - # TODO: cache value if it's fetched, but don't serialize the cached value - function RemoteRef(w, wh, id) - r = new(w,wh,id) - found = getkey(client_refs, r, false) - if !is(found,false) - return found + RemoteChannel(w, wh, id) = (r = new(w,wh,id); test_existing_ref(r)) +end + +function test_existing_ref(r::Future) + found = getkey(client_refs, r, false) + if !is(found,false) + if isnull(found.v) && !isnull(r.v) + # we have recd the value from another source, probably a deserialized ref, send a del_client message + send_del_client(r) + found.v = r.v end - client_refs[r] = true - finalizer(r, send_del_client) - r + return found end + client_refs[r] = true + finalizer(r, finalize_future) + r +end + +function test_existing_ref(r::RemoteChannel) + found = getkey(client_refs, r, false) + !is(found,false) && return found + client_refs[r] = true + finalizer(r, send_del_client) + r end let REF_ID::Int = 1 @@ -491,53 +516,77 @@ let REF_ID::Int = 1 next_rrid_tuple() = (myid(),next_ref_id()) end -RemoteRef(w::LocalProcess) = RemoteRef(w.id) -RemoteRef(w::Worker) = RemoteRef(w.id) -function RemoteRef(pid::Integer=myid()) +Future(w::LocalProcess) = Future(w.id) +Future(w::Worker) = Future(w.id) +function Future(pid::Integer=myid()) rrid = next_rrid_tuple() - RemoteRef{Channel{Any}}(pid, rrid[1], rrid[2]) + Future(pid, rrid[1], rrid[2]) end -function RemoteRef(f::Function, pid::Integer=myid()) +function RemoteChannel(pid::Integer=myid()) + rrid = next_rrid_tuple() + RemoteChannel{Channel{Any}}(pid, rrid[1], rrid[2]) +end +function RemoteChannel(f::Function, pid::Integer=myid()) remotecall_fetch(pid, f, next_rrid_tuple()) do f, rrid rv=lookup_ref(rrid, f) - RemoteRef{typeof(rv.c)}(myid(), rrid[1], rrid[2]) + RemoteChannel{typeof(rv.c)}(myid(), rrid[1], rrid[2]) end end -hash(r::RemoteRef, h::UInt) = hash(r.whence, hash(r.id, h)) -==(r::RemoteRef, s::RemoteRef) = (r.whence==s.whence && r.id==s.id) +hash(r::AbstractRemoteRef, h::UInt) = hash(r.whence, hash(r.id, h)) +==(r::AbstractRemoteRef, s::AbstractRemoteRef) = (r.whence==s.whence && r.id==s.id) -rr2id(r::RemoteRef) = (r.whence, r.id) +remoteref_id(r::AbstractRemoteRef) = (r.whence, r.id) +function channel_from_id(id) + rv = get(PGRP.refs, id, false) + if rv === false + throw(ErrorException("Local instance of remote reference not found")) + end + rv.c +end lookup_ref(id, f=def_rv_channel) = lookup_ref(PGRP, id, f) function lookup_ref(pg, id, f) rv = get(pg.refs, id, false) if rv === false # first we've heard of this ref - rv = RemoteValue(f) + rv = RemoteValue(f()) pg.refs[id] = rv push!(rv.clientset, id[1]) end rv end +function isready(rr::Future) + !isnull(rr.v) && return true -function isready(rr::RemoteRef, args...) - rid = rr2id(rr) + rid = remoteref_id(rr) + if rr.where == myid() + isready(lookup_ref(rid).c) + else + remotecall_fetch(rid->isready(lookup_ref(rid).c), rr.where, rid) + end +end + +function isready(rr::RemoteChannel, args...) + rid = remoteref_id(rr) if rr.where == myid() isready(lookup_ref(rid).c, args...) else - remotecall_fetch(id->isready(lookup_ref(rid).c, args...), rr.where, rid) + remotecall_fetch(rid->isready(lookup_ref(rid).c, args...), rr.where, rid) end end del_client(id, client) = del_client(PGRP, id, client) function del_client(pg, id, client) - rv = lookup_ref(id) - delete!(rv.clientset, client) - if isempty(rv.clientset) - delete!(pg.refs, id) - #print("$(myid()) collected $id\n") + rv = get(pg.refs, id, false) + + if rv != false + delete!(rv.clientset, client) + if isempty(rv.clientset) + delete!(pg.refs, id) + #print("$(myid()) collected $id\n") + end end nothing end @@ -556,16 +605,16 @@ function start_gc_msgs_task() end end -function send_del_client(rr::RemoteRef) +function send_del_client(rr) if rr.where == myid() - del_client(rr2id(rr), myid()) + del_client(remoteref_id(rr), myid()) else if in(rr.where, map_del_wrkr) # for a removed worker, don't bother return end w = worker_from_id(rr.where) - push!(w.del_msgs, (rr2id(rr), myid())) + push!(w.del_msgs, (remoteref_id(rr), myid())) w.gcflag = true notify(any_gc_flag) end @@ -580,54 +629,69 @@ end function add_clients(pairs::Vector) for p in pairs - add_client(p[1], p[2]) + add_client(p[1], p[2]...) end end -function send_add_client(rr::RemoteRef, i) +function send_add_client(rr::AbstractRemoteRef, i) if rr.where == myid() - add_client(rr2id(rr), i) + add_client(remoteref_id(rr), i) elseif i != rr.where # don't need to send add_client if the message is already going # to the processor that owns the remote ref. it will add_client # itself inside deserialize(). w = worker_from_id(rr.where) - #println("$(myid()) adding $((rr2id(rr), i)) for $(rr.where)") - push!(w.add_msgs, (rr2id(rr), i)) + #println("$(myid()) adding $((remoteref_id(rr), i)) for $(rr.where)") + push!(w.add_msgs, (remoteref_id(rr), i)) w.gcflag = true notify(any_gc_flag) end end -channel_type{T}(rr::RemoteRef{T}) = T -function serialize(s::SerializationState, rr::RemoteRef) - i = worker_id_from_socket(s.io) - #println("$(myid()) serializing $rr to $i") - if i != -1 - #println("send add $rr to $i") - send_add_client(rr, i) +channel_type{T}(rr::RemoteChannel{T}) = T + +serialize(s::SerializationState, f::Future) = serialize(s, f, isnull(f.v)) +serialize(s::SerializationState, rr::RemoteChannel) = serialize(s, rr, true) +function serialize(s::SerializationState, rr::AbstractRemoteRef, addclient) + if addclient + p = worker_id_from_socket(s.io) + (p !== rr.where) && send_add_client(rr, p) end invoke(serialize, Tuple{SerializationState, Any}, s, rr) end -function deserialize{T<:RemoteRef}(s::SerializationState, t::Type{T}) +function deserialize{T<:Future}(s::SerializationState, t::Type{T}) + f = deserialize_rr(s,t) + Future(f.where, f.whence, f.id) # ctor adds to ref table +end + +function deserialize{T<:RemoteChannel}(s::SerializationState, t::Type{T}) + rr = deserialize_rr(s,t) + # call ctor to make sure this rr gets added to the client_refs table + RemoteChannel{channel_type(rr)}(rr.where, rr.whence, rr.id) +end + +function deserialize_rr(s, t) rr = invoke(deserialize, Tuple{SerializationState, DataType}, s, t) - where = rr.where - if where == myid() - add_client(rr2id(rr), myid()) + if rr.where == myid() + # send_add_client() is not executed when the ref is being + # serialized to where it exists + add_client(remoteref_id(rr), myid()) end - # call ctor to make sure this rr gets added to the client_refs table - RemoteRef{channel_type(rr)}(where, rr.whence, rr.id) + rr end -# data stored by the owner of a RemoteRef +# data stored by the owner of a remote reference def_rv_channel() = Channel(1) type RemoteValue c::AbstractChannel - clientset::IntSet + clientset::IntSet # Set of workerids that have a reference to this channel. + # Keeping ids instead of a count aids in cleaning up upon + # a worker exit. + waitingfor::Int # processor we need to hear from to fill this, or 0 - RemoteValue(f::Function) = new(f(), IntSet(), 0) + RemoteValue(c) = new(c, IntSet(), 0) end wait(rv::RemoteValue) = wait(rv.c) @@ -662,7 +726,7 @@ function run_work_thunk(rv::RemoteValue, thunk) end function schedule_call(rid, thunk) - rv = RemoteValue(def_rv_channel) + rv = RemoteValue(def_rv_channel()) (PGRP::ProcessGroup).refs[rid] = rv push!(rv.clientset, rid[1]) schedule(@task(run_work_thunk(rv,thunk))) @@ -671,7 +735,7 @@ end #localize_ref(b::Box) = Box(localize_ref(b.contents)) -#function localize_ref(r::RemoteRef) +#function localize_ref(r::RemoteChannel) # if r.where == myid() # fetch(r) # else @@ -705,15 +769,15 @@ function local_remotecall_thunk(f, args) end function remotecall(f, w::LocalProcess, args...) - rr = RemoteRef(w) - schedule_call(rr2id(rr), local_remotecall_thunk(f,args)) + rr = Future(w) + schedule_call(remoteref_id(rr), local_remotecall_thunk(f,args)) rr end function remotecall(f, w::Worker, args...) - rr = RemoteRef(w) + rr = Future(w) #println("$(myid()) asking for $rr") - send_msg(w, CallMsg{:call}(f, args, rr2id(rr))) + send_msg(w, CallMsg{:call}(f, args, remoteref_id(rr))) rr end @@ -747,8 +811,8 @@ function remotecall_wait(f, w::Worker, args...) prid = next_rrid_tuple() rv = lookup_ref(prid) rv.waitingfor = w.id - rr = RemoteRef(w) - send_msg(w, CallWaitMsg(f, args, rr2id(rr), prid)) + rr = Future(w) + send_msg(w, CallWaitMsg(f, args, remoteref_id(rr), prid)) v = fetch(rv.c) delete!(PGRP.refs, prid) isa(v, RemoteException) && throw(v) @@ -775,8 +839,8 @@ end remote_do(f, id::Integer, args...) = remote_do(f, worker_from_id(id), args...) # have the owner of rr call f on it -function call_on_owner(f, rr::RemoteRef, args...) - rid = rr2id(rr) +function call_on_owner(f, rr::AbstractRemoteRef, args...) + rid = remoteref_id(rr) if rr.where == myid() f(rid, args...) else @@ -795,16 +859,48 @@ function wait_ref(rid, callee, args...) end nothing end -wait(r::RemoteRef, args...) = (call_on_owner(wait_ref, r, myid(), args...); r) +wait(r::Future) = (!isnull(r.v) && return r; call_on_owner(wait_ref, r, myid()); r) +wait(r::RemoteChannel, args...) = (call_on_owner(wait_ref, r, myid(), args...); r) + +function fetch_future(rid, callee) + rv = lookup_ref(rid); + v = fetch(rv.c) + del_client(rid, callee) + v +end +function fetch(r::Future) + !isnull(r.v) && return get(r.v) + v=call_on_owner(fetch_future, r, myid()) + r.v=v + v +end fetch_ref(rid, args...) = fetch(lookup_ref(rid).c, args...) -fetch(r::RemoteRef, args...) = call_on_owner(fetch_ref, r, args...) +fetch(r::RemoteChannel, args...) = call_on_owner(fetch_ref, r, args...) fetch(x::ANY) = x -# storing a value to a RemoteRef +isready(rv::RemoteValue, args...) = isready(rv.c, args...) +function put!(rr::Future, v) + !isnull(rr.v) && error("Future can be set only once") + call_on_owner(put_future, rr, v, myid()) + rr.v = v + rr +end +function put_future(rid, v, callee) + rv = lookup_ref(rid) + isready(rv) && error("Future can be set only once") + put!(rv, v) + # The callee has the value and hence can be removed from the remote store. + del_client(rid, callee) + nothing +end + + put!(rv::RemoteValue, args...) = put!(rv.c, args...) -put_ref(rid, args...) = put!(lookup_ref(rid), args...) -put!(rr::RemoteRef, args...) = (call_on_owner(put_ref, rr, args...); rr) +put_ref(rid, args...) = (put!(lookup_ref(rid), args...); nothing) +put!(rr::RemoteChannel, args...) = (call_on_owner(put_ref, rr, args...); rr) + +# take! is not supported on Future take!(rv::RemoteValue, args...) = take!(rv.c, args...) function take_ref(rid, callee, args...) @@ -812,10 +908,12 @@ function take_ref(rid, callee, args...) isa(v, RemoteException) && (myid() == callee) && throw(v) v end -take!(rr::RemoteRef, args...) = call_on_owner(take_ref, rr, myid(), args...) +take!(rr::RemoteChannel, args...) = call_on_owner(take_ref, rr, myid(), args...) + +# close is not supported on Future close_ref(rid) = (close(lookup_ref(rid).c); nothing) -close(rr::RemoteRef) = call_on_owner(close_ref, rr) +close(rr::RemoteChannel) = call_on_owner(close_ref, rr) function deliver_result(sock::IO, msg, oid, value) @@ -1081,7 +1179,7 @@ function init_worker(manager::ClusterManager=DefaultClusterManager()) cluster_manager = manager disable_threaded_libs() - # Since our pid has yet to be set, ensure no RemoteRefs have been created or addprocs() called. + # Since our pid has yet to be set, ensure no RemoteChannel / Future have been created or addprocs() called. assert(nprocs() <= 1) assert(isempty(PGRP.refs)) assert(isempty(client_refs)) @@ -1338,7 +1436,7 @@ let nextidx = 0 if isa(v,Box) v = v.contents end - if isa(v,RemoteRef) + if isa(v,AbstractRemoteRef) p = v.where; break end end @@ -1602,7 +1700,7 @@ end function timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1) pollint > 0 || throw(ArgumentError("cannot set pollint to $pollint seconds")) start = time() - done = RemoteRef() + done = Channel(1) timercb(aw) = begin try if testcb() @@ -1692,10 +1790,14 @@ function terminate_all_workers() end end -getindex(r::RemoteRef) = fetch(r) -function getindex(r::RemoteRef, args...) +getindex(r::RemoteChannel) = fetch(r) +getindex(r::Future) = fetch(r) + +getindex(r::Future, args...) = getindex(fetch(r), args...) +function getindex(r::RemoteChannel, args...) if r.where == myid() return getindex(fetch(r), args...) end return remotecall_fetch(getindex, r.where, r, args...) end + diff --git a/base/precompile.jl b/base/precompile.jl index 42c326b0a7a38..337aa70c543c3 100644 --- a/base/precompile.jl +++ b/base/precompile.jl @@ -144,7 +144,7 @@ precompile(Base.REPLCompletions.completions, (ASCIIString, Int)) precompile(Base.Random.srand, ()) precompile(Base.Random.srand, (ASCIIString, Int)) precompile(Base.Random.srand, (UInt,)) -precompile(Base.RemoteRef, (Int, Int, Int)) +precompile(Base.RemoteChannel, (Int, Int, Int)) precompile(Base.RemoteValue, ()) precompile(Base.Set, ()) precompile(Base.SystemError, (ASCIIString,)) @@ -235,8 +235,8 @@ precompile(Base.getindex, (Type{ByteString}, ASCIIString, ASCIIString)) precompile(Base.getindex, (Type{Dict{Any, Any}}, Dict{Any, Any}, Dict{Any, Any}, Dict{Any, Any}, Dict{Any, Any}, Dict{Any, Any})) precompile(Base.getpid, ()) precompile(Base.hash, (Int,)) -precompile(Base.hash, (RemoteRef, UInt)) -precompile(Base.hash, (RemoteRef,)) +precompile(Base.hash, (RemoteChannel, UInt)) +precompile(Base.hash, (RemoteChannel,)) precompile(Base.haskey, (Base.EnvHash, ASCIIString)) precompile(Base.haskey, (Dict{Symbol,Any}, Symbol)) precompile(Base.haskey, (ObjectIdDict, Symbol)) @@ -267,8 +267,8 @@ precompile(Base.isequal, (Base.LineEdit.Prompt, Base.LineEdit.Prompt)) precompile(Base.isequal, (Bool, Bool)) precompile(Base.isequal, (Char, ASCIIString)) precompile(Base.isequal, (Int,Int)) -precompile(Base.isequal, (RemoteRef, RemoteRef)) -precompile(Base.isequal, (RemoteRef, WeakRef)) +precompile(Base.isequal, (RemoteChannel, RemoteChannel)) +precompile(Base.isequal, (RemoteChannel, WeakRef)) precompile(Base.isequal, (Symbol, Symbol)) precompile(Base.isequal, (VersionNumber, VersionNumber)) precompile(Base.isequal, (Void, Void)) @@ -349,7 +349,7 @@ precompile(Base.reinit_stdio, ()) precompile(Base.repeat, (ASCIIString, Int)) precompile(Base.repl_cmd, (Cmd, Base.Terminals.TTYTerminal)) precompile(Base.require, (Symbol,)) -precompile(Base.rr2id, (RemoteRef,)) +precompile(Base.remoteref_id, (RemoteChannel,)) precompile(Base.rsearch, (ASCIIString, Char)) precompile(Base.rstrip, (ASCIIString,)) precompile(Base.run, (Cmd,)) @@ -405,7 +405,7 @@ precompile(Base.sync_begin, ()) precompile(Base.sync_end, ()) precompile(Base.systemerror, (Symbol, Bool)) precompile(Base.take!, (Base.RemoteValue,)) -precompile(Base.take!, (RemoteRef,)) +precompile(Base.take!, (RemoteChannel,)) precompile(Base.take_ref, (Tuple{Int,Int},)) precompile(Base.takebuf_string, (IOBuffer,)) precompile(Base.task_local_storage, ()) @@ -421,7 +421,7 @@ precompile(Base.uv_error, (ASCIIString, Bool)) precompile(Base.uvfinalize, (Base.TTY,)) precompile(Base.vcat, (Base.LineEdit.Prompt,)) precompile(Base.wait, ()) -precompile(Base.wait, (RemoteRef,)) +precompile(Base.wait, (RemoteChannel,)) precompile(Base.write, (Base.Terminals.TTYTerminal, ASCIIString)) precompile(Base.write, (Base.Terminals.TerminalBuffer, ASCIIString)) precompile(Base.write, (IOBuffer, Vector{UInt8})) @@ -467,10 +467,10 @@ precompile(Base.schedule, (Array{Any, 1}, Task, Void)) precompile(Base.LineEdit.match_input, (Function, Base.LineEdit.MIState, Base.Terminals.TTYTerminal, Array{Char, 1}, Base.Dict{Char, Any})) precompile(Base.convert, (Type{Union{ASCIIString, UTF8String}}, ASCIIString)) precompile(Base.LineEdit.keymap_fcn, (Function, Base.LineEdit.MIState, ASCIIString)) -precompile(Base.weak_key_delete!, (Base.Dict{Any, Any}, Base.RemoteRef)) -precompile(==, (Base.RemoteRef, WeakRef)) -precompile(==, (Base.RemoteRef, Base.RemoteRef)) -precompile(Base.send_del_client, (Base.RemoteRef,)) +precompile(Base.weak_key_delete!, (Base.Dict{Any, Any}, Base.RemoteChannel)) +precompile(==, (Base.RemoteChannel, WeakRef)) +precompile(==, (Base.RemoteChannel, Base.RemoteChannel)) +precompile(Base.send_del_client, (Base.RemoteChannel,)) precompile(!=, (Base.SubString{UTF8String}, ASCIIString)) precompile(Base.print_joined, (Base.IOBuffer, Array{Base.SubString{UTF8String}, 1}, ASCIIString)) precompile(Base.call, (Array{Any, 1}, Type{Base.LineEdit.Prompt}, ASCIIString)) diff --git a/base/require.jl b/base/require.jl index d852239225e28..517b175ea085e 100644 --- a/base/require.jl +++ b/base/require.jl @@ -109,7 +109,7 @@ end function reload_path(path::AbstractString) had = haskey(package_list, path) if !had - package_locks[path] = RemoteRef() + package_locks[path] = RemoteChannel() end package_list[path] = time() tls = task_local_storage() diff --git a/base/sharedarray.jl b/base/sharedarray.jl index b4a1d2b7b1b48..9bcf777e0cb9a 100644 --- a/base/sharedarray.jl +++ b/base/sharedarray.jl @@ -3,7 +3,7 @@ type SharedArray{T,N} <: DenseArray{T,N} dims::NTuple{N,Int} pids::Vector{Int} - refs::Vector{RemoteRef} + refs::Vector # The segname is currently used only in the test scripts to ensure that # the shmem segment has been unlinked. @@ -63,7 +63,7 @@ function SharedArray(T::Type, dims::NTuple; init=false, pids=Int[]) func_mapshmem = () -> shm_mmap_array(T, dims, shm_seg_name, JL_O_RDWR) - refs = Array(RemoteRef, length(pids)) + refs = Array(Future, length(pids)) for (i, p) in enumerate(pids) refs[i] = remotecall(func_mapshmem, p) end @@ -83,27 +83,9 @@ function SharedArray(T::Type, dims::NTuple; init=false, pids=Int[]) systemerror("Error unlinking shmem segment " * shm_seg_name, rc != 0) end S = SharedArray{T,N}(dims, pids, refs, shm_seg_name) + initialize_shared_array(S, s, onlocalhost, init, pids) shm_seg_name = "" - if onlocalhost - init_loc_flds(S) - - # In the event that myid() is not part of pids, s will not be set - # in the init function above, hence setting it here if available. - S.s = s - else - S.pidx = 0 - end - - # if present init function is called on each of the parts - if isa(init, Function) - @sync begin - for p in pids - @async remotecall_wait(init, p, S) - end - end - end - finally if shm_seg_name != "" remotecall_fetch(shm_unlink, shmmem_create_pid, shm_seg_name) @@ -134,7 +116,7 @@ function SharedArray{T,N}(filename::AbstractString, ::Type{T}, dims::NTuple{N,In mode == "r" && !isfile(filename) && throw(ArgumentError("file $filename does not exist, but mode $mode cannot create it")) # Create the file if it doesn't exist, map it if it does - refs = Array(RemoteRef, length(pids)) + refs = Array(Future, length(pids)) func_mmap = mode -> open(filename, mode) do io Mmap.mmap(io, Array{T,N}, dims, offset; shared=true) end @@ -163,7 +145,11 @@ function SharedArray{T,N}(filename::AbstractString, ::Type{T}, dims::NTuple{N,In end S = SharedArray{T,N}(dims, pids, refs, filename) + initialize_shared_array(S, s, onlocalhost, init, pids) + S +end +function initialize_shared_array(S, s, onlocalhost, init, pids) if onlocalhost init_loc_flds(S) # In the event that myid() is not part of pids, s will not be set @@ -181,6 +167,19 @@ function SharedArray{T,N}(filename::AbstractString, ::Type{T}, dims::NTuple{N,In end end end + + finalizer(S, finalize_refs) + S +end + +function finalize_refs{T,N}(S::SharedArray{T,N}) + for r in S.refs + finalize(r) + end + empty!(S.pids) + empty!(S.refs) + init_loc_flds(S) + S.s = Array(T, ntuple(d->0,N)) S end @@ -193,7 +192,7 @@ linearindexing{S<:SharedArray}(::Type{S}) = LinearFast() function reshape{T,N}(a::SharedArray{T}, dims::NTuple{N,Int}) (length(a) != prod(dims)) && throw(DimensionMismatch("dimensions must be consistent with array size")) - refs = Array(RemoteRef, length(a.pids)) + refs = Array(Future, length(a.pids)) for (i, p) in enumerate(a.pids) refs[i] = remotecall(p, a.refs[i], dims) do r,d reshape(fetch(r),d) @@ -270,7 +269,13 @@ sub_1dim(S::SharedArray, pidx) = sub(S.s, range_1dim(S, pidx)) function init_loc_flds{T,N}(S::SharedArray{T,N}) if myid() in S.pids S.pidx = findfirst(S.pids, myid()) - S.s = fetch(S.refs[S.pidx]) + if isa(S.refs[1], Future) + refid = remoteref_id(S.refs[S.pidx]) + else + refid = S.refs[S.pidx] + end + c = channel_from_id(refid) + S.s = fetch(c) S.loc_subarr_1d = sub_1dim(S, S.pidx) else S.pidx = 0 @@ -287,6 +292,15 @@ function serialize(s::SerializationState, S::SharedArray) for n in SharedArray.name.names if n in [:s, :pidx, :loc_subarr_1d] Serializer.writetag(s.io, Serializer.UNDEFREF_TAG) + elseif n == :refs + v = getfield(S, n) + if isa(v[1], Future) + # convert to ids to avoid distributed GC overhead + ids = [remoteref_id(x) for x in v] + serialize(s, ids) + else + serialize(s, v) + end else serialize(s, getfield(S, n)) end diff --git a/doc/manual/parallel-computing.rst b/doc/manual/parallel-computing.rst index 90d085cf6e98f..b647606d37d21 100644 --- a/doc/manual/parallel-computing.rst +++ b/doc/manual/parallel-computing.rst @@ -34,12 +34,18 @@ references* and *remote calls*. A remote reference is an object that can be used from any process to refer to an object stored on a particular process. A remote call is a request by one process to call a certain function on certain arguments on another (possibly the same) process. -A remote call returns a remote reference to its result. Remote calls + +Remote references come in two flavors -``Future`` and ``RemoteChannel``. + +A remote call returns a ``Future`` to its result. Remote calls return immediately; the process that made the call proceeds to its next operation while the remote call happens somewhere else. You can -wait for a remote call to finish by calling :func:`wait` on its remote -reference, and you can obtain the full value of the result using -:func:`fetch`. You can store a value to a remote reference using :func:`put!`. +wait for a remote call to finish by calling :func:`wait` on the returned +`Future`, and you can obtain the full value of the result using +:func:`fetch`. + +On the other hand ``RemoteRefs`` are rewritable. For example, multiple processes +can co-ordinate their processing by referencing the same remote ``Channel``\ . Let's try this out. Starting with ``julia -p n`` provides ``n`` worker processes on the local machine. Generally it makes sense for ``n`` to @@ -49,32 +55,29 @@ equal the number of CPU cores on the machine. $ ./julia -p 2 - julia> r = remotecall(2, rand, 2, 2) - RemoteRef(2,1,5) - - julia> fetch(r) - 2x2 Float64 Array: - 0.60401 0.501111 - 0.174572 0.157411 + julia> r = remotecall(rand, 2, 2, 2) + Future(2,1,3,Nullable{Any}()) julia> s = @spawnat 2 1 .+ fetch(r) - RemoteRef(2,1,7) + Future(2,1,6,Nullable{Any}()) julia> fetch(s) 2x2 Float64 Array: 1.60401 1.50111 1.17457 1.15741 -The first argument to :func:`remotecall` is the index of the process -that will do the work. Most parallel programming in Julia does not -reference specific processes or the number of processes available, -but :func:`remotecall` is considered a low-level interface providing -finer control. The second argument to :func:`remotecall` is the function -to call, and the remaining arguments will be passed to this -function. As you can see, in the first line we asked process 2 to +The first argument to :func:`remotecall` is the function to call. +Most parallel programming in Julia does not reference specific processes +or the number of processes available, but :func:`remotecall` is +considered a low-level interface providing finer control. The second +argument to :func:`remotecall` is the index of the process +that will do the work, and the remaining arguments will be passed +to the function being called. + +As you can see, in the first line we asked process 2 to construct a 2-by-2 random matrix, and in the second line we asked it to add 1 to it. The result of both calculations is available in the -two remote references, ``r`` and ``s``. The :obj:`@spawnat` macro +two futures, ``r`` and ``s``. The :obj:`@spawnat` macro evaluates the expression in the second argument on the process specified by the first argument. @@ -86,22 +89,21 @@ but is more efficient. :: - julia> remotecall_fetch(2, getindex, r, 1, 1) + julia> remotecall_fetch(getindex, 2, r, 1, 1) 0.10824216411304866 Remember that :func:`getindex(r,1,1) ` is :ref:`equivalent ` to -``r[1,1]``, so this call fetches the first element of the remote -reference ``r``. +``r[1,1]``, so this call fetches the first element of the future ``r``. The syntax of :func:`remotecall` is not especially convenient. The macro :obj:`@spawn` makes things easier. It operates on an expression rather than a function, and picks where to do the operation for you:: julia> r = @spawn rand(2,2) - RemoteRef(1,1,0) + Future(2,1,4,Nullable{Any}()) julia> s = @spawn 1 .+ fetch(r) - RemoteRef(1,1,1) + Future(3,1,5,Nullable{Any}()) julia> fetch(s) 1.10824216411304866 1.13798233877923116 @@ -117,6 +119,11 @@ process that owns ``r``, so the :func:`fetch` will be a no-op. as a :ref:`macro `. It is possible to define your own such constructs.) +An important thing to remember is that, once fetched, a ``Future`` will cache its value +locally. Further ``fetch`` calls do not entail a network hop. Once all referencing Futures +have fetched, the remote stored value is deleted. + + .. _man-parallel-computing-code-availability: Code Availability and Loading Packages @@ -135,10 +142,10 @@ type the following into the Julia prompt:: 1.15119 0.918912 julia> @spawn rand2(2,2) - RemoteRef(1,1,1) + Future(2,1,4,Nullable{Any}()) julia> @spawn rand2(2,2) - RemoteRef(2,1,2) + Future(3,1,5,Nullable{Any}()) julia> exception on 2: in anonymous: rand2 not defined @@ -169,7 +176,7 @@ Starting julia with ``julia -p 2``, you can use this to verify the following: - ``using DummyModule`` causes the module to be loaded on all processes; however, the module is brought into scope only on the one executing the statement. - As long as ``DummyModule`` is loaded on process 2, commands like :: - rr = RemoteRef(2) + rr = RemoteChannel(2) put!(rr, MyType(7)) allow you to store an object of type ``MyType`` on process 2 even if ``DummyModule`` is not in scope on process 2. @@ -179,7 +186,7 @@ For example, :obj:`@everywhere` can also be used to directly define a function o julia> @everywhere id = myid() - julia> remotecall_fetch(2, ()->id) + julia> remotecall_fetch(()->id, 2) 2 A file can also be preloaded on multiple processes at startup, and a driver script can be used to drive the computation:: @@ -349,9 +356,9 @@ vector ``a`` shared by all processes. As you could see, the reduction operator can be omitted if it is not needed. In that case, the loop executes asynchronously, i.e. it spawns independent -tasks on all available workers and returns an array of :class:`RemoteRef` +tasks on all available workers and returns an array of :class:`Future` immediately without waiting for completion. -The caller can wait for the :class:`RemoteRef` completions at a later +The caller can wait for the :class:`Future` completions at a later point by calling :func:`fetch` on them, or wait for completion at the end of the loop by prefixing it with :obj:`@sync`, like ``@sync @parallel for``. @@ -465,30 +472,67 @@ variable takes on all values added to the channel. An empty, closed channel causes the ``for`` loop to terminate. -RemoteRefs and AbstractChannels -------------------------------- +Remote references and AbstractChannels +-------------------------------------- -A ``RemoteRef`` is a proxy for an implementation of an ``AbstractChannel`` +Remote references always refer to an implementation of an ``AbstractChannel`` A concrete implementation of an ``AbstractChannel`` (like ``Channel``), is required -to implement ``put!``, ``take!``, ``fetch``, ``isready`` and ``wait``. The remote object -referred to by a ``RemoteRef()`` or ``RemoteRef(pid)`` is stored in a ``Channel{Any}(1)``, -i.e., a channel of size 1 capable of holding objects of ``Any`` type. +to implement ``put!``\ , ``take!``\ , ``fetch``\ , ``isready`` and ``wait``\ . The remote object +referred to by a ``Future`` is stored in a ``Channel{Any}(1)``\ , i.e., a channel of size 1 +capable of holding objects of ``Any`` type. -Methods ``put!``, ``take!``, ``fetch``, ``isready`` and ``wait`` on a ``RemoteRef`` are proxied onto -the backing store on the remote process. +``RemoteChannel``\ , which is rewritable, can point to any type and size of channels, or any other +implementation of an ``AbstractChannel``\ . -The constructor ``RemoteRef(f::Function, pid)`` allows us to construct references to channels holding +The constructor ``RemoteChannel(f::Function, pid)`` allows us to construct references to channels holding more than one value of a specific type. ``f()`` is a function executed on ``pid`` and it must return -an ``AbstractChannel``. +an ``AbstractChannel``\ . + +For example, ``RemoteChannel(()->Channel{Int}(10), pid)``\ , will return a reference to a channel of type ``Int`` +and size 10. The channel exists on worker ``pid``\ . -For example, ``RemoteRef(()->Channel{Int}(10), pid)``, will return a reference to a channel of type ``Int`` -and size 10. +Methods ``put!``\ , ``take!``\ , ``fetch``\ , ``isready`` and ``wait`` on a ``RemoteChannel`` are proxied onto +the backing store on the remote process. -``RemoteRef`` can thus be used to refer to user implemented ``AbstractChannel`` objects. A simple +``RemoteChannel`` can thus be used to refer to user implemented ``AbstractChannel`` objects. A simple example of this is provided in ``examples/dictchannel.jl`` which uses a dictionary as its remote store. +Remote References and Distributed Garbage Collection +---------------------------------------------------- + +Objects referred to by remote references can be freed only when *all* held references in the cluster +are deleted. + +The node where the value is stored keeps track of which of the workers have a reference to it. +Every time a ``RemoteChannel`` or a (unfetched) ``Future`` is serialized to a worker, the node pointed +to by the reference is notified. And every time a ``RemoteChannel`` or a (unfetched) ``Future`` +is garbage collected locally, the node owning the value is again notified. + +The notifications are done via sending of "tracking" messages - an "add reference" message when +a reference is serialized to a different process and a "delete reference" message when a reference +is locally garbage collected. + +Since ``Future``\ s are write-once and cached locally, the act of ``fetch``\ ing a ``Future`` also updates +reference tracking information on the node owning the value. + +The node which owns the value frees it once all references to it are cleared. + +With ``Future``\ s, serializing an already fetched ``Future`` to a different node also sends the value +since the original remote store may have collected the value by this time. + +It is important to note that *when* an object is locally garbage collected depends +on the size of the object and the current memory pressure in the system. + +In case of remote references, the size of the local reference object is quite small, while the value +stored on the remote node may be quite large. Since the local object may not be collected immediately, it is +a good practice to explicitly call ``finalize`` on local instances of a ``RemoteChannel``, or on unfetched +``Future``\ s. Since calling ``fetch`` on a ``Future`` also removes its reference from the remote store, this +is not required on fetched ``Future``\ s. Explicitly calling ``finalize`` results in an immediate message sent to +the remote node to go ahead and remove its reference to the value. + + Shared Arrays ------------- @@ -568,7 +612,7 @@ be careful not to set up conflicts. For example:: @sync begin for p in procs(S) @async begin - remotecall_wait(p, fill!, S, p) + remotecall_wait(fill!, p, S, p) end end end @@ -637,7 +681,7 @@ and one that delegates in chunks:: function advection_shared!(q, u) @sync begin for p in procs(q) - @async remotecall_wait(p, advection_shared_chunk!, q, u) + @async remotecall_wait(advection_shared_chunk!, p, q, u) end end q diff --git a/doc/stdlib/parallel.rst b/doc/stdlib/parallel.rst index c6a43c5db3e35..d0ba04831ae92 100644 --- a/doc/stdlib/parallel.rst +++ b/doc/stdlib/parallel.rst @@ -268,7 +268,39 @@ General Parallel Computing Support .. Docstring generated from Julia source - Call a function asynchronously on the given arguments on the specified process. Returns a ``RemoteRef``\ . + Call a function asynchronously on the given arguments on the specified process. Returns a ``Future``\ . + +.. function:: Future() + + .. Docstring generated from Julia source + + Create a ``Future`` on the local machine. + +.. function:: Future(n) + + .. Docstring generated from Julia source + + Create a ``Future`` on process ``n``\ . + +.. function:: RemoteChannel() + + .. Docstring generated from Julia source + + Make an reference to a ``Channel{Any}(1)`` on the local machine. + +.. function:: RemoteChannel(n) + + .. Docstring generated from Julia source + + Make an reference to a ``Channel{Any}(1)`` on process ``n``\ . + +.. function:: RemoteChannel(f::Function, pid) + + .. Docstring generated from Julia source + + Create references to remote channels of a specific size and type. ``f()`` is a function that when executed on ``pid`` must return an implementation of an ``AbstractChannel``\ . + + For example, ``RemoteChannel(()->Channel{Int}(10), pid)``\ , will return a reference to a channel of type ``Int`` and size 10 on ``pid``\ . .. function:: wait([x]) @@ -276,14 +308,15 @@ General Parallel Computing Support Block the current task until some event occurs, depending on the type of the argument: - * ``RemoteRef``\ : Wait for a value to become available for the specified remote reference. + * ``RemoteChannel`` : Wait for a value to become available on the specified remote channel. + * ``Future`` : Wait for a value to become available for the specified future. * ``Channel``\ : Wait for a value to be appended to the channel. * ``Condition``\ : Wait for ``notify`` on a condition. * ``Process``\ : Wait for a process or process chain to exit. The ``exitcode`` field of a process can be used to determine success or failure. * ``Task``\ : Wait for a ``Task`` to finish, returning its result value. If the task fails with an exception, the exception is propagated (re-thrown in the task that called ``wait``\ ). * ``RawFD``\ : Wait for changes on a file descriptor (see ``poll_fd`` for keyword arguments and return code) - If no argument is passed, the task blocks for an undefined period. If the task's state is set to ``:waiting``\ , it can only be restarted by an explicit call to ``schedule`` or ``yieldto``\ . If the task's state is ``:runnable``\ , it might be restarted unpredictably. + If no argument is passed, the task blocks for an undefined period. A task can only be restarted by an explicit call to ``schedule`` or ``yieldto``\ . Often ``wait`` is called within a ``while`` loop to ensure a waited-for condition is met before proceeding. @@ -293,7 +326,8 @@ General Parallel Computing Support Waits and fetches a value from ``x`` depending on the type of ``x``\ . Does not remove the item fetched: - * ``RemoteRef``\ : Wait for and get the value of a remote reference. If the remote value is an exception, throws a ``RemoteException`` which captures the remote exception and backtrace. + * ``Future``\ : Wait for and get the value of a Future. The fetched value is cached locally. Further calls to ``fetch`` on the same reference return the cached value. If the remote value is an exception, throws a ``RemoteException`` which captures the remote exception and backtrace. + * ``RemoteChannel``\ : Wait for and get the value of a remote reference. Exceptions raised are same as for a ``Future`` . * ``Channel`` : Wait for and get the first available item from the channel. .. function:: remotecall_wait(func, id, args...) @@ -308,11 +342,17 @@ General Parallel Computing Support Perform ``fetch(remotecall(...))`` in one message. Any remote exceptions are captured in a ``RemoteException`` and thrown. -.. function:: put!(RemoteRef, value) +.. function:: put!(RemoteChannel, value) .. Docstring generated from Julia source - Store a value to a remote reference. Implements "shared queue of length 1" semantics: if a value is already present, blocks until the value is removed with ``take!``\ . Returns its first argument. + Store a value to the remote channel. If the channel is full, blocks until space is available. Returns its first argument. + +.. function:: put!(Future, value) + + .. Docstring generated from Julia source + + Store a value to a future. Future's are write-once remote references. A ``put!`` on an already set ``Future`` throws an Exception. All asynchronous remote calls return ``Future``\ s and set the value to the return value of the call upon completion. .. function:: put!(Channel, value) @@ -320,11 +360,11 @@ General Parallel Computing Support Appends an item to the channel. Blocks if the channel is full. -.. function:: take!(RemoteRef) +.. function:: take!(RemoteChannel) .. Docstring generated from Julia source - Fetch the value of a remote reference, removing it so that the reference is empty again. + Fetch a value from a remote channel, also removing it in the processs. .. function:: take!(Channel) @@ -332,19 +372,25 @@ General Parallel Computing Support Removes and returns a value from a ``Channel``\ . Blocks till data is available. -.. function:: isready(r::RemoteRef) +.. function:: isready(r::RemoteChannel) + + .. Docstring generated from Julia source + + Determine whether a ``RemoteChannel`` has a value stored to it. Note that this function can cause race conditions, since by the time you receive its result it may no longer be true. However, it can be safely used on a ``Future`` since they are assigned only once. + +.. function:: isready(r::Future) .. Docstring generated from Julia source - Determine whether a ``RemoteRef`` has a value stored to it. Note that this function can cause race conditions, since by the time you receive its result it may no longer be true. It is recommended that this function only be used on a ``RemoteRef`` that is assigned once. + Determine whether a ``Future`` has a value stored to it. - If the argument ``RemoteRef`` is owned by a different node, this call will block to wait for the answer. It is recommended to wait for ``r`` in a separate task instead, or to use a local ``RemoteRef`` as a proxy: + If the argument ``Future`` is owned by a different node, this call will block to wait for the answer. It is recommended to wait for ``r`` in a separate task instead, or to use a local ``Channel`` as a proxy: .. code-block:: julia - rr = RemoteRef() - @async put!(rr, remotecall_fetch(long_computation, p)) - isready(rr) # will not block + c = Channel(1) + @async put!(c, remotecall_fetch(long_computation, p)) + isready(c) # will not block .. function:: close(Channel) @@ -355,18 +401,6 @@ General Parallel Computing Support * ``put!`` on a closed channel. * ``take!`` and ``fetch`` on an empty, closed channel. -.. function:: RemoteRef() - - .. Docstring generated from Julia source - - Make an uninitialized remote reference on the local machine. - -.. function:: RemoteRef(n) - - .. Docstring generated from Julia source - - Make an uninitialized remote reference on process ``n``\ . - .. function:: timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1) .. Docstring generated from Julia source @@ -377,13 +411,13 @@ General Parallel Computing Support .. Docstring generated from Julia source - Creates a closure around an expression and runs it on an automatically-chosen process, returning a ``RemoteRef`` to the result. + Creates a closure around an expression and runs it on an automatically-chosen process, returning a ``Future`` to the result. .. function:: @spawnat .. Docstring generated from Julia source - Accepts two arguments, ``p`` and an expression. A closure is created around the expression and run asynchronously on process ``p``\ . Returns a ``RemoteRef`` to the result. + Accepts two arguments, ``p`` and an expression. A closure is created around the expression and run asynchronously on process ``p``\ . Returns a ``Future`` to the result. .. function:: @fetch diff --git a/test/examples.jl b/test/examples.jl index fc8e76ecb2a88..573cde2cd9d1e 100644 --- a/test/examples.jl +++ b/test/examples.jl @@ -39,12 +39,7 @@ include(joinpath(dir, "queens.jl")) @unix_only begin script = joinpath(dir, "clustermanager/simple/test_simple.jl") cmd = `$(Base.julia_cmd()) $script` - - (strm, proc) = open(cmd) - errors = readall(strm) - wait(proc) - if !success(proc) && ccall(:jl_running_on_valgrind,Cint,()) == 0 - println(errors); + if !success(pipeline(cmd; stdout=STDOUT, stderr=STDERR)) && ccall(:jl_running_on_valgrind,Cint,()) == 0 error("UnixDomainCM failed test, cmd : $cmd") end end @@ -58,8 +53,8 @@ remotecall_fetch(1, dc_path) do f include(f) nothing end -dc=RemoteRef(()->DictChannel(), 1) -@test typeof(dc) == RemoteRef{DictChannel} +dc=RemoteChannel(()->DictChannel(), 1) +@test typeof(dc) == RemoteChannel{DictChannel} @test isready(dc) == false put!(dc, 1, 2) diff --git a/test/parallel.jl b/test/parallel.jl index dca7aea17ac50..e90df32325d8b 100644 --- a/test/parallel.jl +++ b/test/parallel.jl @@ -3,20 +3,16 @@ # Run the parallel test outside of the main driver, since it runs off its own # set of workers. -inline_flag = Base.JLOptions().can_inline == 1 ? "" : "--inline=no" -cov_flag = "" +inline_flag = Base.JLOptions().can_inline == 1 ? `` : `--inline=no` +cov_flag = `` if Base.JLOptions().code_coverage == 1 - cov_flag = "--code-coverage=user" + cov_flag = `--code-coverage=user` elseif Base.JLOptions().code_coverage == 2 - cov_flag = "--code-coverage=all" + cov_flag = `--code-coverage=all` end cmd = `$(Base.julia_cmd()) $inline_flag $cov_flag --check-bounds=yes --depwarn=error parallel_exec.jl` -(strm, proc) = open(pipeline(cmd, stderr=STDERR)) -cmdout = readall(strm) -wait(proc) -println(cmdout); -if !success(proc) && ccall(:jl_running_on_valgrind,Cint,()) == 0 +if !success(pipeline(cmd; stdout=STDOUT, stderr=STDERR)) && ccall(:jl_running_on_valgrind,Cint,()) == 0 error("Parallel test failed, cmd : $cmd") end diff --git a/test/parallel_exec.jl b/test/parallel_exec.jl index a54f9b61be18a..8739bbc16ce29 100644 --- a/test/parallel_exec.jl +++ b/test/parallel_exec.jl @@ -2,35 +2,163 @@ using Base.Test -inline_flag = Base.JLOptions().can_inline == 1 ? "" : "--inline=no" -cov_flag = "" +inline_flag = Base.JLOptions().can_inline == 1 ? `` : `--inline=no` +cov_flag = `` if Base.JLOptions().code_coverage == 1 - cov_flag = "--code-coverage=user" + cov_flag = `--code-coverage=user` elseif Base.JLOptions().code_coverage == 2 - cov_flag = "--code-coverage=all" + cov_flag = `--code-coverage=all` end addprocs(3; exeflags=`$cov_flag $inline_flag --check-bounds=yes --depwarn=error`) id_me = myid() id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))] +# Test Futures +function testf(id) + f=Future(id) + @test isready(f) == false + @test isnull(f.v) == true + put!(f, :OK) + @test isready(f) == true + @test isnull(f.v) == false + + @test_throws ErrorException put!(f, :OK) # Cannot put! to a already set future + @test_throws MethodError take!(f) # take! is unsupported on a Future + + @test fetch(f) == :OK +end + +testf(id_me) +testf(id_other) + +# Distributed GC tests for Futures +function test_futures_dgc(id) + f = remotecall(myid, id) + fid = Base.remoteref_id(f) + + # remote value should be deleted after a fetch + @test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, fid) == true + @test isnull(f.v) == true + @test fetch(f) == id + @test isnull(f.v) == false + @test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, fid) == false + + + # if unfetched, it should be deleted after a finalize + f = remotecall(myid, id) + fid = Base.remoteref_id(f) + @test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, fid) == true + @test isnull(f.v) == true + finalize(f) + Base.flush_gc_msgs() + @test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, fid) == false +end + +test_futures_dgc(id_me) +test_futures_dgc(id_other) + +# if sent to another worker, it should not be deleted till the other worker has fetched. +wid1 = workers()[1] +wid2 = workers()[2] +f = remotecall(myid, wid1) +fid = Base.remoteref_id(f) + +fstore = RemoteChannel(wid2) +put!(fstore, f) + +@test fetch(f) == wid1 +@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == true +remotecall_fetch(r->fetch(fetch(r)), wid2, fstore) +@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == false + +# put! should release remote reference since it would have been cached locally +f = Future(wid1) +fid = Base.remoteref_id(f) + +# should not be created remotely till accessed +@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == false +# create it remotely +isready(f) + +@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == true +put!(f, :OK) +@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == false +@test fetch(f) == :OK + +# RemoteException should be thrown on a put! when another process has set the value +f = Future(wid1) +fid = Base.remoteref_id(f) + +fstore = RemoteChannel(wid2) +put!(fstore, f) # send f to wid2 +put!(f, :OK) # set value from master + +@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == true + +testval = remotecall_fetch(wid2, fstore) do x + try + put!(fetch(x), :OK) + return 0 + catch e + if isa(e, RemoteException) + return 1 + else + return 2 + end + end +end +@test testval == 1 + +# Distributed GC tests for RemoteChannels +function test_remoteref_dgc(id) + rr = RemoteChannel(id) + put!(rr, :OK) + rrid = Base.remoteref_id(rr) + + # remote value should be deleted after finalizing the ref + @test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, rrid) == true + @test fetch(rr) == :OK + @test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, rrid) == true + finalize(rr) + Base.flush_gc_msgs() + @test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, rrid) == false +end +test_remoteref_dgc(id_me) +test_remoteref_dgc(id_other) + +# if sent to another worker, it should not be deleted till the other worker has also finalized. +wid1 = workers()[1] +wid2 = workers()[2] +rr = RemoteChannel(wid1) +rrid = Base.remoteref_id(rr) + +fstore = RemoteChannel(wid2) +put!(fstore, rr) + +@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, rrid) == true +finalize(rr); Base.flush_gc_msgs() # finalize locally +@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, rrid) == true +remotecall_fetch(r->(finalize(take!(r)); Base.flush_gc_msgs(); nothing), wid2, fstore) # finalize remotely +sleep(0.5) # to ensure that wid2 messages have been executed on wid1 +@test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, rrid) == false + @test fetch(@spawnat id_other myid()) == id_other @test @fetchfrom id_other begin myid() end == id_other @fetch begin myid() end -rr=RemoteRef() -@test typeof(rr) == RemoteRef{Channel{Any}} -a = rand(5,5) -put!(rr, a) -@test rr[2,3] == a[2,3] -@test rr[] == a +# test getindex on Futures and RemoteChannels +function test_indexing(rr) + a = rand(5,5) + put!(rr, a) + @test rr[2,3] == a[2,3] + @test rr[] == a +end -rr=RemoteRef(workers()[1]) -@test typeof(rr) == RemoteRef{Channel{Any}} -a = rand(5,5) -put!(rr, a) -@test rr[1,5] == a[1,5] -@test rr[] == a +test_indexing(Future()) +test_indexing(Future(id_other)) +test_indexing(RemoteChannel()) +test_indexing(RemoteChannel(id_other)) dims = (20,20,20) @@ -344,7 +472,7 @@ function test_channel(c) end test_channel(Channel(10)) -test_channel(RemoteRef(()->Channel(10))) +test_channel(RemoteChannel(()->Channel(10))) c=Channel{Int}(1) @test_throws MethodError put!(c, "Hello")