Skip to content

Commit

Permalink
updated docs
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Nov 12, 2015
1 parent 1a146df commit 66fbab6
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 131 deletions.
79 changes: 55 additions & 24 deletions base/docs/helpdb.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1155,18 +1155,29 @@ Return a tuple `(I, J, V)` where `I` and `J` are the row and column indexes of t
findnz

doc"""
RemoteRef()
RemoteChannel()
Make an uninitialized remote reference on the local machine.
Make an reference to a `Channel{Any}(1)` on the local machine.
"""
RemoteRef()
RemoteChannel()

doc"""
RemoteRef(n)
RemoteChannel(n)
Make an uninitialized remote reference on process `n`.
Make an reference to a `Channel{Any}(1)` on process `n`.
"""
RemoteRef(::Integer)
RemoteChannel(::Integer)

doc"""
RemoteChannel(f::Function, pid)
Create references to remote channels of a specific size and type. `f()` is a function that when
executed on `pid` must return an implementation of an `AbstractChannel`.
For example, `RemoteChannel(()->Channel{Int}(10), pid)`, will return a reference to a channel of type `Int`
and size 10 on `pid`.
"""
RemoteChannel(f::Function, pid)

doc"""
```rst
Expand Down Expand Up @@ -2468,7 +2479,7 @@ display
doc"""
@spawnat
Accepts two arguments, `p` and an expression. A closure is created around the expression and run asynchronously on process `p`. Returns a `RemoteRef` to the result.
Accepts two arguments, `p` and an expression. A closure is created around the expression and run asynchronously on process `p`. Returns a `Future` to the result.
"""
:@spawnat

Expand Down Expand Up @@ -4840,11 +4851,11 @@ Optional argument `msg` is a descriptive error string.
DimensionMismatch

doc"""
take!(RemoteRef)
take!(RemoteChannel)
Fetch the value of a remote reference, removing it so that the reference is empty again.
Fetch a value from a remote channel, also removing it in the processs.
"""
take!(::RemoteRef)
take!(::RemoteChannel)

doc"""
take!(Channel)
Expand Down Expand Up @@ -6639,7 +6650,8 @@ doc"""
Block the current task until some event occurs, depending on the type
of the argument:
* `RemoteRef`: Wait for a value to become available for the specified remote reference.
* `RemoteChannel` : Wait for a value to become available on the specified remote channel.
* `Future` : Wait for a value to become available for the specified future.
* `Channel`: Wait for a value to be appended to the channel.
* `Condition`: Wait for `notify` on a condition.
* `Process`: Wait for a process or process chain to exit. The `exitcode` field of a process can be used to determine success or failure.
Expand Down Expand Up @@ -7031,17 +7043,25 @@ Return the minimum of the arguments. Operates elementwise over arrays.
min

doc"""
isready(r::RemoteRef)
isready(r::RemoteChannel)
Determine whether a `RemoteRef` has a value stored to it. Note that this function can cause race conditions, since by the time you receive its result it may no longer be true. It is recommended that this function only be used on a `RemoteRef` that is assigned once.
Determine whether a `RemoteChannel` has a value stored to it. Note that this function can cause race conditions, since by the time you receive its result it may no longer be true.
However, it can be safely used on a `Future` since they are assigned only once.
"""
isready

If the argument `RemoteRef` is owned by a different node, this call will block to wait for the answer. It is recommended to wait for `r` in a separate task instead, or to use a local `RemoteRef` as a proxy:
doc"""
isready(r::Future)
rr = RemoteRef()
@async put!(rr, remotecall_fetch(long_computation, p))
isready(rr) # will not block
Determine whether a `Future` has a value stored to it.
If the argument `Future` is owned by a different node, this call will block to wait for the answer. It is recommended to wait for `r` in a separate task instead, or to use a local `Channel` as a proxy:
c = Channel(1)
@async put!(c, remotecall_fetch(long_computation, p))
isready(c) # will not block
"""
isready
isready(r::Future)

doc"""
InexactError()
Expand Down Expand Up @@ -8111,11 +8131,19 @@ An iterator that cycles through `iter` forever.
cycle

doc"""
put!(RemoteRef, value)
put!(RemoteChannel, value)
Store a value to the remote channel. If the channel is full, blocks until space is available. Returns its first argument.
"""
put!(::RemoteChannel, value)

doc"""
put!(Future, value)
Store a value to a remote reference. Implements "shared queue of length 1" semantics: if a value is already present, blocks until the value is removed with `take!`. Returns its first argument.
Store a value to a future. Future's are write-once remote references. A `put!` on an already set `Future` throws an Exception.
All asynchronous remote calls return `Future`s and set the value to the return value of the call upon completion.
"""
put!(::RemoteRef, value)
put!(::Future, value)

doc"""
put!(Channel, value)
Expand Down Expand Up @@ -8862,7 +8890,7 @@ Base.(:(!=))
doc"""
@spawn
Creates a closure around an expression and runs it on an automatically-chosen process, returning a `RemoteRef` to the result.
Creates a closure around an expression and runs it on an automatically-chosen process, returning a `Future` to the result.
"""
:@spawn

Expand Down Expand Up @@ -9116,7 +9144,7 @@ readavailable
doc"""
remotecall(func, id, args...)
Call a function asynchronously on the given arguments on the specified process. Returns a `RemoteRef`.
Call a function asynchronously on the given arguments on the specified process. Returns a `Future`.
"""
remotecall

Expand Down Expand Up @@ -9512,7 +9540,10 @@ doc"""
Waits and fetches a value from `x` depending on the type of `x`. Does not remove the item fetched:
* `RemoteRef`: Wait for and get the value of a remote reference. If the remote value is an exception, throws a `RemoteException` which captures the remote exception and backtrace.
* `Future`: Wait for and get the value of a Future. The fetched value is cached locally. Further calls to `fetch` on the same reference
returns the cached value.
If the remote value is an exception, throws a `RemoteException` which captures the remote exception and backtrace.
* `RemoteChannel`: Wait for and get the value of a remote reference. Exceptions raised are same as for a `Future` .
* `Channel` : Wait for and get the first available item from the channel.
"""
fetch
Expand Down
4 changes: 1 addition & 3 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export
Rational,
Regex,
RegexMatch,
RemoteRef,
RemoteChannel,
RepString,
RevString,
RoundFromZero,
Expand Down Expand Up @@ -1197,7 +1197,6 @@ export

# multiprocessing
addprocs,
channel_from_id,
ClusterManager,
fetch,
init_worker,
Expand All @@ -1214,7 +1213,6 @@ export
remotecall,
remotecall_fetch,
remotecall_wait,
remoteref_id,
rmprocs,
take!,
timedwait,
Expand Down
50 changes: 25 additions & 25 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ function deregister_worker(pg, pid)
end
push!(map_del_wrkr, pid)

# delete this worker from our RemoteRef client sets
# delete this worker from our remote reference client sets
ids = []
tonotify = []
for (id,rv) in pg.refs
Expand Down Expand Up @@ -477,12 +477,12 @@ type Future <: AbstractRemoteRef
end
finalize_future(f::Future) = (isnull(f.v) && send_del_client(f))

type RemoteRef{T<:AbstractChannel} <: AbstractRemoteRef
type RemoteChannel{T<:AbstractChannel} <: AbstractRemoteRef
where::Int
whence::Int
id::Int

RemoteRef(w, wh, id) = (r = new(w,wh,id); test_existing_ref(r))
RemoteChannel(w, wh, id) = (r = new(w,wh,id); test_existing_ref(r))
end

function test_existing_ref(r::Future)
Expand All @@ -500,7 +500,7 @@ function test_existing_ref(r::Future)
r
end

function test_existing_ref(r::RemoteRef)
function test_existing_ref(r::RemoteChannel)
found = getkey(client_refs, r, false)
!is(found,false) && return found
client_refs[r] = true
Expand All @@ -523,14 +523,14 @@ function Future(pid::Integer=myid())
Future(pid, rrid[1], rrid[2])
end

function RemoteRef(pid::Integer=myid())
function RemoteChannel(pid::Integer=myid())
rrid = next_rrid_tuple()
RemoteRef{Channel{Any}}(pid, rrid[1], rrid[2])
RemoteChannel{Channel{Any}}(pid, rrid[1], rrid[2])
end
function RemoteRef(f::Function, pid::Integer=myid())
function RemoteChannel(f::Function, pid::Integer=myid())
remotecall_fetch(pid, f, next_rrid_tuple()) do f, rrid
rv=lookup_ref(rrid, f)
RemoteRef{typeof(rv.c)}(myid(), rrid[1], rrid[2])
RemoteChannel{typeof(rv.c)}(myid(), rrid[1], rrid[2])
end
end

Expand All @@ -541,7 +541,7 @@ remoteref_id(r::AbstractRemoteRef) = (r.whence, r.id)
function channel_from_id(id)
rv = get(PGRP.refs, id, false)
if rv === false
throw(ErrorException("Local instance of remoteref not found"))
throw(ErrorException("Local instance of remote reference not found"))
end
rv.c
end
Expand All @@ -568,7 +568,7 @@ function isready(rr::Future)
end
end

function isready(rr::RemoteRef, args...)
function isready(rr::RemoteChannel, args...)
rid = remoteref_id(rr)
if rr.where == myid()
isready(lookup_ref(rid).c, args...)
Expand Down Expand Up @@ -648,10 +648,10 @@ function send_add_client(rr::AbstractRemoteRef, i)
end
end

channel_type{T}(rr::RemoteRef{T}) = T
channel_type{T}(rr::RemoteChannel{T}) = T

serialize(s::SerializationState, f::Future) = serialize(s, f, isnull(f.v))
serialize(s::SerializationState, rr::RemoteRef) = serialize(s, rr, true)
serialize(s::SerializationState, rr::RemoteChannel) = serialize(s, rr, true)
function serialize(s::SerializationState, rr::AbstractRemoteRef, addclient)
if addclient
p = worker_id_from_socket(s.io)
Expand All @@ -665,10 +665,10 @@ function deserialize{T<:Future}(s::SerializationState, t::Type{T})
Future(f.where, f.whence, f.id) # ctor adds to ref table
end

function deserialize{T<:RemoteRef}(s::SerializationState, t::Type{T})
function deserialize{T<:RemoteChannel}(s::SerializationState, t::Type{T})
rr = deserialize_rr(s,t)
# call ctor to make sure this rr gets added to the client_refs table
RemoteRef{channel_type(rr)}(rr.where, rr.whence, rr.id)
RemoteChannel{channel_type(rr)}(rr.where, rr.whence, rr.id)
end

function deserialize_rr(s, t)
Expand All @@ -681,7 +681,7 @@ function deserialize_rr(s, t)
rr
end

# data stored by the owner of a RemoteRef
# data stored by the owner of a remote reference
def_rv_channel() = Channel(1)
type RemoteValue
c::AbstractChannel
Expand Down Expand Up @@ -735,7 +735,7 @@ end

#localize_ref(b::Box) = Box(localize_ref(b.contents))

#function localize_ref(r::RemoteRef)
#function localize_ref(r::RemoteChannel)
# if r.where == myid()
# fetch(r)
# else
Expand Down Expand Up @@ -860,7 +860,7 @@ function wait_ref(rid, callee, args...)
nothing
end
wait(r::Future) = (!isnull(r.v) && return r; call_on_owner(wait_ref, r, myid()); r)
wait(r::RemoteRef, args...) = (call_on_owner(wait_ref, r, myid(), args...); r)
wait(r::RemoteChannel, args...) = (call_on_owner(wait_ref, r, myid(), args...); r)

function fetch_future(rid, callee)
rv = lookup_ref(rid);
Expand All @@ -876,7 +876,7 @@ function fetch(r::Future)
end

fetch_ref(rid, args...) = fetch(lookup_ref(rid).c, args...)
fetch(r::RemoteRef, args...) = call_on_owner(fetch_ref, r, args...)
fetch(r::RemoteChannel, args...) = call_on_owner(fetch_ref, r, args...)
fetch(x::ANY) = x

isready(rv::RemoteValue, args...) = isready(rv.c, args...)
Expand All @@ -898,7 +898,7 @@ end

put!(rv::RemoteValue, args...) = put!(rv.c, args...)
put_ref(rid, args...) = (put!(lookup_ref(rid), args...); nothing)
put!(rr::RemoteRef, args...) = (call_on_owner(put_ref, rr, args...); rr)
put!(rr::RemoteChannel, args...) = (call_on_owner(put_ref, rr, args...); rr)

# take! is not supported on Future

Expand All @@ -908,12 +908,12 @@ function take_ref(rid, callee, args...)
isa(v, RemoteException) && (myid() == callee) && throw(v)
v
end
take!(rr::RemoteRef, args...) = call_on_owner(take_ref, rr, myid(), args...)
take!(rr::RemoteChannel, args...) = call_on_owner(take_ref, rr, myid(), args...)

# close is not supported on Future

close_ref(rid) = (close(lookup_ref(rid).c); nothing)
close(rr::RemoteRef) = call_on_owner(close_ref, rr)
close(rr::RemoteChannel) = call_on_owner(close_ref, rr)


function deliver_result(sock::IO, msg, oid, value)
Expand Down Expand Up @@ -1179,7 +1179,7 @@ function init_worker(manager::ClusterManager=DefaultClusterManager())
cluster_manager = manager
disable_threaded_libs()

# Since our pid has yet to be set, ensure no RemoteRef / Future have been created or addprocs() called.
# Since our pid has yet to be set, ensure no RemoteChannel / Future have been created or addprocs() called.
assert(nprocs() <= 1)
assert(isempty(PGRP.refs))
assert(isempty(client_refs))
Expand Down Expand Up @@ -1436,7 +1436,7 @@ let nextidx = 0
if isa(v,Box)
v = v.contents
end
if isa(v,RemoteRef)
if isa(v,AbstractRemoteRef)
p = v.where; break
end
end
Expand Down Expand Up @@ -1790,11 +1790,11 @@ function terminate_all_workers()
end
end

getindex(r::RemoteRef) = fetch(r)
getindex(r::RemoteChannel) = fetch(r)
getindex(r::Future) = fetch(r)

getindex(r::Future, args...) = getindex(fetch(r), args...)
function getindex(r::RemoteRef, args...)
function getindex(r::RemoteChannel, args...)
if r.where == myid()
return getindex(fetch(r), args...)
end
Expand Down
Loading

0 comments on commit 66fbab6

Please sign in to comment.