From fc80968ba54dedc2d486c74a7c39114081465497 Mon Sep 17 00:00:00 2001 From: dipinhora Date: Wed, 10 Jan 2018 17:05:32 -0500 Subject: [PATCH] Fix and re-enable dynamic scheduler scaling Change dynamic scheduler scaling implementation in order to resolve the hangs encountered in #2451. The previous implementation assumed that signalling to wake a thread was a reliable operation. Apparently, that's not necessarily true (see https://en.wikipedia.org/wiki/Spurious_wakeup and https://askldjd.com/2010/04/24/the-lost-wakeup-problem/). Seeing as we couldn't find any other explanation for why the previous implementation was experiencing hangs, I've assumed it is either because of lost wake ups or spurious wake ups and redesigned the logic accordingly. Now, when a thread is about to suspend, it will decrement the `active_scheduler_count` and then suspend. When it wakes up, it will check to see if the `active_scheduler_count` is at least as big as its `index`. If the `active_scheduler_count` isn't big enough, the thread will suspend itself again immediately. If it is big enough, it will resume. Threads no longer modify `active_scheduler_count` when they wake up. `active_scheduler_count` must now be modified by the thread that is waking up another thread prior to sending the wake up notification. Additionally, since we're now assuming that wake up signals can be lost, we now send multiple wake up notifications just in case. While this is somewhat wasteful, it is better than being in a situation where some threads aren't woken up at all (i.e. a hang). Additionally, only use `scheduler_count_changing` for `signals` implementation of dynamic scheduler scaling. `pthreads` implementation now uses a mutex (`sched_mut`) in its place. We also now change logic to only unlock mutex in `pthreads` implementation once threads have been woken to avoid potential lost wake ups. This isn't an issue for the `signals` implementation and the unlocking of `scheduler_count_changing` can remain where it is prior to threads being woken up. This commit also splits out scheduler block/unblock message handling logic into their own functions (this is so that sched 0 can call those functions directly instead of sending messages to itself). This commit also includes a change inspired by #2474. Now, *all* scheduler threads can suspend as long as there is at least one noisy actor registered with the ASIO subsystem. If there are no noisy actors registered with the ASIO subsystem then scheduler 0 is not allowed to suspend itself because it is reponsible for quiescence detection. Lastly, this commit adds logic to allow a scheduler thread to suspend even if it has already sent a scheduler block message so that we can now suspend scheduler threads in most scenarios. --- src/common/threads.h | 4 + src/libponyc/options/options.c | 2 +- src/libponyrt/asio/event.c | 4 + src/libponyrt/platform/threads.c | 26 +- src/libponyrt/sched/scheduler.c | 467 +++++++++++++++++++++++-------- src/libponyrt/sched/scheduler.h | 6 +- 6 files changed, 364 insertions(+), 145 deletions(-) diff --git a/src/common/threads.h b/src/common/threads.h index 06cb87c320..4d776ce3c0 100644 --- a/src/common/threads.h +++ b/src/common/threads.h @@ -57,7 +57,11 @@ void ponyint_thread_detach(pony_thread_id_t thread); pony_thread_id_t ponyint_thread_self(); +#if defined(USE_SCHEDULER_SCALING_PTHREADS) +void ponyint_thread_suspend(pony_signal_event_t signal, pthread_mutex_t* mut); +#else void ponyint_thread_suspend(pony_signal_event_t signal); +#endif int ponyint_thread_wake(pony_thread_id_t thread, pony_signal_event_t signal); diff --git a/src/libponyc/options/options.c b/src/libponyc/options/options.c index 68516c7846..4961f21519 100644 --- a/src/libponyc/options/options.c +++ b/src/libponyc/options/options.c @@ -209,7 +209,7 @@ static void usage(void) " --ponythreads Use N scheduler threads. Defaults to the number of\n" " cores (not hyperthreads) available.\n" " --ponyminthreads Minimum number of active scheduler threads allowed.\n" - " Defaults to the number of '--ponythreads'.\n" + " Defaults to 0.\n" " --ponycdmin Defer cycle detection until 2^N actors have blocked.\n" " Defaults to 2^4.\n" " --ponycdmax Always cycle detect when 2^N actors have blocked.\n" diff --git a/src/libponyrt/asio/event.c b/src/libponyrt/asio/event.c index 4c9aba8954..57625e09ca 100644 --- a/src/libponyrt/asio/event.c +++ b/src/libponyrt/asio/event.c @@ -2,6 +2,7 @@ #include "asio.h" #include "../actor/actor.h" #include "../mem/pool.h" +#include "../sched/scheduler.h" #include "ponyassert.h" #include @@ -122,4 +123,7 @@ PONY_API void pony_asio_event_send(asio_event_t* ev, uint32_t flags, // sender they aren't covered by backpressure. We pass false for an early // bailout in the backpressure code. pony_sendv(pony_ctx(), ev->owner, &m->msg, &m->msg, false); + + // maybe wake up a scheduler thread if they've all fallen asleep + ponyint_sched_maybe_wakeup_if_all_asleep(-1); } diff --git a/src/libponyrt/platform/threads.c b/src/libponyrt/platform/threads.c index d9cbe63b7e..b3f75570b5 100644 --- a/src/libponyrt/platform/threads.c +++ b/src/libponyrt/platform/threads.c @@ -18,17 +18,6 @@ typedef cpuset_t cpu_set_t; #endif -#if defined(USE_SCHEDULER_SCALING_PTHREADS) -static pthread_mutex_t sleep_mut; - -static pthread_once_t sleep_mut_once = PTHREAD_ONCE_INIT; - -void sleep_mut_init() -{ - pthread_mutex_init(&sleep_mut, NULL); -} -#endif - #if defined(PLATFORM_IS_LINUX) #include @@ -213,10 +202,6 @@ bool ponyint_thread_create(pony_thread_id_t* thread, thread_fn start, return false; #endif -#if !defined(PLATFORM_IS_WINDOWS) && defined(USE_SCHEDULER_SCALING_PTHREADS) - pthread_once(&sleep_mut_once, sleep_mut_init); -#endif - return true; } @@ -249,22 +234,21 @@ pony_thread_id_t ponyint_thread_self() #endif } +#if defined(USE_SCHEDULER_SCALING_PTHREADS) +void ponyint_thread_suspend(pony_signal_event_t signal, pthread_mutex_t* mut) +#else void ponyint_thread_suspend(pony_signal_event_t signal) +#endif { #ifdef PLATFORM_IS_WINDOWS WaitForSingleObject(signal, INFINITE); #elif defined(USE_SCHEDULER_SCALING_PTHREADS) int ret; - // lock mutex - ret = pthread_mutex_lock(&sleep_mut); // wait for condition variable (will sleep and release mutex) - ret = pthread_cond_wait(signal, &sleep_mut); + ret = pthread_cond_wait(signal, mut); // TODO: What to do if `ret` is an unrecoverable error? (void) ret; - - // unlock mutex - ret = pthread_mutex_unlock(&sleep_mut); #else int sig; sigset_t sigmask; diff --git a/src/libponyrt/sched/scheduler.c b/src/libponyrt/sched/scheduler.c index 00db326747..e21c281355 100644 --- a/src/libponyrt/sched/scheduler.c +++ b/src/libponyrt/sched/scheduler.c @@ -34,13 +34,25 @@ static uint32_t asio_cpu; static uint32_t scheduler_count; static uint32_t min_scheduler_count; static PONY_ATOMIC(uint32_t) active_scheduler_count; -static PONY_ATOMIC(bool) scheduler_count_changing; static scheduler_t* scheduler; static PONY_ATOMIC(bool) detect_quiescence; static bool use_yield; static mpmcq_t inject; static __pony_thread_local scheduler_t* this_scheduler; +#if defined(USE_SCHEDULER_SCALING_PTHREADS) +static pthread_mutex_t sched_mut; + +static pthread_once_t sched_mut_once = PTHREAD_ONCE_INIT; + +void sched_mut_init() +{ + pthread_mutex_init(&sched_mut, NULL); +} +#else +static PONY_ATOMIC(bool) scheduler_count_changing; +#endif + /** * Gets the current active scheduler count */ @@ -106,42 +118,116 @@ static void send_msg_all_active(uint32_t from, sched_msg_t msg, intptr_t arg) static void send_msg_all(uint32_t from, sched_msg_t msg, intptr_t arg) { - send_msg(from, 0, msg, arg); - - for(uint32_t i = 1; i < scheduler_count; i++) + for(uint32_t i = 0; i < scheduler_count; i++) send_msg(from, i, msg, arg); } -static void wake_suspended_threads() +static void wake_suspended_threads(int32_t current_scheduler_id) { + uint32_t current_active_scheduler_count = get_active_scheduler_count(); + // wake up any sleeping threads - while (get_active_scheduler_count() < scheduler_count) + while ((current_active_scheduler_count = get_active_scheduler_count()) < scheduler_count) { +#if defined(USE_SCHEDULER_SCALING_PTHREADS) + // acquire mutex if using pthreads + if(!pthread_mutex_lock(&sched_mut)) +#else + // get the bool that controls modifying the active scheduler count variable + // if using signals if(!atomic_exchange_explicit(&scheduler_count_changing, true, memory_order_acquire)) +#endif { // in case the count changed between the while check and now - if(get_active_scheduler_count() < scheduler_count) + current_active_scheduler_count = get_active_scheduler_count(); + + if(current_active_scheduler_count < scheduler_count) { - // send signal to wake up next scheduler thread available - if(ponyint_thread_wake(scheduler[get_active_scheduler_count()].tid, - scheduler[get_active_scheduler_count()].sleep_object)) - // if there was an error waking the thread - // unlock the bool that controls modifying the active scheduler count - // variable. - atomic_store_explicit(&scheduler_count_changing, false, - memory_order_release); + // set active_scheduler_count to wake all schedulers + current_active_scheduler_count = scheduler_count; + atomic_store_explicit(&active_scheduler_count, current_active_scheduler_count, + memory_order_relaxed); + } + +#if !defined(USE_SCHEDULER_SCALING_PTHREADS) + // unlock the bool that controls modifying the active scheduler count + // variable if using signals. + atomic_store_explicit(&scheduler_count_changing, false, + memory_order_release); +#endif + + // send multiple signals to wake up all schedulers that should be awake + // this is somewhat wasteful, but, it's better than risking some + // schedulers never waking up. If a scheduler is already awake, + // the signal is disregarded + for(int j = 0; j < 3; j++) + { + for(uint32_t i = 0; i < current_active_scheduler_count; i++) + { + if((int32_t)i != current_scheduler_id) + { + ponyint_thread_wake(scheduler[i].tid, scheduler[i].sleep_object); + } + } } - else - // if there are no scheduler threads left to unlock - // unlock the bool that controls modifying the active scheduler count - // variable. - atomic_store_explicit(&scheduler_count_changing, false, - memory_order_release); + +#if defined(USE_SCHEDULER_SCALING_PTHREADS) + // unlock mutex if using pthreads + pthread_mutex_unlock(&sched_mut); +#endif } } } +// start cnf/ack cycle for quiescence if block count == active_scheduler_count +static void maybe_start_cnf_ack_cycle(scheduler_t* sched) +{ + if(atomic_load_explicit(&detect_quiescence, memory_order_relaxed) && + (sched->block_count == get_active_scheduler_count())) + { + // reset ack token count to 0 because dynamic scheduler scaling means + // that a new thread can wake up changing active_scheduler_count and + // then block causing block_count == active_scheduler_count for a + // second time and if we don't reset, we can think we've received + // enough acks when we really haven't + sched->ack_token++; + sched->ack_count = 0; + + // If we think all threads are blocked, send CNF(token) to everyone. + send_msg_all_active(sched->index, SCHED_CNF, sched->ack_token); + } +} + +// handle SCHED_BLOCK message +static void handle_sched_block(scheduler_t* sched) +{ + sched->block_count++; + maybe_start_cnf_ack_cycle(sched); +} + +// handle SCHED_UNBLOCK message +static void handle_sched_unblock(scheduler_t* sched) +{ + // if the ASIO thread has already been stopped + if (sched->asio_stopped) + { + // restart the ASIO thread + ponyint_asio_init(asio_cpu); + sched->asio_stopped = !ponyint_asio_start(); + } + + // make sure asio hasn't already been stopped or else runtime is in + // an invalid state without the ASIO thread running + pony_assert(!sched->asio_stopped); + + // Cancel all acks and increment the ack token, so that any pending + // acks in the queue will be dropped when they are received. + sched->block_count--; + sched->ack_token++; + sched->ack_count = 0; +} + static bool read_msg(scheduler_t* sched) { pony_msgi_t* m; @@ -158,59 +244,19 @@ static bool read_msg(scheduler_t* sched) { case SCHED_SUSPEND: { - if(atomic_load_explicit(&detect_quiescence, memory_order_relaxed) && - (sched->block_count == get_active_scheduler_count())) - { - // reset ack token count to 0 because dynamic scheduler scaling means - // that a new thread can suspend changing active_scheduler_count and - // we can think we've received enough acks when we really haven't - sched->ack_count = 0; - - // If we think all threads are blocked, send CNF(token) to everyone. - send_msg_all_active(sched->index, SCHED_CNF, sched->ack_token); - } + maybe_start_cnf_ack_cycle(sched); break; } case SCHED_BLOCK: { - sched->block_count++; - - if(atomic_load_explicit(&detect_quiescence, memory_order_relaxed) && - (sched->block_count == get_active_scheduler_count())) - { - // reset ack token count to 0 because dynamic scheduler scaling means - // that a new thread can wake up changing active_scheduler_count and - // then block causing block_count == active_scheduler_count for a - // second time and if we don't reset, we can think we've received - // enough acks when we really haven't - sched->ack_count = 0; - - // If we think all threads are blocked, send CNF(token) to everyone. - send_msg_all_active(sched->index, SCHED_CNF, sched->ack_token); - } + handle_sched_block(sched); break; } case SCHED_UNBLOCK: { - // if the ASIO thread has already been stopped - if (sched->asio_stopped) - { - // restart the ASIO thread - ponyint_asio_init(asio_cpu); - sched->asio_stopped = !ponyint_asio_start(); - } - - // make sure asio hasn't already been stopped or else runtime is in - // an invalid state without the ASIO thread running - pony_assert(!sched->asio_stopped); - - // Cancel all acks and increment the ack token, so that any pending - // acks in the queue will be dropped when they are received. - sched->block_count--; - sched->ack_token++; - sched->ack_count = 0; + handle_sched_unblock(sched); break; } @@ -283,7 +329,7 @@ static bool quiescent(scheduler_t* sched, uint64_t tsc, uint64_t tsc2) { send_msg_all(sched->index, SCHED_TERMINATE, 0); - wake_suspended_threads(); + wake_suspended_threads(sched->index); sched->ack_token++; sched->ack_count = 0; @@ -434,15 +480,158 @@ static pony_actor_t* steal(scheduler_t* sched) else if (((tsc2 - tsc) > 1000000) && (ponyint_mutemap_size(&sched->mute_mapping) == 0)) { + // in case active scheduler count changed + current_active_scheduler_count = get_active_scheduler_count(); + // if we're the highest active scheduler thread // and there are more active schedulers than the minimum requested // and we're not terminating if ((sched == &scheduler[current_active_scheduler_count - 1]) && (current_active_scheduler_count > min_scheduler_count) && (!sched->terminate) +#if defined(USE_SCHEDULER_SCALING_PTHREADS) + // try to acquire mutex if using pthreads + && !pthread_mutex_trylock(&sched_mut) +#else + // try and get the bool that controls modifying the active scheduler count + // variable if using signals && !atomic_exchange_explicit(&scheduler_count_changing, true, - memory_order_acquire)) + memory_order_acquire) +#endif + ) + { + // can only sleep if we're scheduler > 0 or if we're scheduler 0 and + // there is at least one noisy actor registered + if((sched->index > 0) || ((sched->index == 0) && sched->asio_noisy)) + { + // decrement active_scheduler_count so other schedulers know we're + // sleeping + uint32_t sched_count = atomic_load_explicit(&active_scheduler_count, + memory_order_relaxed); + + // make sure the scheduler count didn't change + pony_assert(sched_count == current_active_scheduler_count); + + atomic_store_explicit(&active_scheduler_count, sched_count - 1, + memory_order_relaxed); + +#if !defined(USE_SCHEDULER_SCALING_PTHREADS) + // unlock the bool that controls modifying the active scheduler count + // variable if using signals + atomic_store_explicit(&scheduler_count_changing, false, + memory_order_release); +#endif + + // let sched 0 know we're suspending only after decrementing + // active_scheduler_count to avoid a race condition between + // when we update active_scheduler_count and scheduler 0 processes + // the SCHED_SUSPEND message we send it. If we don't do this, + // and scheduler 0 processes the SCHED_SUSPEND message before we + // decrement active_scheduler_count, it could think that + // active_scheduler_count > block_count and not start the CNF/ACK + // process for termination and potentiall hang the runtime instead + // of allowing it to reach quiescence. + if(sched->index != 0) + send_msg(sched->index, 0, SCHED_SUSPEND, 0); + + // dtrace suspend notification + DTRACE1(THREAD_SUSPEND, (uintptr_t)sched); + + while(get_active_scheduler_count() <= (uint32_t)sched->index) + { + // sleep waiting for signal to wake up again +#if defined(USE_SCHEDULER_SCALING_PTHREADS) + ponyint_thread_suspend(sched->sleep_object, &sched_mut); +#else + ponyint_thread_suspend(sched->sleep_object); +#endif + } + + // dtrace resume notification + DTRACE1(THREAD_RESUME, (uintptr_t)sched); + + // reset steal_attempts so we try to steal from all other schedulers + // prior to suspending again + steal_attempts = 0; + } + else + { + pony_assert(sched->index == 0); + pony_assert(!sched->asio_noisy); + +#if !defined(USE_SCHEDULER_SCALING_PTHREADS) + // unlock the bool that controls modifying the active scheduler count + // variable if using signals + atomic_store_explicit(&scheduler_count_changing, false, + memory_order_release); +#endif + + // send block message if there are no noisy actors registered + // with the ASIO thread and this is scheduler 0 + handle_sched_block(sched); + block_sent = true; + } + +#if defined(USE_SCHEDULER_SCALING_PTHREADS) + // unlock mutex if using pthreads + pthread_mutex_unlock(&sched_mut); +#endif + } + else if(!sched->asio_noisy) { + // Only send block messages if there are no noisy actors registered + // with the ASIO thread + if(sched->index == 0) + handle_sched_block(sched); + else + send_msg(sched->index, 0, SCHED_BLOCK, 0); + + block_sent = true; + } + } + } + else + { + // block sent and no work to do. We should try and suspend if we can now + // if we do suspend, we'll send a unblock message first to ensure cnf/ack + // cycle works as expected + + // get active scheduler count + uint32_t current_active_scheduler_count = get_active_scheduler_count(); + + // make sure thread scaling order is still valid. we should never be + // active if the active_scheduler_count isn't larger than our index. + pony_assert(current_active_scheduler_count > (uint32_t)sched->index); + + // if we're the highest active scheduler thread + // and there are more active schedulers than the minimum requested + // and we're not terminating + if ((sched == &scheduler[current_active_scheduler_count - 1]) + && (current_active_scheduler_count > min_scheduler_count) + && (!sched->terminate) +#if defined(USE_SCHEDULER_SCALING_PTHREADS) + // try to acquire mutex if using pthreads + && !pthread_mutex_trylock(&sched_mut) +#else + // try and get the bool that controls modifying the active scheduler count + // variable if using signals + && !atomic_exchange_explicit(&scheduler_count_changing, true, + memory_order_acquire) +#endif + ) + { + // can only sleep if we're scheduler > 0 or if we're scheduler 0 and + // there is at least one noisy actor registered + if((sched->index > 0) || ((sched->index == 0) && sched->asio_noisy)) + { + // unblock before suspending to ensure cnf/ack cycle works as expected + if(sched->index == 0) + handle_sched_unblock(sched); + else + send_msg(sched->index, 0, SCHED_UNBLOCK, 0); + + block_sent = false; + // decrement active_scheduler_count so other schedulers know we're // sleeping uint32_t sched_count = atomic_load_explicit(&active_scheduler_count, @@ -454,10 +643,12 @@ static pony_actor_t* steal(scheduler_t* sched) atomic_store_explicit(&active_scheduler_count, sched_count - 1, memory_order_relaxed); +#if !defined(USE_SCHEDULER_SCALING_PTHREADS) // unlock the bool that controls modifying the active scheduler count - // variable + // variable if using signals atomic_store_explicit(&scheduler_count_changing, false, memory_order_release); +#endif // let sched 0 know we're suspending only after decrementing // active_scheduler_count to avoid a race condition between @@ -468,37 +659,21 @@ static pony_actor_t* steal(scheduler_t* sched) // active_scheduler_count > block_count and not start the CNF/ACK // process for termination and potentiall hang the runtime instead // of allowing it to reach quiescence. - send_msg(sched->index, 0, SCHED_SUSPEND, 0); + if(sched->index != 0) + send_msg(sched->index, 0, SCHED_SUSPEND, 0); // dtrace suspend notification DTRACE1(THREAD_SUSPEND, (uintptr_t)sched); - // sleep waiting for signal to wake up again - ponyint_thread_suspend(sched->sleep_object); - - bool scc = atomic_load_explicit(&scheduler_count_changing, - memory_order_acquire); - - // make sure scheduler_count_changing is true - pony_assert(scc); - - // increment active_scheduler_count so other schedulers know we're - // awake again - sched_count = atomic_load_explicit(&active_scheduler_count, - memory_order_relaxed); - - // make sure the scheduler count is correct still - pony_assert((sched_count + 1) == current_active_scheduler_count); - - atomic_store_explicit(&active_scheduler_count, sched_count + 1, - memory_order_relaxed); - - // unlock the bool that controls modifying the active scheduler count - // variable. this is because the signalling thread locks the control - // variable before signalling - scc = false; - atomic_store_explicit(&scheduler_count_changing, scc, - memory_order_release); + while(get_active_scheduler_count() <= (uint32_t)sched->index) + { + // sleep waiting for signal to wake up again +#if defined(USE_SCHEDULER_SCALING_PTHREADS) + ponyint_thread_suspend(sched->sleep_object, &sched_mut); +#else + ponyint_thread_suspend(sched->sleep_object); +#endif + } // dtrace resume notification DTRACE1(THREAD_RESUME, (uintptr_t)sched); @@ -507,13 +682,23 @@ static pony_actor_t* steal(scheduler_t* sched) // prior to suspending again steal_attempts = 0; } - else if(!sched->asio_noisy) + else { - // Only send block messages if there are no noisy actors registered - // with the ASIO thread - send_msg(sched->index, 0, SCHED_BLOCK, 0); - block_sent = true; + pony_assert(sched->index == 0); + pony_assert(!sched->asio_noisy); + +#if !defined(USE_SCHEDULER_SCALING_PTHREADS) + // unlock the bool that controls modifying the active scheduler count + // variable if using signals + atomic_store_explicit(&scheduler_count_changing, false, + memory_order_release); +#endif } + +#if defined(USE_SCHEDULER_SCALING_PTHREADS) + // unlock mutex if using pthreads + pthread_mutex_unlock(&sched_mut); +#endif } } } @@ -521,7 +706,10 @@ static pony_actor_t* steal(scheduler_t* sched) if(block_sent) { // Only send unblock message if a corresponding block message was sent - send_msg(sched->index, 0, SCHED_UNBLOCK, 0); + if(sched->index == 0) + handle_sched_unblock(sched); + else + send_msg(sched->index, 0, SCHED_UNBLOCK, 0); } return actor; } @@ -570,7 +758,7 @@ static void run(scheduler_t* sched) // the extra scheduler threads would keep being woken up and then go back // to sleep over and over again. if(ponyint_mutemap_size(&sched->mute_mapping) > 0) - ponyint_sched_maybe_wakeup(); + ponyint_sched_maybe_wakeup(sched->index); // Run the current actor and get the next actor. bool reschedule = ponyint_actor_run(&sched->ctx, actor, PONY_SCHED_BATCH); @@ -673,10 +861,6 @@ pony_ctx_t* ponyint_sched_init(uint32_t threads, bool noyield, bool nopin, if(threads == 0) threads = ponyint_cpu_count(); - // If no minimum thread count is specified, use # of threads - if(min_threads == 0) - min_threads = threads; - // If minimum thread count is > thread count, cap it at thread count if(min_threads > threads) min_threads = threads; @@ -692,6 +876,10 @@ pony_ctx_t* ponyint_sched_init(uint32_t threads, bool noyield, bool nopin, asio_cpu = ponyint_cpu_assign(scheduler_count, scheduler, nopin, pinasio); +#if !defined(PLATFORM_IS_WINDOWS) && defined(USE_SCHEDULER_SCALING_PTHREADS) + pthread_once(&sched_mut_once, sched_mut_init); +#endif + for(uint32_t i = 0; i < scheduler_count; i++) { #if defined(PLATFORM_IS_WINDOWS) @@ -827,33 +1015,68 @@ void ponyint_sched_unnoisy_asio(int32_t from) } // Maybe wake up a scheduler thread if possible -void ponyint_sched_maybe_wakeup() +void ponyint_sched_maybe_wakeup_if_all_asleep(int32_t current_scheduler_id) +{ + uint32_t current_active_scheduler_count = get_active_scheduler_count(); + + // wake up threads is the current active count is 0 + if(current_active_scheduler_count == 0) + ponyint_sched_maybe_wakeup(current_scheduler_id); +} + +// Maybe wake up a scheduler thread if possible +void ponyint_sched_maybe_wakeup(int32_t current_scheduler_id) { uint32_t current_active_scheduler_count = get_active_scheduler_count(); // if we have some schedulers that are sleeping, wake one up if((current_active_scheduler_count < scheduler_count) && +#if defined(USE_SCHEDULER_SCALING_PTHREADS) + // try to acquire mutex if using pthreads + !pthread_mutex_trylock(&sched_mut) +#else + // try and get the bool that controls modifying the active scheduler count + // variable if using signals !atomic_exchange_explicit(&scheduler_count_changing, true, - memory_order_acquire)) + memory_order_acquire) +#endif + ) { // in case the count changed between the while check and now - if(get_active_scheduler_count() < scheduler_count) + current_active_scheduler_count = get_active_scheduler_count(); + + if(current_active_scheduler_count < scheduler_count) { - // send signal to wake up next scheduler thread available - if(ponyint_thread_wake(scheduler[get_active_scheduler_count()].tid, - scheduler[get_active_scheduler_count()].sleep_object)) - // if there was an error waking the thread - // unlock the bool that controls modifying the active scheduler count - // variable. - atomic_store_explicit(&scheduler_count_changing, false, - memory_order_release); + // increment active_scheduler_count to wake a new scheduler up + current_active_scheduler_count++; + atomic_store_explicit(&active_scheduler_count, current_active_scheduler_count, + memory_order_relaxed); } - else - // if there are no scheduler threads left to unlock - // unlock the bool that controls modifying the active scheduler count - // variable. - atomic_store_explicit(&scheduler_count_changing, false, - memory_order_release); + +#if !defined(USE_SCHEDULER_SCALING_PTHREADS) + // unlock the bool that controls modifying the active scheduler count + // variable if using signals. + atomic_store_explicit(&scheduler_count_changing, false, + memory_order_release); +#endif + + // send multiple signals to wake up all schedulers that should be awake + // this is somewhat wasteful, but, it's better than risking some + // schedulers never waking up. If a scheduler is already awake, + // the signal is disregarded + for(int j = 0; j < 3; j++) + { + for(uint32_t i = 0; i < current_active_scheduler_count; i++) + { + if((int32_t)i != current_scheduler_id) + ponyint_thread_wake(scheduler[i].tid, scheduler[i].sleep_object); + } + } + +#if defined(USE_SCHEDULER_SCALING_PTHREADS) + // unlock mutex if using pthreads + pthread_mutex_unlock(&sched_mut); +#endif } } diff --git a/src/libponyrt/sched/scheduler.h b/src/libponyrt/sched/scheduler.h index 45d6852fe8..b157ee0f07 100644 --- a/src/libponyrt/sched/scheduler.h +++ b/src/libponyrt/sched/scheduler.h @@ -101,7 +101,11 @@ void ponyint_sched_noisy_asio(int32_t from); void ponyint_sched_unnoisy_asio(int32_t from); // Try and wake up a sleeping scheduler thread to help with load -void ponyint_sched_maybe_wakeup(); +void ponyint_sched_maybe_wakeup(int32_t current_scheduler_id); + +// Try and wake up a sleeping scheduler thread only if all scheduler +// threads are asleep +void ponyint_sched_maybe_wakeup_if_all_asleep(int32_t current_scheduler_id); PONY_EXTERN_C_END