Skip to content

Commit

Permalink
Merge pull request #90268 from RandomShaper/wtp_servers
Browse files Browse the repository at this point in the history
Use WorkerThreadPool for Server threads (enhanced)
  • Loading branch information
akien-mga authored Apr 15, 2024
2 parents a44b0b6 + 65686de commit c951421
Show file tree
Hide file tree
Showing 38 changed files with 457 additions and 389 deletions.
16 changes: 16 additions & 0 deletions core/config/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ int Engine::get_audio_output_latency() const {
return _audio_output_latency;
}

void Engine::increment_frames_drawn() {
if (frame_server_synced) {
server_syncs++;
} else {
server_syncs = 0;
}
frame_server_synced = false;

frames_drawn++;
}

uint64_t Engine::get_frames_drawn() {
return frames_drawn;
}
Expand Down Expand Up @@ -364,6 +375,11 @@ Engine *Engine::get_singleton() {
return singleton;
}

bool Engine::notify_frame_server_synced() {
frame_server_synced = true;
return server_syncs > SERVER_SYNC_FRAME_COUNT_WARNING;
}

Engine::Engine() {
singleton = this;
}
Expand Down
7 changes: 7 additions & 0 deletions core/config/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ class Engine {
String write_movie_path;
String shader_cache_path;

static constexpr int SERVER_SYNC_FRAME_COUNT_WARNING = 5;
int server_syncs = 0;
bool frame_server_synced = false;

public:
static Engine *get_singleton();

Expand Down Expand Up @@ -179,6 +183,9 @@ class Engine {
bool is_generate_spirv_debug_info_enabled() const;
int32_t get_gpu_index() const;

void increment_frames_drawn();
bool notify_frame_server_synced();

Engine();
virtual ~Engine() {}
};
Expand Down
158 changes: 98 additions & 60 deletions core/object/worker_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include "core/os/thread_safe.h"
#include "core/templates/command_queue_mt.h"

WorkerThreadPool::Task *const WorkerThreadPool::ThreadData::YIELDING = (Task *)1;

void WorkerThreadPool::Task::free_template_userdata() {
ERR_FAIL_NULL(template_userdata);
ERR_FAIL_NULL(native_func_userdata);
Expand All @@ -60,11 +62,13 @@ void WorkerThreadPool::_process_task(Task *p_task) {
// its pre-created threads can't have ScriptServer::thread_enter() called on them early.
// Therefore, we do it late at the first opportunity, so in case the task
// about to be run uses scripting, guarantees are held.
task_mutex.lock();
if (!curr_thread.ready_for_scripting && ScriptServer::are_languages_initialized()) {
task_mutex.unlock();
ScriptServer::thread_enter();
task_mutex.lock();
curr_thread.ready_for_scripting = true;
}
task_mutex.lock();
p_task->pool_thread_index = pool_thread_index;
prev_task = curr_thread.current_task;
curr_thread.current_task = p_task;
Expand Down Expand Up @@ -389,83 +393,117 @@ Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) {
task_mutex.unlock();

if (caller_pool_thread) {
while (true) {
Task *task_to_process = nullptr;
{
MutexLock lock(task_mutex);
bool was_signaled = caller_pool_thread->signaled;
caller_pool_thread->signaled = false;

if (task->completed) {
// This thread was awaken also for some reason, but it's about to exit.
// Let's find out what may be pending and forward the requests.
if (!exit_threads && was_signaled) {
uint32_t to_process = task_queue.first() ? 1 : 0;
uint32_t to_promote = caller_pool_thread->current_task->low_priority && low_priority_task_queue.first() ? 1 : 0;
if (to_process || to_promote) {
// This thread must be left alone since it won't loop again.
caller_pool_thread->signaled = true;
_notify_threads(caller_pool_thread, to_process, to_promote);
}
}
_wait_collaboratively(caller_pool_thread, task);
task->waiting_pool--;
if (task->waiting_pool == 0 && task->waiting_user == 0) {
tasks.erase(p_task_id);
task_allocator.free(task);
}
} else {
task->done_semaphore.wait();
task_mutex.lock();
task->waiting_user--;
if (task->waiting_pool == 0 && task->waiting_user == 0) {
tasks.erase(p_task_id);
task_allocator.free(task);
}
task_mutex.unlock();
}

task->waiting_pool--;
if (task->waiting_pool == 0 && task->waiting_user == 0) {
tasks.erase(p_task_id);
task_allocator.free(task);
}
return OK;
}

break;
}
void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) {
// Keep processing tasks until the condition to stop waiting is met.

if (!exit_threads) {
// This is a thread from the pool. It shouldn't just idle.
// Let's try to process other tasks while we wait.
#define IS_WAIT_OVER (unlikely(p_task == ThreadData::YIELDING) ? p_caller_pool_thread->yield_is_over : p_task->completed)

if (caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) {
if (_try_promote_low_priority_task()) {
_notify_threads(caller_pool_thread, 1, 0);
}
while (true) {
Task *task_to_process = nullptr;
{
MutexLock lock(task_mutex);
bool was_signaled = p_caller_pool_thread->signaled;
p_caller_pool_thread->signaled = false;

if (IS_WAIT_OVER) {
p_caller_pool_thread->yield_is_over = false;
if (!exit_threads && was_signaled) {
// This thread was awaken for some additional reason, but it's about to exit.
// Let's find out what may be pending and forward the requests.
uint32_t to_process = task_queue.first() ? 1 : 0;
uint32_t to_promote = p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first() ? 1 : 0;
if (to_process || to_promote) {
// This thread must be left alone since it won't loop again.
p_caller_pool_thread->signaled = true;
_notify_threads(p_caller_pool_thread, to_process, to_promote);
}
}

break;
}

if (singleton->task_queue.first()) {
task_to_process = task_queue.first()->self();
task_queue.remove(task_queue.first());
if (!exit_threads) {
if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) {
if (_try_promote_low_priority_task()) {
_notify_threads(p_caller_pool_thread, 1, 0);
}
}

if (!task_to_process) {
caller_pool_thread->awaited_task = task;
if (singleton->task_queue.first()) {
task_to_process = task_queue.first()->self();
task_queue.remove(task_queue.first());
}

if (flushing_cmd_queue) {
flushing_cmd_queue->unlock();
}
caller_pool_thread->cond_var.wait(lock);
if (flushing_cmd_queue) {
flushing_cmd_queue->lock();
}
if (!task_to_process) {
p_caller_pool_thread->awaited_task = p_task;

DEV_ASSERT(exit_threads || caller_pool_thread->signaled || task->completed);
caller_pool_thread->awaited_task = nullptr;
if (flushing_cmd_queue) {
flushing_cmd_queue->unlock();
}
p_caller_pool_thread->cond_var.wait(lock);
if (flushing_cmd_queue) {
flushing_cmd_queue->lock();
}
}
}

if (task_to_process) {
_process_task(task_to_process);
DEV_ASSERT(exit_threads || p_caller_pool_thread->signaled || IS_WAIT_OVER);
p_caller_pool_thread->awaited_task = nullptr;
}
}
}
} else {
task->done_semaphore.wait();
task_mutex.lock();
task->waiting_user--;
if (task->waiting_pool == 0 && task->waiting_user == 0) {
tasks.erase(p_task_id);
task_allocator.free(task);

if (task_to_process) {
_process_task(task_to_process);
}
}
}

void WorkerThreadPool::yield() {
int th_index = get_thread_index();
ERR_FAIL_COND_MSG(th_index == -1, "This function can only be called from a worker thread.");
_wait_collaboratively(&threads[th_index], ThreadData::YIELDING);
}

void WorkerThreadPool::notify_yield_over(TaskID p_task_id) {
task_mutex.lock();
Task **taskp = tasks.getptr(p_task_id);
if (!taskp) {
task_mutex.unlock();
ERR_FAIL_MSG("Invalid Task ID.");
}
Task *task = *taskp;

return OK;
#ifdef DEBUG_ENABLED
if (task->pool_thread_index == get_thread_index()) {
WARN_PRINT("A worker thread is attempting to notify itself. That makes no sense.");
}
#endif

ThreadData &td = threads[task->pool_thread_index];
td.yield_is_over = true;
td.signaled = true;
td.cond_var.notify_one();

task_mutex.unlock();
}

WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) {
Expand Down
19 changes: 16 additions & 3 deletions core/object/worker_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,21 @@ class WorkerThreadPool : public Object {
BinaryMutex task_mutex;

struct ThreadData {
static Task *const YIELDING; // Too bad constexpr doesn't work here.

uint32_t index = 0;
Thread thread;
bool ready_for_scripting = false;
bool signaled = false;
bool ready_for_scripting : 1;
bool signaled : 1;
bool yield_is_over : 1;
Task *current_task = nullptr;
Task *awaited_task = nullptr; // Null if not awaiting the condition variable. Special value for idle-waiting.
Task *awaited_task = nullptr; // Null if not awaiting the condition variable, or special value (YIELDING).
ConditionVariable cond_var;

ThreadData() :
ready_for_scripting(false),
signaled(false),
yield_is_over(false) {}
};

TightLocalVector<ThreadData> threads;
Expand Down Expand Up @@ -177,6 +185,8 @@ class WorkerThreadPool : public Object {
}
};

void _wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task);

protected:
static void _bind_methods();

Expand All @@ -196,6 +206,9 @@ class WorkerThreadPool : public Object {
bool is_task_completed(TaskID p_task_id) const;
Error wait_for_task_completion(TaskID p_task_id);

void yield();
void notify_yield_over(TaskID p_task_id);

template <typename C, typename M, typename U>
GroupID add_template_group_task(C *p_instance, M p_method, U p_userdata, int p_elements, int p_tasks = -1, bool p_high_priority = false, const String &p_description = String()) {
typedef GroupUserData<C, M, U> GroupUD;
Expand Down
8 changes: 1 addition & 7 deletions core/templates/command_queue_mt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,8 @@ CommandQueueMT::SyncSemaphore *CommandQueueMT::_alloc_sync_sem() {
return &sync_sems[idx];
}

CommandQueueMT::CommandQueueMT(bool p_sync) {
if (p_sync) {
sync = memnew(Semaphore);
}
CommandQueueMT::CommandQueueMT() {
}

CommandQueueMT::~CommandQueueMT() {
if (sync) {
memdelete(sync);
}
}
43 changes: 25 additions & 18 deletions core/templates/command_queue_mt.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,16 +248,17 @@
#define CMD_TYPE(N) Command##N<T, M COMMA(N) COMMA_SEP_LIST(TYPE_ARG, N)>
#define CMD_ASSIGN_PARAM(N) cmd->p##N = p##N

#define DECL_PUSH(N) \
template <typename T, typename M COMMA(N) COMMA_SEP_LIST(TYPE_PARAM, N)> \
void push(T *p_instance, M p_method COMMA(N) COMMA_SEP_LIST(PARAM, N)) { \
CMD_TYPE(N) *cmd = allocate_and_lock<CMD_TYPE(N)>(); \
cmd->instance = p_instance; \
cmd->method = p_method; \
SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \
unlock(); \
if (sync) \
sync->post(); \
#define DECL_PUSH(N) \
template <typename T, typename M COMMA(N) COMMA_SEP_LIST(TYPE_PARAM, N)> \
void push(T *p_instance, M p_method COMMA(N) COMMA_SEP_LIST(PARAM, N)) { \
CMD_TYPE(N) *cmd = allocate_and_lock<CMD_TYPE(N)>(); \
cmd->instance = p_instance; \
cmd->method = p_method; \
SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \
if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \
WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \
} \
unlock(); \
}

#define CMD_RET_TYPE(N) CommandRet##N<T, M, COMMA_SEP_LIST(TYPE_ARG, N) COMMA(N) R>
Expand All @@ -272,9 +273,10 @@
SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \
cmd->ret = r_ret; \
cmd->sync_sem = ss; \
if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \
WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \
} \
unlock(); \
if (sync) \
sync->post(); \
ss->sem.wait(); \
ss->in_use = false; \
}
Expand All @@ -290,9 +292,10 @@
cmd->method = p_method; \
SEMIC_SEP_LIST(CMD_ASSIGN_PARAM, N); \
cmd->sync_sem = ss; \
if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) { \
WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id); \
} \
unlock(); \
if (sync) \
sync->post(); \
ss->sem.wait(); \
ss->in_use = false; \
}
Expand Down Expand Up @@ -340,7 +343,7 @@ class CommandQueueMT {
LocalVector<uint8_t> command_mem;
SyncSemaphore sync_sems[SYNC_SEMAPHORES];
Mutex mutex;
Semaphore *sync = nullptr;
WorkerThreadPool::TaskID pump_task_id = WorkerThreadPool::INVALID_TASK_ID;
uint64_t flush_read_ptr = 0;

template <typename T>
Expand Down Expand Up @@ -420,12 +423,16 @@ class CommandQueueMT {
}

void wait_and_flush() {
ERR_FAIL_NULL(sync);
sync->wait();
ERR_FAIL_COND(pump_task_id == WorkerThreadPool::INVALID_TASK_ID);
WorkerThreadPool::get_singleton()->wait_for_task_completion(pump_task_id);
_flush();
}

CommandQueueMT(bool p_sync);
void set_pump_task_id(WorkerThreadPool::TaskID p_task_id) {
pump_task_id = p_task_id;
}

CommandQueueMT();
~CommandQueueMT();
};

Expand Down
Loading

0 comments on commit c951421

Please sign in to comment.