Skip to content

Commit

Permalink
IO: tie lifetime of handle field to container (JuliaLang#43218)
Browse files Browse the repository at this point in the history
Rather than freeing this memory as soon as possible, ensure that the
lifetime of the handle is always >= the container object. This lets us
examine some (limited) aspects of the handle without holding a lock.

And we also examine and fix numerous other thread-safety and
synchronization bugs too.
  • Loading branch information
vtjnash authored and LilithHafner committed Feb 22, 2022
1 parent a4c3d19 commit 729ae51
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 125 deletions.
30 changes: 15 additions & 15 deletions base/asyncevent.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ Use [`isopen`](@ref) to check whether it is still active.
This provides an implicit acquire & release memory ordering between the sending and waiting threads.
"""
mutable struct AsyncCondition
handle::Ptr{Cvoid}
@atomic handle::Ptr{Cvoid}
cond::ThreadSynchronizer
isopen::Bool
@atomic isopen::Bool
@atomic set::Bool

function AsyncCondition()
Expand Down Expand Up @@ -71,9 +71,9 @@ Note: `interval` is subject to accumulating time skew. If you need precise event
absolute time, create a new timer at each expiration with the difference to the next time computed.
"""
mutable struct Timer
handle::Ptr{Cvoid}
@atomic handle::Ptr{Cvoid}
cond::ThreadSynchronizer
isopen::Bool
@atomic isopen::Bool
@atomic set::Bool

function Timer(timeout::Real; interval::Real = 0.0)
Expand Down Expand Up @@ -143,12 +143,13 @@ function wait(t::Union{Timer, AsyncCondition})
end


isopen(t::Union{Timer, AsyncCondition}) = t.isopen
isopen(t::Union{Timer, AsyncCondition}) = t.isopen && t.handle != C_NULL

function close(t::Union{Timer, AsyncCondition})
iolock_begin()
if t.handle != C_NULL && isopen(t)
t.isopen = false
if isopen(t)
@atomic :monotonic t.isopen = false
preserve_handle(t)
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t)
end
iolock_end()
Expand All @@ -159,13 +160,11 @@ function uvfinalize(t::Union{Timer, AsyncCondition})
iolock_begin()
lock(t.cond)
try
if t.handle != C_NULL
if isopen(t)
disassociate_julia_struct(t.handle) # not going to call the usual close hooks
if t.isopen
t.isopen = false
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t)
end
t.handle = C_NULL
@atomic :monotonic t.isopen = false
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t.handle)
@atomic :monotonic t.handle = C_NULL
notify(t.cond, false)
end
finally
Expand All @@ -178,8 +177,9 @@ end
function _uv_hook_close(t::Union{Timer, AsyncCondition})
lock(t.cond)
try
t.isopen = false
t.handle = C_NULL
@atomic :monotonic t.isopen = false
unpreserve_handle(t)
@atomic :monotonic t.handle = C_NULL
notify(t.cond, t.set)
finally
unlock(t.cond)
Expand Down
7 changes: 5 additions & 2 deletions base/libuv.jl
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ function preserve_handle(x)
end
function unpreserve_handle(x)
lock(preserve_handle_lock)
v = uvhandles[x]::Int
if v == 1
v = get(uvhandles, x, 0)::Int
if v == 0
unlock(preserve_handle_lock)
error("unbalanced call to unpreserve_handle for $(typeof(x))")
elseif v == 1
pop!(uvhandles, x)
else
uvhandles[x] = v - 1
Expand Down
7 changes: 4 additions & 3 deletions base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ function uv_return_spawn(p::Ptr{Cvoid}, exit_status::Int64, termsignal::Int32)
proc = unsafe_pointer_to_objref(data)::Process
proc.exitcode = exit_status
proc.termsignal = termsignal
disassociate_julia_struct(proc.handle)
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), proc.handle)
proc.handle = C_NULL
lock(proc.exitnotify)
Expand All @@ -65,7 +66,7 @@ end

# called when the libuv handle is destroyed
function _uv_hook_close(proc::Process)
proc.handle = C_NULL
Libc.free(@atomicswap :not_atomic proc.handle = C_NULL)
nothing
end

Expand Down Expand Up @@ -587,10 +588,10 @@ Get the child process ID, if it still exists.
This function requires at least Julia 1.1.
"""
function Libc.getpid(p::Process)
# TODO: due to threading, this method is no longer synchronized with the user application
# TODO: due to threading, this method is only weakly synchronized with the user application
iolock_begin()
ppid = Int32(0)
if p.handle != C_NULL
if p.handle != C_NULL # e.g. process_running
ppid = ccall(:jl_uv_process_pid, Int32, (Ptr{Cvoid},), p.handle)
end
iolock_end()
Expand Down
27 changes: 17 additions & 10 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ if OS_HANDLE != RawFD
end

function isopen(x::Union{LibuvStream, LibuvServer})
if x.status == StatusUninit || x.status == StatusInit
if x.status == StatusUninit || x.status == StatusInit || x.handle === C_NULL
throw(ArgumentError("$x is not initialized"))
end
return x.status != StatusClosed
Expand Down Expand Up @@ -496,34 +496,39 @@ end

function close(stream::Union{LibuvStream, LibuvServer})
iolock_begin()
should_wait = false
if stream.status == StatusInit
preserve_handle(stream)
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
elseif isopen(stream)
should_wait = uv_handle_data(stream) != C_NULL
if stream.status != StatusClosing
preserve_handle(stream)
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
end
end
iolock_end()
should_wait && wait_close(stream)
wait_close(stream)
nothing
end

function uvfinalize(uv::Union{LibuvStream, LibuvServer})
uv.handle == C_NULL && return
iolock_begin()
if uv.handle != C_NULL
disassociate_julia_struct(uv.handle) # not going to call the usual close hooks
if uv.status != StatusUninit
close(uv)
else
if uv.status == StatusUninit
Libc.free(uv.handle)
elseif uv.status == StatusInit
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), uv.handle)
elseif isopen(uv)
if uv.status != StatusClosing
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), uv.handle)
end
elseif uv.status == StatusClosed
Libc.free(uv.handle)
end
uv.status = StatusClosed
uv.handle = C_NULL
uv.status = StatusClosed
end
iolock_end()
nothing
Expand Down Expand Up @@ -667,13 +672,15 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid})
notify(stream.cond)
else
# underlying stream is no longer useful: begin finalization
preserve_handle(stream)
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
end
end
else
stream.readerror = _UVError("read", nread)
# This is a fatal connection error
preserve_handle(stream)
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
end
Expand Down Expand Up @@ -711,9 +718,9 @@ function reseteof(x::TTY)
end

function _uv_hook_close(uv::Union{LibuvStream, LibuvServer})
unpreserve_handle(uv)
lock(uv.cond)
try
uv.handle = C_NULL
uv.status = StatusClosed
# notify any listeners that exist on this libuv stream type
notify(uv.cond)
Expand Down
3 changes: 1 addition & 2 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,7 @@ static void jl_close_item_atexit(uv_handle_t *handle)
switch(handle->type) {
case UV_PROCESS:
// cause Julia to forget about the Process object
if (handle->data)
jl_uv_call_close_callback((jl_value_t*)handle->data);
handle->data = NULL;
// and make libuv think it is already dead
((uv_process_t*)handle)->pid = 0;
// fall-through
Expand Down
50 changes: 23 additions & 27 deletions src/jl_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ JL_DLLEXPORT void jl_iolock_end(void)
}


void jl_uv_call_close_callback(jl_value_t *val)
static void jl_uv_call_close_callback(jl_value_t *val)
{
jl_value_t *args[2];
args[0] = jl_get_global(jl_base_relative_to(((jl_datatype_t*)jl_typeof(val))->name->module),
Expand Down Expand Up @@ -105,6 +105,7 @@ static void jl_uv_closeHandle(uv_handle_t *handle)
ct->world_age = jl_atomic_load_acquire(&jl_world_counter);
jl_uv_call_close_callback((jl_value_t*)handle->data);
ct->world_age = last_age;
return;
}
if (handle == (uv_handle_t*)&signal_async)
return;
Expand All @@ -125,6 +126,10 @@ static void jl_uv_flush_close_callback(uv_write_t *req, int status)
free(req);
return;
}
if (uv_is_closing((uv_handle_t*)stream)) { // avoid double-close on the stream
free(req);
return;
}
if (status == 0 && uv_is_writable(stream) && stream->write_queue_size != 0) {
// new data was written, wait for it to flush too
uv_buf_t buf;
Expand All @@ -134,12 +139,9 @@ static void jl_uv_flush_close_callback(uv_write_t *req, int status)
if (uv_write(req, stream, &buf, 1, (uv_write_cb)jl_uv_flush_close_callback) == 0)
return;
}
if (!uv_is_closing((uv_handle_t*)stream)) { // avoid double-close on the stream
if (stream->type == UV_TTY)
uv_tty_set_mode((uv_tty_t*)stream, UV_TTY_MODE_NORMAL);
uv_close((uv_handle_t*)stream, &jl_uv_closeHandle);
}
free(req);
if (stream->type == UV_TTY)
uv_tty_set_mode((uv_tty_t*)stream, UV_TTY_MODE_NORMAL);
uv_close((uv_handle_t*)stream, &jl_uv_closeHandle);
}

static void uv_flush_callback(uv_write_t *req, int status)
Expand Down Expand Up @@ -222,47 +224,41 @@ static void jl_proc_exit_cleanup_cb(uv_process_t *process, int64_t exit_status,

JL_DLLEXPORT void jl_close_uv(uv_handle_t *handle)
{
JL_UV_LOCK();
if (handle->type == UV_PROCESS && ((uv_process_t*)handle)->pid != 0) {
// take ownership of this handle,
// so we can waitpid for the resource to exit and avoid leaving zombies
assert(handle->data == NULL); // make sure Julia has forgotten about it already
((uv_process_t*)handle)->exit_cb = jl_proc_exit_cleanup_cb;
return;
}
JL_UV_LOCK();
if (handle->type == UV_FILE) {
else if (handle->type == UV_FILE) {
uv_fs_t req;
jl_uv_file_t *fd = (jl_uv_file_t*)handle;
if ((ssize_t)fd->file != -1) {
uv_fs_close(handle->loop, &req, fd->file, NULL);
fd->file = (uv_os_fd_t)(ssize_t)-1;
}
jl_uv_closeHandle(handle); // synchronous (ok since the callback is known to not interact with any global state)
JL_UV_UNLOCK();
return;
}

if (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP || handle->type == UV_TTY) {
uv_write_t *req = (uv_write_t*)malloc_s(sizeof(uv_write_t));
req->handle = (uv_stream_t*)handle;
jl_uv_flush_close_callback(req, 0);
JL_UV_UNLOCK();
return;
}

// avoid double-closing the stream
if (!uv_is_closing(handle)) {
uv_close(handle, &jl_uv_closeHandle);
else if (!uv_is_closing(handle)) { // avoid double-closing the stream
if (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP || handle->type == UV_TTY) {
// flush the stream write-queue first
uv_write_t *req = (uv_write_t*)malloc_s(sizeof(uv_write_t));
req->handle = (uv_stream_t*)handle;
jl_uv_flush_close_callback(req, 0);
}
else {
uv_close(handle, &jl_uv_closeHandle);
}
}
JL_UV_UNLOCK();
}

JL_DLLEXPORT void jl_forceclose_uv(uv_handle_t *handle)
{
// avoid double-closing the stream
if (!uv_is_closing(handle)) {
if (!uv_is_closing(handle)) { // avoid double-closing the stream
JL_UV_LOCK();
if (!uv_is_closing(handle)) {
if (!uv_is_closing(handle)) { // double-check
uv_close(handle, &jl_uv_closeHandle);
}
JL_UV_UNLOCK();
Expand Down
1 change: 0 additions & 1 deletion src/julia_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,6 @@ JL_DLLEXPORT jl_fptr_args_t jl_get_builtin_fptr(jl_value_t *b);

extern uv_loop_t *jl_io_loop;
void jl_uv_flush(uv_stream_t *stream);
void jl_uv_call_close_callback(jl_value_t *val);

typedef struct jl_typeenv_t {
jl_tvar_t *var;
Expand Down
Loading

0 comments on commit 729ae51

Please sign in to comment.