Skip to content

Commit

Permalink
implement #6563 to merge close #4401
Browse files Browse the repository at this point in the history
  • Loading branch information
vtjnash committed Jun 16, 2014
1 parent 2eb615e commit c091975
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 128 deletions.
106 changes: 33 additions & 73 deletions base/poll.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,11 @@ type FileMonitor
this = new(handle,cb,false,Condition())
associate_julia_struct(handle,this)
finalizer(this,uvfinalize)
this
this
end
FileMonitor(file) = FileMonitor(false,file)
end

function close(t::FileMonitor)
if t.handle != C_NULL
ccall(:jl_close_uv,Void,(Ptr{Void},),t.handle)
end
end

immutable FileEvent
renamed::Bool
changed::Bool
Expand Down Expand Up @@ -82,11 +76,11 @@ type PollingFileWatcher <: UVPollingWatcher
associate_julia_struct(handle,this)
finalizer(this,uvfinalize)
this
end
end
PollingFileWatcher(file) = PollingFileWatcher(false,file)
end

@unix_only typealias FDW_FD RawFD
@unix_only typealias FDW_FD RawFD
@windows_only typealias FDW_FD WindowsRawSocket

@unix_only _get_osfhandle(fd::RawFD) = fd
Expand Down Expand Up @@ -141,7 +135,7 @@ function fdw_wait_cb(fdw::FDWatcher, events::FDEvent, status)
end
end

function _wait(fdw::FDWatcher,readable,writable)
function wait(fdw::FDWatcher; readable=false,writable=false)
if !readable && !writable
error("must watch for at least one event")
end
Expand All @@ -165,46 +159,27 @@ function _wait(fdw::FDWatcher,readable,writable)
events
end

# On Unix we can only have one watcher per FD, so we need to keep an explicit
# list of them. On Windows, I think it is techincally possible to have more than one
# watcher per FD, but in order to keep compatibility, we do the same on windows as we do
# on unix

let
global fdwatcher_init, wait
@unix_only begin
local fdwatcher_array
function fdwatcher_init()
fdwatcher_array = Array(FDWatcher,0)
@unix_only begin
function wait(fd::RawFD; readable=false, writable=false)
fdw = FDWatcher(fd)
try
return wait(fdw,readable=readable,writable=writable)
finally
close(fdw)
end

function wait(fd::RawFD; readable=false, writable=false)
old_length = length(fdwatcher_array)
if fd.fd+1 > old_length
resize!(fdwatcher_array,fd.fd+1)
end
if !isdefined(fdwatcher_array,fd.fd+1)
fdwatcher_array[fd.fd+1] = FDWatcher(fd)
end
_wait(fdwatcher_array[fd.fd+1],readable,writable)
end
end
@windows_only begin
local fdwatcher_array
function fdwatcher_init()
fdwatcher_array = Dict{WindowsRawSocket,FDWatcher}()
end

function wait(fd::RawFD; readable=false, writable=false)
wait(_get_osfhandle(fd); readable=readable, writable=writable)
end
@windows_only begin
function wait(fd::RawFD; readable=false, writable=false)
wait(_get_osfhandle(fd); readable=readable, writable=writable)
end
function wait(socket::WindowsRawSocket; readable=false, writable=false)
fdw = FDWatcher(fd)
try
return wait(fdw,readable=readable,writable=writable)
finally
close(fdw)
end

function wait(socket::WindowsRawSocket; readable=false, writable=false)
if !haskey(fdwatcher_array,socket.handle)
fdwatcher_array[socket] = FDWatcher(socket)
end
_wait(fdwatcher_array[socket],readable,writable)
end
end
end

Expand Down Expand Up @@ -232,8 +207,13 @@ function wait(m::FileMonitor)
filename, events
end


close(t::UVPollingWatcher) = ccall(:jl_close_uv,Void,(Ptr{Void},),t.handle)
function close(t::Union(FileMonitor,UVPollingWatcher))
if t.handle != C_NULL
ccall(:jl_close_uv,Void,(Ptr{Void},),t.handle)
disassociate_julia_struct(t)
t.handle = C_NULL
end
end

function start_watching(t::FDWatcher, events::FDEvent)
associate_julia_struct(t.handle, t)
Expand Down Expand Up @@ -293,30 +273,10 @@ end

_uv_hook_close(uv::FileMonitor) = (uv.handle = 0; nothing)
_uv_hook_close(uv::UVPollingWatcher) = (uv.handle = 0; nothing)

function poll_fd(s, seconds::Real; readable=false, writable=false)
wt = Condition()

@schedule (args = wait(s; readable=readable, writable=writable); notify(wt,(:poll,args)))
@schedule (sleep(seconds); notify(wt,(:timeout,fdtimeout())))

_, ret = wait(wt)

return ret
end

function poll_file(s, interval_seconds::Real, seconds::Real)
wt = Condition()
pfw = PollingFileWatcher(s)

@schedule (wait(pfw;interval=interval_seconds); notify(wt,(:poll)))
@schedule (sleep(seconds); notify(wt,(:timeout)))

result = wait(wt)
if result == :timeout
stop_watching(pfw)
end
result == :poll
t1 = () -> wait(s; readable=readable, writable=writable)
t2 = () -> (sleep(seconds); fdtimeout())
wait(t2, t1)
end

watch_file(s; poll=false) = watch_file(false, s, poll=poll)
Expand All @@ -325,7 +285,7 @@ function watch_file(cb, s; poll=false)
pfw = PollingFileWatcher(cb,s)
start_watching(pfw)
return pfw
else
else
return FileMonitor(cb,s)
end
end
18 changes: 14 additions & 4 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ function _uv_hook_readcb(stream::AsyncStream, nread::Int, base::Ptr{Void}, len::
(nb_available(stream.buffer) == stream.buffer.maxsize)
stop_reading(stream)
end
nothing
end
##########################################
# Async Workers
Expand Down Expand Up @@ -454,6 +455,7 @@ function _uv_hook_close(uv::Union(AsyncStream,UVServer))
notify(uv.closenotify)
try notify(uv.readnotify) end
try notify(uv.connectnotify) end
nothing
end
_uv_hook_close(uv::AsyncWork) = (uv.handle = C_NULL; nothing)

Expand Down Expand Up @@ -490,11 +492,9 @@ function stop_timer(timer::Timer)
unpreserve_handle(timer)
end

function sleep(sec::Real)
function wait(sec::Real)
w = Condition()
timer = Timer(function (tmr)
notify(w)
end)
timer = Timer( tmr->notify(w) )
start_timer(timer, float(sec), 0)
try
stream_wait(timer,w)
Expand All @@ -503,6 +503,15 @@ function sleep(sec::Real)
end
nothing
end
sleep(sec::Real) = wait(sec)
function waitq(sec::Real, killq)
c = Condition()
t = Timer( (tmr)->notify(c) )
start_timer(t, float(sec), 0)
push!(killq, t)
return c
end
waitkill(t::Timer,ct::Task) = (stop_timer(t); nothing)

## event loop ##
eventloop() = global uv_eventloop::Ptr{Void}
Expand Down Expand Up @@ -793,6 +802,7 @@ function _uv_hook_writecb_task(s::AsyncStream,req::Ptr{Void},status::Int32)
elseif d != C_NULL
schedule(unsafe_pointer_to_objref(d)::Task)
end
nothing
end

## Libuv error handling ##
Expand Down
1 change: 0 additions & 1 deletion base/sysimg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,6 @@ function __init__()
# Base library init
reinit_stdio()
Multimedia.reinit_displays() # since Multimedia.displays uses STDOUT as fallback
fdwatcher_init()
end

include("precompile.jl")
Expand Down
81 changes: 81 additions & 0 deletions base/task.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## basic task functions and TLS

wait(f::Callable) = f()

show(io::IO, t::Task) = print(io, "Task ($(t.state)) @0x$(hex(unsigned(pointer_from_objref(t)), WORD_SIZE>>2))")

macro task(ex)
Expand Down Expand Up @@ -214,6 +216,85 @@ notify1(c::Condition, arg=nothing) = notify(c, arg, all=false)
notify_error(c::Condition, err) = notify(c, err, error=true)
notify1_error(c::Condition, err) = notify(c, err, error=true, all=false)

waitq(c, killq) = waitq(()->wait(c), killq)
function waitq(c::Callable, killq)
t = schedule(Task(c))
push!(killq, t)
return waitq(t, killq)
end
waitq(c::Condition, killq) = c
function waitq(t::Task, killq)
if istaskdone(t)
if t.state == :failed
throw(t.exception)
end
end
if is(t.donenotify, nothing)
t.donenotify = Condition()
end
return t.donenotify
end

waitresult(c) = (false, nothing)
function waitresult(t::Task)
if istaskdone(t)
if t.state == :failed
throw(t.exception)
end
return (true, t.result)
end
return (false, nothing)
end

waitcleanup(c,ct::Task) = nothing
waitcleanup(c::Condition,ct::Task) = (filter!(x->x!==ct, c.waitq); nothing)
waitcleanup(t::Task, ct::Task) = waitcleanup(t.donenotify, ct)

waitkill(c,ct::Task) = nothing
function waitkill(t::Task,ct::Task)
waitcleanup(t,ct)
if !istaskdone(t)
filter!(x->x!==t, Workqueue)
istaskstarted(t) && schedule(t, EOFError(), error=true)
end
nothing
end

function wait(cs...)
ct = current_task()
ct.state = :waiting

killq = Any[]
for c in cs
c = waitq(c, killq)::Condition
push!(c.waitq, ct)
end

try
result = wait()
for c in cs
hasresult, res = waitresult(c)
if hasresult
result = res
end
end
return result
catch e
if ct.state == :waiting
ct.state = :runnable
end
rethrow(e)
finally
filter!(x->x!==ct, Workqueue) # in case we got more that one trigger
for c in cs
waitcleanup(c,ct)
end
for c in killq
waitkill(c,ct)
end
end
end


## scheduler and work queue

Expand Down
Loading

0 comments on commit c091975

Please sign in to comment.