From 66fbab6a0191af8d62c61035942ef65100c530be Mon Sep 17 00:00:00 2001 From: Amit Murthy Date: Wed, 11 Nov 2015 20:47:25 +0530 Subject: [PATCH] updated docs --- base/docs/helpdb.jl | 79 +++++++++++------ base/exports.jl | 4 +- base/multi.jl | 50 +++++------ base/precompile.jl | 24 +++--- base/require.jl | 2 +- base/sharedarray.jl | 7 +- doc/manual/parallel-computing.rst | 136 ++++++++++++++++++++---------- test/examples.jl | 4 +- test/parallel_exec.jl | 34 ++++---- 9 files changed, 209 insertions(+), 131 deletions(-) diff --git a/base/docs/helpdb.jl b/base/docs/helpdb.jl index e93a0095f2dd5..597e88f4798b3 100644 --- a/base/docs/helpdb.jl +++ b/base/docs/helpdb.jl @@ -1155,18 +1155,29 @@ Return a tuple `(I, J, V)` where `I` and `J` are the row and column indexes of t findnz doc""" - RemoteRef() + RemoteChannel() -Make an uninitialized remote reference on the local machine. +Make an reference to a `Channel{Any}(1)` on the local machine. """ -RemoteRef() +RemoteChannel() doc""" - RemoteRef(n) + RemoteChannel(n) -Make an uninitialized remote reference on process `n`. +Make an reference to a `Channel{Any}(1)` on process `n`. """ -RemoteRef(::Integer) +RemoteChannel(::Integer) + +doc""" + RemoteChannel(f::Function, pid) + +Create references to remote channels of a specific size and type. `f()` is a function that when +executed on `pid` must return an implementation of an `AbstractChannel`. + +For example, `RemoteChannel(()->Channel{Int}(10), pid)`, will return a reference to a channel of type `Int` +and size 10 on `pid`. +""" +RemoteChannel(f::Function, pid) doc""" ```rst @@ -2468,7 +2479,7 @@ display doc""" @spawnat -Accepts two arguments, `p` and an expression. A closure is created around the expression and run asynchronously on process `p`. Returns a `RemoteRef` to the result. +Accepts two arguments, `p` and an expression. A closure is created around the expression and run asynchronously on process `p`. Returns a `Future` to the result. """ :@spawnat @@ -4840,11 +4851,11 @@ Optional argument `msg` is a descriptive error string. DimensionMismatch doc""" - take!(RemoteRef) + take!(RemoteChannel) -Fetch the value of a remote reference, removing it so that the reference is empty again. +Fetch a value from a remote channel, also removing it in the processs. """ -take!(::RemoteRef) +take!(::RemoteChannel) doc""" take!(Channel) @@ -6639,7 +6650,8 @@ doc""" Block the current task until some event occurs, depending on the type of the argument: -* `RemoteRef`: Wait for a value to become available for the specified remote reference. +* `RemoteChannel` : Wait for a value to become available on the specified remote channel. +* `Future` : Wait for a value to become available for the specified future. * `Channel`: Wait for a value to be appended to the channel. * `Condition`: Wait for `notify` on a condition. * `Process`: Wait for a process or process chain to exit. The `exitcode` field of a process can be used to determine success or failure. @@ -7031,17 +7043,25 @@ Return the minimum of the arguments. Operates elementwise over arrays. min doc""" - isready(r::RemoteRef) + isready(r::RemoteChannel) -Determine whether a `RemoteRef` has a value stored to it. Note that this function can cause race conditions, since by the time you receive its result it may no longer be true. It is recommended that this function only be used on a `RemoteRef` that is assigned once. +Determine whether a `RemoteChannel` has a value stored to it. Note that this function can cause race conditions, since by the time you receive its result it may no longer be true. +However, it can be safely used on a `Future` since they are assigned only once. +""" +isready -If the argument `RemoteRef` is owned by a different node, this call will block to wait for the answer. It is recommended to wait for `r` in a separate task instead, or to use a local `RemoteRef` as a proxy: +doc""" + isready(r::Future) - rr = RemoteRef() - @async put!(rr, remotecall_fetch(long_computation, p)) - isready(rr) # will not block +Determine whether a `Future` has a value stored to it. + +If the argument `Future` is owned by a different node, this call will block to wait for the answer. It is recommended to wait for `r` in a separate task instead, or to use a local `Channel` as a proxy: + + c = Channel(1) + @async put!(c, remotecall_fetch(long_computation, p)) + isready(c) # will not block """ -isready + isready(r::Future) doc""" InexactError() @@ -8111,11 +8131,19 @@ An iterator that cycles through `iter` forever. cycle doc""" - put!(RemoteRef, value) + put!(RemoteChannel, value) + +Store a value to the remote channel. If the channel is full, blocks until space is available. Returns its first argument. +""" +put!(::RemoteChannel, value) + +doc""" + put!(Future, value) -Store a value to a remote reference. Implements "shared queue of length 1" semantics: if a value is already present, blocks until the value is removed with `take!`. Returns its first argument. +Store a value to a future. Future's are write-once remote references. A `put!` on an already set `Future` throws an Exception. +All asynchronous remote calls return `Future`s and set the value to the return value of the call upon completion. """ -put!(::RemoteRef, value) +put!(::Future, value) doc""" put!(Channel, value) @@ -8862,7 +8890,7 @@ Base.(:(!=)) doc""" @spawn -Creates a closure around an expression and runs it on an automatically-chosen process, returning a `RemoteRef` to the result. +Creates a closure around an expression and runs it on an automatically-chosen process, returning a `Future` to the result. """ :@spawn @@ -9116,7 +9144,7 @@ readavailable doc""" remotecall(func, id, args...) -Call a function asynchronously on the given arguments on the specified process. Returns a `RemoteRef`. +Call a function asynchronously on the given arguments on the specified process. Returns a `Future`. """ remotecall @@ -9512,7 +9540,10 @@ doc""" Waits and fetches a value from `x` depending on the type of `x`. Does not remove the item fetched: -* `RemoteRef`: Wait for and get the value of a remote reference. If the remote value is an exception, throws a `RemoteException` which captures the remote exception and backtrace. +* `Future`: Wait for and get the value of a Future. The fetched value is cached locally. Further calls to `fetch` on the same reference +returns the cached value. +If the remote value is an exception, throws a `RemoteException` which captures the remote exception and backtrace. +* `RemoteChannel`: Wait for and get the value of a remote reference. Exceptions raised are same as for a `Future` . * `Channel` : Wait for and get the first available item from the channel. """ fetch diff --git a/base/exports.jl b/base/exports.jl index 73d37ab7cfed5..0cdd6e5908813 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -88,7 +88,7 @@ export Rational, Regex, RegexMatch, - RemoteRef, + RemoteChannel, RepString, RevString, RoundFromZero, @@ -1197,7 +1197,6 @@ export # multiprocessing addprocs, - channel_from_id, ClusterManager, fetch, init_worker, @@ -1214,7 +1213,6 @@ export remotecall, remotecall_fetch, remotecall_wait, - remoteref_id, rmprocs, take!, timedwait, diff --git a/base/multi.jl b/base/multi.jl index 7e008d5e43842..ecbb9ba02c47a 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -439,7 +439,7 @@ function deregister_worker(pg, pid) end push!(map_del_wrkr, pid) - # delete this worker from our RemoteRef client sets + # delete this worker from our remote reference client sets ids = [] tonotify = [] for (id,rv) in pg.refs @@ -477,12 +477,12 @@ type Future <: AbstractRemoteRef end finalize_future(f::Future) = (isnull(f.v) && send_del_client(f)) -type RemoteRef{T<:AbstractChannel} <: AbstractRemoteRef +type RemoteChannel{T<:AbstractChannel} <: AbstractRemoteRef where::Int whence::Int id::Int - RemoteRef(w, wh, id) = (r = new(w,wh,id); test_existing_ref(r)) + RemoteChannel(w, wh, id) = (r = new(w,wh,id); test_existing_ref(r)) end function test_existing_ref(r::Future) @@ -500,7 +500,7 @@ function test_existing_ref(r::Future) r end -function test_existing_ref(r::RemoteRef) +function test_existing_ref(r::RemoteChannel) found = getkey(client_refs, r, false) !is(found,false) && return found client_refs[r] = true @@ -523,14 +523,14 @@ function Future(pid::Integer=myid()) Future(pid, rrid[1], rrid[2]) end -function RemoteRef(pid::Integer=myid()) +function RemoteChannel(pid::Integer=myid()) rrid = next_rrid_tuple() - RemoteRef{Channel{Any}}(pid, rrid[1], rrid[2]) + RemoteChannel{Channel{Any}}(pid, rrid[1], rrid[2]) end -function RemoteRef(f::Function, pid::Integer=myid()) +function RemoteChannel(f::Function, pid::Integer=myid()) remotecall_fetch(pid, f, next_rrid_tuple()) do f, rrid rv=lookup_ref(rrid, f) - RemoteRef{typeof(rv.c)}(myid(), rrid[1], rrid[2]) + RemoteChannel{typeof(rv.c)}(myid(), rrid[1], rrid[2]) end end @@ -541,7 +541,7 @@ remoteref_id(r::AbstractRemoteRef) = (r.whence, r.id) function channel_from_id(id) rv = get(PGRP.refs, id, false) if rv === false - throw(ErrorException("Local instance of remoteref not found")) + throw(ErrorException("Local instance of remote reference not found")) end rv.c end @@ -568,7 +568,7 @@ function isready(rr::Future) end end -function isready(rr::RemoteRef, args...) +function isready(rr::RemoteChannel, args...) rid = remoteref_id(rr) if rr.where == myid() isready(lookup_ref(rid).c, args...) @@ -648,10 +648,10 @@ function send_add_client(rr::AbstractRemoteRef, i) end end -channel_type{T}(rr::RemoteRef{T}) = T +channel_type{T}(rr::RemoteChannel{T}) = T serialize(s::SerializationState, f::Future) = serialize(s, f, isnull(f.v)) -serialize(s::SerializationState, rr::RemoteRef) = serialize(s, rr, true) +serialize(s::SerializationState, rr::RemoteChannel) = serialize(s, rr, true) function serialize(s::SerializationState, rr::AbstractRemoteRef, addclient) if addclient p = worker_id_from_socket(s.io) @@ -665,10 +665,10 @@ function deserialize{T<:Future}(s::SerializationState, t::Type{T}) Future(f.where, f.whence, f.id) # ctor adds to ref table end -function deserialize{T<:RemoteRef}(s::SerializationState, t::Type{T}) +function deserialize{T<:RemoteChannel}(s::SerializationState, t::Type{T}) rr = deserialize_rr(s,t) # call ctor to make sure this rr gets added to the client_refs table - RemoteRef{channel_type(rr)}(rr.where, rr.whence, rr.id) + RemoteChannel{channel_type(rr)}(rr.where, rr.whence, rr.id) end function deserialize_rr(s, t) @@ -681,7 +681,7 @@ function deserialize_rr(s, t) rr end -# data stored by the owner of a RemoteRef +# data stored by the owner of a remote reference def_rv_channel() = Channel(1) type RemoteValue c::AbstractChannel @@ -735,7 +735,7 @@ end #localize_ref(b::Box) = Box(localize_ref(b.contents)) -#function localize_ref(r::RemoteRef) +#function localize_ref(r::RemoteChannel) # if r.where == myid() # fetch(r) # else @@ -860,7 +860,7 @@ function wait_ref(rid, callee, args...) nothing end wait(r::Future) = (!isnull(r.v) && return r; call_on_owner(wait_ref, r, myid()); r) -wait(r::RemoteRef, args...) = (call_on_owner(wait_ref, r, myid(), args...); r) +wait(r::RemoteChannel, args...) = (call_on_owner(wait_ref, r, myid(), args...); r) function fetch_future(rid, callee) rv = lookup_ref(rid); @@ -876,7 +876,7 @@ function fetch(r::Future) end fetch_ref(rid, args...) = fetch(lookup_ref(rid).c, args...) -fetch(r::RemoteRef, args...) = call_on_owner(fetch_ref, r, args...) +fetch(r::RemoteChannel, args...) = call_on_owner(fetch_ref, r, args...) fetch(x::ANY) = x isready(rv::RemoteValue, args...) = isready(rv.c, args...) @@ -898,7 +898,7 @@ end put!(rv::RemoteValue, args...) = put!(rv.c, args...) put_ref(rid, args...) = (put!(lookup_ref(rid), args...); nothing) -put!(rr::RemoteRef, args...) = (call_on_owner(put_ref, rr, args...); rr) +put!(rr::RemoteChannel, args...) = (call_on_owner(put_ref, rr, args...); rr) # take! is not supported on Future @@ -908,12 +908,12 @@ function take_ref(rid, callee, args...) isa(v, RemoteException) && (myid() == callee) && throw(v) v end -take!(rr::RemoteRef, args...) = call_on_owner(take_ref, rr, myid(), args...) +take!(rr::RemoteChannel, args...) = call_on_owner(take_ref, rr, myid(), args...) # close is not supported on Future close_ref(rid) = (close(lookup_ref(rid).c); nothing) -close(rr::RemoteRef) = call_on_owner(close_ref, rr) +close(rr::RemoteChannel) = call_on_owner(close_ref, rr) function deliver_result(sock::IO, msg, oid, value) @@ -1179,7 +1179,7 @@ function init_worker(manager::ClusterManager=DefaultClusterManager()) cluster_manager = manager disable_threaded_libs() - # Since our pid has yet to be set, ensure no RemoteRef / Future have been created or addprocs() called. + # Since our pid has yet to be set, ensure no RemoteChannel / Future have been created or addprocs() called. assert(nprocs() <= 1) assert(isempty(PGRP.refs)) assert(isempty(client_refs)) @@ -1436,7 +1436,7 @@ let nextidx = 0 if isa(v,Box) v = v.contents end - if isa(v,RemoteRef) + if isa(v,AbstractRemoteRef) p = v.where; break end end @@ -1790,11 +1790,11 @@ function terminate_all_workers() end end -getindex(r::RemoteRef) = fetch(r) +getindex(r::RemoteChannel) = fetch(r) getindex(r::Future) = fetch(r) getindex(r::Future, args...) = getindex(fetch(r), args...) -function getindex(r::RemoteRef, args...) +function getindex(r::RemoteChannel, args...) if r.where == myid() return getindex(fetch(r), args...) end diff --git a/base/precompile.jl b/base/precompile.jl index 3dd98337004b1..337aa70c543c3 100644 --- a/base/precompile.jl +++ b/base/precompile.jl @@ -144,7 +144,7 @@ precompile(Base.REPLCompletions.completions, (ASCIIString, Int)) precompile(Base.Random.srand, ()) precompile(Base.Random.srand, (ASCIIString, Int)) precompile(Base.Random.srand, (UInt,)) -precompile(Base.RemoteRef, (Int, Int, Int)) +precompile(Base.RemoteChannel, (Int, Int, Int)) precompile(Base.RemoteValue, ()) precompile(Base.Set, ()) precompile(Base.SystemError, (ASCIIString,)) @@ -235,8 +235,8 @@ precompile(Base.getindex, (Type{ByteString}, ASCIIString, ASCIIString)) precompile(Base.getindex, (Type{Dict{Any, Any}}, Dict{Any, Any}, Dict{Any, Any}, Dict{Any, Any}, Dict{Any, Any}, Dict{Any, Any})) precompile(Base.getpid, ()) precompile(Base.hash, (Int,)) -precompile(Base.hash, (RemoteRef, UInt)) -precompile(Base.hash, (RemoteRef,)) +precompile(Base.hash, (RemoteChannel, UInt)) +precompile(Base.hash, (RemoteChannel,)) precompile(Base.haskey, (Base.EnvHash, ASCIIString)) precompile(Base.haskey, (Dict{Symbol,Any}, Symbol)) precompile(Base.haskey, (ObjectIdDict, Symbol)) @@ -267,8 +267,8 @@ precompile(Base.isequal, (Base.LineEdit.Prompt, Base.LineEdit.Prompt)) precompile(Base.isequal, (Bool, Bool)) precompile(Base.isequal, (Char, ASCIIString)) precompile(Base.isequal, (Int,Int)) -precompile(Base.isequal, (RemoteRef, RemoteRef)) -precompile(Base.isequal, (RemoteRef, WeakRef)) +precompile(Base.isequal, (RemoteChannel, RemoteChannel)) +precompile(Base.isequal, (RemoteChannel, WeakRef)) precompile(Base.isequal, (Symbol, Symbol)) precompile(Base.isequal, (VersionNumber, VersionNumber)) precompile(Base.isequal, (Void, Void)) @@ -349,7 +349,7 @@ precompile(Base.reinit_stdio, ()) precompile(Base.repeat, (ASCIIString, Int)) precompile(Base.repl_cmd, (Cmd, Base.Terminals.TTYTerminal)) precompile(Base.require, (Symbol,)) -precompile(Base.remoteref_id, (RemoteRef,)) +precompile(Base.remoteref_id, (RemoteChannel,)) precompile(Base.rsearch, (ASCIIString, Char)) precompile(Base.rstrip, (ASCIIString,)) precompile(Base.run, (Cmd,)) @@ -405,7 +405,7 @@ precompile(Base.sync_begin, ()) precompile(Base.sync_end, ()) precompile(Base.systemerror, (Symbol, Bool)) precompile(Base.take!, (Base.RemoteValue,)) -precompile(Base.take!, (RemoteRef,)) +precompile(Base.take!, (RemoteChannel,)) precompile(Base.take_ref, (Tuple{Int,Int},)) precompile(Base.takebuf_string, (IOBuffer,)) precompile(Base.task_local_storage, ()) @@ -421,7 +421,7 @@ precompile(Base.uv_error, (ASCIIString, Bool)) precompile(Base.uvfinalize, (Base.TTY,)) precompile(Base.vcat, (Base.LineEdit.Prompt,)) precompile(Base.wait, ()) -precompile(Base.wait, (RemoteRef,)) +precompile(Base.wait, (RemoteChannel,)) precompile(Base.write, (Base.Terminals.TTYTerminal, ASCIIString)) precompile(Base.write, (Base.Terminals.TerminalBuffer, ASCIIString)) precompile(Base.write, (IOBuffer, Vector{UInt8})) @@ -467,10 +467,10 @@ precompile(Base.schedule, (Array{Any, 1}, Task, Void)) precompile(Base.LineEdit.match_input, (Function, Base.LineEdit.MIState, Base.Terminals.TTYTerminal, Array{Char, 1}, Base.Dict{Char, Any})) precompile(Base.convert, (Type{Union{ASCIIString, UTF8String}}, ASCIIString)) precompile(Base.LineEdit.keymap_fcn, (Function, Base.LineEdit.MIState, ASCIIString)) -precompile(Base.weak_key_delete!, (Base.Dict{Any, Any}, Base.RemoteRef)) -precompile(==, (Base.RemoteRef, WeakRef)) -precompile(==, (Base.RemoteRef, Base.RemoteRef)) -precompile(Base.send_del_client, (Base.RemoteRef,)) +precompile(Base.weak_key_delete!, (Base.Dict{Any, Any}, Base.RemoteChannel)) +precompile(==, (Base.RemoteChannel, WeakRef)) +precompile(==, (Base.RemoteChannel, Base.RemoteChannel)) +precompile(Base.send_del_client, (Base.RemoteChannel,)) precompile(!=, (Base.SubString{UTF8String}, ASCIIString)) precompile(Base.print_joined, (Base.IOBuffer, Array{Base.SubString{UTF8String}, 1}, ASCIIString)) precompile(Base.call, (Array{Any, 1}, Type{Base.LineEdit.Prompt}, ASCIIString)) diff --git a/base/require.jl b/base/require.jl index d852239225e28..517b175ea085e 100644 --- a/base/require.jl +++ b/base/require.jl @@ -109,7 +109,7 @@ end function reload_path(path::AbstractString) had = haskey(package_list, path) if !had - package_locks[path] = RemoteRef() + package_locks[path] = RemoteChannel() end package_list[path] = time() tls = task_local_storage() diff --git a/base/sharedarray.jl b/base/sharedarray.jl index a07f072e9e6db..9bcf777e0cb9a 100644 --- a/base/sharedarray.jl +++ b/base/sharedarray.jl @@ -172,10 +172,15 @@ function initialize_shared_array(S, s, onlocalhost, init, pids) S end -function finalize_refs(S) +function finalize_refs{T,N}(S::SharedArray{T,N}) for r in S.refs finalize(r) end + empty!(S.pids) + empty!(S.refs) + init_loc_flds(S) + S.s = Array(T, ntuple(d->0,N)) + S end typealias SharedVector{T} SharedArray{T,1} diff --git a/doc/manual/parallel-computing.rst b/doc/manual/parallel-computing.rst index 90d085cf6e98f..4b3ced187453e 100644 --- a/doc/manual/parallel-computing.rst +++ b/doc/manual/parallel-computing.rst @@ -34,12 +34,18 @@ references* and *remote calls*. A remote reference is an object that can be used from any process to refer to an object stored on a particular process. A remote call is a request by one process to call a certain function on certain arguments on another (possibly the same) process. -A remote call returns a remote reference to its result. Remote calls + +Remote references come in two flavors -``Future`` and ``RemoteChannel``. + +A remote call returns a ``Future`` to its result. Remote calls return immediately; the process that made the call proceeds to its next operation while the remote call happens somewhere else. You can -wait for a remote call to finish by calling :func:`wait` on its remote -reference, and you can obtain the full value of the result using -:func:`fetch`. You can store a value to a remote reference using :func:`put!`. +wait for a remote call to finish by calling :func:`wait` on the returned +`Future`, and you can obtain the full value of the result using +:func:`fetch`. + +On the other hand ``RemoteRefs`` are rewritable. For example, multiple processes +can co-ordinate their processing by referencing the same remote ``Channel``\ . Let's try this out. Starting with ``julia -p n`` provides ``n`` worker processes on the local machine. Generally it makes sense for ``n`` to @@ -49,32 +55,29 @@ equal the number of CPU cores on the machine. $ ./julia -p 2 - julia> r = remotecall(2, rand, 2, 2) - RemoteRef(2,1,5) - - julia> fetch(r) - 2x2 Float64 Array: - 0.60401 0.501111 - 0.174572 0.157411 + julia> r = remotecall(rand, 2, 2, 2) + Future(2,1,3,Nullable{Any}()) julia> s = @spawnat 2 1 .+ fetch(r) - RemoteRef(2,1,7) + Future(2,1,6,Nullable{Any}()) julia> fetch(s) 2x2 Float64 Array: 1.60401 1.50111 1.17457 1.15741 -The first argument to :func:`remotecall` is the index of the process -that will do the work. Most parallel programming in Julia does not -reference specific processes or the number of processes available, -but :func:`remotecall` is considered a low-level interface providing -finer control. The second argument to :func:`remotecall` is the function -to call, and the remaining arguments will be passed to this -function. As you can see, in the first line we asked process 2 to +The first argument to :func:`remotecall` is the function to call. +Most parallel programming in Julia does not reference specific processes +or the number of processes available, but :func:`remotecall` is +considered a low-level interface providing finer control. The second +argument to :func:`remotecall` is the index of the process +that will do the work, and the remaining arguments will be passed +to the function being called. + +As you can see, in the first line we asked process 2 to construct a 2-by-2 random matrix, and in the second line we asked it to add 1 to it. The result of both calculations is available in the -two remote references, ``r`` and ``s``. The :obj:`@spawnat` macro +two futures, ``r`` and ``s``. The :obj:`@spawnat` macro evaluates the expression in the second argument on the process specified by the first argument. @@ -86,22 +89,21 @@ but is more efficient. :: - julia> remotecall_fetch(2, getindex, r, 1, 1) + julia> remotecall_fetch(getindex, 2, r, 1, 1) 0.10824216411304866 Remember that :func:`getindex(r,1,1) ` is :ref:`equivalent ` to -``r[1,1]``, so this call fetches the first element of the remote -reference ``r``. +``r[1,1]``, so this call fetches the first element of the future ``r``. The syntax of :func:`remotecall` is not especially convenient. The macro :obj:`@spawn` makes things easier. It operates on an expression rather than a function, and picks where to do the operation for you:: julia> r = @spawn rand(2,2) - RemoteRef(1,1,0) + Future(2,1,4,Nullable{Any}()) julia> s = @spawn 1 .+ fetch(r) - RemoteRef(1,1,1) + Future(3,1,5,Nullable{Any}()) julia> fetch(s) 1.10824216411304866 1.13798233877923116 @@ -117,6 +119,11 @@ process that owns ``r``, so the :func:`fetch` will be a no-op. as a :ref:`macro `. It is possible to define your own such constructs.) +An important thing to remember is that, once fetched, a `Future` will cache its value +locally. Further `fetch` calls do not entail a network hop. Once all referencing Futures +have fetched, the remote stored value is deleted. + + .. _man-parallel-computing-code-availability: Code Availability and Loading Packages @@ -135,10 +142,10 @@ type the following into the Julia prompt:: 1.15119 0.918912 julia> @spawn rand2(2,2) - RemoteRef(1,1,1) + Future(2,1,4,Nullable{Any}()) julia> @spawn rand2(2,2) - RemoteRef(2,1,2) + Future(3,1,5,Nullable{Any}()) julia> exception on 2: in anonymous: rand2 not defined @@ -169,7 +176,7 @@ Starting julia with ``julia -p 2``, you can use this to verify the following: - ``using DummyModule`` causes the module to be loaded on all processes; however, the module is brought into scope only on the one executing the statement. - As long as ``DummyModule`` is loaded on process 2, commands like :: - rr = RemoteRef(2) + rr = RemoteChannel(2) put!(rr, MyType(7)) allow you to store an object of type ``MyType`` on process 2 even if ``DummyModule`` is not in scope on process 2. @@ -179,7 +186,7 @@ For example, :obj:`@everywhere` can also be used to directly define a function o julia> @everywhere id = myid() - julia> remotecall_fetch(2, ()->id) + julia> remotecall_fetch(()->id, 2) 2 A file can also be preloaded on multiple processes at startup, and a driver script can be used to drive the computation:: @@ -349,9 +356,9 @@ vector ``a`` shared by all processes. As you could see, the reduction operator can be omitted if it is not needed. In that case, the loop executes asynchronously, i.e. it spawns independent -tasks on all available workers and returns an array of :class:`RemoteRef` +tasks on all available workers and returns an array of :class:`Future` immediately without waiting for completion. -The caller can wait for the :class:`RemoteRef` completions at a later +The caller can wait for the :class:`Future` completions at a later point by calling :func:`fetch` on them, or wait for completion at the end of the loop by prefixing it with :obj:`@sync`, like ``@sync @parallel for``. @@ -465,30 +472,67 @@ variable takes on all values added to the channel. An empty, closed channel causes the ``for`` loop to terminate. -RemoteRefs and AbstractChannels -------------------------------- +Remote references and AbstractChannels +-------------------------------------- -A ``RemoteRef`` is a proxy for an implementation of an ``AbstractChannel`` +Remote references always refer to an implementation of an `AbstractChannel` A concrete implementation of an ``AbstractChannel`` (like ``Channel``), is required -to implement ``put!``, ``take!``, ``fetch``, ``isready`` and ``wait``. The remote object -referred to by a ``RemoteRef()`` or ``RemoteRef(pid)`` is stored in a ``Channel{Any}(1)``, -i.e., a channel of size 1 capable of holding objects of ``Any`` type. +to implement ``put!``\ , ``take!``\ , ``fetch``\ , ``isready`` and ``wait``\ . The remote object +referred to by a ``Future`` is stored in a ``Channel{Any}(1)``\ , i.e., a channel of size 1 +capable of holding objects of ``Any`` type. -Methods ``put!``, ``take!``, ``fetch``, ``isready`` and ``wait`` on a ``RemoteRef`` are proxied onto -the backing store on the remote process. +``RemoteChannel``\ , which is rewritable, can point to any type and size of channels, or any other +implementation of an ``AbstractChannel``\ . -The constructor ``RemoteRef(f::Function, pid)`` allows us to construct references to channels holding +The constructor ``RemoteChannel(f::Function, pid)`` allows us to construct references to channels holding more than one value of a specific type. ``f()`` is a function executed on ``pid`` and it must return -an ``AbstractChannel``. +an ``AbstractChannel``\ . + +For example, ``RemoteChannel(()->Channel{Int}(10), pid)``\ , will return a reference to a channel of type ``Int`` +and size 10. The channel exists on worker ``pid``\ . -For example, ``RemoteRef(()->Channel{Int}(10), pid)``, will return a reference to a channel of type ``Int`` -and size 10. +Methods ``put!``\ , ``take!``\ , ``fetch``\ , ``isready`` and ``wait`` on a ``RemoteChannel`` are proxied onto +the backing store on the remote process. -``RemoteRef`` can thus be used to refer to user implemented ``AbstractChannel`` objects. A simple +``RemoteChannel`` can thus be used to refer to user implemented ``AbstractChannel`` objects. A simple example of this is provided in ``examples/dictchannel.jl`` which uses a dictionary as its remote store. +Remote References and Distrubuted Garbage Collection +---------------------------------------------------- + +Objects referred to by remote references can be freed only when *all* held references in the cluster +are deleted. + +The node where the value is stored keeps tracks of which of the workers have a reference to it. +Everytime a ``RemoteChannel`` or a (unfetched) ``Future`` is serialized to a worker, the node pointed +to by the reference is notified. And everytime a ``RemoteChannel`` or a (unfetched) ``Future`` +is garbage collected locally, the node owning the value is again notified. + +The notifications are done via sending of "tracking" messages - an "add reference" message when +a reference is serialized to a different process and a "delete reference" message when a reference +is locally garbage collected. + +Since ``Future``\ s are write-once and cached locally, the act of ``fetch``ing a ``Future`` also updates +reference tracking information on the node owning the value. + +The node which owns the value, frees it once all references to it are cleared. + +With ``Future``\ s, serializing an already fetched ``Future`` to a different node also sends the value +since the original remote store may have collected the value by this time. + +It is important to note that *when* an object is locally garbage collected is dependent +upon the size of the object and the current memory pressure in the system. + +In case of remote references, the size of the local reference object is quite small, while the remote value +stored on the remote node may be quite large. Since the local object may not be colected immediately, it is +a good practice to explictly call ``finalize`` on local instances of a ``RemoteChannel``, or on unfetched +``Future``\ s. Since calling ``fetch`` on a ``Future`` also removes its reference from the remote store, this +is not required on fetched ``Future``\ s. Explicitly calling ``finalize`` immediately sends a message to +the remote node to go ahead and remove its reference to the value. + + Shared Arrays ------------- @@ -568,7 +612,7 @@ be careful not to set up conflicts. For example:: @sync begin for p in procs(S) @async begin - remotecall_wait(p, fill!, S, p) + remotecall_wait(fill!, p, S, p) end end end @@ -637,7 +681,7 @@ and one that delegates in chunks:: function advection_shared!(q, u) @sync begin for p in procs(q) - @async remotecall_wait(p, advection_shared_chunk!, q, u) + @async remotecall_wait(advection_shared_chunk!, p, q, u) end end q diff --git a/test/examples.jl b/test/examples.jl index 8c6e2133c2403..573cde2cd9d1e 100644 --- a/test/examples.jl +++ b/test/examples.jl @@ -53,8 +53,8 @@ remotecall_fetch(1, dc_path) do f include(f) nothing end -dc=RemoteRef(()->DictChannel(), 1) -@test typeof(dc) == RemoteRef{DictChannel} +dc=RemoteChannel(()->DictChannel(), 1) +@test typeof(dc) == RemoteChannel{DictChannel} @test isready(dc) == false put!(dc, 1, 2) diff --git a/test/parallel_exec.jl b/test/parallel_exec.jl index eb1edf8e72375..ec71fc6ce7185 100644 --- a/test/parallel_exec.jl +++ b/test/parallel_exec.jl @@ -28,7 +28,7 @@ testf(id_other) # Distributed GC tests for Futures function test_futures_dgc(id) f = remotecall(myid, id) - fid = remoteref_id(f) + fid = Base.remoteref_id(f) # remote value should be deleted after a fetch @test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, fid) == true @@ -40,7 +40,7 @@ function test_futures_dgc(id) # if unfetched, it should be deleted after a finalize f = remotecall(myid, id) - fid = remoteref_id(f) + fid = Base.remoteref_id(f) @test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, fid) == true @test isnull(f.v) == true finalize(f) @@ -55,9 +55,9 @@ test_futures_dgc(id_other) wid1 = workers()[1] wid2 = workers()[2] f = remotecall(myid, wid1) -fid = remoteref_id(f) +fid = Base.remoteref_id(f) -fstore = RemoteRef(wid2) +fstore = RemoteChannel(wid2) put!(fstore, f) @test fetch(f) == wid1 @@ -67,7 +67,7 @@ remotecall_fetch(r->fetch(fetch(r)), wid2, fstore) # put! should release remote reference since it would have been cached locally f = Future(wid1) -fid = remoteref_id(f) +fid = Base.remoteref_id(f) # should not be created remotely till accessed @test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, fid) == false @@ -81,9 +81,9 @@ put!(f, :OK) # RemoteException should be thrown on a put! when another process has set the value f = Future(wid1) -fid = remoteref_id(f) +fid = Base.remoteref_id(f) -fstore = RemoteRef(wid2) +fstore = RemoteChannel(wid2) put!(fstore, f) # send f to wid2 put!(f, :OK) # set value from master @@ -103,11 +103,11 @@ testval = remotecall_fetch(wid2, fstore) do x end @test testval == 1 -# Distributed GC tests for RemoteRefs +# Distributed GC tests for RemoteChannels function test_remoteref_dgc(id) - rr = RemoteRef(id) + rr = RemoteChannel(id) put!(rr, :OK) - rrid = remoteref_id(rr) + rrid = Base.remoteref_id(rr) # remote value should be deleted after finalizing the ref @test remotecall_fetch(k->haskey(Base.PGRP.refs, k), id, rrid) == true @@ -123,10 +123,10 @@ test_remoteref_dgc(id_other) # if sent to another worker, it should not be deleted till the other worker has also finalized. wid1 = workers()[1] wid2 = workers()[2] -rr = RemoteRef(wid1) -rrid = remoteref_id(rr) +rr = RemoteChannel(wid1) +rrid = Base.remoteref_id(rr) -fstore = RemoteRef(wid2) +fstore = RemoteChannel(wid2) put!(fstore, rr) @test remotecall_fetch(k->haskey(Base.PGRP.refs, k), wid1, rrid) == true @@ -140,7 +140,7 @@ sleep(0.5) # to ensure that wid2 messages have been executed on wid1 @test @fetchfrom id_other begin myid() end == id_other @fetch begin myid() end -# test getindex on Futures and RemoteRefs +# test getindex on Futures and RemoteChannels function test_indexing(rr) a = rand(5,5) put!(rr, a) @@ -150,8 +150,8 @@ end test_indexing(Future()) test_indexing(Future(id_other)) -test_indexing(RemoteRef()) -test_indexing(RemoteRef(id_other)) +test_indexing(RemoteChannel()) +test_indexing(RemoteChannel(id_other)) dims = (20,20,20) @@ -465,7 +465,7 @@ function test_channel(c) end test_channel(Channel(10)) -test_channel(RemoteRef(()->Channel(10))) +test_channel(RemoteChannel(()->Channel(10))) c=Channel{Int}(1) @test_throws MethodError put!(c, "Hello")