Skip to content

Commit

Permalink
Reland "IO: tie lifetime of handle field to container (#43218)"
Browse files Browse the repository at this point in the history
Reverts a400a24 (#43924).

Fix lifetime issues with the original attempt. We do not need to prevent
it from being GC-finalized: only to make sure it does not conflict with
a concurrent uv_close callback.
  • Loading branch information
vtjnash committed Apr 12, 2022
1 parent f10ba9c commit 1b8f074
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 128 deletions.
28 changes: 14 additions & 14 deletions base/asyncevent.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ Use [`isopen`](@ref) to check whether it is still active.
This provides an implicit acquire & release memory ordering between the sending and waiting threads.
"""
mutable struct AsyncCondition
handle::Ptr{Cvoid}
@atomic handle::Ptr{Cvoid}
cond::ThreadSynchronizer
isopen::Bool
@atomic isopen::Bool
@atomic set::Bool

function AsyncCondition()
Expand Down Expand Up @@ -77,9 +77,9 @@ once. When the timer is closed (by [`close`](@ref)) waiting tasks are woken with
"""
mutable struct Timer
handle::Ptr{Cvoid}
@atomic handle::Ptr{Cvoid}
cond::ThreadSynchronizer
isopen::Bool
@atomic isopen::Bool
@atomic set::Bool

function Timer(timeout::Real; interval::Real = 0.0)
Expand Down Expand Up @@ -149,12 +149,12 @@ function wait(t::Union{Timer, AsyncCondition})
end


isopen(t::Union{Timer, AsyncCondition}) = t.isopen
isopen(t::Union{Timer, AsyncCondition}) = t.isopen && t.handle != C_NULL

function close(t::Union{Timer, AsyncCondition})
iolock_begin()
if t.handle != C_NULL && isopen(t)
t.isopen = false
if isopen(t)
@atomic :monotonic t.isopen = false
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t)
end
iolock_end()
Expand All @@ -166,12 +166,12 @@ function uvfinalize(t::Union{Timer, AsyncCondition})
lock(t.cond)
try
if t.handle != C_NULL
disassociate_julia_struct(t.handle) # not going to call the usual close hooks
disassociate_julia_struct(t.handle) # not going to call the usual close hooks anymore
if t.isopen
t.isopen = false
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t)
@atomic :monotonic t.isopen = false
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), t.handle)
end
t.handle = C_NULL
@atomic :monotonic t.handle = C_NULL
notify(t.cond, false)
end
finally
Expand All @@ -184,9 +184,9 @@ end
function _uv_hook_close(t::Union{Timer, AsyncCondition})
lock(t.cond)
try
t.isopen = false
t.handle = C_NULL
notify(t.cond, t.set)
@atomic :monotonic t.isopen = false
Libc.free(@atomicswap :monotonic t.handle = C_NULL)
notify(t.cond, false)
finally
unlock(t.cond)
end
Expand Down
7 changes: 5 additions & 2 deletions base/libuv.jl
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ function preserve_handle(x)
end
function unpreserve_handle(x)
lock(preserve_handle_lock)
v = uvhandles[x]::Int
if v == 1
v = get(uvhandles, x, 0)::Int
if v == 0
unlock(preserve_handle_lock)
error("unbalanced call to unpreserve_handle for $(typeof(x))")
elseif v == 1
pop!(uvhandles, x)
else
uvhandles[x] = v - 1
Expand Down
8 changes: 4 additions & 4 deletions base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ function uv_return_spawn(p::Ptr{Cvoid}, exit_status::Int64, termsignal::Int32)
proc = unsafe_pointer_to_objref(data)::Process
proc.exitcode = exit_status
proc.termsignal = termsignal
disassociate_julia_struct(proc) # ensure that data field is set to C_NULL
disassociate_julia_struct(proc.handle) # ensure that data field is set to C_NULL
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), proc.handle)
proc.handle = C_NULL
lock(proc.exitnotify)
Expand All @@ -70,7 +70,7 @@ end

# called when the libuv handle is destroyed
function _uv_hook_close(proc::Process)
proc.handle = C_NULL
Libc.free(@atomicswap :not_atomic proc.handle = C_NULL)
nothing
end

Expand Down Expand Up @@ -607,10 +607,10 @@ Get the child process ID, if it still exists.
This function requires at least Julia 1.1.
"""
function Libc.getpid(p::Process)
# TODO: due to threading, this method is no longer synchronized with the user application
# TODO: due to threading, this method is only weakly synchronized with the user application
iolock_begin()
ppid = Int32(0)
if p.handle != C_NULL
if p.handle != C_NULL # e.g. process_running
ppid = ccall(:jl_uv_process_pid, Int32, (Ptr{Cvoid},), p.handle)
end
iolock_end()
Expand Down
24 changes: 13 additions & 11 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ if OS_HANDLE != RawFD
end

function isopen(x::Union{LibuvStream, LibuvServer})
if x.status == StatusUninit || x.status == StatusInit
if x.status == StatusUninit || x.status == StatusInit || x.handle === C_NULL
throw(ArgumentError("$x is not initialized"))
end
return x.status != StatusClosed
Expand Down Expand Up @@ -496,34 +496,37 @@ end

function close(stream::Union{LibuvStream, LibuvServer})
iolock_begin()
should_wait = false
if stream.status == StatusInit
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
elseif isopen(stream)
should_wait = uv_handle_data(stream) != C_NULL
if stream.status != StatusClosing
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
end
end
iolock_end()
should_wait && wait_close(stream)
wait_close(stream)
nothing
end

function uvfinalize(uv::Union{LibuvStream, LibuvServer})
uv.handle == C_NULL && return
iolock_begin()
if uv.handle != C_NULL
disassociate_julia_struct(uv.handle) # not going to call the usual close hooks
if uv.status != StatusUninit
close(uv)
else
disassociate_julia_struct(uv.handle) # not going to call the usual close hooks (so preserve_handle is not needed)
if uv.status == StatusUninit
Libc.free(uv.handle)
elseif uv.status == StatusInit
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), uv.handle)
elseif isopen(uv)
if uv.status != StatusClosing
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), uv.handle)
end
elseif uv.status == StatusClosed
Libc.free(uv.handle)
end
uv.status = StatusClosed
uv.handle = C_NULL
uv.status = StatusClosed
end
iolock_end()
nothing
Expand Down Expand Up @@ -713,7 +716,6 @@ end
function _uv_hook_close(uv::Union{LibuvStream, LibuvServer})
lock(uv.cond)
try
uv.handle = C_NULL
uv.status = StatusClosed
# notify any listeners that exist on this libuv stream type
notify(uv.cond)
Expand Down
3 changes: 1 addition & 2 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,7 @@ static void jl_close_item_atexit(uv_handle_t *handle)
switch(handle->type) {
case UV_PROCESS:
// cause Julia to forget about the Process object
if (handle->data)
jl_uv_call_close_callback((jl_value_t*)handle->data);
handle->data = NULL;
// and make libuv think it is already dead
((uv_process_t*)handle)->pid = 0;
// fall-through
Expand Down
55 changes: 27 additions & 28 deletions src/jl_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,16 @@ JL_DLLEXPORT void jl_iolock_end(void)
}


void jl_uv_call_close_callback(jl_value_t *val)
static void jl_uv_call_close_callback(jl_value_t *val)
{
jl_value_t *args[2];
jl_value_t **args;
JL_GC_PUSHARGS(args, 2); // val is "rooted" in the finalizer list only right now
args[0] = jl_get_global(jl_base_relative_to(((jl_datatype_t*)jl_typeof(val))->name->module),
jl_symbol("_uv_hook_close")); // topmod(typeof(val))._uv_hook_close
args[1] = val;
assert(args[0]);
jl_apply(args, 2); // TODO: wrap in try-catch?
JL_GC_POP();
}

static void jl_uv_closeHandle(uv_handle_t *handle)
Expand All @@ -105,6 +107,7 @@ static void jl_uv_closeHandle(uv_handle_t *handle)
ct->world_age = jl_atomic_load_acquire(&jl_world_counter);
jl_uv_call_close_callback((jl_value_t*)handle->data);
ct->world_age = last_age;
return;
}
if (handle == (uv_handle_t*)&signal_async)
return;
Expand All @@ -125,6 +128,10 @@ static void jl_uv_flush_close_callback(uv_write_t *req, int status)
free(req);
return;
}
if (uv_is_closing((uv_handle_t*)stream)) { // avoid double-close on the stream
free(req);
return;
}
if (status == 0 && uv_is_writable(stream) && stream->write_queue_size != 0) {
// new data was written, wait for it to flush too
uv_buf_t buf;
Expand All @@ -134,12 +141,9 @@ static void jl_uv_flush_close_callback(uv_write_t *req, int status)
if (uv_write(req, stream, &buf, 1, (uv_write_cb)jl_uv_flush_close_callback) == 0)
return; // success
}
if (!uv_is_closing((uv_handle_t*)stream)) { // avoid double-close on the stream
if (stream->type == UV_TTY)
uv_tty_set_mode((uv_tty_t*)stream, UV_TTY_MODE_NORMAL);
uv_close((uv_handle_t*)stream, &jl_uv_closeHandle);
}
free(req);
if (stream->type == UV_TTY)
uv_tty_set_mode((uv_tty_t*)stream, UV_TTY_MODE_NORMAL);
uv_close((uv_handle_t*)stream, &jl_uv_closeHandle);
}

static void uv_flush_callback(uv_write_t *req, int status)
Expand Down Expand Up @@ -224,47 +228,42 @@ static void jl_proc_exit_cleanup_cb(uv_process_t *process, int64_t exit_status,

JL_DLLEXPORT void jl_close_uv(uv_handle_t *handle)
{
JL_UV_LOCK();
if (handle->type == UV_PROCESS && ((uv_process_t*)handle)->pid != 0) {
// take ownership of this handle,
// so we can waitpid for the resource to exit and avoid leaving zombies
assert(handle->data == NULL); // make sure Julia has forgotten about it already
((uv_process_t*)handle)->exit_cb = jl_proc_exit_cleanup_cb;
return;
uv_unref(handle);
}
JL_UV_LOCK();
if (handle->type == UV_FILE) {
else if (handle->type == UV_FILE) {
uv_fs_t req;
jl_uv_file_t *fd = (jl_uv_file_t*)handle;
if ((ssize_t)fd->file != -1) {
uv_fs_close(handle->loop, &req, fd->file, NULL);
fd->file = (uv_os_fd_t)(ssize_t)-1;
}
jl_uv_closeHandle(handle); // synchronous (ok since the callback is known to not interact with any global state)
JL_UV_UNLOCK();
return;
}

if (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP || handle->type == UV_TTY) {
uv_write_t *req = (uv_write_t*)malloc_s(sizeof(uv_write_t));
req->handle = (uv_stream_t*)handle;
jl_uv_flush_close_callback(req, 0);
JL_UV_UNLOCK();
return;
}

// avoid double-closing the stream
if (!uv_is_closing(handle)) {
uv_close(handle, &jl_uv_closeHandle);
else if (!uv_is_closing(handle)) { // avoid double-closing the stream
if (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP || handle->type == UV_TTY) {
// flush the stream write-queue first
uv_write_t *req = (uv_write_t*)malloc_s(sizeof(uv_write_t));
req->handle = (uv_stream_t*)handle;
jl_uv_flush_close_callback(req, 0);
}
else {
uv_close(handle, &jl_uv_closeHandle);
}
}
JL_UV_UNLOCK();
}

JL_DLLEXPORT void jl_forceclose_uv(uv_handle_t *handle)
{
// avoid double-closing the stream
if (!uv_is_closing(handle)) {
if (!uv_is_closing(handle)) { // avoid double-closing the stream
JL_UV_LOCK();
if (!uv_is_closing(handle)) {
if (!uv_is_closing(handle)) { // double-check
uv_close(handle, &jl_uv_closeHandle);
}
JL_UV_UNLOCK();
Expand Down
1 change: 0 additions & 1 deletion src/julia_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,6 @@ JL_DLLEXPORT jl_fptr_args_t jl_get_builtin_fptr(jl_value_t *b);

extern uv_loop_t *jl_io_loop;
void jl_uv_flush(uv_stream_t *stream);
void jl_uv_call_close_callback(jl_value_t *val);

typedef struct jl_typeenv_t {
jl_tvar_t *var;
Expand Down
Loading

0 comments on commit 1b8f074

Please sign in to comment.