diff --git a/base/threadingconstructs.jl b/base/threadingconstructs.jl index 5e64af806b31a..cd9abc67979e6 100644 --- a/base/threadingconstructs.jl +++ b/base/threadingconstructs.jl @@ -86,11 +86,11 @@ function _threadsfor(iter, lbody, schedule) end end end - if threadid() != 1 || ccall(:jl_in_threaded_region, Cint, ()) != 0 + if ccall(:jl_in_threaded_region, Cint, ()) != 0 $(if schedule === :static - :(error("`@threads :static` can only be used from thread 1 and not nested")) + :(error("`@threads :static` cannot be used concurrently or nested")) else - # only use threads when called from thread 1, outside @threads + # only use threads when called from outside @threads :(Base.invokelatest(threadsfor_fun, true)) end) else diff --git a/src/jl_uv.c b/src/jl_uv.c index 0f616cdebba36..e803e6263142d 100644 --- a/src/jl_uv.c +++ b/src/jl_uv.c @@ -59,10 +59,10 @@ void JL_UV_LOCK(void) if (jl_mutex_trylock(&jl_uv_mutex)) { } else { - jl_atomic_fetch_add(&jl_uv_n_waiters, 1); + jl_atomic_fetch_add_relaxed(&jl_uv_n_waiters, 1); jl_wake_libuv(); JL_LOCK(&jl_uv_mutex); - jl_atomic_fetch_add(&jl_uv_n_waiters, -1); + jl_atomic_fetch_add_relaxed(&jl_uv_n_waiters, -1); } } @@ -204,7 +204,7 @@ JL_DLLEXPORT int jl_process_events(void) uv_loop_t *loop = jl_io_loop; jl_gc_safepoint_(ct->ptls); if (loop && (jl_atomic_load_relaxed(&_threadedregion) || jl_atomic_load_relaxed(&ct->tid) == 0)) { - if (jl_atomic_load(&jl_uv_n_waiters) == 0 && jl_mutex_trylock(&jl_uv_mutex)) { + if (jl_atomic_load_relaxed(&jl_uv_n_waiters) == 0 && jl_mutex_trylock(&jl_uv_mutex)) { loop->stop_flag = 0; int r = uv_run(loop, UV_RUN_NOWAIT); JL_UV_UNLOCK(); diff --git a/src/julia.h b/src/julia.h index 9b7e93dda7119..5acf070f7a25b 100644 --- a/src/julia.h +++ b/src/julia.h @@ -911,7 +911,7 @@ STATIC_INLINE jl_value_t *jl_svecset( // TODO: while svec is supposedly immutable, in practice we sometimes publish it first // and set the values lazily. Those users should be using jl_atomic_store_release here. jl_svec_data(t)[i] = (jl_value_t*)x; - if (x) jl_gc_wb(t, x); + jl_gc_wb(t, x); return (jl_value_t*)x; } #endif diff --git a/src/partr.c b/src/partr.c index 048a841158153..22c9c15605f21 100644 --- a/src/partr.c +++ b/src/partr.c @@ -31,6 +31,16 @@ static const int16_t sleeping = 1; // invariant: The transition of a thread state to sleeping must be followed by a check that there wasn't work pending for it. // information: Observing thread not-sleeping is sufficient to ensure the target thread will subsequently inspect its local queue. // information: Observing thread is-sleeping says it may be necessary to notify it at least once to wakeup. It may already be awake however for a variety of reasons. +// information: These observations require sequentially-consistent fences to be inserted between each of those operational phases. +// [^store_buffering_1]: These fences are used to avoid the cycle 2b -> 1a -> 1b -> 2a -> 2b where +// * Dequeuer: +// * 1a: `jl_atomic_store_relaxed(&ptls->sleep_check_state, sleeping)` +// * 1b: `multiq_check_empty` returns true +// * Enqueuer: +// * 2a: `multiq_insert` +// * 2b: `jl_atomic_load_relaxed(&ptls->sleep_check_state)` in `jl_wakeup_thread` returns `not_sleeping` +// i.e., the dequeuer misses the enqueue and enqueuer misses the sleep state transition. + JULIA_DEBUG_SLEEPWAKE( uint64_t wakeup_enter; @@ -348,16 +358,20 @@ static int sleep_check_after_threshold(uint64_t *start_cycles) } -static void wake_thread(int16_t tid) +static int wake_thread(int16_t tid) { jl_ptls_t other = jl_all_tls_states[tid]; int8_t state = sleeping; - jl_atomic_cmpswap(&other->sleep_check_state, &state, not_sleeping); - if (state == sleeping) { - uv_mutex_lock(&sleep_locks[tid]); - uv_cond_signal(&wake_signals[tid]); - uv_mutex_unlock(&sleep_locks[tid]); + + if (jl_atomic_load_relaxed(&other->sleep_check_state) == sleeping) { + if (jl_atomic_cmpswap_relaxed(&other->sleep_check_state, &state, not_sleeping)) { + uv_mutex_lock(&sleep_locks[tid]); + uv_cond_signal(&wake_signals[tid]); + uv_mutex_unlock(&sleep_locks[tid]); + return 1; + } } + return 0; } @@ -372,37 +386,48 @@ static void wake_libuv(void) JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) { jl_task_t *ct = jl_current_task; - jl_ptls_t ptls = ct->ptls; - jl_task_t *uvlock = jl_atomic_load(&jl_uv_mutex.owner); int16_t self = jl_atomic_load_relaxed(&ct->tid); + if (tid != self) + jl_fence(); // [^store_buffering_1] + jl_task_t *uvlock = jl_atomic_load_relaxed(&jl_uv_mutex.owner); JULIA_DEBUG_SLEEPWAKE( wakeup_enter = cycleclock() ); if (tid == self || tid == -1) { // we're already awake, but make sure we'll exit uv_run + jl_ptls_t ptls = ct->ptls; if (jl_atomic_load_relaxed(&ptls->sleep_check_state) == sleeping) - jl_atomic_store(&ptls->sleep_check_state, not_sleeping); + jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); if (uvlock == ct) uv_stop(jl_global_event_loop()); } else { // something added to the sticky-queue: notify that thread - wake_thread(tid); - // check if we need to notify uv_run too - jl_task_t *system_tid = jl_atomic_load_relaxed(&jl_all_tls_states[tid]->current_task); - if (uvlock != ct && jl_atomic_load(&jl_uv_mutex.owner) == system_tid) - wake_libuv(); + if (wake_thread(tid)) { + // check if we need to notify uv_run too + jl_fence(); + jl_task_t *tid_task = jl_atomic_load_relaxed(&jl_all_tls_states[tid]->current_task); + // now that we have changed the thread to not-sleeping, ensure that + // either it has not yet acquired the libuv lock, or that it will + // observe the change of state to not_sleeping + if (uvlock != ct && jl_atomic_load_relaxed(&jl_uv_mutex.owner) == tid_task) + wake_libuv(); + } } // check if the other threads might be sleeping if (tid == -1) { // something added to the multi-queue: notify all threads // in the future, we might want to instead wake some fraction of threads, // and let each of those wake additional threads if they find work + int anysleep = 0; for (tid = 0; tid < jl_n_threads; tid++) { if (tid != self) - wake_thread(tid); + anysleep |= wake_thread(tid); } // check if we need to notify uv_run too - if (uvlock != ct && jl_atomic_load(&jl_uv_mutex.owner) != NULL) - wake_libuv(); + if (uvlock != ct && anysleep) { + jl_fence(); + if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) != NULL) + wake_libuv(); + } } JULIA_DEBUG_SLEEPWAKE( wakeup_leave = cycleclock() ); } @@ -426,7 +451,9 @@ static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT { // sleep_check_state is only transitioned from not_sleeping to sleeping // by the thread itself. As a result, if this returns false, it will - // continue returning false. If it returns true, there are no guarantees. + // continue returning false. If it returns true, we know the total + // modification order of the fences. + jl_fence(); // [^store_buffering_1] return jl_atomic_load_relaxed(&ptls->sleep_check_state) == sleeping; } @@ -452,26 +479,45 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q) jl_cpu_pause(); jl_ptls_t ptls = ct->ptls; if (sleep_check_after_threshold(&start_cycles) || (!jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == 0)) { - jl_atomic_store(&ptls->sleep_check_state, sleeping); // acquire sleep-check lock - if (!multiq_check_empty()) { + // acquire sleep-check lock + jl_atomic_store_relaxed(&ptls->sleep_check_state, sleeping); + jl_fence(); // [^store_buffering_1] + if (!multiq_check_empty()) { // uses relaxed loads if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) - jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us + jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us + continue; + } + task = get_next_task(trypoptask, q); // note: this should not yield + if (ptls != ct->ptls) { + // sigh, a yield was detected, so let's go ahead and handle it anyway by starting over + ptls = ct->ptls; + if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) + jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us + if (task) + return task; continue; } - task = get_next_task(trypoptask, q); // WARNING: this should not yield - if (ptls != ct->ptls) - continue; // oops, get_next_task did yield--start over if (task) { if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) - jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us + jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us return task; } - // one thread should win this race and watch the event loop - // inside a threaded region, any thread can listen for IO messages, - // although none are allowed to create new ones - // outside of threaded regions, all IO is permitted, - // but only on thread 1 + + // IO is always permitted, but outside a threaded region, only + // thread 0 will process messages. + // Inside a threaded region, any thread can listen for IO messages, + // and one thread should win this race and watch the event loop, + // but we bias away from idle threads getting parked here. + // + // The reason this works is somewhat convoluted, and closely tied to [^store_buffering_1]: + // - After decrementing _threadedregion, the thread is required to + // call jl_wakeup_thread(0), that will kick out any thread who is + // already there, and then eventually thread 0 will get here. + // - Inside a _threadedregion, there must exist at least one + // thread that has a happens-before relationship on the libuv lock + // before reaching this decision point in the code who will see + // the lock as unlocked and thus must win this race here. int uvlock = 0; if (jl_atomic_load_relaxed(&_threadedregion)) { uvlock = jl_mutex_trylock(&jl_uv_mutex); @@ -482,50 +528,40 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q) } if (uvlock) { int active = 1; - if (jl_atomic_load(&jl_uv_n_waiters) != 0) { - // but if we won the race against someone who actually needs - // the lock to do real work, we need to let them have it instead - JL_UV_UNLOCK(); - } - else { - // otherwise, we may block until someone asks us for the lock - uv_loop_t *loop = jl_global_event_loop(); + // otherwise, we block until someone asks us for the lock + uv_loop_t *loop = jl_global_event_loop(); + while (active && may_sleep(ptls)) { + if (jl_atomic_load_relaxed(&jl_uv_n_waiters) != 0) + // but if we won the race against someone who actually needs + // the lock to do real work, we need to let them have it instead + break; + loop->stop_flag = 0; + JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_enter = cycleclock() ); + active = uv_run(loop, UV_RUN_ONCE); + JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_leave = cycleclock() ); jl_gc_safepoint(); - if (may_sleep(ptls)) { - loop->stop_flag = 0; - JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_enter = cycleclock() ); - active = uv_run(loop, UV_RUN_ONCE); - JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_leave = cycleclock() ); - } - JL_UV_UNLOCK(); - // optimization: check again first if we may have work to do - if (!may_sleep(ptls)) { - assert(jl_atomic_load_relaxed(&ptls->sleep_check_state) == not_sleeping); - start_cycles = 0; - continue; - } - // otherwise, we got a spurious wakeup since some other - // thread that just wanted to steal libuv from us, - // just go right back to sleep on the other wake signal - // to let them take it from us without conflict - // TODO: this relinquishes responsibility for all event - // to the last thread to do an explicit operation, - // which may starve other threads of critical work - if (jl_atomic_load(&jl_uv_n_waiters) == 0) { - continue; - } + } + JL_UV_UNLOCK(); + // optimization: check again first if we may have work to do. + // Otherwise we got a spurious wakeup since some other thread + // that just wanted to steal libuv from us. We will just go + // right back to sleep on the individual wake signal to let + // them take it from us without conflict. + if (!may_sleep(ptls)) { + start_cycles = 0; + continue; } if (!jl_atomic_load_relaxed(&_threadedregion) && active && ptls->tid == 0) { // thread 0 is the only thread permitted to run the event loop - // so it needs to stay alive + // so it needs to stay alive, just spin-looping if necessary if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) - jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us + jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us start_cycles = 0; continue; } } - // the other threads will just wait for on signal to resume + // the other threads will just wait for an individual wake signal to resume JULIA_DEBUG_SLEEPWAKE( ptls->sleep_enter = cycleclock() ); int8_t gc_state = jl_gc_safe_enter(ptls); uv_mutex_lock(&sleep_locks[ptls->tid]); diff --git a/src/signals-unix.c b/src/signals-unix.c index 9142da0c03ada..67af5c0b1358d 100644 --- a/src/signals-unix.c +++ b/src/signals-unix.c @@ -442,17 +442,18 @@ static void jl_exit_thread0(int state, jl_bt_element_t *bt_data, size_t bt_size) if (thread0_exit_count <= 1) { unw_context_t *signal_context; jl_thread_suspend_and_get_state(0, &signal_context); - thread0_exit_state = state; - ptls2->bt_size = bt_size; // <= JL_MAX_BT_SIZE - memcpy(ptls2->bt_data, bt_data, ptls2->bt_size * sizeof(bt_data[0])); - jl_thread_resume(0, -1); - } - else { - thread0_exit_state = state; - jl_atomic_store_release(&ptls2->signal_request, 3); - // This also makes sure `sleep` is aborted. - pthread_kill(ptls2->system_id, SIGUSR2); + if (signal_context != NULL) { + thread0_exit_state = state; + ptls2->bt_size = bt_size; // <= JL_MAX_BT_SIZE + memcpy(ptls2->bt_data, bt_data, ptls2->bt_size * sizeof(bt_data[0])); + jl_thread_resume(0, -1); + return; + } } + thread0_exit_state = state; + jl_atomic_store_release(&ptls2->signal_request, 3); + // This also makes sure `sleep` is aborted. + pthread_kill(ptls2->system_id, SIGUSR2); } // request: diff --git a/src/simplevector.c b/src/simplevector.c index fa21330b23ab4..988cf18ccc9b6 100644 --- a/src/simplevector.c +++ b/src/simplevector.c @@ -13,7 +13,7 @@ JL_DLLEXPORT jl_svec_t *(ijl_svec)(size_t n, ...) if (n == 0) return jl_emptysvec; va_start(args, n); jl_svec_t *jv = jl_alloc_svec_uninit(n); - for(size_t i=0; i < n; i++) + for (size_t i = 0; i < n; i++) jl_svecset(jv, i, va_arg(args, jl_value_t*)); va_end(args); return jv; @@ -38,7 +38,7 @@ JL_DLLEXPORT jl_svec_t *jl_svec1(void *a) jl_svec_t *v = (jl_svec_t*)jl_gc_alloc(ct->ptls, sizeof(void*) * 2, jl_simplevector_type); jl_svec_set_len_unsafe(v, 1); - jl_svecset(v, 0, a); + jl_svec_data(v)[0] = (jl_value_t*)a; return v; } @@ -48,8 +48,8 @@ JL_DLLEXPORT jl_svec_t *jl_svec2(void *a, void *b) jl_svec_t *v = (jl_svec_t*)jl_gc_alloc(ct->ptls, sizeof(void*) * 3, jl_simplevector_type); jl_svec_set_len_unsafe(v, 2); - jl_svecset(v, 0, a); - jl_svecset(v, 1, b); + jl_svec_data(v)[0] = (jl_value_t*)a; + jl_svec_data(v)[1] = (jl_value_t*)b; return v; } @@ -67,26 +67,24 @@ JL_DLLEXPORT jl_svec_t *jl_alloc_svec(size_t n) { if (n == 0) return jl_emptysvec; jl_svec_t *jv = jl_alloc_svec_uninit(n); - for(size_t i=0; i < n; i++) - jl_svecset(jv, i, NULL); + memset(jl_assume_aligned(jl_svec_data(jv), sizeof(void*)), 0, n * sizeof(void*)); return jv; } JL_DLLEXPORT jl_svec_t *jl_svec_copy(jl_svec_t *a) { - size_t i, n=jl_svec_len(a); + size_t n = jl_svec_len(a); jl_svec_t *c = jl_alloc_svec_uninit(n); - for(i=0; i < n; i++) - jl_svecset(c, i, jl_svecref(a,i)); + memmove_refs((void**)jl_svec_data(c), (void**)jl_svec_data(a), n); return c; } JL_DLLEXPORT jl_svec_t *jl_svec_fill(size_t n, jl_value_t *x) { - if (n==0) return jl_emptysvec; + if (n == 0) return jl_emptysvec; jl_svec_t *v = jl_alloc_svec_uninit(n); - for(size_t i=0; i < n; i++) - jl_svecset(v, i, x); + for (size_t i = 0; i < n; i++) + jl_svec_data(v)[i] = x; return v; } diff --git a/src/threading.c b/src/threading.c index f10612016ef8a..2f50783dafaf0 100644 --- a/src/threading.c +++ b/src/threading.c @@ -531,8 +531,7 @@ _Atomic(unsigned) _threadedregion; // HACK: keep track of whether to prioritize JL_DLLEXPORT int jl_in_threaded_region(void) { - return jl_atomic_load_relaxed(&jl_current_task->tid) != 0 || - jl_atomic_load_relaxed(&_threadedregion) != 0; + return jl_atomic_load_relaxed(&_threadedregion) != 0; } JL_DLLEXPORT void jl_enter_threaded_region(void) @@ -542,12 +541,15 @@ JL_DLLEXPORT void jl_enter_threaded_region(void) JL_DLLEXPORT void jl_exit_threaded_region(void) { - jl_atomic_fetch_add(&_threadedregion, -1); - jl_wake_libuv(); - // make sure no more callbacks will run while user code continues - // outside thread region and might touch an I/O object. - JL_UV_LOCK(); - JL_UV_UNLOCK(); + if (jl_atomic_fetch_add(&_threadedregion, -1) == 1) { + // make sure no more callbacks will run while user code continues + // outside thread region and might touch an I/O object. + JL_UV_LOCK(); + JL_UV_UNLOCK(); + // make sure thread 0 is not using the sleep_lock + // so that it may enter the libuv event loop instead + jl_wakeup_thread(0); + } }