Skip to content

Commit

Permalink
Merge pull request #43418 from JuliaLang/improve-threads
Browse files Browse the repository at this point in the history
Improve thread scheduler memory ordering
  • Loading branch information
tkf authored Jan 27, 2022
2 parents fcd34ae + 0eaf35a commit 05e0cb9
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 101 deletions.
6 changes: 3 additions & 3 deletions base/threadingconstructs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ function _threadsfor(iter, lbody, schedule)
end
end
end
if threadid() != 1 || ccall(:jl_in_threaded_region, Cint, ()) != 0
if ccall(:jl_in_threaded_region, Cint, ()) != 0
$(if schedule === :static
:(error("`@threads :static` can only be used from thread 1 and not nested"))
:(error("`@threads :static` cannot be used concurrently or nested"))
else
# only use threads when called from thread 1, outside @threads
# only use threads when called from outside @threads
:(Base.invokelatest(threadsfor_fun, true))
end)
else
Expand Down
6 changes: 3 additions & 3 deletions src/jl_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ void JL_UV_LOCK(void)
if (jl_mutex_trylock(&jl_uv_mutex)) {
}
else {
jl_atomic_fetch_add(&jl_uv_n_waiters, 1);
jl_atomic_fetch_add_relaxed(&jl_uv_n_waiters, 1);
jl_wake_libuv();
JL_LOCK(&jl_uv_mutex);
jl_atomic_fetch_add(&jl_uv_n_waiters, -1);
jl_atomic_fetch_add_relaxed(&jl_uv_n_waiters, -1);
}
}

Expand Down Expand Up @@ -204,7 +204,7 @@ JL_DLLEXPORT int jl_process_events(void)
uv_loop_t *loop = jl_io_loop;
jl_gc_safepoint_(ct->ptls);
if (loop && (jl_atomic_load_relaxed(&_threadedregion) || jl_atomic_load_relaxed(&ct->tid) == 0)) {
if (jl_atomic_load(&jl_uv_n_waiters) == 0 && jl_mutex_trylock(&jl_uv_mutex)) {
if (jl_atomic_load_relaxed(&jl_uv_n_waiters) == 0 && jl_mutex_trylock(&jl_uv_mutex)) {
loop->stop_flag = 0;
int r = uv_run(loop, UV_RUN_NOWAIT);
JL_UV_UNLOCK();
Expand Down
2 changes: 1 addition & 1 deletion src/julia.h
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,7 @@ STATIC_INLINE jl_value_t *jl_svecset(
// TODO: while svec is supposedly immutable, in practice we sometimes publish it first
// and set the values lazily. Those users should be using jl_atomic_store_release here.
jl_svec_data(t)[i] = (jl_value_t*)x;
if (x) jl_gc_wb(t, x);
jl_gc_wb(t, x);
return (jl_value_t*)x;
}
#endif
Expand Down
164 changes: 100 additions & 64 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ static const int16_t sleeping = 1;
// invariant: The transition of a thread state to sleeping must be followed by a check that there wasn't work pending for it.
// information: Observing thread not-sleeping is sufficient to ensure the target thread will subsequently inspect its local queue.
// information: Observing thread is-sleeping says it may be necessary to notify it at least once to wakeup. It may already be awake however for a variety of reasons.
// information: These observations require sequentially-consistent fences to be inserted between each of those operational phases.
// [^store_buffering_1]: These fences are used to avoid the cycle 2b -> 1a -> 1b -> 2a -> 2b where
// * Dequeuer:
// * 1a: `jl_atomic_store_relaxed(&ptls->sleep_check_state, sleeping)`
// * 1b: `multiq_check_empty` returns true
// * Enqueuer:
// * 2a: `multiq_insert`
// * 2b: `jl_atomic_load_relaxed(&ptls->sleep_check_state)` in `jl_wakeup_thread` returns `not_sleeping`
// i.e., the dequeuer misses the enqueue and enqueuer misses the sleep state transition.


JULIA_DEBUG_SLEEPWAKE(
uint64_t wakeup_enter;
Expand Down Expand Up @@ -348,16 +358,20 @@ static int sleep_check_after_threshold(uint64_t *start_cycles)
}


static void wake_thread(int16_t tid)
static int wake_thread(int16_t tid)
{
jl_ptls_t other = jl_all_tls_states[tid];
int8_t state = sleeping;
jl_atomic_cmpswap(&other->sleep_check_state, &state, not_sleeping);
if (state == sleeping) {
uv_mutex_lock(&sleep_locks[tid]);
uv_cond_signal(&wake_signals[tid]);
uv_mutex_unlock(&sleep_locks[tid]);

if (jl_atomic_load_relaxed(&other->sleep_check_state) == sleeping) {
if (jl_atomic_cmpswap_relaxed(&other->sleep_check_state, &state, not_sleeping)) {
uv_mutex_lock(&sleep_locks[tid]);
uv_cond_signal(&wake_signals[tid]);
uv_mutex_unlock(&sleep_locks[tid]);
return 1;
}
}
return 0;
}


Expand All @@ -372,37 +386,48 @@ static void wake_libuv(void)
JL_DLLEXPORT void jl_wakeup_thread(int16_t tid)
{
jl_task_t *ct = jl_current_task;
jl_ptls_t ptls = ct->ptls;
jl_task_t *uvlock = jl_atomic_load(&jl_uv_mutex.owner);
int16_t self = jl_atomic_load_relaxed(&ct->tid);
if (tid != self)
jl_fence(); // [^store_buffering_1]
jl_task_t *uvlock = jl_atomic_load_relaxed(&jl_uv_mutex.owner);
JULIA_DEBUG_SLEEPWAKE( wakeup_enter = cycleclock() );
if (tid == self || tid == -1) {
// we're already awake, but make sure we'll exit uv_run
jl_ptls_t ptls = ct->ptls;
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) == sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping);
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping);
if (uvlock == ct)
uv_stop(jl_global_event_loop());
}
else {
// something added to the sticky-queue: notify that thread
wake_thread(tid);
// check if we need to notify uv_run too
jl_task_t *system_tid = jl_atomic_load_relaxed(&jl_all_tls_states[tid]->current_task);
if (uvlock != ct && jl_atomic_load(&jl_uv_mutex.owner) == system_tid)
wake_libuv();
if (wake_thread(tid)) {
// check if we need to notify uv_run too
jl_fence();
jl_task_t *tid_task = jl_atomic_load_relaxed(&jl_all_tls_states[tid]->current_task);
// now that we have changed the thread to not-sleeping, ensure that
// either it has not yet acquired the libuv lock, or that it will
// observe the change of state to not_sleeping
if (uvlock != ct && jl_atomic_load_relaxed(&jl_uv_mutex.owner) == tid_task)
wake_libuv();
}
}
// check if the other threads might be sleeping
if (tid == -1) {
// something added to the multi-queue: notify all threads
// in the future, we might want to instead wake some fraction of threads,
// and let each of those wake additional threads if they find work
int anysleep = 0;
for (tid = 0; tid < jl_n_threads; tid++) {
if (tid != self)
wake_thread(tid);
anysleep |= wake_thread(tid);
}
// check if we need to notify uv_run too
if (uvlock != ct && jl_atomic_load(&jl_uv_mutex.owner) != NULL)
wake_libuv();
if (uvlock != ct && anysleep) {
jl_fence();
if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) != NULL)
wake_libuv();
}
}
JULIA_DEBUG_SLEEPWAKE( wakeup_leave = cycleclock() );
}
Expand All @@ -426,7 +451,9 @@ static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT
{
// sleep_check_state is only transitioned from not_sleeping to sleeping
// by the thread itself. As a result, if this returns false, it will
// continue returning false. If it returns true, there are no guarantees.
// continue returning false. If it returns true, we know the total
// modification order of the fences.
jl_fence(); // [^store_buffering_1]
return jl_atomic_load_relaxed(&ptls->sleep_check_state) == sleeping;
}

Expand All @@ -452,26 +479,45 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q)
jl_cpu_pause();
jl_ptls_t ptls = ct->ptls;
if (sleep_check_after_threshold(&start_cycles) || (!jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == 0)) {
jl_atomic_store(&ptls->sleep_check_state, sleeping); // acquire sleep-check lock
if (!multiq_check_empty()) {
// acquire sleep-check lock
jl_atomic_store_relaxed(&ptls->sleep_check_state, sleeping);
jl_fence(); // [^store_buffering_1]
if (!multiq_check_empty()) { // uses relaxed loads
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
continue;
}
task = get_next_task(trypoptask, q); // note: this should not yield
if (ptls != ct->ptls) {
// sigh, a yield was detected, so let's go ahead and handle it anyway by starting over
ptls = ct->ptls;
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping)
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
if (task)
return task;
continue;
}
task = get_next_task(trypoptask, q); // WARNING: this should not yield
if (ptls != ct->ptls)
continue; // oops, get_next_task did yield--start over
if (task) {
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
return task;
}

// one thread should win this race and watch the event loop
// inside a threaded region, any thread can listen for IO messages,
// although none are allowed to create new ones
// outside of threaded regions, all IO is permitted,
// but only on thread 1

// IO is always permitted, but outside a threaded region, only
// thread 0 will process messages.
// Inside a threaded region, any thread can listen for IO messages,
// and one thread should win this race and watch the event loop,
// but we bias away from idle threads getting parked here.
//
// The reason this works is somewhat convoluted, and closely tied to [^store_buffering_1]:
// - After decrementing _threadedregion, the thread is required to
// call jl_wakeup_thread(0), that will kick out any thread who is
// already there, and then eventually thread 0 will get here.
// - Inside a _threadedregion, there must exist at least one
// thread that has a happens-before relationship on the libuv lock
// before reaching this decision point in the code who will see
// the lock as unlocked and thus must win this race here.
int uvlock = 0;
if (jl_atomic_load_relaxed(&_threadedregion)) {
uvlock = jl_mutex_trylock(&jl_uv_mutex);
Expand All @@ -482,50 +528,40 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q)
}
if (uvlock) {
int active = 1;
if (jl_atomic_load(&jl_uv_n_waiters) != 0) {
// but if we won the race against someone who actually needs
// the lock to do real work, we need to let them have it instead
JL_UV_UNLOCK();
}
else {
// otherwise, we may block until someone asks us for the lock
uv_loop_t *loop = jl_global_event_loop();
// otherwise, we block until someone asks us for the lock
uv_loop_t *loop = jl_global_event_loop();
while (active && may_sleep(ptls)) {
if (jl_atomic_load_relaxed(&jl_uv_n_waiters) != 0)
// but if we won the race against someone who actually needs
// the lock to do real work, we need to let them have it instead
break;
loop->stop_flag = 0;
JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_enter = cycleclock() );
active = uv_run(loop, UV_RUN_ONCE);
JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_leave = cycleclock() );
jl_gc_safepoint();
if (may_sleep(ptls)) {
loop->stop_flag = 0;
JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_enter = cycleclock() );
active = uv_run(loop, UV_RUN_ONCE);
JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_leave = cycleclock() );
}
JL_UV_UNLOCK();
// optimization: check again first if we may have work to do
if (!may_sleep(ptls)) {
assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping);
start_cycles = 0;
continue;
}
// otherwise, we got a spurious wakeup since some other
// thread that just wanted to steal libuv from us,
// just go right back to sleep on the other wake signal
// to let them take it from us without conflict
// TODO: this relinquishes responsibility for all event
// to the last thread to do an explicit operation,
// which may starve other threads of critical work
if (jl_atomic_load(&jl_uv_n_waiters) == 0) {
continue;
}
}
JL_UV_UNLOCK();
// optimization: check again first if we may have work to do.
// Otherwise we got a spurious wakeup since some other thread
// that just wanted to steal libuv from us. We will just go
// right back to sleep on the individual wake signal to let
// them take it from us without conflict.
if (!may_sleep(ptls)) {
start_cycles = 0;
continue;
}
if (!jl_atomic_load_relaxed(&_threadedregion) && active && ptls->tid == 0) {
// thread 0 is the only thread permitted to run the event loop
// so it needs to stay alive
// so it needs to stay alive, just spin-looping if necessary
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
start_cycles = 0;
continue;
}
}

// the other threads will just wait for on signal to resume
// the other threads will just wait for an individual wake signal to resume
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_enter = cycleclock() );
int8_t gc_state = jl_gc_safe_enter(ptls);
uv_mutex_lock(&sleep_locks[ptls->tid]);
Expand Down
21 changes: 11 additions & 10 deletions src/signals-unix.c
Original file line number Diff line number Diff line change
Expand Up @@ -442,17 +442,18 @@ static void jl_exit_thread0(int state, jl_bt_element_t *bt_data, size_t bt_size)
if (thread0_exit_count <= 1) {
unw_context_t *signal_context;
jl_thread_suspend_and_get_state(0, &signal_context);
thread0_exit_state = state;
ptls2->bt_size = bt_size; // <= JL_MAX_BT_SIZE
memcpy(ptls2->bt_data, bt_data, ptls2->bt_size * sizeof(bt_data[0]));
jl_thread_resume(0, -1);
}
else {
thread0_exit_state = state;
jl_atomic_store_release(&ptls2->signal_request, 3);
// This also makes sure `sleep` is aborted.
pthread_kill(ptls2->system_id, SIGUSR2);
if (signal_context != NULL) {
thread0_exit_state = state;
ptls2->bt_size = bt_size; // <= JL_MAX_BT_SIZE
memcpy(ptls2->bt_data, bt_data, ptls2->bt_size * sizeof(bt_data[0]));
jl_thread_resume(0, -1);
return;
}
}
thread0_exit_state = state;
jl_atomic_store_release(&ptls2->signal_request, 3);
// This also makes sure `sleep` is aborted.
pthread_kill(ptls2->system_id, SIGUSR2);
}

// request:
Expand Down
22 changes: 10 additions & 12 deletions src/simplevector.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ JL_DLLEXPORT jl_svec_t *(ijl_svec)(size_t n, ...)
if (n == 0) return jl_emptysvec;
va_start(args, n);
jl_svec_t *jv = jl_alloc_svec_uninit(n);
for(size_t i=0; i < n; i++)
for (size_t i = 0; i < n; i++)
jl_svecset(jv, i, va_arg(args, jl_value_t*));
va_end(args);
return jv;
Expand All @@ -38,7 +38,7 @@ JL_DLLEXPORT jl_svec_t *jl_svec1(void *a)
jl_svec_t *v = (jl_svec_t*)jl_gc_alloc(ct->ptls, sizeof(void*) * 2,
jl_simplevector_type);
jl_svec_set_len_unsafe(v, 1);
jl_svecset(v, 0, a);
jl_svec_data(v)[0] = (jl_value_t*)a;
return v;
}

Expand All @@ -48,8 +48,8 @@ JL_DLLEXPORT jl_svec_t *jl_svec2(void *a, void *b)
jl_svec_t *v = (jl_svec_t*)jl_gc_alloc(ct->ptls, sizeof(void*) * 3,
jl_simplevector_type);
jl_svec_set_len_unsafe(v, 2);
jl_svecset(v, 0, a);
jl_svecset(v, 1, b);
jl_svec_data(v)[0] = (jl_value_t*)a;
jl_svec_data(v)[1] = (jl_value_t*)b;
return v;
}

Expand All @@ -67,26 +67,24 @@ JL_DLLEXPORT jl_svec_t *jl_alloc_svec(size_t n)
{
if (n == 0) return jl_emptysvec;
jl_svec_t *jv = jl_alloc_svec_uninit(n);
for(size_t i=0; i < n; i++)
jl_svecset(jv, i, NULL);
memset(jl_assume_aligned(jl_svec_data(jv), sizeof(void*)), 0, n * sizeof(void*));
return jv;
}

JL_DLLEXPORT jl_svec_t *jl_svec_copy(jl_svec_t *a)
{
size_t i, n=jl_svec_len(a);
size_t n = jl_svec_len(a);
jl_svec_t *c = jl_alloc_svec_uninit(n);
for(i=0; i < n; i++)
jl_svecset(c, i, jl_svecref(a,i));
memmove_refs((void**)jl_svec_data(c), (void**)jl_svec_data(a), n);
return c;
}

JL_DLLEXPORT jl_svec_t *jl_svec_fill(size_t n, jl_value_t *x)
{
if (n==0) return jl_emptysvec;
if (n == 0) return jl_emptysvec;
jl_svec_t *v = jl_alloc_svec_uninit(n);
for(size_t i=0; i < n; i++)
jl_svecset(v, i, x);
for (size_t i = 0; i < n; i++)
jl_svec_data(v)[i] = x;
return v;
}

Expand Down
Loading

0 comments on commit 05e0cb9

Please sign in to comment.