Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid multiple possibilites of deadlock in resource loading #77143

Merged
merged 1 commit into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions core/io/resource_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -476,16 +476,16 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,

if (run_on_current_thread) {
load_task_ptr->thread_id = Thread::get_caller_id();
if (must_not_register) {
load_token->res_if_unregistered = load_task_ptr->resource;
}
} else {
load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_thread_load_function, load_task_ptr);
}
}

if (run_on_current_thread) {
_thread_load_function(load_task_ptr);
if (must_not_register) {
load_token->res_if_unregistered = load_task_ptr->resource;
}
}

return load_token;
Expand Down Expand Up @@ -613,14 +613,33 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
return Ref<Resource>();
}

if (load_task.task_id != 0 && !load_task.awaited) {
// Loading thread is in the worker pool and still not awaited.
if (load_task.task_id != 0) {
// Loading thread is in the worker pool.
load_task.awaited = true;
thread_load_mutex.unlock();
WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id);
thread_load_mutex.lock();
Error err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id);
if (err == ERR_BUSY) {
// The WorkerThreadPool has scheduled tasks in a way that the current load depends on
// another one in a lower stack frame. Restart such load here. When the stack is eventually
// unrolled, the original load will have been notified to go on.
#ifdef DEV_ENABLED
print_verbose("ResourceLoader: Load task happened to wait on another one deep in the call stack. Attempting to avoid deadlock by re-issuing the load now.");
#endif
// CACHE_MODE_IGNORE is needed because, otherwise, the new request would just see there's
// an ongoing load for that resource and wait for it again. This value forces a new load.
Ref<ResourceLoader::LoadToken> token = _load_start(load_task.local_path, load_task.type_hint, LOAD_THREAD_DISTRIBUTE, ResourceFormatLoader::CACHE_MODE_IGNORE);
Ref<Resource> resource = _load_complete(*token.ptr(), &err);
if (r_error) {
*r_error = err;
}
thread_load_mutex.lock();
return resource;
} else {
DEV_ASSERT(err == OK);
thread_load_mutex.lock();
}
} else {
// Loading thread is main or user thread, or in the worker pool, but already awaited by some other thread.
// Loading thread is main or user thread.
if (!load_task.cond_var) {
load_task.cond_var = memnew(ConditionVariable);
}
Expand Down
208 changes: 158 additions & 50 deletions core/object/worker_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,23 @@ void WorkerThreadPool::_process_task_queue() {

void WorkerThreadPool::_process_task(Task *p_task) {
bool low_priority = p_task->low_priority;
int pool_thread_index = -1;
Task *prev_low_prio_task = nullptr; // In case this is recursively called.

if (!use_native_low_priority_threads) {
pool_thread_index = thread_ids[Thread::get_caller_id()];
ThreadData &curr_thread = threads[pool_thread_index];
task_mutex.lock();
p_task->pool_thread_index = pool_thread_index;
if (low_priority) {
low_priority_tasks_running++;
prev_low_prio_task = curr_thread.current_low_prio_task;
curr_thread.current_low_prio_task = p_task;
} else {
curr_thread.current_low_prio_task = nullptr;
}
task_mutex.unlock();
}

if (p_task->group) {
// Handling a group
Expand Down Expand Up @@ -126,21 +143,36 @@ void WorkerThreadPool::_process_task(Task *p_task) {
p_task->callable.callp(nullptr, 0, ret, ce);
}

task_mutex.lock();
p_task->completed = true;
p_task->done_semaphore.post();
for (uint8_t i = 0; i < p_task->waiting; i++) {
p_task->done_semaphore.post();
}
if (!use_native_low_priority_threads) {
p_task->pool_thread_index = -1;
}
task_mutex.unlock(); // Keep mutex down to here since on unlock the task may be freed.
}

if (!use_native_low_priority_threads && low_priority) {
// A low prioriry task was freed, so see if we can move a pending one to the high priority queue.
// Task may have been freed by now (all callers notified).
p_task = nullptr;

if (!use_native_low_priority_threads) {
bool post = false;
task_mutex.lock();
if (low_priority_task_queue.first()) {
Task *low_prio_task = low_priority_task_queue.first()->self();
low_priority_task_queue.remove(low_priority_task_queue.first());
task_queue.add_last(&low_prio_task->task_elem);
post = true;
} else {
ThreadData &curr_thread = threads[pool_thread_index];
curr_thread.current_low_prio_task = prev_low_prio_task;
if (low_priority) {
low_priority_threads_used--;
low_priority_tasks_running--;
// A low prioriry task was freed, so see if we can move a pending one to the high priority queue.
if (_try_promote_low_priority_task()) {
post = true;
}

if (low_priority_tasks_awaiting_others == low_priority_tasks_running) {
_prevent_low_prio_saturation_deadlock();
}
}
task_mutex.unlock();
if (post) {
Expand Down Expand Up @@ -198,6 +230,35 @@ void WorkerThreadPool::_post_task(Task *p_task, bool p_high_priority) {
}
}

bool WorkerThreadPool::_try_promote_low_priority_task() {
if (low_priority_task_queue.first()) {
Task *low_prio_task = low_priority_task_queue.first()->self();
low_priority_task_queue.remove(low_priority_task_queue.first());
task_queue.add_last(&low_prio_task->task_elem);
low_priority_threads_used++;
return true;
} else {
return false;
}
}

void WorkerThreadPool::_prevent_low_prio_saturation_deadlock() {
if (low_priority_tasks_awaiting_others == low_priority_tasks_running) {
#ifdef DEV_ENABLED
print_verbose("WorkerThreadPool: Low-prio slots saturated with tasks all waiting for other low-prio tasks. Attempting to avoid deadlock by scheduling one extra task.");
#endif
// In order not to create dependency cycles, we can only schedule the next one.
// We'll keep doing the same until the deadlock is broken,
SelfList<Task> *to_promote = low_priority_task_queue.first();
if (to_promote) {
low_priority_task_queue.remove(to_promote);
task_queue.add_last(to_promote);
low_priority_threads_used++;
task_available_semaphore.post();
}
}
}

WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *), void *p_userdata, bool p_high_priority, const String &p_description) {
return _add_task(Callable(), p_func, p_userdata, nullptr, p_high_priority, p_description);
}
Expand Down Expand Up @@ -238,66 +299,113 @@ bool WorkerThreadPool::is_task_completed(TaskID p_task_id) const {
return completed;
}

void WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) {
Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) {
task_mutex.lock();
Task **taskp = tasks.getptr(p_task_id);
if (!taskp) {
task_mutex.unlock();
ERR_FAIL_MSG("Invalid Task ID"); // Invalid task
ERR_FAIL_V_MSG(ERR_INVALID_PARAMETER, "Invalid Task ID"); // Invalid task
}
Task *task = *taskp;

if (task->waiting) {
String description = task->description;
task_mutex.unlock();
if (description.is_empty()) {
ERR_FAIL_MSG("Another thread is waiting on this task: " + itos(p_task_id)); // Invalid task
} else {
ERR_FAIL_MSG("Another thread is waiting on this task: " + description + " (" + itos(p_task_id) + ")"); // Invalid task
if (!task->completed) {
if (!use_native_low_priority_threads && task->pool_thread_index != -1) { // Otherwise, it's not running yet.
int caller_pool_th_index = thread_ids.has(Thread::get_caller_id()) ? thread_ids[Thread::get_caller_id()] : -1;
if (caller_pool_th_index == task->pool_thread_index) {
// Deadlock prevention.
// Waiting for a task run on this same thread? That means the task to be awaited started waiting as well
// and another task was run to make use of the thread in the meantime, with enough bad luck as to
// the need to wait for the original task arose in turn.
// In other words, the task we want to wait for is buried in the stack.
// Let's report the caller about the issue to it handles as it sees fit.
task_mutex.unlock();
return ERR_BUSY;
}
}
}

task->waiting = true;

task_mutex.unlock();
task->waiting++;

bool is_low_prio_waiting_for_another = false;
if (!use_native_low_priority_threads) {
// Deadlock prevention:
// If all low-prio tasks are waiting for other low-prio tasks and there are no more free low-prio slots,
// we have a no progressable situation. We can apply a workaround, consisting in promoting an awaited queued
// low-prio task to the schedule queue so it can run and break the "impasse".
// NOTE: A similar reasoning could be made about high priority tasks, but there are usually much more
// than low-prio. Therefore, a deadlock there would only happen when dealing with a very complex task graph
// or when there are too few worker threads (limited platforms or exotic settings). If that turns out to be
// an issue in the real world, a further fix can be applied against that.
if (task->low_priority) {
bool awaiter_is_a_low_prio_task = thread_ids.has(Thread::get_caller_id()) && threads[thread_ids[Thread::get_caller_id()]].current_low_prio_task;
if (awaiter_is_a_low_prio_task) {
is_low_prio_waiting_for_another = true;
low_priority_tasks_awaiting_others++;
if (low_priority_tasks_awaiting_others == low_priority_tasks_running) {
_prevent_low_prio_saturation_deadlock();
}
}
}
}

if (use_native_low_priority_threads && task->low_priority) {
task->low_priority_thread->wait_to_finish();
task_mutex.unlock();

task_mutex.lock();
native_thread_allocator.free(task->low_priority_thread);
} else {
int *index = thread_ids.getptr(Thread::get_caller_id());

if (index) {
// We are an actual process thread, we must not be blocked so continue processing stuff if available.
bool must_exit = false;
while (true) {
if (task->done_semaphore.try_wait()) {
// If done, exit
break;
}
if (!must_exit && task_available_semaphore.try_wait()) {
if (exit_threads) {
must_exit = true;
} else {
// Solve tasks while they are around.
_process_task_queue();
continue;
if (use_native_low_priority_threads && task->low_priority) {
task->done_semaphore.wait();
} else {
bool current_is_pool_thread = thread_ids.has(Thread::get_caller_id());
if (current_is_pool_thread) {
// We are an actual process thread, we must not be blocked so continue processing stuff if available.
bool must_exit = false;
while (true) {
if (task->done_semaphore.try_wait()) {
// If done, exit
break;
}
if (!must_exit) {
if (task_available_semaphore.try_wait()) {
if (exit_threads) {
must_exit = true;
} else {
// Solve tasks while they are around.
_process_task_queue();
continue;
}
} else if (!use_native_low_priority_threads && task->low_priority) {
// A low prioriry task started waiting, so see if we can move a pending one to the high priority queue.
task_mutex.lock();
bool post = _try_promote_low_priority_task();
task_mutex.unlock();
if (post) {
task_available_semaphore.post();
}
}
}
OS::get_singleton()->delay_usec(1); // Microsleep, this could be converted to waiting for multiple objects in supported platforms for a bit more performance.
}
OS::get_singleton()->delay_usec(1); // Microsleep, this could be converted to waiting for multiple objects in supported platforms for a bit more performance.
} else {
task->done_semaphore.wait();
}
} else {
task->done_semaphore.wait();
}

task_mutex.lock();
if (is_low_prio_waiting_for_another) {
low_priority_tasks_awaiting_others--;
}

task->waiting--;
}

if (task->waiting == 0) {
if (use_native_low_priority_threads && task->low_priority) {
task->low_priority_thread->wait_to_finish();
native_thread_allocator.free(task->low_priority_thread);
}
tasks.erase(p_task_id);
task_allocator.free(task);
}

tasks.erase(p_task_id);
task_allocator.free(task);
task_mutex.unlock();
return OK;
}

WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) {
Expand Down Expand Up @@ -429,7 +537,7 @@ void WorkerThreadPool::init(int p_thread_count, bool p_use_native_threads_low_pr
if (p_use_native_threads_low_priority) {
max_low_priority_threads = 0;
} else {
max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count);
max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count - 1);
}

use_native_low_priority_threads = p_use_native_threads_low_priority;
Expand Down
11 changes: 9 additions & 2 deletions core/object/worker_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ class WorkerThreadPool : public Object {
bool completed = false;
Group *group = nullptr;
SelfList<Task> task_elem;
bool waiting = false; // Waiting for completion
uint32_t waiting = 0;
bool low_priority = false;
BaseTemplateUserdata *template_userdata = nullptr;
Thread *low_priority_thread = nullptr;
int pool_thread_index = -1;

void free_template_userdata();
Task() :
Expand All @@ -104,6 +105,7 @@ class WorkerThreadPool : public Object {
struct ThreadData {
uint32_t index;
Thread thread;
Task *current_low_prio_task = nullptr;
};

TightLocalVector<ThreadData> threads;
Expand All @@ -116,6 +118,8 @@ class WorkerThreadPool : public Object {
bool use_native_low_priority_threads = false;
uint32_t max_low_priority_threads = 0;
uint32_t low_priority_threads_used = 0;
uint32_t low_priority_tasks_running = 0;
uint32_t low_priority_tasks_awaiting_others = 0;

uint64_t last_task = 1;

Expand All @@ -127,6 +131,9 @@ class WorkerThreadPool : public Object {

void _post_task(Task *p_task, bool p_high_priority);

bool _try_promote_low_priority_task();
void _prevent_low_prio_saturation_deadlock();

static WorkerThreadPool *singleton;

TaskID _add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description);
Expand Down Expand Up @@ -169,7 +176,7 @@ class WorkerThreadPool : public Object {
TaskID add_task(const Callable &p_action, bool p_high_priority = false, const String &p_description = String());

bool is_task_completed(TaskID p_task_id) const;
void wait_for_task_completion(TaskID p_task_id);
Error wait_for_task_completion(TaskID p_task_id);

template <class C, class M, class U>
GroupID add_template_group_task(C *p_instance, M p_method, U p_userdata, int p_elements, int p_tasks = -1, bool p_high_priority = false, const String &p_description = String()) {
Expand Down
5 changes: 4 additions & 1 deletion doc/classes/WorkerThreadPool.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,13 @@
</description>
</method>
<method name="wait_for_task_completion">
<return type="void" />
<return type="int" enum="Error" />
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically this breaks compatibility. Needs to be assessed whether it's worth making an exception and allow it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change it so the exposed API doesn't change. However, I don't really see the downside. If the caller is not interested, they can just ignore the returned value. After all, there's no behavior change. This gives an opportunity to the user of doing something about the deadlock, but the possibility of it occuring was already there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its fine, It does not change anything for GDScript and for GDExtension we already have the compatibility fallback system.

<param index="0" name="task_id" type="int" />
<description>
Pauses the thread that calls this method until the task with the given ID is completed.
Returns [constant @GlobalScope.OK] if the task could be successfully awaited.
Returns [constant @GlobalScope.ERR_INVALID_PARAMETER] if a task with the passed ID does not exist (maybe because it was already awaited and disposed of).
Returns [constant @GlobalScope.ERR_BUSY] if the call is made from another running task and, due to task scheduling, the task to await is at a lower level in the call stack and therefore can't progress. This is an advanced situation that should only matter when some tasks depend on others.
</description>
</method>
</methods>
Expand Down