Skip to content

Commit

Permalink
support threads in precompile and jl_task_wait_empty
Browse files Browse the repository at this point in the history
Add a `nrunning` counter which identifies (when zero) when there is
nothing running anymore. Allowing us to gate all tasks on all threads on
reaching a quiescent state, not just thread 0. This should let us better
support running precompile with threads (since we will be ensured that
all of them are asleep in a consistent state before serialization tries
to inspect the process state). We could additionally stop them
afterwards to make sure there is no way for them to begin running, even
if we forgot about some other event source, but that seems unnecessary
paranoia for now.
  • Loading branch information
vtjnash committed Dec 8, 2023
1 parent 9723de5 commit d952c80
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 69 deletions.
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ SRCS := \
jltypes gf typemap smallintset ast builtins module interpreter symbol \
dlload sys init task array genericmemory staticdata toplevel jl_uv datatype \
simplevector runtime_intrinsics precompile jloptions mtarraylist \
threading partr stackwalk gc gc-debug gc-pages gc-stacks gc-alloc-profiler method \
threading scheduler stackwalk gc gc-debug gc-pages gc-stacks gc-alloc-profiler method \
jlapi signal-handling safepoint timing subtype rtutils gc-heap-snapshot \
crc32c APInt-C processor ircode opaque_closure codegen-stubs coverage runtime_ccall

Expand Down
26 changes: 17 additions & 9 deletions src/jl_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,26 @@ static void wait_empty_func(uv_timer_t *t)
void jl_wait_empty_begin(void)
{
JL_UV_LOCK();
if (wait_empty_worker.type != UV_TIMER && jl_io_loop) {
// try to purge anything that is just waiting for cleanup
jl_io_loop->stop_flag = 0;
uv_run(jl_io_loop, UV_RUN_NOWAIT);
uv_timer_init(jl_io_loop, &wait_empty_worker);
if (jl_io_loop) {
if (wait_empty_worker.type != UV_TIMER) {
// try to purge anything that is just waiting for cleanup
jl_io_loop->stop_flag = 0;
uv_run(jl_io_loop, UV_RUN_NOWAIT);
uv_timer_init(jl_io_loop, &wait_empty_worker);
uv_unref((uv_handle_t*)&wait_empty_worker);
}
// make sure this is running
uv_update_time(jl_io_loop);
uv_timer_start(&wait_empty_worker, wait_empty_func, 10, 15000);
uv_unref((uv_handle_t*)&wait_empty_worker);
}
JL_UV_UNLOCK();
}
void jl_wait_empty_end(void)
{
// n.b. caller must be holding jl_uv_mutex
uv_close((uv_handle_t*)&wait_empty_worker, NULL);
if (wait_empty_worker.type == UV_TIMER)
// make sure this timer is stopped, but not destroyed in case the user calls jl_wait_empty_begin again
uv_timer_stop(&wait_empty_worker);
}


Expand Down Expand Up @@ -174,9 +179,12 @@ static void jl_uv_closeHandle(uv_handle_t *handle)
ct->world_age = last_age;
return;
}
if (handle == (uv_handle_t*)&signal_async || handle == (uv_handle_t*)&wait_empty_worker)
if (handle == (uv_handle_t*)&wait_empty_worker)
handle->type = UV_UNKNOWN_HANDLE;
else if (handle == (uv_handle_t*)&signal_async)
return;
free(handle);
else
free(handle);
}

static void jl_uv_flush_close_callback(uv_write_t *req, int status)
Expand Down
4 changes: 2 additions & 2 deletions src/julia_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,11 +371,11 @@ void jl_gc_safe_leave(jl_ptls_t ptls, int8_t state) JL_NOTSAFEPOINT_LEAVE; // th
#endif

JL_DLLEXPORT void jl_gc_enable_finalizers(struct _jl_task_t *ct, int on);
JL_DLLEXPORT void jl_gc_disable_finalizers_internal(void);
JL_DLLEXPORT void jl_gc_disable_finalizers_internal(void) JL_NOTSAFEPOINT;
JL_DLLEXPORT void jl_gc_enable_finalizers_internal(void);
JL_DLLEXPORT void jl_gc_run_pending_finalizers(struct _jl_task_t *ct);
extern JL_DLLEXPORT _Atomic(int) jl_gc_have_pending_finalizers;
JL_DLLEXPORT int8_t jl_gc_is_in_finalizer(void);
JL_DLLEXPORT int8_t jl_gc_is_in_finalizer(void) JL_NOTSAFEPOINT;

JL_DLLEXPORT void jl_wakeup_thread(int16_t tid);

Expand Down
20 changes: 0 additions & 20 deletions src/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,26 +143,6 @@
#define MACHINE_EXCLUSIVE_NAME "JULIA_EXCLUSIVE"
#define DEFAULT_MACHINE_EXCLUSIVE 0

// partr -- parallel tasks runtime options ------------------------------------

// multiq
// number of heaps = MULTIQ_HEAP_C * nthreads
#define MULTIQ_HEAP_C 4
// how many in each heap
#define MULTIQ_TASKS_PER_HEAP 129

// parfor
// tasks = niters / (GRAIN_K * nthreads)
#define GRAIN_K 4

// synchronization
// narrivers = ((GRAIN_K * nthreads) ^ ARRIVERS_P) + 1
// limit for number of recursive parfors
#define ARRIVERS_P 2
// nreducers = narrivers * REDUCERS_FRAC
#define REDUCERS_FRAC 1


// sanitizer defaults ---------------------------------------------------------

// Automatically enable MEMDEBUG and KEEP_BODIES for the sanitizers
Expand Down
8 changes: 7 additions & 1 deletion src/precompile.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,12 @@ JL_DLLEXPORT void jl_write_compiler_output(void)
return;
}

jl_task_wait_empty();
jl_task_wait_empty(); // wait for most work to finish (except possibly finalizers)
jl_gc_collect(JL_GC_FULL);
jl_gc_collect(JL_GC_INCREMENTAL); // sweep finalizers
jl_task_t *ct = jl_current_task;
jl_gc_enable_finalizers(ct, 0); // now disable finalizers, as they could schedule more work or make other unexpected changes to reachability
jl_task_wait_empty(); // then make sure we are the only thread alive that could be running user code past here

if (!jl_module_init_order) {
jl_printf(JL_STDERR, "WARNING: --output requested, but no modules defined during run\n");
Expand Down Expand Up @@ -184,6 +189,7 @@ JL_DLLEXPORT void jl_write_compiler_output(void)
}
}
JL_GC_POP();
jl_gc_enable_finalizers(ct, 1);
}

#ifdef __cplusplus
Expand Down
127 changes: 95 additions & 32 deletions src/partr.c → src/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ static const int16_t sleeping = 1;
// this thread is dead.
static const int16_t sleeping_like_the_dead JL_UNUSED = 2;

// a running count of how many threads are currently not_sleeping
// plus a running count of the number of in-flight wake-ups
// n.b. this may temporarily exceed jl_n_threads
static _Atomic(int) nrunning = 1;

// invariant: No thread is ever asleep unless sleep_check_state is sleeping (or we have a wakeup signal pending).
// invariant: Any particular thread is not asleep unless that thread's sleep_check_state is sleeping.
// invariant: The transition of a thread state to sleeping must be followed by a check that there wasn't work pending for it.
Expand Down Expand Up @@ -64,7 +69,7 @@ JL_DLLEXPORT int jl_set_task_tid(jl_task_t *task, int16_t tid) JL_NOTSAFEPOINT
if (was == tid)
return 1;
if (was == -1)
return jl_atomic_cmpswap(&task->tid, &was, tid);
return jl_atomic_cmpswap(&task->tid, &was, tid) || was == tid;
return 0;
}

Expand Down Expand Up @@ -180,6 +185,8 @@ void jl_threadfun(void *arg)
jl_init_stack_limits(0, &stack_lo, &stack_hi);
// warning: this changes `jl_current_task`, so be careful not to call that from this function
jl_task_t *ct = jl_init_root_task(ptls, stack_lo, stack_hi);
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, 1);
assert(wasrunning); (void)wasrunning;
JL_GC_PROMISE_ROOTED(ct);

// wait for all threads
Expand Down Expand Up @@ -220,7 +227,7 @@ int jl_running_under_rr(int recheck)


// sleep_check_after_threshold() -- if sleep_threshold ns have passed, return 1
static int sleep_check_after_threshold(uint64_t *start_cycles)
static int sleep_check_after_threshold(uint64_t *start_cycles) JL_NOTSAFEPOINT
{
JULIA_DEBUG_SLEEPWAKE( return 1 ); // hammer on the sleep/wake logic much harder
/**
Expand All @@ -243,18 +250,31 @@ static int sleep_check_after_threshold(uint64_t *start_cycles)
return 0;
}

static int set_not_sleeping(jl_ptls_t ptls) JL_NOTSAFEPOINT
{
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
if (jl_atomic_exchange_relaxed(&ptls->sleep_check_state, not_sleeping) != not_sleeping) {
return 1;
}
}
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, -1); // consume in-flight wakeup
assert(wasrunning > 1); (void)wasrunning;
return 0;
}

static int wake_thread(int16_t tid) JL_NOTSAFEPOINT
{
jl_ptls_t other = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];
int8_t state = sleeping;

if (jl_atomic_load_relaxed(&other->sleep_check_state) == sleeping) {
if (jl_atomic_cmpswap_relaxed(&other->sleep_check_state, &state, not_sleeping)) {
JL_PROBE_RT_SLEEP_CHECK_WAKE(other, state);
uv_mutex_lock(&other->sleep_lock);
uv_cond_signal(&other->wake_signal);
uv_mutex_unlock(&other->sleep_lock);
jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];

if (jl_atomic_load_relaxed(&ptls2->sleep_check_state) != not_sleeping) {
int8_t state = sleeping;
if (jl_atomic_cmpswap_relaxed(&ptls2->sleep_check_state, &state, not_sleeping)) {
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, 1); // increment in-flight wakeup count
assert(wasrunning); (void)wasrunning;
JL_PROBE_RT_SLEEP_CHECK_WAKE(ptls2, state);
uv_mutex_lock(&ptls2->sleep_lock);
uv_cond_signal(&ptls2->wake_signal);
uv_mutex_unlock(&ptls2->sleep_lock);
return 1;
}
}
Expand All @@ -280,10 +300,14 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) JL_NOTSAFEPOINT
JULIA_DEBUG_SLEEPWAKE( wakeup_enter = cycleclock() );
if (tid == self || tid == -1) {
// we're already awake, but make sure we'll exit uv_run
// and that nrunning is updated if this is now considered in-flight
jl_ptls_t ptls = ct->ptls;
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) == sleeping) {
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping);
JL_PROBE_RT_SLEEP_CHECK_WAKEUP(ptls);
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
if (jl_atomic_exchange_relaxed(&ptls->sleep_check_state, not_sleeping) != not_sleeping) {
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, 1);
assert(wasrunning); (void)wasrunning;
JL_PROBE_RT_SLEEP_CHECK_WAKEUP(ptls);
}
}
if (uvlock == ct)
uv_stop(jl_global_event_loop());
Expand Down Expand Up @@ -360,8 +384,10 @@ void jl_task_wait_empty(void)
// we are back from jl_task_get_next now
ct->world_age = lastage;
wait_empty = NULL;
// TODO: move this lock acquire-release pair to the caller, so that we ensure new work
// (from uv_unref objects) didn't unexpectedly get scheduled and start running behind our back
// TODO: move this lock acquire to before the wait_empty return and the
// unlock to the caller, so that we ensure new work (from uv_unref
// objects) didn't unexpectedly get scheduled and start running behind
// our back during the function return
JL_UV_LOCK();
jl_wait_empty_end();
JL_UV_UNLOCK();
Expand All @@ -378,6 +404,7 @@ static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT
return jl_atomic_load_relaxed(&ptls->sleep_check_state) == sleeping;
}


extern _Atomic(unsigned) _threadedregion;

JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q, jl_value_t *checkempty)
Expand Down Expand Up @@ -405,8 +432,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
jl_fence(); // [^store_buffering_1]
JL_PROBE_RT_SLEEP_CHECK_SLEEP(ptls);
if (!check_empty(checkempty)) { // uses relaxed loads
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_TASKQ_WAKE(ptls);
}
continue;
Expand All @@ -415,23 +441,20 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
if (ptls != ct->ptls) {
// sigh, a yield was detected, so let's go ahead and handle it anyway by starting over
ptls = ct->ptls;
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
}
if (task)
return task;
continue;
}
if (task) {
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
}
return task;
}


// IO is always permitted, but outside a threaded region, only
// thread 0 will process messages.
// Inside a threaded region, any thread can listen for IO messages,
Expand Down Expand Up @@ -485,41 +508,64 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
// right back to sleep on the individual wake signal to let
// them take it from us without conflict.
if (active || !may_sleep(ptls)) {
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_UV_WAKE(ptls);
}
start_cycles = 0;
continue;
}
if (!enter_eventloop && !jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == 0) {
// thread 0 is the only thread permitted to run the event loop
// so it needs to stay alive, just spin-looping if necessary
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
if (set_not_sleeping(ptls)) {
JL_PROBE_RT_SLEEP_CHECK_UV_WAKE(ptls);
}
start_cycles = 0;
continue;
}
}

// any thread which wants us running again will have to observe
// sleep_check_state==sleeping and increment nrunning for us
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, -1);
assert(wasrunning);
if (wasrunning == 1) {
// This was the last running thread, and there is no thread with !may_sleep
// so make sure tid 0 is notified to check wait_empty
// TODO: this also might be a good time to check again that
// libuv's queue is truly empty, instead of during delete_thread
if (ptls->tid != 0) {
uv_mutex_lock(&ptls->sleep_lock);
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
uv_mutex_unlock(&ptls->sleep_lock);
}
}

// the other threads will just wait for an individual wake signal to resume
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_enter = cycleclock() );
int8_t gc_state = jl_gc_safe_enter(ptls);
uv_mutex_lock(&ptls->sleep_lock);
while (may_sleep(ptls)) {
if (ptls->tid == 0 && wait_empty) {
task = wait_empty;
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) {
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
}
task = wait_empty;
if (ptls->tid == 0 && task && jl_atomic_load_relaxed(&nrunning) == 0) {
wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, 1);
assert(!wasrunning);
wasrunning = !set_not_sleeping(ptls);
assert(!wasrunning);
JL_PROBE_RT_SLEEP_CHECK_TASK_WAKE(ptls);
if (!ptls->finalizers_inhibited)
ptls->finalizers_inhibited++; // this annoyingly is rather sticky (we should like to reset it at the end of jl_task_wait_empty)
break;
}
// else should we warn the user of certain deadlock here if tid == 0 && nrunning == 0?
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
}
assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping);
assert(jl_atomic_load_relaxed(&nrunning));
start_cycles = 0;
uv_mutex_unlock(&ptls->sleep_lock);
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_leave = cycleclock() );
jl_gc_safe_leave(ptls, gc_state); // contains jl_gc_safepoint
start_cycles = 0;
if (task) {
assert(task == wait_empty);
wait_empty = NULL;
Expand All @@ -533,6 +579,23 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q,
}
}

void scheduler_delete_thread(jl_ptls_t ptls) JL_NOTSAFEPOINT
{
if (jl_atomic_exchange_relaxed(&ptls->sleep_check_state, sleeping_like_the_dead) != sleeping) {
int wasrunning = jl_atomic_fetch_add_relaxed(&nrunning, -1);
if (wasrunning == 1) {
jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[0];
// This was the last running thread, and there is no thread with !may_sleep
// so make sure tid 0 is notified to check wait_empty
uv_mutex_lock(&ptls2->sleep_lock);
uv_cond_signal(&ptls2->wake_signal);
uv_mutex_unlock(&ptls2->sleep_lock);
}
}
jl_fence();
jl_wakeup_thread(0); // force thread 0 to see that we do not have the IO lock (and am dead)
}

#ifdef __cplusplus
}
#endif
2 changes: 1 addition & 1 deletion src/staticdata.c
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ static int jl_needs_serialization(jl_serializer_state *s, jl_value_t *v) JL_NOTS
else if (jl_typetagis(v, jl_uint8_tag << 4)) {
return 0;
}
else if (jl_typetagis(v, jl_task_tag << 4)) {
else if (v == (jl_value_t*)s->ptls->root_task) {
return 0;
}

Expand Down
Loading

0 comments on commit d952c80

Please sign in to comment.