diff --git a/base/asyncevent.jl b/base/asyncevent.jl index 0736bd463111f..db2343ed77c35 100644 --- a/base/asyncevent.jl +++ b/base/asyncevent.jl @@ -14,9 +14,9 @@ Use [`isopen`](@ref) to check whether it is still active. This provides an implicit acquire & release memory ordering between the sending and waiting threads. """ mutable struct AsyncCondition - handle::Ptr{Cvoid} + @atomic handle::Ptr{Cvoid} cond::ThreadSynchronizer - isopen::Bool + @atomic isopen::Bool @atomic set::Bool function AsyncCondition() @@ -77,9 +77,9 @@ once. When the timer is closed (by [`close`](@ref)) waiting tasks are woken with """ mutable struct Timer - handle::Ptr{Cvoid} + @atomic handle::Ptr{Cvoid} cond::ThreadSynchronizer - isopen::Bool + @atomic isopen::Bool @atomic set::Bool function Timer(timeout::Real; interval::Real = 0.0) @@ -149,12 +149,12 @@ function wait(t::Union{Timer, AsyncCondition}) end -isopen(t::Union{Timer, AsyncCondition}) = t.isopen +isopen(t::Union{Timer, AsyncCondition}) = t.isopen && t.handle != C_NULL function close(t::Union{Timer, AsyncCondition}) iolock_begin() - if t.handle != C_NULL && isopen(t) - t.isopen = false + if isopen(t) + @atomic :monotonic t.isopen = false ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t) end iolock_end() @@ -166,12 +166,12 @@ function uvfinalize(t::Union{Timer, AsyncCondition}) lock(t.cond) try if t.handle != C_NULL - disassociate_julia_struct(t.handle) # not going to call the usual close hooks + disassociate_julia_struct(t.handle) # not going to call the usual close hooks anymore if t.isopen - t.isopen = false - ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t) + @atomic :monotonic t.isopen = false + ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t.handle) end - t.handle = C_NULL + @atomic :monotonic t.handle = C_NULL notify(t.cond, false) end finally @@ -184,9 +184,9 @@ end function _uv_hook_close(t::Union{Timer, AsyncCondition}) lock(t.cond) try - t.isopen = false - t.handle = C_NULL - notify(t.cond, t.set) + @atomic :monotonic t.isopen = false + Libc.free(@atomicswap :monotonic t.handle = C_NULL) + notify(t.cond, false) finally unlock(t.cond) end diff --git a/base/libuv.jl b/base/libuv.jl index 53870188e75d9..64b228c6500e7 100644 --- a/base/libuv.jl +++ b/base/libuv.jl @@ -61,8 +61,11 @@ function preserve_handle(x) end function unpreserve_handle(x) lock(preserve_handle_lock) - v = uvhandles[x]::Int - if v == 1 + v = get(uvhandles, x, 0)::Int + if v == 0 + unlock(preserve_handle_lock) + error("unbalanced call to unpreserve_handle for $(typeof(x))") + elseif v == 1 pop!(uvhandles, x) else uvhandles[x] = v - 1 diff --git a/base/process.jl b/base/process.jl index 57c4e0ebd874a..aa378e72b2dce 100644 --- a/base/process.jl +++ b/base/process.jl @@ -56,7 +56,7 @@ function uv_return_spawn(p::Ptr{Cvoid}, exit_status::Int64, termsignal::Int32) proc = unsafe_pointer_to_objref(data)::Process proc.exitcode = exit_status proc.termsignal = termsignal - disassociate_julia_struct(proc) # ensure that data field is set to C_NULL + disassociate_julia_struct(proc.handle) # ensure that data field is set to C_NULL ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), proc.handle) proc.handle = C_NULL lock(proc.exitnotify) @@ -70,7 +70,7 @@ end # called when the libuv handle is destroyed function _uv_hook_close(proc::Process) - proc.handle = C_NULL + Libc.free(@atomicswap :not_atomic proc.handle = C_NULL) nothing end @@ -607,10 +607,10 @@ Get the child process ID, if it still exists. This function requires at least Julia 1.1. """ function Libc.getpid(p::Process) - # TODO: due to threading, this method is no longer synchronized with the user application + # TODO: due to threading, this method is only weakly synchronized with the user application iolock_begin() ppid = Int32(0) - if p.handle != C_NULL + if p.handle != C_NULL # e.g. process_running ppid = ccall(:jl_uv_process_pid, Int32, (Ptr{Cvoid},), p.handle) end iolock_end() diff --git a/base/stream.jl b/base/stream.jl index 0d28cf19274b8..948c12ad604b4 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -377,7 +377,7 @@ if OS_HANDLE != RawFD end function isopen(x::Union{LibuvStream, LibuvServer}) - if x.status == StatusUninit || x.status == StatusInit + if x.status == StatusUninit || x.status == StatusInit || x.handle === C_NULL throw(ArgumentError("$x is not initialized")) end return x.status != StatusClosed @@ -496,34 +496,37 @@ end function close(stream::Union{LibuvStream, LibuvServer}) iolock_begin() - should_wait = false if stream.status == StatusInit ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream.handle) stream.status = StatusClosing elseif isopen(stream) - should_wait = uv_handle_data(stream) != C_NULL if stream.status != StatusClosing ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) stream.status = StatusClosing end end iolock_end() - should_wait && wait_close(stream) + wait_close(stream) nothing end function uvfinalize(uv::Union{LibuvStream, LibuvServer}) - uv.handle == C_NULL && return iolock_begin() if uv.handle != C_NULL - disassociate_julia_struct(uv.handle) # not going to call the usual close hooks - if uv.status != StatusUninit - close(uv) - else + disassociate_julia_struct(uv.handle) # not going to call the usual close hooks (so preserve_handle is not needed) + if uv.status == StatusUninit + Libc.free(uv.handle) + elseif uv.status == StatusInit + ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), uv.handle) + elseif isopen(uv) + if uv.status != StatusClosing + ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), uv.handle) + end + elseif uv.status == StatusClosed Libc.free(uv.handle) end - uv.status = StatusClosed uv.handle = C_NULL + uv.status = StatusClosed end iolock_end() nothing @@ -713,7 +716,6 @@ end function _uv_hook_close(uv::Union{LibuvStream, LibuvServer}) lock(uv.cond) try - uv.handle = C_NULL uv.status = StatusClosed # notify any listeners that exist on this libuv stream type notify(uv.cond) diff --git a/src/init.c b/src/init.c index 277a49d6df07e..228222b3658fc 100644 --- a/src/init.c +++ b/src/init.c @@ -165,8 +165,7 @@ static void jl_close_item_atexit(uv_handle_t *handle) switch(handle->type) { case UV_PROCESS: // cause Julia to forget about the Process object - if (handle->data) - jl_uv_call_close_callback((jl_value_t*)handle->data); + handle->data = NULL; // and make libuv think it is already dead ((uv_process_t*)handle)->pid = 0; // fall-through diff --git a/src/jl_uv.c b/src/jl_uv.c index 9a9b012dfb2f5..463c5e2acd952 100644 --- a/src/jl_uv.c +++ b/src/jl_uv.c @@ -77,14 +77,16 @@ JL_DLLEXPORT void jl_iolock_end(void) } -void jl_uv_call_close_callback(jl_value_t *val) +static void jl_uv_call_close_callback(jl_value_t *val) { - jl_value_t *args[2]; + jl_value_t **args; + JL_GC_PUSHARGS(args, 2); // val is "rooted" in the finalizer list only right now args[0] = jl_get_global(jl_base_relative_to(((jl_datatype_t*)jl_typeof(val))->name->module), jl_symbol("_uv_hook_close")); // topmod(typeof(val))._uv_hook_close args[1] = val; assert(args[0]); jl_apply(args, 2); // TODO: wrap in try-catch? + JL_GC_POP(); } static void jl_uv_closeHandle(uv_handle_t *handle) @@ -105,6 +107,7 @@ static void jl_uv_closeHandle(uv_handle_t *handle) ct->world_age = jl_atomic_load_acquire(&jl_world_counter); jl_uv_call_close_callback((jl_value_t*)handle->data); ct->world_age = last_age; + return; } if (handle == (uv_handle_t*)&signal_async) return; @@ -125,6 +128,10 @@ static void jl_uv_flush_close_callback(uv_write_t *req, int status) free(req); return; } + if (uv_is_closing((uv_handle_t*)stream)) { // avoid double-close on the stream + free(req); + return; + } if (status == 0 && uv_is_writable(stream) && stream->write_queue_size != 0) { // new data was written, wait for it to flush too uv_buf_t buf; @@ -134,12 +141,9 @@ static void jl_uv_flush_close_callback(uv_write_t *req, int status) if (uv_write(req, stream, &buf, 1, (uv_write_cb)jl_uv_flush_close_callback) == 0) return; // success } - if (!uv_is_closing((uv_handle_t*)stream)) { // avoid double-close on the stream - if (stream->type == UV_TTY) - uv_tty_set_mode((uv_tty_t*)stream, UV_TTY_MODE_NORMAL); - uv_close((uv_handle_t*)stream, &jl_uv_closeHandle); - } - free(req); + if (stream->type == UV_TTY) + uv_tty_set_mode((uv_tty_t*)stream, UV_TTY_MODE_NORMAL); + uv_close((uv_handle_t*)stream, &jl_uv_closeHandle); } static void uv_flush_callback(uv_write_t *req, int status) @@ -224,15 +228,15 @@ static void jl_proc_exit_cleanup_cb(uv_process_t *process, int64_t exit_status, JL_DLLEXPORT void jl_close_uv(uv_handle_t *handle) { + JL_UV_LOCK(); if (handle->type == UV_PROCESS && ((uv_process_t*)handle)->pid != 0) { // take ownership of this handle, // so we can waitpid for the resource to exit and avoid leaving zombies assert(handle->data == NULL); // make sure Julia has forgotten about it already ((uv_process_t*)handle)->exit_cb = jl_proc_exit_cleanup_cb; - return; + uv_unref(handle); } - JL_UV_LOCK(); - if (handle->type == UV_FILE) { + else if (handle->type == UV_FILE) { uv_fs_t req; jl_uv_file_t *fd = (jl_uv_file_t*)handle; if ((ssize_t)fd->file != -1) { @@ -240,31 +244,26 @@ JL_DLLEXPORT void jl_close_uv(uv_handle_t *handle) fd->file = (uv_os_fd_t)(ssize_t)-1; } jl_uv_closeHandle(handle); // synchronous (ok since the callback is known to not interact with any global state) - JL_UV_UNLOCK(); - return; - } - - if (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP || handle->type == UV_TTY) { - uv_write_t *req = (uv_write_t*)malloc_s(sizeof(uv_write_t)); - req->handle = (uv_stream_t*)handle; - jl_uv_flush_close_callback(req, 0); - JL_UV_UNLOCK(); - return; } - - // avoid double-closing the stream - if (!uv_is_closing(handle)) { - uv_close(handle, &jl_uv_closeHandle); + else if (!uv_is_closing(handle)) { // avoid double-closing the stream + if (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP || handle->type == UV_TTY) { + // flush the stream write-queue first + uv_write_t *req = (uv_write_t*)malloc_s(sizeof(uv_write_t)); + req->handle = (uv_stream_t*)handle; + jl_uv_flush_close_callback(req, 0); + } + else { + uv_close(handle, &jl_uv_closeHandle); + } } JL_UV_UNLOCK(); } JL_DLLEXPORT void jl_forceclose_uv(uv_handle_t *handle) { - // avoid double-closing the stream - if (!uv_is_closing(handle)) { + if (!uv_is_closing(handle)) { // avoid double-closing the stream JL_UV_LOCK(); - if (!uv_is_closing(handle)) { + if (!uv_is_closing(handle)) { // double-check uv_close(handle, &jl_uv_closeHandle); } JL_UV_UNLOCK(); diff --git a/src/julia_internal.h b/src/julia_internal.h index 1edc015a21cc7..451e07eb9e3df 100644 --- a/src/julia_internal.h +++ b/src/julia_internal.h @@ -568,7 +568,6 @@ JL_DLLEXPORT jl_fptr_args_t jl_get_builtin_fptr(jl_value_t *b); extern uv_loop_t *jl_io_loop; void jl_uv_flush(uv_stream_t *stream); -void jl_uv_call_close_callback(jl_value_t *val); typedef struct jl_typeenv_t { jl_tvar_t *var; diff --git a/stdlib/FileWatching/src/FileWatching.jl b/stdlib/FileWatching/src/FileWatching.jl index 04b39f1c5d067..e266bff7ec7d1 100644 --- a/stdlib/FileWatching/src/FileWatching.jl +++ b/stdlib/FileWatching/src/FileWatching.jl @@ -78,7 +78,7 @@ iswritable(f::FDEvent) = f.writable |(a::FDEvent, b::FDEvent) = FDEvent(getfield(a, :events) | getfield(b, :events)) mutable struct FileMonitor - handle::Ptr{Cvoid} + @atomic handle::Ptr{Cvoid} file::String notify::Base.ThreadSynchronizer events::Int32 @@ -101,12 +101,14 @@ mutable struct FileMonitor end mutable struct FolderMonitor - handle::Ptr{Cvoid} - notify::Channel{Any} # eltype = Union{Pair{String, FileEvent}, IOError} + @atomic handle::Ptr{Cvoid} + # notify::Channel{Any} # eltype = Union{Pair{String, FileEvent}, IOError} + notify::Base.ThreadSynchronizer + channel::Vector{Any} # eltype = Pair{String, FileEvent} FolderMonitor(folder::AbstractString) = FolderMonitor(String(folder)) function FolderMonitor(folder::String) handle = Libc.malloc(_sizeof_uv_fs_event) - this = new(handle, Channel(Inf)) + this = new(handle, Base.ThreadSynchronizer(), []) associate_julia_struct(handle, this) iolock_begin() err = ccall(:uv_fs_event_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), handle) @@ -124,7 +126,7 @@ mutable struct FolderMonitor end mutable struct PollingFileWatcher - handle::Ptr{Cvoid} + @atomic handle::Ptr{Cvoid} file::String interval::UInt32 notify::Base.ThreadSynchronizer @@ -149,14 +151,14 @@ mutable struct PollingFileWatcher end mutable struct _FDWatcher - handle::Ptr{Cvoid} + @atomic handle::Ptr{Cvoid} fdnum::Int # this is NOT the file descriptor refcount::Tuple{Int, Int} notify::Base.ThreadSynchronizer events::Int32 active::Tuple{Bool, Bool} - let FDWatchers = Vector{Any}() # XXX: this structure and refcount need thread-safety locks + let FDWatchers = Vector{Any}() # n.b.: this structure and the refcount are protected by the iolock global _FDWatcher, uvfinalize @static if Sys.isunix() _FDWatcher(fd::RawFD, mask::FDEvent) = _FDWatcher(fd, mask.readable, mask.writable) @@ -208,7 +210,7 @@ mutable struct _FDWatcher if t.handle != C_NULL disassociate_julia_struct(t) ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t.handle) - t.handle = C_NULL + @atomic :monotonic t.handle = C_NULL end t.refcount = (0, 0) t.active = (false, false) @@ -315,19 +317,25 @@ function close(t::FDWatcher) end function uvfinalize(uv::Union{FileMonitor, FolderMonitor, PollingFileWatcher}) - disassociate_julia_struct(uv) - close(uv) + iolock_begin() + if uv.handle != C_NULL + disassociate_julia_struct(uv) # close (and free) without notify + ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), uv.handle) + end + iolock_end() end function close(t::Union{FileMonitor, FolderMonitor, PollingFileWatcher}) + iolock_begin() if t.handle != C_NULL ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t.handle) end + iolock_end() end function _uv_hook_close(uv::_FDWatcher) # fyi: jl_atexit_hook can cause this to get called too - uv.handle = C_NULL + Libc.free(@atomicswap :monotonic uv.handle = C_NULL) uvfinalize(uv) nothing end @@ -335,8 +343,8 @@ end function _uv_hook_close(uv::PollingFileWatcher) lock(uv.notify) try - uv.handle = C_NULL uv.active = false + Libc.free(@atomicswap :monotonic uv.handle = C_NULL) notify(uv.notify, StatStruct()) finally unlock(uv.notify) @@ -347,8 +355,8 @@ end function _uv_hook_close(uv::FileMonitor) lock(uv.notify) try - uv.handle = C_NULL uv.active = false + Libc.free(@atomicswap :monotonic uv.handle = C_NULL) notify(uv.notify, FileEvent()) finally unlock(uv.notify) @@ -357,8 +365,13 @@ function _uv_hook_close(uv::FileMonitor) end function _uv_hook_close(uv::FolderMonitor) - uv.handle = C_NULL - close(uv.notify) + lock(uv.notify) + try + Libc.free(@atomicswap :monotonic uv.handle = C_NULL) + notify_error(uv.notify, EOFError()) + finally + unlock(uv.notify) + end nothing end @@ -386,11 +399,17 @@ end function uv_fseventscb_folder(handle::Ptr{Cvoid}, filename::Ptr, events::Int32, status::Int32) t = @handle_as handle FolderMonitor - if status != 0 - put!(t.notify, _UVError("FolderMonitor", status)) - else - fname = (filename == C_NULL) ? "" : unsafe_string(convert(Cstring, filename)) - put!(t.notify, fname => FileEvent(events)) + lock(t.notify) + try + if status != 0 + notify_error(t.notify, _UVError("FolderMonitor", status)) + else + fname = (filename == C_NULL) ? "" : unsafe_string(convert(Cstring, filename)) + push!(t.channel, fname => FileEvent(events)) + notify(t.notify) + end + finally + unlock(t.notify) end nothing end @@ -448,7 +467,7 @@ end function start_watching(t::_FDWatcher) iolock_begin() - t.handle == C_NULL && return throw(ArgumentError("FDWatcher is closed")) + t.handle == C_NULL && throw(ArgumentError("FDWatcher is closed")) readable = t.refcount[1] > 0 writable = t.refcount[2] > 0 if t.active[1] != readable || t.active[2] != writable @@ -466,7 +485,7 @@ end function start_watching(t::PollingFileWatcher) iolock_begin() - t.handle == C_NULL && return throw(ArgumentError("PollingFileWatcher is closed")) + t.handle == C_NULL && throw(ArgumentError("PollingFileWatcher is closed")) if !t.active uv_error("PollingFileWatcher (start)", ccall(:uv_fs_poll_start, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, UInt32), @@ -495,7 +514,7 @@ end function start_watching(t::FileMonitor) iolock_begin() - t.handle == C_NULL && return throw(ArgumentError("FileMonitor is closed")) + t.handle == C_NULL && throw(ArgumentError("FileMonitor is closed")) if !t.active uv_error("FileMonitor (start)", ccall(:uv_fs_event_start, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Cstring, Int32), @@ -649,26 +668,20 @@ function wait(m::FileMonitor) end function wait(m::FolderMonitor) - m.handle == C_NULL && return throw(ArgumentError("FolderMonitor is closed")) - if isready(m.notify) - evt = take!(m.notify) # non-blocking fast-path - else - preserve_handle(m) - evt = try - take!(m.notify) - catch ex - unpreserve_handle(m) - if ex isa InvalidStateException && ex.state === :closed - rethrow(EOFError()) # `wait(::Channel)` throws the wrong exception - end - rethrow() + m.handle == C_NULL && throw(EOFError()) + preserve_handle(m) + lock(m.notify) + evt = try + m.handle == C_NULL && throw(EOFError()) + while isempty(m.channel) + wait(m.notify) end - end - if evt isa Pair{String, FileEvent} - return evt - else - throw(evt) - end + popfirst!(m.channel) + finally + unlock(m.notify) + unpreserve_handle(m) + end + return evt::Pair{String, FileEvent} end @@ -689,6 +702,7 @@ function poll_fd(s::Union{RawFD, Sys.iswindows() ? WindowsRawSocket : Union{}}, mask.timedout && return mask fdw = _FDWatcher(s, mask) local timer + # we need this flag to explicitly track whether we call `close` already, to update the internal refcount correctly timedout = false # TODO: make this atomic try if timeout_s >= 0 @@ -776,37 +790,39 @@ function watch_folder(s::String, timeout_s::Real=-1) fm = get!(watched_folders, s) do return FolderMonitor(s) end - if timeout_s >= 0 && !isready(fm.notify) + local timer + if timeout_s >= 0 + @lock fm.notify isempty(fm.channel) || return popfirst!(fm.channel) if timeout_s <= 0.010 # for very small timeouts, we can just sleep for the whole timeout-interval (timeout_s == 0) ? yield() : sleep(timeout_s) - if !isready(fm.notify) - return "" => FileEvent() # timeout - end - # fall-through to a guaranteed non-blocking fast-path call to wait + @lock fm.notify isempty(fm.channel) || return popfirst!(fm.channel) + return "" => FileEvent() # timeout else - # If we may need to be able to cancel via a timeout, - # create a second monitor object just for that purpose. - # We still take the events from the primary stream. - fm2 = FileMonitor(s) timer = Timer(timeout_s) do t - close(fm2) + @lock fm.notify notify(fm.notify) end - try - while isopen(fm.notify) && !isready(fm.notify) - fm2.handle == C_NULL && return "" => FileEvent() # timeout - wait(fm2) + end + end + # inline a copy of `wait` with added support for checking timer + fm.handle == C_NULL && throw(EOFError()) + preserve_handle(fm) + lock(fm.notify) + evt = try + fm.handle == C_NULL && throw(EOFError()) + while isempty(fm.channel) + if @isdefined(timer) + isopen(timer) || return "" => FileEvent() # timeout end - finally - close(fm2) - close(timer) + wait(fm.notify) end - # guaranteed that next call to `wait(fm)` is non-blocking - # since we haven't entered the libuv event loop yet - # or the Base scheduler workqueue since last testing `isready` + popfirst!(fm.channel) + finally + unlock(fm.notify) + unpreserve_handle(fm) + @isdefined(timer) && close(timer) end - end - return wait(fm) + return evt::Pair{String, FileEvent} end """ diff --git a/stdlib/Sockets/src/Sockets.jl b/stdlib/Sockets/src/Sockets.jl index 4b5518a1fde61..82dedb72e6ecc 100644 --- a/stdlib/Sockets/src/Sockets.jl +++ b/stdlib/Sockets/src/Sockets.jl @@ -200,7 +200,6 @@ end show(io::IO, stream::UDPSocket) = print(io, typeof(stream), "(", uv_status_string(stream), ")") function _uv_hook_close(sock::UDPSocket) - sock.handle = C_NULL lock(sock.cond) try sock.status = StatusClosed diff --git a/test/testhelpers/FakePTYs.jl b/test/testhelpers/FakePTYs.jl index 03610665142e2..17dd270cd2424 100644 --- a/test/testhelpers/FakePTYs.jl +++ b/test/testhelpers/FakePTYs.jl @@ -41,8 +41,8 @@ function open_fake_pty() fds = ccall(:open, Cint, (Ptr{UInt8}, Cint), ccall(:ptsname, Ptr{UInt8}, (Cint,), fdm), O_RDWR | O_NOCTTY) + pts = RawFD(fds) - pts = RawFD(fds) # pts = fdio(fds, true) # pts = Base.Filesystem.File(RawFD(fds)) # pts = Base.TTY(RawFD(fds); readable = false)