From 0fffb9c55a3d07c6222f33125d410e419ed300c8 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Fri, 21 Jan 2022 17:46:29 -0500 Subject: [PATCH] Make sure threads waiting on continuation request execute continuations while waiting No other threads should execute them if the continuation request is limited to polling only. Signed-off-by: Joseph Schuchart --- ompi/mpiext/continue/c/continuation.c | 103 +++++++++++++------------- ompi/mpiext/continue/c/continuation.h | 14 ++-- ompi/request/req_wait.c | 33 +++------ ompi/request/request.h | 30 ++++++++ opal/mca/threads/base/wait_sync.c | 6 +- opal/mca/threads/wait_sync.h | 4 + 6 files changed, 106 insertions(+), 84 deletions(-) diff --git a/ompi/mpiext/continue/c/continuation.c b/ompi/mpiext/continue/c/continuation.c index 7597e7e7005..4f96ab03908 100644 --- a/ompi/mpiext/continue/c/continuation.c +++ b/ompi/mpiext/continue/c/continuation.c @@ -81,10 +81,10 @@ struct ompi_cont_request_t { ompi_request_t super; opal_atomic_lock_t cont_lock; /**< Lock used completing/restarting the cont request */ bool cont_enqueue_complete; /**< Whether to enqueue immediately complete requests */ - bool cont_in_wait; /**< Whether the continuation request is currently waited on */ opal_atomic_int32_t cont_num_active; /**< The number of active continuations registered with a continuation request */ uint32_t continue_max_poll; /**< max number of local continuations to execute at once */ opal_list_t *cont_complete_list; /**< List of complete continuations to be invoked during test */ + ompi_wait_sync_t *sync; /**< Sync object this continuation request is attached to */ }; static void ompi_cont_request_construct(ompi_cont_request_t* cont_req) @@ -98,10 +98,10 @@ static void ompi_cont_request_construct(ompi_cont_request_t* cont_req) cont_req->super.req_status = ompi_status_empty; /* always returns MPI_SUCCESS */ opal_atomic_lock_init(&cont_req->cont_lock, false); cont_req->cont_enqueue_complete = false; - cont_req->cont_in_wait = false; cont_req->cont_num_active = 0; cont_req->continue_max_poll = UINT32_MAX; cont_req->cont_complete_list = NULL; + cont_req->sync = NULL; } static void ompi_cont_request_destruct(ompi_cont_request_t* cont_req) @@ -156,10 +156,13 @@ static opal_mutex_t request_cont_lock; */ static bool progress_callback_registered = false; -/** - * Thread-local list of continuation requests that should be progressed. - */ -static opal_thread_local opal_list_t *thread_progress_list = NULL; +struct lazy_list_s { + opal_list_t list; + bool is_initialized; +}; +typedef struct lazy_list_s lazy_list_t; + +static opal_thread_local lazy_list_t thread_progress_list = { .is_initialized = false }; static inline void ompi_continue_cont_req_release(ompi_cont_request_t *cont_req, @@ -179,6 +182,10 @@ void ompi_continue_cont_req_release(ompi_cont_request_t *cont_req, /* signal that all continuations were found complete */ ompi_request_complete(&cont_req->super, true); } + if (NULL != cont_req->sync) { + /* release the sync object */ + OPAL_THREAD_ADD_FETCH32(&cont_req->sync->num_req_need_progress, -1); + } if (take_lock && using_threads) { opal_atomic_unlock(&cont_req->cont_lock); } @@ -191,12 +198,7 @@ void ompi_continue_cont_release(ompi_continuation_t *cont) ompi_cont_request_t *cont_req = cont->cont_req; assert(OMPI_REQUEST_CONT == cont_req->super.req_type); - /* if a thread is waiting on the request, we got here when - * the thread started executing the continuations, so the continuation - * request is complete already */ - if (!cont_req->cont_in_wait) { - ompi_continue_cont_req_release(cont_req, 1, true); - } + ompi_continue_cont_req_release(cont_req, 1, true); OBJ_RELEASE(cont_req); #ifdef OPAL_ENABLE_DEBUG @@ -240,9 +242,13 @@ int ompi_continue_progress_n(const uint32_t max) in_progress = 1; const bool using_threads = opal_using_threads(); - if (NULL != thread_progress_list) { + + /* execute thread-local continuations first + * (e.g., from continuation requests the current thread is waiting on) */ + lazy_list_t *tl_list = &thread_progress_list; + if (tl_list->is_initialized) { ompi_cont_request_t *cont_req; - OPAL_LIST_FOREACH(cont_req, thread_progress_list, ompi_cont_request_t) { + OPAL_LIST_FOREACH(cont_req, &tl_list->list, ompi_cont_request_t) { ompi_continuation_t *cb; if (opal_list_is_empty(cont_req->cont_complete_list)) continue; while (max > completed) { @@ -289,6 +295,12 @@ static int ompi_continue_progress_callback() return ompi_continue_progress_n(1); } +static int ompi_continue_wait_progress_callback() +{ + return ompi_continue_progress_n(UINT32_MAX); +} + + int ompi_continue_progress_request(ompi_request_t *req) { if (in_progress) return 0; @@ -329,42 +341,39 @@ int ompi_continue_progress_request(ompi_request_t *req) /** - * Register the provided continuation request to be included in the - * global progress loop (used while a thread is waiting for the contnuation - * request to complete). - * We move all local continuations into the global continuation list - * and mark the continuation request such that future continuations - * are directly put into the global continuations list. - * Once the wait completed (i.e., all continuations registered with the - * continuation request) we unmark it (see ompi_continue_deregister_request_progress). + * Register the continuation request so that it will be progressed even if + * it is poll-only and the thread is waiting on the provided sync object. */ -int ompi_continue_register_request_progress(ompi_request_t *req) +int ompi_continue_register_request_progress(ompi_request_t *req, ompi_wait_sync_t *sync) { ompi_cont_request_t *cont_req = (ompi_cont_request_t *)req; if (NULL == cont_req->cont_complete_list) return OMPI_SUCCESS; - opal_atomic_lock(&cont_req->cont_lock); + lazy_list_t *cont_req_list = &thread_progress_list; - cont_req->cont_in_wait = true; - - ompi_continue_cont_req_release(cont_req, opal_list_get_size(cont_req->cont_complete_list), false); + /* check that the thread-local list is initialized */ + if (!cont_req_list->is_initialized) { + OBJ_CONSTRUCT(&cont_req_list->list, opal_list_t); + cont_req_list->is_initialized = true; + } - opal_atomic_unlock(&cont_req->cont_lock); + /* add the continuation request to the thread-local list */ + opal_list_append(&cont_req_list->list, &cont_req->super.super.super); - if (NULL == thread_progress_list) { - thread_progress_list = OBJ_NEW(opal_list_t); + /* register with the sync object */ + if (NULL != sync) { + sync->num_req_need_progress++; + sync->progress_cb = &ompi_continue_wait_progress_callback; } - - /* enqueue the continuation request to allow for progress by this thread */ - opal_list_append(thread_progress_list, &req->super.super); + cont_req->sync = sync; return OMPI_SUCCESS; } /** - * Remove the continuation request from being progressed by the global progress - * loop (after a wait completes). + * Remove the poll-only continuation request from the thread's progress list after + * it has completed. */ int ompi_continue_deregister_request_progress(ompi_request_t *req) { @@ -372,17 +381,13 @@ int ompi_continue_deregister_request_progress(ompi_request_t *req) if (NULL == cont_req->cont_complete_list) return OMPI_SUCCESS; - /* make sure we execute all outstanding continuations */ - uint32_t tmp_max_poll = cont_req->continue_max_poll; - cont_req->continue_max_poll = UINT32_MAX; - ompi_continue_progress_request(req); - cont_req->continue_max_poll = tmp_max_poll; - - cont_req->cont_in_wait = false; - + /* let the sync know we're done, it may suspend the thread now */ + if (NULL != cont_req->sync) { + cont_req->sync->num_req_need_progress--; + } /* remove the continuation request from the thread-local progress list */ - opal_list_remove_item(thread_progress_list, &req->super.super); + opal_list_remove_item(&thread_progress_list.list, &req->super.super); return OMPI_SUCCESS; } @@ -439,13 +444,6 @@ ompi_continue_enqueue_runnable(ompi_continuation_t *cont) if (NULL != cont_req->cont_complete_list) { opal_atomic_lock(&cont_req->cont_lock); opal_list_append(cont_req->cont_complete_list, &cont->super.super); - if (cont_req->cont_in_wait) { - /* if a thread is waiting for this request to complete, signal completions - * the continuations will be executed at the end of the wait - * but we need to ensure that the request is marked complete first - */ - ompi_continue_cont_req_release(cont_req, 1, false); - } opal_atomic_unlock(&cont_req->cont_lock); } else { OPAL_THREAD_LOCK(&request_cont_lock); @@ -601,7 +599,6 @@ int ompi_continue_attach( requests[i] = MPI_REQUEST_NULL; } } - } } @@ -609,7 +606,7 @@ int ompi_continue_attach( int num_complete = count - num_registered; int32_t last_num_active = OPAL_THREAD_ADD_FETCH32(&cont->num_active, -num_complete); - if (0 == last_num_active && 0 == num_registered) { + if (0 == last_num_active) { if (cont_req->cont_enqueue_complete) { /* enqueue for later processing */ ompi_continue_enqueue_runnable(cont); diff --git a/ompi/mpiext/continue/c/continuation.h b/ompi/mpiext/continue/c/continuation.h index 42648a917d1..d32a071148a 100644 --- a/ompi/mpiext/continue/c/continuation.h +++ b/ompi/mpiext/continue/c/continuation.h @@ -22,6 +22,8 @@ #include "ompi/mpiext/continue/c/mpiext_continue_c.h" +struct ompi_request_t; + BEGIN_C_DECLS /** @@ -38,18 +40,18 @@ int ompi_continuation_fini(void); * Register a request with local completion list for progressing through * the progress engine. */ -int ompi_continue_register_request_progress(ompi_request_t *cont_req); +int ompi_continue_register_request_progress(struct ompi_request_t *cont_req, ompi_wait_sync_t *sync); /** * Deregister a request with local completion list from progressing through * the progress engine. */ -int ompi_continue_deregister_request_progress(ompi_request_t *cont_req); +int ompi_continue_deregister_request_progress(struct ompi_request_t *cont_req); /** * Progress a continuation request that has local completions. */ -int ompi_continue_progress_request(ompi_request_t *cont_req); +int ompi_continue_progress_request(struct ompi_request_t *cont_req); /** * Attach a continuation to a set of operations represented by \c requests. @@ -60,9 +62,9 @@ int ompi_continue_progress_request(ompi_request_t *cont_req); * can be used to query for and progress outstanding continuations. */ int ompi_continue_attach( - ompi_request_t *cont_req, + struct ompi_request_t *cont_req, int count, - ompi_request_t *requests[], + struct ompi_request_t *requests[], MPIX_Continue_cb_function *cont_cb, void *cont_data, ompi_status_public_t statuses[]); @@ -71,7 +73,7 @@ int ompi_continue_attach( /** * Allocate a new continuation request. */ -int ompi_continue_allocate_request(ompi_request_t **cont_req, ompi_info_t *info); +int ompi_continue_allocate_request(struct ompi_request_t **cont_req, ompi_info_t *info); END_C_DECLS diff --git a/ompi/request/req_wait.c b/ompi/request/req_wait.c index a331c6be012..2c64af2768c 100644 --- a/ompi/request/req_wait.c +++ b/ompi/request/req_wait.c @@ -41,23 +41,8 @@ int ompi_request_default_wait( { ompi_request_t *req = *req_ptr; -#if OMPI_HAVE_MPI_EXT_CONTINUE - if (OMPI_REQUEST_CONT == req->req_type) { - /* let the continuations be processed as part of the global progress loop - * while we're waiting for their completion */ - ompi_continue_register_request_progress(req); - } -#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ - - ompi_request_wait_completion(req); -#if OMPI_HAVE_MPI_EXT_CONTINUE - if (OMPI_REQUEST_CONT == req->req_type) { - ompi_continue_deregister_request_progress(req); - } -#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ - #if OPAL_ENABLE_FT_MPI /* Special case for MPI_ANY_SOURCE */ if( MPI_ERR_PROC_FAILED_PENDING == req->req_status.MPI_ERROR ) { @@ -144,13 +129,6 @@ int ompi_request_default_wait_any(size_t count, request = requests[i]; -#if OMPI_HAVE_MPI_EXT_CONTINUE - if (OMPI_REQUEST_CONT == request->req_type) { - have_cont_req = true; - ompi_continue_register_request_progress(request); - } -#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ - /* Check for null or completed persistent request. For * MPI_REQUEST_NULL, the req_state is always OMPI_REQUEST_INACTIVE. */ @@ -167,6 +145,13 @@ int ompi_request_default_wait_any(size_t count, } } +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + have_cont_req = true; + ompi_continue_register_request_progress(request, &sync); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + #if OPAL_ENABLE_FT_MPI if(OPAL_UNLIKELY( ompi_request_is_failed(request) )) { completed = i; @@ -319,7 +304,7 @@ int ompi_request_default_wait_all( size_t count, #if OMPI_HAVE_MPI_EXT_CONTINUE if (OMPI_REQUEST_CONT == request->req_type) { - ompi_continue_register_request_progress(request); + ompi_continue_register_request_progress(request, &sync); } #endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ @@ -590,7 +575,7 @@ int ompi_request_default_wait_some(size_t count, #if OMPI_HAVE_MPI_EXT_CONTINUE if (OMPI_REQUEST_CONT == request->req_type) { - ompi_continue_register_request_progress(request); + ompi_continue_register_request_progress(request, &sync); } #endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ diff --git a/ompi/request/request.h b/ompi/request/request.h index 0e6fb80cbf7..930578a20e2 100644 --- a/ompi/request/request.h +++ b/ompi/request/request.h @@ -40,6 +40,10 @@ #include "ompi/constants.h" #include "ompi/runtime/params.h" +#if OMPI_HAVE_MPI_EXT_CONTINUE +#include "ompi/mpiext/continue/c/continuation.h" +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + BEGIN_C_DECLS /** @@ -465,7 +469,20 @@ static inline void ompi_request_wait_completion(ompi_request_t *req) WAIT_SYNC_INIT(&sync, 1); if (OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&req->req_complete, &_tmp_ptr, &sync)) { +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == req->req_type) { + /* let the continuations be processed as part of the global progress loop + * while we're waiting for their completion */ + ompi_continue_register_request_progress(req, &sync); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ SYNC_WAIT(&sync); + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == req->req_type) { + ompi_continue_deregister_request_progress(req); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ } else { /* completed before we had a chance to swap in the sync object */ WAIT_SYNC_SIGNALLED(&sync); @@ -487,6 +504,13 @@ static inline void ompi_request_wait_completion(ompi_request_t *req) } opal_atomic_rmb(); } else { +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == req->req_type) { + /* let the continuations be processed as part of the global progress loop + * while we're waiting for their completion */ + ompi_continue_register_request_progress(req, NULL); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ while(!REQUEST_COMPLETE(req)) { opal_progress(); #if OPAL_ENABLE_FT_MPI @@ -497,6 +521,12 @@ static inline void ompi_request_wait_completion(ompi_request_t *req) } #endif /* OPAL_ENABLE_FT_MPI */ } + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == req->req_type) { + ompi_continue_deregister_request_progress(req); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ } } diff --git a/opal/mca/threads/base/wait_sync.c b/opal/mca/threads/base/wait_sync.c index 2451419620e..02525882538 100644 --- a/opal/mca/threads/base/wait_sync.c +++ b/opal/mca/threads/base/wait_sync.c @@ -98,7 +98,11 @@ int ompi_sync_wait_mt(ompi_wait_sync_t *sync) */ check_status: if (sync != wait_sync_list && num_thread_in_progress >= opal_max_thread_in_progress) { - opal_thread_internal_cond_wait(&sync->condition, &sync->lock); + if (0 < sync->num_req_need_progress) { + sync->progress_cb(); + } else { + opal_thread_internal_cond_wait(&sync->condition, &sync->lock); + } /** * At this point either the sync was completed in which case diff --git a/opal/mca/threads/wait_sync.h b/opal/mca/threads/wait_sync.h index c90e3d52a5c..f7bffc392e1 100644 --- a/opal/mca/threads/wait_sync.h +++ b/opal/mca/threads/wait_sync.h @@ -46,6 +46,8 @@ typedef struct ompi_wait_sync_t { opal_thread_internal_mutex_t lock; struct ompi_wait_sync_t *next; struct ompi_wait_sync_t *prev; + opal_progress_callback_t progress_cb; + opal_atomic_int32_t num_req_need_progress; volatile bool signaling; } ompi_wait_sync_t; @@ -119,6 +121,8 @@ static inline int sync_wait_st(ompi_wait_sync_t *sync) opal_thread_internal_cond_init(&(sync)->condition); \ opal_thread_internal_mutex_init(&(sync)->lock, false); \ } \ + (sync)->progress_cb = NULL; \ + (sync)->num_req_need_progress = 0; \ } while (0) /**