diff --git a/base/REPL.jl b/base/REPL.jl index 3519bc13f0a98..e99f9ac47d832 100644 --- a/base/REPL.jl +++ b/base/REPL.jl @@ -36,8 +36,8 @@ abstract AbstractREPL answer_color(::AbstractREPL) = "" type REPLBackend - repl_channel::RemoteRef - response_channel::RemoteRef + repl_channel::Channel + response_channel::Channel in_eval::Bool ans backend_task::Task @@ -75,7 +75,7 @@ function eval_user_input(ast::ANY, backend::REPLBackend) end end -function start_repl_backend(repl_channel::RemoteRef, response_channel::RemoteRef) +function start_repl_backend(repl_channel::Channel, response_channel::Channel) backend = REPLBackend(repl_channel, response_channel, false, nothing) backend.backend_task = @schedule begin # include looks at this to determine the relative include path @@ -154,13 +154,13 @@ end # A reference to a backend immutable REPLBackendRef - repl_channel::RemoteRef - response_channel::RemoteRef + repl_channel::Channel + response_channel::Channel end function run_repl(repl::AbstractREPL) - repl_channel = RemoteRef() - response_channel = RemoteRef() + repl_channel = Channel(1) + response_channel = Channel(1) backend = start_repl_backend(repl_channel, response_channel) run_frontend(repl, REPLBackendRef(repl_channel,response_channel)) backend @@ -665,7 +665,7 @@ function setup_interface(repl::LineEditREPL; hascolor = repl.hascolor, extra_rep # # Usage: # - # repl_channel,response_channel = RemoteRef(),RemoteRef() + # repl_channel,response_channel = Channel(),Channel() # start_repl_backend(repl_channel, response_channel) # setup_interface(REPLDisplay(t),repl_channel,response_channel) # @@ -894,8 +894,8 @@ input_color(r::StreamREPL) = r.input_color function run_repl(stream::AsyncStream) repl = @async begin - repl_channel = RemoteRef() - response_channel = RemoteRef() + repl_channel = Channel(1) + response_channel = Channel(1) start_repl_backend(repl_channel, response_channel) StreamREPL_frontend(repl, repl_channel, response_channel) end diff --git a/base/channels.jl b/base/channels.jl new file mode 100644 index 0000000000000..b23e13a325e65 --- /dev/null +++ b/base/channels.jl @@ -0,0 +1,137 @@ +# This file is a part of Julia. License is MIT: http://julialang.org/license + +abstract AbstractChannel{T} + +type Channel{T} <: AbstractChannel{T} + cond_take::Condition # waiting for data to become available + cond_put::Condition # waiting for a writeable slot + state::Symbol + + data::Array{T,1} + szp1::Int # current channel size plus one + sz_max::Int # maximum size of channel + take_pos::Int # read position + put_pos::Int # write position + + function Channel(sz) + sz_max = sz == typemax(Int) ? typemax(Int) - 1 : sz + szp1 = sz > 32 ? 33 : sz+1 + new(Condition(), Condition(), :open, + Array(T, szp1), szp1, sz_max, 1, 1) + end +end + +const DEF_CHANNEL_SZ=32 + +Channel() = Channel(DEF_CHANNEL_SZ) +Channel(sz::Int) = Channel{Any}(sz) + +closed_exception() = InvalidStateException("Channel is closed.", :closed) +function close(c::Channel) + c.state = :closed + notify_error(c::Channel, closed_exception()) + c +end +isopen(c::Channel) = (c.state == :open) + +type InvalidStateException <: Exception + msg::AbstractString + state +end +InvalidStateException() = InvalidStateException("") +InvalidStateException(msg) = InvalidStateException(msg, 0) + +function put!(c::Channel, v) + !isopen(c) && throw(closed_exception()) + d = c.take_pos - c.put_pos + if (d == 1) || (d == -(c.szp1-1)) + # grow the channel if possible + if (c.szp1 - 1) < c.sz_max + if ((c.szp1-1) * 2) > c.sz_max + c.szp1 = c.sz_max + 1 + else + c.szp1 = ((c.szp1-1) * 2) + 1 + end + newdata = Array(eltype(c), c.szp1) + if c.put_pos > c.take_pos + copy!(newdata, 1, c.data, c.take_pos, (c.put_pos - c.take_pos)) + c.put_pos = c.put_pos - c.take_pos + 1 + else + len_first_part = length(c.data) - c.take_pos + 1 + copy!(newdata, 1, c.data, c.take_pos, len_first_part) + copy!(newdata, len_first_part+1, c.data, 1, c.put_pos-1) + c.put_pos = len_first_part + c.put_pos + end + c.take_pos = 1 + c.data = newdata + else + wait(c.cond_put) + end + end + + c.data[c.put_pos] = v + c.put_pos = (c.put_pos == c.szp1 ? 1 : c.put_pos + 1) + notify(c.cond_take, nothing, true, false) # notify all, since some of the waiters may be on a "fetch" call. + v +end + +function fetch(c::Channel) + wait(c) + c.data[c.take_pos] +end + +function take!(c::Channel) + !isopen(c) && !isready(c) && throw(closed_exception()) + while !isready(c) + wait(c.cond_take) + end + v = c.data[c.take_pos] + c.take_pos = (c.take_pos == c.szp1 ? 1 : c.take_pos + 1) + notify(c.cond_put, nothing, false, false) # notify only one, since only one slot has become available for a put!. + v +end + +isready(c::Channel) = (c.take_pos == c.put_pos ? false : true) + +function wait(c::Channel) + while !isready(c) + wait(c.cond_take) + end + nothing +end + +function notify_error(c::Channel, err) + notify_error(c.cond_take, err) + notify_error(c.cond_put, err) +end + +eltype{T}(c::Channel{T}) = T + +function length(c::Channel) + if c.put_pos >= c.take_pos + return c.put_pos - c.take_pos + else + return c.szp1 - c.take_pos + c.put_pos + end +end + +size(c::Channel) = c.sz_max + +show(io::IO, c::Channel) = print(io, "$(typeof(c))(sz_max:$(size(c)),sz_curr:$(length(c)))") + +start{T}(c::Channel{T}) = Ref{Nullable{T}}(Nullable{T}()) +function done(c::Channel, state::Ref) + try + # we are waiting either for more data or channel to be closed + state.x = take!(c) + return false + catch e + if isa(e, InvalidStateException) && e.state==:closed + return true + else + rethrow(e) + end + end +end +next{T}(c::Channel{T}, state) = (get(state.x), Ref{Nullable{T}}(Nullable{T}())) + diff --git a/base/exports.jl b/base/exports.jl index 38426568f1ebe..8e9bd9633a4e2 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -40,6 +40,7 @@ export BufferStream, CartesianIndex, CartesianRange, + Channel, Cmd, Colon, Complex, @@ -157,6 +158,7 @@ export DimensionMismatch, EOFError, ErrorException, + InvalidStateException, KeyError, LoadError, MethodError, diff --git a/base/precompile.jl b/base/precompile.jl index 6d2d0f027590c..60c286ec4ca60 100644 --- a/base/precompile.jl +++ b/base/precompile.jl @@ -115,7 +115,7 @@ precompile(Base.ProcessGroup, (Int, Array{Any,1}, Array{Any,1})) precompile(Base.REPL.(:(==)), (Base.REPL.REPLDisplay{Base.REPL.LineEditREPL}, Base.REPL.REPLDisplay{Base.REPL.LineEditREPL})) precompile(Base.REPL.LineEditREPL, (Base.Terminals.TTYTerminal, Bool, ASCIIString, ASCIIString, ASCIIString, ASCIIString, ASCIIString, Bool, Bool, Bool, Bool)) precompile(Base.REPL.LineEditREPL, (Base.Terminals.TTYTerminal,)) -precompile(Base.REPL.REPLBackendRef, (RemoteRef, RemoteRef)) +precompile(Base.REPL.REPLBackendRef, (Channel, Channel)) precompile(Base.REPL.REPLDisplay, (Base.REPL.BasicREPL,)) precompile(Base.REPL.REPLDisplay, (Base.REPL.LineEditREPL,)) precompile(Base.REPL.add_history, (Base.REPL.REPLHistoryProvider, Base.LineEdit.PromptState)) @@ -135,9 +135,9 @@ precompile(Base.REPL.respond, (Function, Base.REPL.LineEditREPL, Base.LineEdit.P precompile(Base.REPL.return_callback, (Base.LineEdit.PromptState,)) precompile(Base.REPL.run_repl, (Base.REPL.LineEditREPL,)) precompile(Base.REPL.send_to_backend, (Expr, Base.REPL.REPLBackendRef)) -precompile(Base.REPL.send_to_backend, (Expr, RemoteRef, RemoteRef)) +precompile(Base.REPL.send_to_backend, (Expr, Channel, Channel)) precompile(Base.REPL.send_to_backend, (Symbol, Base.REPL.REPLBackendRef)) -precompile(Base.REPL.start_repl_backend, (RemoteRef, RemoteRef)) +precompile(Base.REPL.start_repl_backend, (Channel, Channel)) precompile(Base.REPLCompletions.complete_methods, (ASCIIString,)) precompile(Base.REPLCompletions.complete_symbol, (ASCIIString, Function)) precompile(Base.REPLCompletions.completions, (ASCIIString, Int)) diff --git a/base/sysimg.jl b/base/sysimg.jl index ea370521f1786..af2c760bc9107 100644 --- a/base/sysimg.jl +++ b/base/sysimg.jl @@ -204,6 +204,7 @@ importall .Enums # concurrency and parallelism include("serialize.jl") importall .Serializer +include("channels.jl") include("multi.jl") include("managers.jl") diff --git a/base/task.jl b/base/task.jl index 0fef24dd06c94..31489d3f25b9d 100644 --- a/base/task.jl +++ b/base/task.jl @@ -235,7 +235,8 @@ function wait(c::Condition) end end -function notify(c::Condition, arg::ANY=nothing; all=true, error=false) +notify(c::Condition, arg::ANY=nothing; all=true, error=false) = notify(c, arg, all, error) +function notify(c::Condition, arg, all, error) if all for t in c.waitq schedule(t, arg, error=error) diff --git a/doc/manual/control-flow.rst b/doc/manual/control-flow.rst index 292744b2f5ab8..655b469d74906 100644 --- a/doc/manual/control-flow.rst +++ b/doc/manual/control-flow.rst @@ -600,47 +600,49 @@ Built-in :exc:`Exception`\ s :exc:`Exception`\ s are thrown when an unexpected condition has occurred. The built-in :exc:`Exception`\ s listed below all interrupt the normal flow of control. -+---------------------------+ -| :exc:`Exception` | -+===========================+ -| :exc:`ArgumentError` | -+---------------------------+ -| :exc:`BoundsError` | -+---------------------------+ -| :exc:`DivideError` | -+---------------------------+ -| :exc:`DomainError` | -+---------------------------+ -| :exc:`EOFError` | -+---------------------------+ -| :exc:`ErrorException` | -+---------------------------+ -| :exc:`InexactError` | -+---------------------------+ -| :exc:`InterruptException` | -+---------------------------+ -| :exc:`KeyError` | -+---------------------------+ -| :exc:`LoadError` | -+---------------------------+ -| :exc:`OutOfMemoryError` | -+---------------------------+ -| :exc:`ReadOnlyMemoryError`| -+---------------------------+ -| :exc:`MethodError` | -+---------------------------+ -| :exc:`OverflowError` | -+---------------------------+ -| :exc:`ParseError` | -+---------------------------+ -| :exc:`SystemError` | -+---------------------------+ -| :exc:`TypeError` | -+---------------------------+ -| :exc:`UndefRefError` | -+---------------------------+ -| :exc:`UndefVarError` | -+---------------------------+ ++------------------------------+ +| :exc:`Exception` | ++==============================+ +| :exc:`ArgumentError` | ++------------------------------+ +| :exc:`BoundsError` | ++------------------------------+ +| :exc:`DivideError` | ++------------------------------+ +| :exc:`DomainError` | ++------------------------------+ +| :exc:`EOFError` | ++------------------------------+ +| :exc:`ErrorException` | ++------------------------------+ +| :exc:`InexactError` | ++------------------------------+ +| :exc:`InterruptException` | ++------------------------------+ +| :exc:`InvalidStateException` | ++------------------------------+ +| :exc:`KeyError` | ++------------------------------+ +| :exc:`LoadError` | ++------------------------------+ +| :exc:`OutOfMemoryError` | ++------------------------------+ +| :exc:`ReadOnlyMemoryError` | ++------------------------------+ +| :exc:`MethodError` | ++------------------------------+ +| :exc:`OverflowError` | ++------------------------------+ +| :exc:`ParseError` | ++------------------------------+ +| :exc:`SystemError` | ++------------------------------+ +| :exc:`TypeError` | ++------------------------------+ +| :exc:`UndefRefError` | ++------------------------------+ +| :exc:`UndefVarError` | ++------------------------------+ For example, the :func:`sqrt` function throws a :exc:`DomainError` if applied to a diff --git a/doc/manual/parallel-computing.rst b/doc/manual/parallel-computing.rst index fbab3fd99e8d6..a6b0007cb5845 100644 --- a/doc/manual/parallel-computing.rst +++ b/doc/manual/parallel-computing.rst @@ -445,6 +445,24 @@ preemptively. This means context switches only occur at well-defined points: in this case, when :func:`remotecall_fetch` is called. +Channels +-------- +Channels provide for a fast means of inter-task communication. A +``Channel(T::Type, n::Int)`` is a shared queue of maximum length ``n`` +holding objects of type ``T``. Multiple readers can read off the channel +via ``fetch`` and ``take!``. Multiple writers can add to the channel via +``put!``. ``isready`` tests for the prescence of any object in +the channel, while ``wait`` waits for an object to become available. +``close`` closes a Channel. On a closed channel, ``put!`` will fail, +while ``take!`` and ``fetch`` successfully return any existing values +till it is emptied. + +A Channel can be used as an iterable object in a ``for`` loop, in which +case the loop runs as long as the channel has data or is open. The loop +variable takes on all values added to the channel. An empty, closed channel +causes the ``for`` loop to terminate. + + Shared Arrays (Experimental) ----------------------------------------------- diff --git a/doc/stdlib/parallel.rst b/doc/stdlib/parallel.rst index cbac48bb48ac9..4b1d747233fd8 100644 --- a/doc/stdlib/parallel.rst +++ b/doc/stdlib/parallel.rst @@ -64,7 +64,7 @@ Tasks Edge triggering means that only tasks waiting at the time ``notify`` is called can be woken up. For level-triggered notifications, you must keep extra state to keep track of whether a notification has happened. - The ``RemoteRef`` type does this, and so can be used for level-triggered + The ``Channel`` type does this, and so can be used for level-triggered events. .. function:: notify(condition, val=nothing; all=true, error=false) @@ -114,6 +114,15 @@ Tasks Releases ownership of the lock by the current task. If the lock had been acquired before, it just decrements an internal counter and returns immediately. +.. function:: Channel{T}(sz::Int) + + Constructs a Channel that can hold a maximum of ``sz`` objects of type ``T``. ``put!`` calls + on a full channel block till an object is removed with ``take!``. + + Other constructors: + ``Channel()`` - equivalent to ``Channel{Any}(32)`` + ``Channel(sz::Int)`` equivalent to ``Channel{Any}(sz)`` + General Parallel Computing Support ---------------------------------- @@ -230,6 +239,8 @@ General Parallel Computing Support * ``RemoteRef``: Wait for a value to become available for the specified remote reference. + * ``Channel``: Wait for a value to be appended to the channel. + * ``Condition``: Wait for ``notify`` on a condition. * ``Process``: Wait for a process or process chain to exit. The ``exitcode`` field of a process can be used to determine success or failure. @@ -246,9 +257,13 @@ General Parallel Computing Support Often ``wait`` is called within a ``while`` loop to ensure a waited-for condition is met before proceeding. -.. function:: fetch(RemoteRef) +.. function:: fetch(x) - Wait for and get the value of a remote reference. + Waits and fetches a value from ``x`` depending on the type of ``x``. Does not remove the item fetched: + + * ``RemoteRef``: Wait for and get the value of a remote reference. + + * ``Channel`` : Wait for and get the first available item from the channel. .. function:: remotecall_wait(id, func, args...) @@ -262,10 +277,18 @@ General Parallel Computing Support Store a value to a remote reference. Implements "shared queue of length 1" semantics: if a value is already present, blocks until the value is removed with ``take!``. Returns its first argument. +.. function:: put!(Channel, value) + + Appends an item to the channel. Blocks if the channel is full. + .. function:: take!(RemoteRef) Fetch the value of a remote reference, removing it so that the reference is empty again. +.. function:: take!(Channel) + + Removes and returns a value from a ``Channel``. Blocks till data is available. + .. function:: isready(r::RemoteRef) Determine whether a ``RemoteRef`` has a value stored to it. Note that this function @@ -281,6 +304,14 @@ General Parallel Computing Support @async put!(rr, remotecall_fetch(p, long_computation)) isready(rr) # will not block +.. function:: close(Channel) + + Closes a channel. An exception is thrown by: + + * ``put!`` on a on a closed channel. + + * ``take!`` and ``fetch`` on an empty, closed channel. + .. function:: RemoteRef() Make an uninitialized remote reference on the local machine. diff --git a/test/file.jl b/test/file.jl index f3c25b3c8d9bc..f65f2b8e179f3 100644 --- a/test/file.jl +++ b/test/file.jl @@ -115,7 +115,7 @@ end function test_timeout(tval) tic() - channel = RemoteRef() + channel = Channel(1) @async test_file_poll(channel, 10, tval) tr = take!(channel) t_elapsed = toq() @@ -125,7 +125,7 @@ end function test_touch(slval) tval = slval*1.1 - channel = RemoteRef() + channel = Channel(1) @async test_file_poll(channel, tval/3, tval) sleep(tval/3) # one poll period f = open(file,"a") diff --git a/test/parallel.jl b/test/parallel.jl index 27e70247ecf75..8bf71a03dc7c1 100644 --- a/test/parallel.jl +++ b/test/parallel.jl @@ -150,11 +150,11 @@ workloads = hist(@parallel((a,b)->[a;b], for i=1:7; myid(); end), nprocs())[2] # @parallel reduction should work even with very short ranges @test @parallel(+, for i=1:2; i; end) == 3 -# Testing timedwait on multiple RemoteRefs +# Testing timedwait on multiple channels @sync begin - rr1 = RemoteRef() - rr2 = RemoteRef() - rr3 = RemoteRef() + rr1 = Channel() + rr2 = Channel() + rr3 = Channel() @async begin sleep(0.5); put!(rr1, :ok) end @async begin sleep(1.0); put!(rr2, :ok) end @@ -196,7 +196,7 @@ num_small_requests = 10000 # test parallel sends of large arrays from multiple tasks to the same remote worker ntasks = 10 -rr_list = [RemoteRef() for x in 1:ntasks] +rr_list = [Channel() for x in 1:ntasks] a=ones(2*10^5); for rr in rr_list @async let rr=rr @@ -214,6 +214,69 @@ end @test [fetch(rr) for rr in rr_list] == [:OK for x in 1:ntasks] +function test_channel(c) + put!(c, 1) + put!(c, "Hello") + put!(c, 5.0) + + @test isready(c) == true + @test fetch(c) == 1 + @test fetch(c) == 1 # Should not have been popped previously + @test take!(c) == 1 + @test take!(c) == "Hello" + @test fetch(c) == 5.0 + @test take!(c) == 5.0 + @test isready(c) == false + close(c) +end + +test_channel(Channel(10)) + +c=Channel{Int}(1) +@test_throws MethodError put!(c, "Hello") + +c=Channel(256) +# Test growth of channel +@test c.szp1 <= 33 +for x in 1:40 + put!(c, x) +end +@test c.szp1 <= 65 +for x in 1:39 + take!(c) +end +for x in 1:64 + put!(c, x) +end +@test (c.szp1 > 65) && (c.szp1 <= 129) +for x in 1:39 + take!(c) +end +@test fetch(c) == 39 +for x in 1:26 + take!(c) +end +@test isready(c) == false + +# test channel iterations +function test_iteration(in_c, out_c) + t=@schedule for v in in_c + put!(out_c, v) + end + + isa(in_c, Channel) && @test isopen(in_c) == true + put!(in_c, 1) + @test take!(out_c) == 1 + put!(in_c, "Hello") + close(in_c) + @test take!(out_c) == "Hello" + isa(in_c, Channel) && @test isopen(in_c) == false + @test_throws InvalidStateException put!(in_c, :foo) + yield() + @test istaskdone(t) == true +end + +test_iteration(Channel(10), Channel(10)) # The below block of tests are usually run only on local development systems, since: # - addprocs tests are memory intensive diff --git a/test/socket.jl b/test/socket.jl index 645d989321808..13cd5b64a6c44 100644 --- a/test/socket.jl +++ b/test/socket.jl @@ -64,7 +64,7 @@ end @test repr(ip"2001:db8:0:0:1:0:0:1") == "ip\"2001:db8::1:0:0:1\"" @test repr(ip"2001:0:0:1:0:0:0:1") == "ip\"2001:0:0:1::1\"" -port = RemoteRef() +port = Channel(1) c = Base.Condition() defaultport = rand(2000:4000) tsk = @async begin @@ -107,7 +107,7 @@ end @test_throws ArgumentError connect(ip"0:0:0:0:0:ffff:127.0.0.1", typemax(UInt16)+1) p, server = listenany(defaultport) -r = RemoteRef() +r = Channel(1) tsk = @async begin put!(r, :start) @test_throws Base.UVError accept(server) diff --git a/test/spawn.jl b/test/spawn.jl index 277a7143286c3..313d03ed10012 100644 --- a/test/spawn.jl +++ b/test/spawn.jl @@ -79,7 +79,7 @@ rm(file) # Stream Redirection @unix_only begin - r = RemoteRef() + r = Channel(1) @async begin port, server = listenany(2326) put!(r,port) @@ -125,7 +125,7 @@ rm(file) # issue #3373 # fixing up Conditions after interruptions -r = RemoteRef() +r = Channel(1) t = @async begin try wait(r) @@ -141,7 +141,7 @@ yield() # Test marking of AsyncStream -r = RemoteRef() +r = Channel(1) @async begin port, server = listenany(2327) put!(r, port)