From 1454b6ca87c34a385a7779a91648fa4e04b8f3c9 Mon Sep 17 00:00:00 2001 From: Nathan French Date: Tue, 10 Jul 2018 12:55:15 -0400 Subject: [PATCH] Feature/better thread distribution (#102) * [#100] Better thread distribution. Bandaid for unfair RR-based thread distribution. This patch attempts to find the thread with the smallest backlog to defer. This does not seem to affect performance. * update conditions for optimal search * Remove locality tests --- examples/CMakeLists.txt | 6 +++--- thread.c | 39 ++++++++++++++++++++++++++++++++------- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 6512805..4723214 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -13,13 +13,13 @@ add_executable(example_request_fini EXCLUDE_FROM_ALL example_request_fini.c) add_executable(example_basic EXCLUDE_FROM_ALL example_basic.c) if(NOT EVHTP_DISABLE_EVTHR) - add_executable(example_locality EXCLUDE_FROM_ALL example_locality.c) - target_link_libraries(example_locality evhtp ${LIBEVHTP_EXTERNAL_LIBS} ${SYS_LIBS}) + #add_executable(example_locality EXCLUDE_FROM_ALL example_locality.c) + #target_link_libraries(example_locality evhtp ${LIBEVHTP_EXTERNAL_LIBS} ${SYS_LIBS}) add_executable(test_proxy EXCLUDE_FROM_ALL test_proxy.c) target_link_libraries(test_proxy evhtp ${LIBEVHTP_EXTERNAL_LIBS} ${SYS_LIBS}) - add_dependencies(examples test_proxy example_locality) + add_dependencies(examples test_proxy) endif() target_link_libraries(test_extensive evhtp ${LIBEVHTP_EXTERNAL_LIBS} ${SYS_LIBS}) diff --git a/thread.c b/thread.c index f5994a3..dfddc7e 100644 --- a/thread.c +++ b/thread.c @@ -7,6 +7,7 @@ #include #endif +#include #include #include @@ -112,12 +113,14 @@ _evthr_loop(void * args) EV_READ | EV_PERSIST, _evthr_read_cmd, args); event_add(thread->shared_pool_ev, NULL); } + #endif pthread_mutex_lock(&thread->lock); if (thread->init_cb != NULL) { (thread->init_cb)(thread, thread->arg); } + pthread_mutex_unlock(&thread->lock); event_base_loop(thread->evbase, 0); @@ -126,6 +129,7 @@ _evthr_loop(void * args) if (thread->exit_cb != NULL) { (thread->exit_cb)(thread, thread->arg); } + pthread_mutex_unlock(&thread->lock); if (thread->err == 1) { @@ -298,6 +302,7 @@ evthr_free(evthr_t * thread) if (thread->shared_pool_ev) { event_free(thread->shared_pool_ev); } + #endif if (thread->evbase) { @@ -343,6 +348,16 @@ evthr_pool_stop(evthr_pool_t * pool) return EVTHR_RES_OK; } +static inline int +get_backlog_(evthr_t * thread) +{ + int backlog = 0; + + ioctl(thread->rdr, FIONREAD, &backlog); + + return (int)(backlog / sizeof(evthr_cmd_t)); +} + evthr_res evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg) { @@ -358,8 +373,10 @@ evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg) } return EVTHR_RES_OK; -#else - evthr_t * thr = NULL; +#endif + evthr_t * thread = NULL; + evthr_t * min_thread = NULL; + int min_backlog = 0; if (pool == NULL) { return EVTHR_RES_FATAL; @@ -369,14 +386,22 @@ evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg) return EVTHR_RES_NOCB; } - thr = TAILQ_FIRST(&pool->threads); - TAILQ_REMOVE(&pool->threads, thr, next); - TAILQ_INSERT_TAIL(&pool->threads, thr, next); + TAILQ_FOREACH(thread, &pool->threads, next) { + int backlog = get_backlog_(thread); + if (backlog == 0) { + min_thread = thread; + break; + } - return evthr_defer(thr, cb, arg); -#endif + if (min_thread == NULL || backlog < min_backlog) { + min_thread = thread; + min_backlog = backlog; + } + } + + return evthr_defer(min_thread, cb, arg); } /* evthr_pool_defer */ static evthr_pool_t *