Skip to content

Commit

Permalink
handle async termination better
Browse files Browse the repository at this point in the history
Fixes #55235
  • Loading branch information
vtjnash committed Aug 9, 2024
1 parent 86231ce commit e3017ef
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 27 deletions.
2 changes: 1 addition & 1 deletion base/Base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ function start_profile_listener()
# this will prompt any ongoing or pending event to flush also
close(cond)
# error-propagation is not needed, since the errormonitor will handle printing that better
_wait(t)
t === current_task() || _wait(t)
end
finalizer(cond) do c
# if something goes south, still make sure we aren't keeping a reference in C to this
Expand Down
2 changes: 1 addition & 1 deletion base/condition.jl
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ function wait(c::GenericCondition; first::Bool=false)
try
return wait()
catch
ct.queue === nothing || list_deletefirst!(ct.queue::IntrusiveLinkedList{Task}, ct)
q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
rethrow()
finally
relockall(c.lock, token)
Expand Down
5 changes: 5 additions & 0 deletions base/initdefs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,11 @@ function atexit(f::Function)
end

function _atexit(exitcode::Cint)
# this current task shouldn't be scheduled anywhere, but if it was (because
# this exit came from a signal for example), then try to clear that state
# to minimize scheduler issues later
ct = current_task()
q = ct.queue; q === nothing || list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
# Don't hold the lock around the iteration, just in case any other thread executing in
# parallel tries to register a new atexit hook while this is running. We don't want to
# block that thread from proceeding, and we can allow it to register its hook which we
Expand Down
4 changes: 2 additions & 2 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ function closewrite(s::LibuvStream)
# try-finally unwinds the sigatomic level, so need to repeat sigatomic_end
sigatomic_end()
iolock_begin()
ct.queue === nothing || list_deletefirst!(ct.queue::IntrusiveLinkedList{Task}, ct)
q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
if uv_req_data(req) != C_NULL
# req is still alive,
# so make sure we won't get spurious notifications later
Expand Down Expand Up @@ -1076,7 +1076,7 @@ function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
# try-finally unwinds the sigatomic level, so need to repeat sigatomic_end
sigatomic_end()
iolock_begin()
ct.queue === nothing || list_deletefirst!(ct.queue::IntrusiveLinkedList{Task}, ct)
q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
if uv_req_data(uvw) != C_NULL
# uvw is still alive,
# so make sure we won't get spurious notifications later
Expand Down
34 changes: 19 additions & 15 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ end

# just wait for a task to be done, no error propagation
function _wait(t::Task)
t === current_task() && Core.throw(ConcurrencyViolationError("deadlock detected: cannot wait on current task"))
if !istaskdone(t)
donenotify = t.donenotify::ThreadSynchronizer
lock(donenotify)
Expand Down Expand Up @@ -374,7 +375,6 @@ in an error, thrown as a [`TaskFailedException`](@ref) which wraps the failed ta
Throws a `ConcurrencyViolationError` if `t` is the currently running task, to prevent deadlocks.
"""
function wait(t::Task; throw=true)
t === current_task() && Core.throw(ConcurrencyViolationError("deadlock detected: cannot wait on current task"))
_wait(t)
if throw && istaskfailed(t)
Core.throw(TaskFailedException(t))
Expand Down Expand Up @@ -813,12 +813,15 @@ macro sync_add(expr)
end
end

throwto_repl_task(@nospecialize val) = throwto(getfield(active_repl_backend, :backend_task)::Task, val)

function is_repl_running()
return isdefined(Base, :active_repl_backend) &&
(getfield(active_repl_backend, :backend_task)::Task)._state === task_state_runnable &&
getfield(active_repl_backend, :in_eval)
function repl_backend_task()
@isdefined(active_repl_backend) || return
backend = active_repl_backend
isdefined(backend, :backend_task) || return
backend_task = getfield(active_repl_backend, :backend_task)::Task
if backend_task._state === task_state_runnable && getfield(backend, :in_eval)
return backend_task
end
return
end

# runtime system hook called when a task finishes
Expand All @@ -842,8 +845,9 @@ function task_done_hook(t::Task)
end

if err && !handled && Threads.threadid() == 1
if isa(result, InterruptException) && isempty(Workqueue) && is_repl_running()
throwto_repl_task(result)
if isa(result, InterruptException) && isempty(Workqueue)
backend = repl_backend_task()
backend isa Task && throwto(backend, e)
end
end
# Clear sigatomic before waiting
Expand All @@ -854,11 +858,11 @@ function task_done_hook(t::Task)
# If an InterruptException happens while blocked in the event loop, try handing
# the exception to the REPL task since the current task is done.
# issue #19467
if Threads.threadid() == 1 && isa(e, InterruptException) && isempty(Workqueue) && is_repl_running()
throwto_repl_task(e)
else
rethrow()
if Threads.threadid() == 1 && isa(e, InterruptException) && isempty(Workqueue)
backend = repl_backend_task()
backend isa Task && throwto(backend, e)
end
rethrow() # this will terminate the program
end
end

Expand Down Expand Up @@ -1032,7 +1036,7 @@ function schedule(t::Task, @nospecialize(arg); error=false)
# schedule a task to be (re)started with the given value or exception
t._state === task_state_runnable || Base.error("schedule: Task not runnable")
if error
t.queue === nothing || Base.list_deletefirst!(t.queue::IntrusiveLinkedList{Task}, t)
q = t.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, t)
setfield!(t, :result, arg)
setfield!(t, :_isexception, true)
else
Expand All @@ -1056,7 +1060,7 @@ function yield()
try
wait()
catch
ct.queue === nothing || list_deletefirst!(ct.queue::IntrusiveLinkedList{Task}, ct)
q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
rethrow()
end
end
Expand Down
15 changes: 15 additions & 0 deletions src/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,21 @@ static int sleep_check_after_threshold(uint64_t *start_cycles) JL_NOTSAFEPOINT
return 0;
}

void surprise_wakeup(jl_ptls_t ptls) JL_NOTSAFEPOINT
{
// equivalent to wake_thread, without the assert on wasrunning
int8_t state = jl_atomic_load_relaxed(&ptls->sleep_check_state);
if (state == sleeping) {
if (jl_atomic_cmpswap_relaxed(&ptls->sleep_check_state, &state, not_sleeping)) {
// this notification will never be consumed, so we may have now
// introduced some inaccuracy into the count, but that is
// unavoidable with any asynchronous interruption
jl_atomic_fetch_add_relaxed(&n_threads_running, 1);
}
}
}


static int set_not_sleeping(jl_ptls_t ptls) JL_NOTSAFEPOINT
{
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
Expand Down
15 changes: 10 additions & 5 deletions src/signal-handling.c
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,8 @@ void jl_show_sigill(void *_ctx)
#endif
}

void surprise_wakeup(jl_ptls_t ptls) JL_NOTSAFEPOINT;

// make it invalid for a task to return from this point to its stack
// this is generally quite an foolish operation, but does free you up to do
// arbitrary things on this stack now without worrying about corrupt state that
Expand All @@ -439,15 +441,17 @@ void jl_task_frame_noreturn(jl_task_t *ct) JL_NOTSAFEPOINT
ct->eh = NULL;
ct->world_age = 1;
// Force all locks to drop. Is this a good idea? Of course not. But the alternative would probably deadlock instead of crashing.
small_arraylist_t *locks = &ct->ptls->locks;
jl_ptls_t ptls = ct->ptls;
small_arraylist_t *locks = &ptls->locks;
for (size_t i = locks->len; i > 0; i--)
jl_mutex_unlock_nogc((jl_mutex_t*)locks->items[i - 1]);
locks->len = 0;
ct->ptls->in_pure_callback = 0;
ct->ptls->in_finalizer = 0;
ct->ptls->defer_signal = 0;
ptls->in_pure_callback = 0;
ptls->in_finalizer = 0;
ptls->defer_signal = 0;
// forcibly exit GC (if we were in it) or safe into unsafe, without the mandatory safepoint
jl_atomic_store_release(&ct->ptls->gc_state, JL_GC_STATE_UNSAFE);
jl_atomic_store_release(&ptls->gc_state, JL_GC_STATE_UNSAFE);
surprise_wakeup(ptls);
// allow continuing to use a Task that should have already died--unsafe necromancy!
jl_atomic_store_relaxed(&ct->_state, JL_TASK_STATE_RUNNABLE);
}
Expand All @@ -461,6 +465,7 @@ void jl_critical_error(int sig, int si_code, bt_context_t *context, jl_task_t *c
size_t i, n = ct ? *bt_size : 0;
if (sig) {
// kill this task, so that we cannot get back to it accidentally (via an untimely ^C or jlbacktrace in jl_exit)
// and also resets the state of ct and ptls so that some code can run on this task again
jl_task_frame_noreturn(ct);
#ifndef _OS_WINDOWS_
sigset_t sset;
Expand Down
2 changes: 1 addition & 1 deletion stdlib/Sockets/src/Sockets.jl
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ function send(sock::UDPSocket, ipaddr::IPAddr, port::Integer, msg)
finally
Base.sigatomic_end()
iolock_begin()
ct.queue === nothing || Base.list_deletefirst!(ct.queue, ct)
q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
if uv_req_data(uvw) != C_NULL
# uvw is still alive,
# so make sure we won't get spurious notifications later
Expand Down
4 changes: 2 additions & 2 deletions stdlib/Sockets/src/addrinfo.jl
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ function getalladdrinfo(host::String)
finally
Base.sigatomic_end()
iolock_begin()
ct.queue === nothing || Base.list_deletefirst!(ct.queue, ct)
q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
if uv_req_data(req) != C_NULL
# req is still alive,
# so make sure we don't get spurious notifications later
Expand Down Expand Up @@ -223,7 +223,7 @@ function getnameinfo(address::Union{IPv4, IPv6})
finally
Base.sigatomic_end()
iolock_begin()
ct.queue === nothing || Base.list_deletefirst!(ct.queue, ct)
q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
if uv_req_data(req) != C_NULL
# req is still alive,
# so make sure we don't get spurious notifications later
Expand Down

0 comments on commit e3017ef

Please sign in to comment.