From c0919754f80692e6f33e58fa9d38e143350ad89e Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Mon, 16 Jun 2014 01:24:42 -0400 Subject: [PATCH] implement #6563 to merge close #4401 --- base/poll.jl | 106 +++++++++++++++---------------------------------- base/stream.jl | 18 +++++++-- base/sysimg.jl | 1 - base/task.jl | 81 +++++++++++++++++++++++++++++++++++++ test/pollfd.jl | 102 ++++++++++++++++++++++++----------------------- 5 files changed, 180 insertions(+), 128 deletions(-) diff --git a/base/poll.jl b/base/poll.jl index f81fa24073b2e..0907dc9bd2bf6 100644 --- a/base/poll.jl +++ b/base/poll.jl @@ -13,17 +13,11 @@ type FileMonitor this = new(handle,cb,false,Condition()) associate_julia_struct(handle,this) finalizer(this,uvfinalize) - this + this end FileMonitor(file) = FileMonitor(false,file) end -function close(t::FileMonitor) - if t.handle != C_NULL - ccall(:jl_close_uv,Void,(Ptr{Void},),t.handle) - end -end - immutable FileEvent renamed::Bool changed::Bool @@ -82,11 +76,11 @@ type PollingFileWatcher <: UVPollingWatcher associate_julia_struct(handle,this) finalizer(this,uvfinalize) this - end + end PollingFileWatcher(file) = PollingFileWatcher(false,file) end -@unix_only typealias FDW_FD RawFD +@unix_only typealias FDW_FD RawFD @windows_only typealias FDW_FD WindowsRawSocket @unix_only _get_osfhandle(fd::RawFD) = fd @@ -141,7 +135,7 @@ function fdw_wait_cb(fdw::FDWatcher, events::FDEvent, status) end end -function _wait(fdw::FDWatcher,readable,writable) +function wait(fdw::FDWatcher; readable=false,writable=false) if !readable && !writable error("must watch for at least one event") end @@ -165,46 +159,27 @@ function _wait(fdw::FDWatcher,readable,writable) events end -# On Unix we can only have one watcher per FD, so we need to keep an explicit -# list of them. On Windows, I think it is techincally possible to have more than one -# watcher per FD, but in order to keep compatibility, we do the same on windows as we do -# on unix - -let - global fdwatcher_init, wait - @unix_only begin - local fdwatcher_array - function fdwatcher_init() - fdwatcher_array = Array(FDWatcher,0) +@unix_only begin + function wait(fd::RawFD; readable=false, writable=false) + fdw = FDWatcher(fd) + try + return wait(fdw,readable=readable,writable=writable) + finally + close(fdw) end - - function wait(fd::RawFD; readable=false, writable=false) - old_length = length(fdwatcher_array) - if fd.fd+1 > old_length - resize!(fdwatcher_array,fd.fd+1) - end - if !isdefined(fdwatcher_array,fd.fd+1) - fdwatcher_array[fd.fd+1] = FDWatcher(fd) - end - _wait(fdwatcher_array[fd.fd+1],readable,writable) - end end - @windows_only begin - local fdwatcher_array - function fdwatcher_init() - fdwatcher_array = Dict{WindowsRawSocket,FDWatcher}() - end - - function wait(fd::RawFD; readable=false, writable=false) - wait(_get_osfhandle(fd); readable=readable, writable=writable) +end +@windows_only begin + function wait(fd::RawFD; readable=false, writable=false) + wait(_get_osfhandle(fd); readable=readable, writable=writable) + end + function wait(socket::WindowsRawSocket; readable=false, writable=false) + fdw = FDWatcher(fd) + try + return wait(fdw,readable=readable,writable=writable) + finally + close(fdw) end - - function wait(socket::WindowsRawSocket; readable=false, writable=false) - if !haskey(fdwatcher_array,socket.handle) - fdwatcher_array[socket] = FDWatcher(socket) - end - _wait(fdwatcher_array[socket],readable,writable) - end end end @@ -232,8 +207,13 @@ function wait(m::FileMonitor) filename, events end - -close(t::UVPollingWatcher) = ccall(:jl_close_uv,Void,(Ptr{Void},),t.handle) +function close(t::Union(FileMonitor,UVPollingWatcher)) + if t.handle != C_NULL + ccall(:jl_close_uv,Void,(Ptr{Void},),t.handle) + disassociate_julia_struct(t) + t.handle = C_NULL + end +end function start_watching(t::FDWatcher, events::FDEvent) associate_julia_struct(t.handle, t) @@ -293,30 +273,10 @@ end _uv_hook_close(uv::FileMonitor) = (uv.handle = 0; nothing) _uv_hook_close(uv::UVPollingWatcher) = (uv.handle = 0; nothing) - function poll_fd(s, seconds::Real; readable=false, writable=false) - wt = Condition() - - @schedule (args = wait(s; readable=readable, writable=writable); notify(wt,(:poll,args))) - @schedule (sleep(seconds); notify(wt,(:timeout,fdtimeout()))) - - _, ret = wait(wt) - - return ret -end - -function poll_file(s, interval_seconds::Real, seconds::Real) - wt = Condition() - pfw = PollingFileWatcher(s) - - @schedule (wait(pfw;interval=interval_seconds); notify(wt,(:poll))) - @schedule (sleep(seconds); notify(wt,(:timeout))) - - result = wait(wt) - if result == :timeout - stop_watching(pfw) - end - result == :poll + t1 = () -> wait(s; readable=readable, writable=writable) + t2 = () -> (sleep(seconds); fdtimeout()) + wait(t2, t1) end watch_file(s; poll=false) = watch_file(false, s, poll=poll) @@ -325,7 +285,7 @@ function watch_file(cb, s; poll=false) pfw = PollingFileWatcher(cb,s) start_watching(pfw) return pfw - else + else return FileMonitor(cb,s) end end diff --git a/base/stream.jl b/base/stream.jl index 966685f61ff01..45a86a8063e69 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -404,6 +404,7 @@ function _uv_hook_readcb(stream::AsyncStream, nread::Int, base::Ptr{Void}, len:: (nb_available(stream.buffer) == stream.buffer.maxsize) stop_reading(stream) end + nothing end ########################################## # Async Workers @@ -454,6 +455,7 @@ function _uv_hook_close(uv::Union(AsyncStream,UVServer)) notify(uv.closenotify) try notify(uv.readnotify) end try notify(uv.connectnotify) end + nothing end _uv_hook_close(uv::AsyncWork) = (uv.handle = C_NULL; nothing) @@ -490,11 +492,9 @@ function stop_timer(timer::Timer) unpreserve_handle(timer) end -function sleep(sec::Real) +function wait(sec::Real) w = Condition() - timer = Timer(function (tmr) - notify(w) - end) + timer = Timer( tmr->notify(w) ) start_timer(timer, float(sec), 0) try stream_wait(timer,w) @@ -503,6 +503,15 @@ function sleep(sec::Real) end nothing end +sleep(sec::Real) = wait(sec) +function waitq(sec::Real, killq) + c = Condition() + t = Timer( (tmr)->notify(c) ) + start_timer(t, float(sec), 0) + push!(killq, t) + return c +end +waitkill(t::Timer,ct::Task) = (stop_timer(t); nothing) ## event loop ## eventloop() = global uv_eventloop::Ptr{Void} @@ -793,6 +802,7 @@ function _uv_hook_writecb_task(s::AsyncStream,req::Ptr{Void},status::Int32) elseif d != C_NULL schedule(unsafe_pointer_to_objref(d)::Task) end + nothing end ## Libuv error handling ## diff --git a/base/sysimg.jl b/base/sysimg.jl index e443c3056419b..370e87102fbb3 100644 --- a/base/sysimg.jl +++ b/base/sysimg.jl @@ -267,7 +267,6 @@ function __init__() # Base library init reinit_stdio() Multimedia.reinit_displays() # since Multimedia.displays uses STDOUT as fallback - fdwatcher_init() end include("precompile.jl") diff --git a/base/task.jl b/base/task.jl index f628a248eb814..fd619c9d4f6dc 100644 --- a/base/task.jl +++ b/base/task.jl @@ -1,5 +1,7 @@ ## basic task functions and TLS +wait(f::Callable) = f() + show(io::IO, t::Task) = print(io, "Task ($(t.state)) @0x$(hex(unsigned(pointer_from_objref(t)), WORD_SIZE>>2))") macro task(ex) @@ -214,6 +216,85 @@ notify1(c::Condition, arg=nothing) = notify(c, arg, all=false) notify_error(c::Condition, err) = notify(c, err, error=true) notify1_error(c::Condition, err) = notify(c, err, error=true, all=false) +waitq(c, killq) = waitq(()->wait(c), killq) +function waitq(c::Callable, killq) + t = schedule(Task(c)) + push!(killq, t) + return waitq(t, killq) +end +waitq(c::Condition, killq) = c +function waitq(t::Task, killq) + if istaskdone(t) + if t.state == :failed + throw(t.exception) + end + end + if is(t.donenotify, nothing) + t.donenotify = Condition() + end + return t.donenotify +end + +waitresult(c) = (false, nothing) +function waitresult(t::Task) + if istaskdone(t) + if t.state == :failed + throw(t.exception) + end + return (true, t.result) + end + return (false, nothing) +end + +waitcleanup(c,ct::Task) = nothing +waitcleanup(c::Condition,ct::Task) = (filter!(x->x!==ct, c.waitq); nothing) +waitcleanup(t::Task, ct::Task) = waitcleanup(t.donenotify, ct) + +waitkill(c,ct::Task) = nothing +function waitkill(t::Task,ct::Task) + waitcleanup(t,ct) + if !istaskdone(t) + filter!(x->x!==t, Workqueue) + istaskstarted(t) && schedule(t, EOFError(), error=true) + end + nothing +end + +function wait(cs...) + ct = current_task() + ct.state = :waiting + + killq = Any[] + for c in cs + c = waitq(c, killq)::Condition + push!(c.waitq, ct) + end + + try + result = wait() + for c in cs + hasresult, res = waitresult(c) + if hasresult + result = res + end + end + return result + catch e + if ct.state == :waiting + ct.state = :runnable + end + rethrow(e) + finally + filter!(x->x!==ct, Workqueue) # in case we got more that one trigger + for c in cs + waitcleanup(c,ct) + end + for c in killq + waitkill(c,ct) + end + end +end + ## scheduler and work queue diff --git a/test/pollfd.jl b/test/pollfd.jl index cf24339db05ef..4d6004d332701 100644 --- a/test/pollfd.jl +++ b/test/pollfd.jl @@ -1,16 +1,13 @@ -@unix_only begin -require("testdefs.jl") - # This script does the following # Sets up n unix pipes -# For the odd pipes, a byte is written to the write end at intervals specified in intvls +# For the odd pipes, a byte is written to the write end at intervals specified in intvls # Nothing is written into the even numbered pipes # Odd numbered pipes are tested for reads # Even numbered pipes are tested for timeouts # Writable ends are always tested for writability before a write n = 20 -intvls = [0.1, 0.1, 1.0, 2.0] # NOTE: The first interval is just used to let the readers/writers sort of synchnronize. +intvls = [2, .2, .1, .002] pipe_fds = cell(n) for i in 1:n @@ -19,61 +16,69 @@ for i in 1:n end -function pfd_tst_reads(idx) - for (i, intvl) in enumerate(intvls) - tic() - evt = poll_fd(RawFD(pipe_fds[idx][1]), intvl * 10.0; readable=true, writable=true) - t_elapsed = toq() - @test evt.readable && !(evt.writable) && !(evt.timedout) - - # ignore the first one, everyone is just getting setup and synchronized - if i > 1 -# println("i ", i, ", Expected ", intvl, ", actual ", t_elapsed, ", diff ", t_elapsed - intvl) - # Assuming that a 200 millisecond buffer is good enough on a modern system - @test t_elapsed <= (intvl + 0.2) - end - - - dout = Array(Uint8, 1) - @test 1 == ccall(:read, Csize_t, (Cint, Ptr{Uint8},Csize_t), pipe_fds[idx][1], dout, 1) - @test dout[1] == int8('A') - end +function pfd_tst_reads(idx, intvl) + global ready += 1 + wait(ready_c) + tic() + evt = poll_fd(RawFD(pipe_fds[idx][1]), intvl; readable=true, writable=true) + t_elapsed = toq() + @test !evt.timedout + @test evt.readable + @test !evt.writable + + # println("Expected ", intvl, ", actual ", t_elapsed, ", diff ", t_elapsed - intvl) + # Assuming that a 2 second buffer is good enough on a modern system + @test t_elapsed <= (intvl + 1) + + dout = Array(Uint8, 1) + @test 1 == ccall(:read, Csize_t, (Cint, Ptr{Uint8},Csize_t), pipe_fds[idx][1], dout, 1) + @test dout[1] == int8('A') end -function pfd_tst_timeout(idx) - for intvl in intvls - tic() - evt = poll_fd(RawFD(pipe_fds[idx][1]), intvl; readable=true, writable=true) - @test !(evt.readable) && !(evt.writable) && evt.timedout - t_elapsed = toq() - - @test (intvl <= t_elapsed) && (t_elapsed <= (intvl + 0.2)) - end +function pfd_tst_timeout(idx, intvl) + global ready += 1 + wait(ready_c) + tic() + evt = poll_fd(RawFD(pipe_fds[idx][1]), intvl; readable=true, writable=false) + @test evt.timedout + @test !evt.readable + @test !evt.writable + t_elapsed = toq() + + @test (intvl <= t_elapsed) && (t_elapsed <= (intvl + 1)) end # Odd numbers trigger reads, even numbers timeout -@sync begin - for i in 1:n - if isodd(i) - @async pfd_tst_reads(i) - else - @async pfd_tst_timeout(i) +for (i, intvl) in enumerate(intvls) + @sync begin + global ready = 0 + global ready_c = Condition() + for i in 1:n + if isodd(i) + @async pfd_tst_reads(i, intvl) + else + @async pfd_tst_timeout(i, intvl) + end end - end - - for (i, intvl) in enumerate(intvls) - sleep(intvl) + + while ready < n + sleep(0.1) + end + ready = 0 # tickle only the odd ones, but test for writablity for everyone for idx in 1:n - evt = poll_fd(RawFD(pipe_fds[idx][2]), 0.0; readable=true, writable=true) - @test !(evt.readable) && evt.writable && !(evt.timedout) - + evt = poll_fd(RawFD(pipe_fds[idx][2]), 0.001; readable=true, writable=true) + @test !evt.timedout + @test !evt.readable + @test evt.writable + if isodd(idx) @test 1 == ccall(:write, Csize_t, (Cint, Ptr{Uint8},Csize_t), pipe_fds[idx][2], bytestring("A"), 1) end - end + end + notify(ready_c, all=true) end end @@ -82,6 +87,3 @@ for i in 1:n ccall(:close, Cint, (Cint,), pipe_fds[i][1]) ccall(:close, Cint, (Cint,), pipe_fds[i][2]) end - - -end