Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add thread that runs libuv loop continuously #50880

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
.vscode/*
*.heapsnapshot
.cache
*.lit_test_times.txt
# Buildkite: Ignore the entire .buildkite directory
/.buildkite

Expand Down
5 changes: 5 additions & 0 deletions base/Base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,12 @@ function __init__()
@static if !Sys.iswindows()
# triggering a profile via signals is not implemented on windows
cond = Base.AsyncCondition()
Base.iolock_begin() # uv_unref needs lock
Base.uv_unref(cond.handle)
Base.iolock_end()
PROFILE_PRINT_COND[] = cond
ccall(:jl_set_peek_cond, Cvoid, (Ptr{Cvoid},), PROFILE_PRINT_COND[].handle)
errormonitor(Threads.@spawn(profile_printing_listener()))
t = errormonitor(Threads.@spawn(profile_printing_listener(cond)))
atexit() do
# destroy this callback when exiting
Expand Down
6 changes: 3 additions & 3 deletions base/lock.jl
Original file line number Diff line number Diff line change
Expand Up @@ -435,10 +435,10 @@ This provides an acquire & release memory ordering on notify/wait.
The `autoreset` functionality and memory ordering guarantee requires at least Julia 1.8.
"""
mutable struct Event
const notify::ThreadSynchronizer
const autoreset::Bool
notify::Threads.Condition
autoreset::Bool
@atomic set::Bool
Event(autoreset::Bool=false) = new(ThreadSynchronizer(), autoreset, false)
Event(autoreset::Bool=false) = new(Threads.Condition(), autoreset, false)
end

function wait(e::Event)
Expand Down
1 change: 0 additions & 1 deletion base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,6 @@ function wait()
W = workqueue_for(Threads.threadid())
poptask(W)
result = try_yieldto(ensure_rescheduled)
process_events()
# return when we come out of the queue
return result
end
Expand Down
24 changes: 21 additions & 3 deletions base/threadingconstructs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,26 @@ This includes both mark threads and concurrent sweep threads.
"""
ngcthreads() = Int(unsafe_load(cglobal(:jl_n_gcthreads, Cint))) + 1

# Atomic is not available here and soft-deprecated
mutable struct ThreadedRegion
@atomic counter::UInt
end
const THREADED_REGION = ThreadedRegion(UInt(0))

in_threaded_region() = THREADED_REGION.counter != 0

function enter_threaded_region()
@atomic THREADED_REGION.counter += 1
return nothing
end

function exit_threaded_region()
@atomic THREADED_REGION.counter -= 1
return nothing
end

function threading_run(fun, static)
ccall(:jl_enter_threaded_region, Cvoid, ())
enter_threaded_region()
n = threadpoolsize()
tid_offset = threadpoolsize(:interactive)
tasks = Vector{Task}(undef, n)
Expand All @@ -165,7 +183,7 @@ function threading_run(fun, static)
for i = 1:n
Base._wait(tasks[i])
end
ccall(:jl_exit_threaded_region, Cvoid, ())
exit_threaded_region()
failed_tasks = filter!(istaskfailed, tasks)
if !isempty(failed_tasks)
throw(CompositeException(map(TaskFailedException, failed_tasks)))
Expand Down Expand Up @@ -217,7 +235,7 @@ function _threadsfor(iter, lbody, schedule)
end
if $(schedule === :dynamic || schedule === :default)
threading_run(threadsfor_fun, false)
elseif ccall(:jl_in_threaded_region, Cint, ()) != 0 # :static
elseif in_threaded_region() # :static
error("`@threads :static` cannot be used concurrently or nested")
else # :static
threading_run(threadsfor_fun, true)
Expand Down
3 changes: 1 addition & 2 deletions src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,7 @@ void jl_gc_wait_for_the_world(jl_ptls_t* gc_all_tls_states, int gc_n_threads)
TracyCZoneColor(ctx, 0x696969);
#endif
assert(gc_n_threads);
if (gc_n_threads > 1)
jl_wake_libuv();
jl_wake_libuv();
for (int i = 0; i < gc_n_threads; i++) {
jl_ptls_t ptls2 = gc_all_tls_states[i];
if (ptls2 != NULL) {
Expand Down
4 changes: 2 additions & 2 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ static void jl_uv_exitcleanup_add(uv_handle_t *handle, struct uv_shutdown_queue
queue->last = item;
}

static void jl_uv_exitcleanup_walk(uv_handle_t *handle, void *arg)
static void jl_uv_cb_exitcleanup_walk(uv_handle_t *handle, void *arg)
{
jl_uv_exitcleanup_add(handle, (struct uv_shutdown_queue*)arg);
}
Expand Down Expand Up @@ -302,7 +302,7 @@ JL_DLLEXPORT void jl_atexit_hook(int exitcode) JL_NOTSAFEPOINT_ENTER
if (loop != NULL) {
struct uv_shutdown_queue queue = {NULL, NULL};
JL_UV_LOCK();
uv_walk(loop, jl_uv_exitcleanup_walk, &queue);
uv_walk(loop, jl_uv_cb_exitcleanup_walk, &queue);
struct uv_shutdown_queue_item *item = queue.first;
if (ct) {
while (item) {
Expand Down
3 changes: 0 additions & 3 deletions src/jl_exported_funcs.inc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@
XX(jl_egal__bitstag) \
XX(jl_eh_restore_state) \
XX(jl_enter_handler) \
XX(jl_enter_threaded_region) \
XX(jl_environ) \
XX(jl_eof_error) \
XX(jl_eqtable_get) \
Expand All @@ -124,7 +123,6 @@
XX(jl_excstack_state) \
XX(jl_exit) \
XX(jl_exit_on_sigint) \
XX(jl_exit_threaded_region) \
XX(jl_expand) \
XX(jl_expand_and_resolve) \
XX(jl_expand_stmt) \
Expand Down Expand Up @@ -259,7 +257,6 @@
XX(jl_intrinsic_name) \
XX(jl_invoke) \
XX(jl_invoke_api) \
XX(jl_in_threaded_region) \
XX(jl_iolock_begin) \
XX(jl_iolock_end) \
XX(jl_ios_buffer_n) \
Expand Down
Loading