Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support threads in precompile and jl_task_wait_empty #52445

Merged
merged 1 commit into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ SRCS := \
jltypes gf typemap smallintset ast builtins module interpreter symbol \
dlload sys init task array genericmemory staticdata toplevel jl_uv datatype \
simplevector runtime_intrinsics precompile jloptions mtarraylist \
threading partr stackwalk gc gc-debug gc-pages gc-stacks gc-alloc-profiler method \
threading scheduler stackwalk gc gc-debug gc-pages gc-stacks gc-alloc-profiler method \
jlapi signal-handling safepoint timing subtype rtutils gc-heap-snapshot \
crc32c APInt-C processor ircode opaque_closure codegen-stubs coverage runtime_ccall

Expand Down
26 changes: 17 additions & 9 deletions src/jl_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,26 @@ static void wait_empty_func(uv_timer_t *t)
void jl_wait_empty_begin(void)
{
JL_UV_LOCK();
if (wait_empty_worker.type != UV_TIMER && jl_io_loop) {
// try to purge anything that is just waiting for cleanup
jl_io_loop->stop_flag = 0;
uv_run(jl_io_loop, UV_RUN_NOWAIT);
uv_timer_init(jl_io_loop, &wait_empty_worker);
if (jl_io_loop) {
if (wait_empty_worker.type != UV_TIMER) {
// try to purge anything that is just waiting for cleanup
jl_io_loop->stop_flag = 0;
uv_run(jl_io_loop, UV_RUN_NOWAIT);
uv_timer_init(jl_io_loop, &wait_empty_worker);
uv_unref((uv_handle_t*)&wait_empty_worker);
}
// make sure this is running
uv_update_time(jl_io_loop);
uv_timer_start(&wait_empty_worker, wait_empty_func, 10, 15000);
uv_unref((uv_handle_t*)&wait_empty_worker);
}
JL_UV_UNLOCK();
}
void jl_wait_empty_end(void)
{
// n.b. caller must be holding jl_uv_mutex
uv_close((uv_handle_t*)&wait_empty_worker, NULL);
if (wait_empty_worker.type == UV_TIMER)
// make sure this timer is stopped, but not destroyed in case the user calls jl_wait_empty_begin again
uv_timer_stop(&wait_empty_worker);
}


Expand Down Expand Up @@ -174,9 +179,12 @@ static void jl_uv_closeHandle(uv_handle_t *handle)
ct->world_age = last_age;
return;
}
if (handle == (uv_handle_t*)&signal_async || handle == (uv_handle_t*)&wait_empty_worker)
if (handle == (uv_handle_t*)&wait_empty_worker)
handle->type = UV_UNKNOWN_HANDLE;
else if (handle == (uv_handle_t*)&signal_async)
return;
free(handle);
else
free(handle);
}

static void jl_uv_flush_close_callback(uv_write_t *req, int status)
Expand Down
4 changes: 2 additions & 2 deletions src/julia_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,11 +371,11 @@ void jl_gc_safe_leave(jl_ptls_t ptls, int8_t state) JL_NOTSAFEPOINT_LEAVE; // th
#endif

JL_DLLEXPORT void jl_gc_enable_finalizers(struct _jl_task_t *ct, int on);
JL_DLLEXPORT void jl_gc_disable_finalizers_internal(void);
JL_DLLEXPORT void jl_gc_disable_finalizers_internal(void) JL_NOTSAFEPOINT;
JL_DLLEXPORT void jl_gc_enable_finalizers_internal(void);
JL_DLLEXPORT void jl_gc_run_pending_finalizers(struct _jl_task_t *ct);
extern JL_DLLEXPORT _Atomic(int) jl_gc_have_pending_finalizers;
JL_DLLEXPORT int8_t jl_gc_is_in_finalizer(void);
JL_DLLEXPORT int8_t jl_gc_is_in_finalizer(void) JL_NOTSAFEPOINT;

JL_DLLEXPORT void jl_wakeup_thread(int16_t tid);

Expand Down
20 changes: 0 additions & 20 deletions src/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,26 +143,6 @@
#define MACHINE_EXCLUSIVE_NAME "JULIA_EXCLUSIVE"
#define DEFAULT_MACHINE_EXCLUSIVE 0

// partr -- parallel tasks runtime options ------------------------------------

// multiq
// number of heaps = MULTIQ_HEAP_C * nthreads
#define MULTIQ_HEAP_C 4
// how many in each heap
#define MULTIQ_TASKS_PER_HEAP 129

// parfor
// tasks = niters / (GRAIN_K * nthreads)
#define GRAIN_K 4

// synchronization
// narrivers = ((GRAIN_K * nthreads) ^ ARRIVERS_P) + 1
// limit for number of recursive parfors
#define ARRIVERS_P 2
// nreducers = narrivers * REDUCERS_FRAC
#define REDUCERS_FRAC 1


// sanitizer defaults ---------------------------------------------------------

// Automatically enable MEMDEBUG and KEEP_BODIES for the sanitizers
Expand Down
8 changes: 7 additions & 1 deletion src/precompile.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,12 @@ JL_DLLEXPORT void jl_write_compiler_output(void)
return;
}

jl_task_wait_empty();
jl_task_wait_empty(); // wait for most work to finish (except possibly finalizers)
jl_gc_collect(JL_GC_FULL);
jl_gc_collect(JL_GC_INCREMENTAL); // sweep finalizers
jl_task_t *ct = jl_current_task;
jl_gc_enable_finalizers(ct, 0); // now disable finalizers, as they could schedule more work or make other unexpected changes to reachability
jl_task_wait_empty(); // then make sure we are the only thread alive that could be running user code past here

if (!jl_module_init_order) {
jl_printf(JL_STDERR, "WARNING: --output requested, but no modules defined during run\n");
Expand Down Expand Up @@ -184,6 +189,7 @@ JL_DLLEXPORT void jl_write_compiler_output(void)
}
}
JL_GC_POP();
jl_gc_enable_finalizers(ct, 1);
}

#ifdef __cplusplus
Expand Down
127 changes: 95 additions & 32 deletions src/partr.c → src/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ static const int16_t sleeping = 1;
// this thread is dead.
static const int16_t sleeping_like_the_dead JL_UNUSED = 2;

// a running count of how many threads are currently not_sleeping
// plus a running count of the number of in-flight wake-ups
// n.b. this may temporarily exceed jl_n_threads
static _Atomic(int) nrunning = 1;

// invariant: No thread is ever asleep unless sleep_check_state is sleeping (or we have a wakeup signal pending).
// invariant: Any particular thread is not asleep unless that thread's sleep_check_state is sleeping.
// invariant: The transition of a thread state to sleeping must be followed by a check that there wasn't work pending for it.
Expand Down Expand Up @@ -64,7 +69,7 @@ JL_DLLEXPORT int jl_set_task_tid(jl_task_t *task, int16_t tid) JL_NOTSAFEPOINT
if (was == tid)
return 1;
if (was == -1)
return jl_atomic_cmpswap(&task->tid, &was, tid);
return jl_atomic_cmpswap(&task->tid, &was, tid) || was == tid;
return 0;
}

Expand Down Expand Up @@ -180,6 +185,8 @@ void jl_threadfun(void *arg)
jl_init_stack_limits(0, &stack_lo, &stack_hi);
// warning: this changes `jl_current_task`, so be careful not to call that from this function
jl_task_t *ct = jl_init_root_task(ptls, stack_lo, stack_hi);
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, 1);
assert(wasrunning); (void)wasrunning;
JL_GC_PROMISE_ROOTED(ct);

// wait for all threads
Expand Down Expand Up @@ -220,7 +227,7 @@ int jl_running_under_rr(int recheck)


// sleep_check_after_threshold() -- if sleep_threshold ns have passed, return 1
static int sleep_check_after_threshold(uint64_t *start_cycles)
static int sleep_check_after_threshold(uint64_t *start_cycles) JL_NOTSAFEPOINT
{
JULIA_DEBUG_SLEEPWAKE( return 1 ); // hammer on the sleep/wake logic much harder
/**
Expand All @@ -243,18 +250,31 @@ static int sleep_check_after_threshold(uint64_t *start_cycles)
return 0;
}

static int set_not_sleeping(jl_ptls_t ptls) JL_NOTSAFEPOINT
{
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
if (jl_atomic_exchange_relaxed(&ptls->sleep_check_state, not_sleeping) != not_sleeping) {
return 1;
}
}
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, -1); // consume in-flight wakeup
assert(wasrunning > 1); (void)wasrunning;
return 0;
}

static int wake_thread(int16_t tid) JL_NOTSAFEPOINT
{
jl_ptls_t other = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];
int8_t state = sleeping;

if (jl_atomic_load_relaxed(&other->sleep_check_state) == sleeping) {
if (jl_atomic_cmpswap_relaxed(&other->sleep_check_state, &state, not_sleeping)) {
JL_PROBE_RT_SLEEP_CHECK_WAKE(other, state);
uv_mutex_lock(&other->sleep_lock);
uv_cond_signal(&other->wake_signal);
uv_mutex_unlock(&other->sleep_lock);
jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];

if (jl_atomic_load_relaxed(&ptls2->sleep_check_state) != not_sleeping) {
int8_t state = sleeping;
if (jl_atomic_cmpswap_relaxed(&ptls2->sleep_check_state, &state, not_sleeping)) {
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, 1); // increment in-flight wakeup count
assert(wasrunning); (void)wasrunning;
JL_PROBE_RT_SLEEP_CHECK_WAKE(ptls2, state);
uv_mutex_lock(&ptls2->sleep_lock);
uv_cond_signal(&ptls2->wake_signal);
uv_mutex_unlock(&ptls2->sleep_lock);
vtjnash marked this conversation as resolved.
Show resolved Hide resolved
return 1;
}
}
Expand All @@ -280,10 +300,14 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) JL_NOTSAFEPOINT
JULIA_DEBUG_SLEEPWAKE( wakeup_enter = cycleclock() );
if (tid == self || tid == -1) {
// we're already awake, but make sure we'll exit uv_run
// and that nrunning is updated if this is now considered in-flight
jl_ptls_t ptls = ct->ptls;
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) == sleeping) {
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping);
JL_PROBE_RT_SLEEP_CHECK_WAKEUP(ptls);
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
if (jl_atomic_exchange_relaxed(&ptls->sleep_check_state, not_sleeping) != not_sleeping) {
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, 1);
assert(wasrunning); (void)wasrunning;
JL_PROBE_RT_SLEEP_CHECK_WAKEUP(ptls);
}
}
if (uvlock == ct)
uv_stop(jl_global_event_loop());
Expand Down Expand Up @@ -360,8 +384,10 @@ void jl_task_wait_empty(void)
// we are back from jl_task_get_next now
ct->world_age = lastage;
wait_empty = NULL;
// TODO: move this lock acquire-release pair to the caller, so that we ensure new work
// (from uv_unref objects) didn't unexpectedly get scheduled and start running behind our back
// TODO: move this lock acquire to before the wait_empty return and the
// unlock to the caller, so that we ensure new work (from uv_unref
// objects) didn't unexpectedly get scheduled and start running behind
// our back during the function return
JL_UV_LOCK();
jl_wait_empty_end();
JL_UV_UNLOCK();
Expand All @@ -378,6 +404,7 @@ static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT
return jl_atomic_load_relaxed(&ptls->sleep_check_state) == sleeping;
}


extern _Atomic(unsigned) _threadedregion;

JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, jl_value_t *checkempty)
Expand Down Expand Up @@ -405,8 +432,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
jl_fence(); // [^store_buffering_1]
JL_PROBE_RT_SLEEP_CHECK_SLEEP(ptls);
if (!check_empty(checkempty)) { // uses relaxed loads
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_TASKQ_WAKE(ptls);
}
continue;
Expand All @@ -415,23 +441,20 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
if (ptls != ct->ptls) {
// sigh, a yield was detected, so let's go ahead and handle it anyway by starting over
ptls = ct->ptls;
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
}
if (task)
return task;
continue;
}
if (task) {
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
}
return task;
}


// IO is always permitted, but outside a threaded region, only
// thread 0 will process messages.
// Inside a threaded region, any thread can listen for IO messages,
Expand Down Expand Up @@ -485,41 +508,64 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
// right back to sleep on the individual wake signal to let
// them take it from us without conflict.
if (active || !may_sleep(ptls)) {
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_UV_WAKE(ptls);
}
start_cycles = 0;
continue;
}
if (!enter_eventloop && !jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == 0) {
// thread 0 is the only thread permitted to run the event loop
// so it needs to stay alive, just spin-looping if necessary
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_UV_WAKE(ptls);
}
start_cycles = 0;
continue;
}
}

// any thread which wants us running again will have to observe
// sleep_check_state==sleeping and increment nrunning for us
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, -1);
assert(wasrunning);
if (wasrunning == 1) {
// This was the last running thread, and there is no thread with !may_sleep
// so make sure tid 0 is notified to check wait_empty
// TODO: this also might be a good time to check again that
// libuv's queue is truly empty, instead of during delete_thread
if (ptls->tid != 0) {
uv_mutex_lock(&ptls->sleep_lock);
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
uv_mutex_unlock(&ptls->sleep_lock);
}
}

// the other threads will just wait for an individual wake signal to resume
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_enter = cycleclock() );
int8_t gc_state = jl_gc_safe_enter(ptls);
uv_mutex_lock(&ptls->sleep_lock);
while (may_sleep(ptls)) {
if (ptls->tid == 0 && wait_empty) {
task = wait_empty;
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
}
task = wait_empty;
if (ptls->tid == 0 && task && jl_atomic_load_relaxed(&nrunning) == 0) {
wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, 1);
assert(!wasrunning);
wasrunning = !set_not_sleeping(ptls);
assert(!wasrunning);
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
if (!ptls->finalizers_inhibited)
ptls->finalizers_inhibited++; // this annoyingly is rather sticky (we should like to reset it at the end of jl_task_wait_empty)
break;
}
// else should we warn the user of certain deadlock here if tid == 0 && nrunning == 0?
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
}
assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping);
assert(jl_atomic_load_relaxed(&nrunning));
start_cycles = 0;
uv_mutex_unlock(&ptls->sleep_lock);
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_leave = cycleclock() );
jl_gc_safe_leave(ptls, gc_state); // contains jl_gc_safepoint
start_cycles = 0;
if (task) {
assert(task == wait_empty);
wait_empty = NULL;
Expand All @@ -533,6 +579,23 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
}
}

void scheduler_delete_thread(jl_ptls_t ptls) JL_NOTSAFEPOINT
{
if (jl_atomic_exchange_relaxed(&ptls->sleep_check_state, sleeping_like_the_dead) != sleeping) {
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, -1);
if (wasrunning == 1) {
jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[0];
// This was the last running thread, and there is no thread with !may_sleep
// so make sure tid 0 is notified to check wait_empty
uv_mutex_lock(&ptls2->sleep_lock);
uv_cond_signal(&ptls2->wake_signal);
uv_mutex_unlock(&ptls2->sleep_lock);
}
}
jl_fence();
jl_wakeup_thread(0); // force thread 0 to see that we do not have the IO lock (and am dead)
}

#ifdef __cplusplus
}
#endif
2 changes: 1 addition & 1 deletion src/staticdata.c
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ static int jl_needs_serialization(jl_serializer_state *s, jl_value_t *v) JL_NOTS
else if (jl_typetagis(v, jl_uint8_tag << 4)) {
return 0;
}
else if (jl_typetagis(v, jl_task_tag << 4)) {
else if (v == (jl_value_t*)s->ptls->root_task) {
return 0;
}

Expand Down
Loading