Skip to content
This repository has been archived by the owner on Jun 30, 2021. It is now read-only.

Commit

Permalink
Feature/better thread distribution (#102)
Browse files Browse the repository at this point in the history
* [#100] Better thread distribution.

Bandaid for unfair RR-based thread distribution. This patch attempts to
find the thread with the smallest backlog to defer. This does not seem
to affect performance.

* update conditions for optimal search

* Remove locality tests
  • Loading branch information
NathanFrench authored Jul 10, 2018
1 parent d008f21 commit 1454b6c
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 10 deletions.
6 changes: 3 additions & 3 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ add_executable(example_request_fini EXCLUDE_FROM_ALL example_request_fini.c)
add_executable(example_basic EXCLUDE_FROM_ALL example_basic.c)

if(NOT EVHTP_DISABLE_EVTHR)
add_executable(example_locality EXCLUDE_FROM_ALL example_locality.c)
target_link_libraries(example_locality evhtp ${LIBEVHTP_EXTERNAL_LIBS} ${SYS_LIBS})
#add_executable(example_locality EXCLUDE_FROM_ALL example_locality.c)
#target_link_libraries(example_locality evhtp ${LIBEVHTP_EXTERNAL_LIBS} ${SYS_LIBS})

add_executable(test_proxy EXCLUDE_FROM_ALL test_proxy.c)
target_link_libraries(test_proxy evhtp ${LIBEVHTP_EXTERNAL_LIBS} ${SYS_LIBS})

add_dependencies(examples test_proxy example_locality)
add_dependencies(examples test_proxy)
endif()

target_link_libraries(test_extensive evhtp ${LIBEVHTP_EXTERNAL_LIBS} ${SYS_LIBS})
Expand Down
39 changes: 32 additions & 7 deletions thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <sys/queue.h>
#endif

#include <sys/ioctl.h>
#include <unistd.h>
#include <pthread.h>

Expand Down Expand Up @@ -112,12 +113,14 @@ _evthr_loop(void * args)
EV_READ | EV_PERSIST, _evthr_read_cmd, args);
event_add(thread->shared_pool_ev, NULL);
}

#endif

pthread_mutex_lock(&thread->lock);
if (thread->init_cb != NULL) {
(thread->init_cb)(thread, thread->arg);
}

pthread_mutex_unlock(&thread->lock);

event_base_loop(thread->evbase, 0);
Expand All @@ -126,6 +129,7 @@ _evthr_loop(void * args)
if (thread->exit_cb != NULL) {
(thread->exit_cb)(thread, thread->arg);
}

pthread_mutex_unlock(&thread->lock);

if (thread->err == 1) {
Expand Down Expand Up @@ -298,6 +302,7 @@ evthr_free(evthr_t * thread)
if (thread->shared_pool_ev) {
event_free(thread->shared_pool_ev);
}

#endif

if (thread->evbase) {
Expand Down Expand Up @@ -343,6 +348,16 @@ evthr_pool_stop(evthr_pool_t * pool)
return EVTHR_RES_OK;
}

static inline int
get_backlog_(evthr_t * thread)
{
int backlog = 0;

ioctl(thread->rdr, FIONREAD, &backlog);

return (int)(backlog / sizeof(evthr_cmd_t));
}

evthr_res
evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg)
{
Expand All @@ -358,8 +373,10 @@ evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg)
}

return EVTHR_RES_OK;
#else
evthr_t * thr = NULL;
#endif
evthr_t * thread = NULL;
evthr_t * min_thread = NULL;
int min_backlog = 0;

if (pool == NULL) {
return EVTHR_RES_FATAL;
Expand All @@ -369,14 +386,22 @@ evthr_pool_defer(evthr_pool_t * pool, evthr_cb cb, void * arg)
return EVTHR_RES_NOCB;
}

thr = TAILQ_FIRST(&pool->threads);

TAILQ_REMOVE(&pool->threads, thr, next);
TAILQ_INSERT_TAIL(&pool->threads, thr, next);
TAILQ_FOREACH(thread, &pool->threads, next) {
int backlog = get_backlog_(thread);

if (backlog == 0) {
min_thread = thread;
break;
}

return evthr_defer(thr, cb, arg);
#endif
if (min_thread == NULL || backlog < min_backlog) {
min_thread = thread;
min_backlog = backlog;
}
}

return evthr_defer(min_thread, cb, arg);
} /* evthr_pool_defer */

static evthr_pool_t *
Expand Down

0 comments on commit 1454b6c

Please sign in to comment.