From c450f4d667e9f2462cb506cd3a32ca882c49ba68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Thu, 5 Sep 2024 08:35:05 +0200 Subject: [PATCH 1/2] ResourceLoader: Simplify handling of unregistered tasks --- core/io/resource_loader.cpp | 70 ++++++++++++++++--------------------- core/io/resource_loader.h | 4 ++- 2 files changed, 34 insertions(+), 40 deletions(-) diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index 7cf101b0de5e..a5f860e1671d 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -234,17 +234,22 @@ void ResourceLoader::LoadToken::clear() { // User-facing tokens shouldn't be deleted until completely claimed. DEV_ASSERT(user_rc == 0 && user_path.is_empty()); - if (!local_path.is_empty()) { // Empty is used for the special case where the load task is not registered. - DEV_ASSERT(thread_load_tasks.has(local_path)); - ThreadLoadTask &load_task = thread_load_tasks[local_path]; - if (load_task.task_id && !load_task.awaited) { - task_to_await = load_task.task_id; + if (!local_path.is_empty()) { + if (task_if_unregistered) { + memdelete(task_if_unregistered); + task_if_unregistered = nullptr; + } else { + DEV_ASSERT(thread_load_tasks.has(local_path)); + ThreadLoadTask &load_task = thread_load_tasks[local_path]; + if (load_task.task_id && !load_task.awaited) { + task_to_await = load_task.task_id; + } + // Removing a task which is still in progress would be catastrophic. + // Tokens must be alive until the task thread function is done. + DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED); + thread_load_tasks.erase(local_path); } - // Removing a task which is still in progress would be catastrophic. - // Tokens must be alive until the task thread function is done. - DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED); - thread_load_tasks.erase(local_path); - local_path.clear(); + local_path.clear(); // Mark as already cleared. } } @@ -521,9 +526,7 @@ Ref ResourceLoader::_load_start(const String &p_path, Ref load_token; bool must_not_register = false; - ThreadLoadTask unregistered_load_task; // Once set, must be valid up to the call to do the load. ThreadLoadTask *load_task_ptr = nullptr; - bool run_on_current_thread = false; { MutexLock thread_load_lock(thread_load_mutex); @@ -578,12 +581,11 @@ Ref ResourceLoader::_load_start(const String &p_path, } } - // If we want to ignore cache, but there's another task loading it, we can't add this one to the map and we also have to finish within scope. + // If we want to ignore cache, but there's another task loading it, we can't add this one to the map. must_not_register = ignoring_cache && thread_load_tasks.has(local_path); if (must_not_register) { - load_token->local_path.clear(); - unregistered_load_task = load_task; - load_task_ptr = &unregistered_load_task; + load_token->task_if_unregistered = memnew(ThreadLoadTask(load_task)); + load_task_ptr = load_token->task_if_unregistered; } else { DEV_ASSERT(!thread_load_tasks.has(local_path)); HashMap::Iterator E = thread_load_tasks.insert(local_path, load_task); @@ -591,9 +593,7 @@ Ref ResourceLoader::_load_start(const String &p_path, } } - run_on_current_thread = must_not_register || p_thread_mode == LOAD_THREAD_FROM_CURRENT; - - if (run_on_current_thread) { + if (p_thread_mode == LOAD_THREAD_FROM_CURRENT) { // The current thread may happen to be a thread from the pool. WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->get_caller_task_id(); if (tid != WorkerThreadPool::INVALID_TASK_ID) { @@ -606,11 +606,8 @@ Ref ResourceLoader::_load_start(const String &p_path, } } // MutexLock(thread_load_mutex). - if (run_on_current_thread) { + if (p_thread_mode == LOAD_THREAD_FROM_CURRENT) { _run_load_task(load_task_ptr); - if (must_not_register) { - load_token->res_if_unregistered = load_task_ptr->resource; - } } return load_token; @@ -738,7 +735,10 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro *r_error = OK; } - if (!p_load_token.local_path.is_empty()) { + ThreadLoadTask *load_task_ptr = nullptr; + if (p_load_token.task_if_unregistered) { + load_task_ptr = p_load_token.task_if_unregistered; + } else { if (!thread_load_tasks.has(p_load_token.local_path)) { if (r_error) { *r_error = ERR_BUG; @@ -809,22 +809,14 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro load_task.error = FAILED; } - Ref resource = load_task.resource; - if (r_error) { - *r_error = load_task.error; - } - return resource; - } else { - // Special case of an unregistered task. - // The resource should have been loaded by now. - Ref resource = p_load_token.res_if_unregistered; - if (!resource.is_valid()) { - if (r_error) { - *r_error = FAILED; - } - } - return resource; + load_task_ptr = &load_task; + } + + Ref resource = load_task_ptr->resource; + if (r_error) { + *r_error = load_task_ptr->error; } + return resource; } bool ResourceLoader::_ensure_load_progress() { diff --git a/core/io/resource_loader.h b/core/io/resource_loader.h index f75bf019fb54..34ac1ba3e92f 100644 --- a/core/io/resource_loader.h +++ b/core/io/resource_loader.h @@ -106,6 +106,8 @@ class ResourceLoader { MAX_LOADERS = 64 }; + struct ThreadLoadTask; + public: enum ThreadLoadStatus { THREAD_LOAD_INVALID_RESOURCE, @@ -124,7 +126,7 @@ class ResourceLoader { String local_path; String user_path; uint32_t user_rc = 0; // Having user RC implies regular RC incremented in one, until the user RC reaches zero. - Ref res_if_unregistered; + ThreadLoadTask *task_if_unregistered = nullptr; void clear(); From 74b9c38d5841b1abc2acaa25e13cd5b7fb1738d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Thu, 5 Sep 2024 09:48:13 +0200 Subject: [PATCH 2/2] ResourceLoader: Add thread-aware resource changed mechanism --- core/io/resource.cpp | 22 +++++----- core/io/resource_loader.cpp | 84 +++++++++++++++++++++++++++++++++++++ core/io/resource_loader.h | 12 ++++++ 3 files changed, 107 insertions(+), 11 deletions(-) diff --git a/core/io/resource.cpp b/core/io/resource.cpp index ff12dc58518a..6177cba6a40c 100644 --- a/core/io/resource.cpp +++ b/core/io/resource.cpp @@ -40,12 +40,12 @@ #include void Resource::emit_changed() { - if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) { - // Let the connection happen on the call queue, later, since signals are not thread-safe. - call_deferred("emit_signal", CoreStringName(changed)); - } else { - emit_signal(CoreStringName(changed)); + if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) { + ResourceLoader::resource_changed_emit(this); + return; } + + emit_signal(CoreStringName(changed)); } void Resource::_resource_path_changed() { @@ -166,22 +166,22 @@ bool Resource::editor_can_reload_from_file() { } void Resource::connect_changed(const Callable &p_callable, uint32_t p_flags) { - if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) { - // Let the check and connection happen on the call queue, later, since signals are not thread-safe. - callable_mp(this, &Resource::connect_changed).call_deferred(p_callable, p_flags); + if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) { + ResourceLoader::resource_changed_connect(this, p_callable, p_flags); return; } + if (!is_connected(CoreStringName(changed), p_callable) || p_flags & CONNECT_REFERENCE_COUNTED) { connect(CoreStringName(changed), p_callable, p_flags); } } void Resource::disconnect_changed(const Callable &p_callable) { - if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) { - // Let the check and disconnection happen on the call queue, later, since signals are not thread-safe. - callable_mp(this, &Resource::disconnect_changed).call_deferred(p_callable); + if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) { + ResourceLoader::resource_changed_disconnect(this, p_callable); return; } + if (is_connected(CoreStringName(changed), p_callable)) { disconnect(CoreStringName(changed), p_callable); } diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index a5f860e1671d..a083a91a634e 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -31,6 +31,7 @@ #include "resource_loader.h" #include "core/config/project_settings.h" +#include "core/core_bind.h" #include "core/io/file_access.h" #include "core/io/resource_importer.h" #include "core/object/script_language.h" @@ -329,6 +330,9 @@ void ResourceLoader::_run_load_task(void *p_userdata) { } } + ThreadLoadTask *curr_load_task_backup = curr_load_task; + curr_load_task = &load_task; + // Thread-safe either if it's the current thread or a brand new one. CallQueue *own_mq_override = nullptr; if (load_nesting == 0) { @@ -456,6 +460,8 @@ void ResourceLoader::_run_load_task(void *p_userdata) { } DEV_ASSERT(load_paths_stack.is_empty()); } + + curr_load_task = curr_load_task_backup; } static String _validate_local_path(const String &p_path) { @@ -816,6 +822,39 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro if (r_error) { *r_error = load_task_ptr->error; } + + if (resource.is_valid()) { + if (curr_load_task) { + // A task awaiting another => Let the awaiter accumulate the resource changed connections. + DEV_ASSERT(curr_load_task != load_task_ptr); + for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) { + curr_load_task->resource_changed_connections.push_back(rcc); + } + } else { + // A leaf task being awaited => Propagate the resource changed connections. + if (Thread::is_main_thread()) { + // On the main thread it's safe to migrate the connections to the standard signal mechanism. + for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) { + if (rcc.callable.is_valid()) { + rcc.source->connect_changed(rcc.callable, rcc.flags); + } + } + } else { + // On non-main threads, we have to queue and call it done when processed. + if (!load_task_ptr->resource_changed_connections.is_empty()) { + for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) { + if (rcc.callable.is_valid()) { + MessageQueue::get_main_singleton()->push_callable(callable_mp(rcc.source, &Resource::connect_changed).bind(rcc.callable, rcc.flags)); + } + } + core_bind::Semaphore done; + MessageQueue::get_main_singleton()->push_callable(callable_mp(&done, &core_bind::Semaphore::post)); + done.wait(); + } + } + } + } + return resource; } @@ -830,6 +869,50 @@ bool ResourceLoader::_ensure_load_progress() { return true; } +void ResourceLoader::resource_changed_connect(Resource *p_source, const Callable &p_callable, uint32_t p_flags) { + print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR "\t%d", Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class(), p_callable.get_object_id())); + + MutexLock lock(thread_load_mutex); + + for (const ThreadLoadTask::ResourceChangedConnection &rcc : curr_load_task->resource_changed_connections) { + if (unlikely(rcc.source == p_source && rcc.callable == p_callable)) { + return; + } + } + + ThreadLoadTask::ResourceChangedConnection rcc; + rcc.source = p_source; + rcc.callable = p_callable; + rcc.flags = p_flags; + curr_load_task->resource_changed_connections.push_back(rcc); +} + +void ResourceLoader::resource_changed_disconnect(Resource *p_source, const Callable &p_callable) { + print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR "t%d", Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class(), p_callable.get_object_id())); + + MutexLock lock(thread_load_mutex); + + for (uint32_t i = 0; i < curr_load_task->resource_changed_connections.size(); ++i) { + const ThreadLoadTask::ResourceChangedConnection &rcc = curr_load_task->resource_changed_connections[i]; + if (unlikely(rcc.source == p_source && rcc.callable == p_callable)) { + curr_load_task->resource_changed_connections.remove_at_unordered(i); + return; + } + } +} + +void ResourceLoader::resource_changed_emit(Resource *p_source) { + print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR, Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class())); + + MutexLock lock(thread_load_mutex); + + for (const ThreadLoadTask::ResourceChangedConnection &rcc : curr_load_task->resource_changed_connections) { + if (unlikely(rcc.source == p_source)) { + rcc.callable.call(); + } + } +} + Ref ResourceLoader::ensure_resource_ref_override_for_outer_load(const String &p_path, const String &p_res_type) { ERR_FAIL_COND_V(load_nesting == 0, Ref()); // It makes no sense to use this from nesting level 0. const String &local_path = _validate_local_path(p_path); @@ -1360,6 +1443,7 @@ bool ResourceLoader::timestamp_on_load = false; thread_local int ResourceLoader::load_nesting = 0; thread_local Vector ResourceLoader::load_paths_stack; thread_local HashMap>> ResourceLoader::res_ref_overrides; +thread_local ResourceLoader::ThreadLoadTask *ResourceLoader::curr_load_task = nullptr; SafeBinaryMutex &_get_res_loader_mutex() { return ResourceLoader::thread_load_mutex; diff --git a/core/io/resource_loader.h b/core/io/resource_loader.h index 34ac1ba3e92f..caaf9f8f45dd 100644 --- a/core/io/resource_loader.h +++ b/core/io/resource_loader.h @@ -189,6 +189,13 @@ class ResourceLoader { Ref resource; bool use_sub_threads = false; HashSet sub_tasks; + + struct ResourceChangedConnection { + Resource *source = nullptr; + Callable callable; + uint32_t flags = 0; + }; + LocalVector resource_changed_connections; }; static void _run_load_task(void *p_userdata); @@ -196,6 +203,7 @@ class ResourceLoader { static thread_local int load_nesting; static thread_local HashMap>> res_ref_overrides; // Outermost key is nesting level. static thread_local Vector load_paths_stack; + static thread_local ThreadLoadTask *curr_load_task; static SafeBinaryMutex thread_load_mutex; friend SafeBinaryMutex &_get_res_loader_mutex(); @@ -216,6 +224,10 @@ class ResourceLoader { static bool is_within_load() { return load_nesting > 0; }; + static void resource_changed_connect(Resource *p_source, const Callable &p_callable, uint32_t p_flags); + static void resource_changed_disconnect(Resource *p_source, const Callable &p_callable); + static void resource_changed_emit(Resource *p_source); + static Ref load(const String &p_path, const String &p_type_hint = "", ResourceFormatLoader::CacheMode p_cache_mode = ResourceFormatLoader::CACHE_MODE_REUSE, Error *r_error = nullptr); static bool exists(const String &p_path, const String &p_type_hint = "");