Kaynağa Gözat

Avoid interaction issues between resource loading threads

Pedro J. Estébanez 2 yıl önce
ebeveyn
işleme
8983b20ccd

+ 15 - 32
core/io/resource_format_binary.cpp

@@ -445,13 +445,12 @@ Error ResourceLoaderBinary::parse_variant(Variant &r_v) {
 						WARN_PRINT("Broken external resource! (index out of size)");
 						r_v = Variant();
 					} else {
-						if (external_resources[erindex].cache.is_null()) {
-							//cache not here yet, wait for it?
-							if (use_sub_threads) {
-								Error err;
-								external_resources.write[erindex].cache = ResourceLoader::load_threaded_get(external_resources[erindex].path, &err);
-
-								if (err != OK || external_resources[erindex].cache.is_null()) {
+						Ref<ResourceLoader::LoadToken> &load_token = external_resources.write[erindex].load_token;
+						if (load_token.is_valid()) { // If not valid, it's OK since then we know this load accepts broken dependencies.
+							Error err;
+							Ref<Resource> res = ResourceLoader::_load_complete(*load_token.ptr(), &err);
+							if (res.is_null()) {
+								if (!ResourceLoader::is_cleaning_tasks()) {
 									if (!ResourceLoader::get_abort_on_missing_resources()) {
 										ResourceLoader::notify_dependency_error(local_path, external_resources[erindex].path, external_resources[erindex].type);
 									} else {
@@ -459,12 +458,11 @@ Error ResourceLoaderBinary::parse_variant(Variant &r_v) {
 										ERR_FAIL_V_MSG(error, "Can't load dependency: " + external_resources[erindex].path + ".");
 									}
 								}
+							} else {
+								r_v = res;
 							}
 						}
-
-						r_v = external_resources[erindex].cache;
 					}
-
 				} break;
 				default: {
 					ERR_FAIL_V(ERR_FILE_CORRUPT);
@@ -684,28 +682,13 @@ Error ResourceLoaderBinary::load() {
 		}
 
 		external_resources.write[i].path = path; //remap happens here, not on load because on load it can actually be used for filesystem dock resource remap
-
-		if (!use_sub_threads) {
-			external_resources.write[i].cache = ResourceLoader::load(path, external_resources[i].type);
-
-			if (external_resources[i].cache.is_null()) {
-				if (!ResourceLoader::get_abort_on_missing_resources()) {
-					ResourceLoader::notify_dependency_error(local_path, path, external_resources[i].type);
-				} else {
-					error = ERR_FILE_MISSING_DEPENDENCIES;
-					ERR_FAIL_V_MSG(error, "Can't load dependency: " + path + ".");
-				}
-			}
-
-		} else {
-			Error err = ResourceLoader::load_threaded_request(path, external_resources[i].type, use_sub_threads, ResourceFormatLoader::CACHE_MODE_REUSE, local_path);
-			if (err != OK) {
-				if (!ResourceLoader::get_abort_on_missing_resources()) {
-					ResourceLoader::notify_dependency_error(local_path, path, external_resources[i].type);
-				} else {
-					error = ERR_FILE_MISSING_DEPENDENCIES;
-					ERR_FAIL_V_MSG(error, "Can't load dependency: " + path + ".");
-				}
+		external_resources.write[i].load_token = ResourceLoader::_load_start(path, external_resources[i].type, use_sub_threads ? ResourceLoader::LOAD_THREAD_DISTRIBUTE : ResourceLoader::LOAD_THREAD_FROM_CURRENT, ResourceFormatLoader::CACHE_MODE_REUSE);
+		if (!external_resources[i].load_token.is_valid()) {
+			if (!ResourceLoader::get_abort_on_missing_resources()) {
+				ResourceLoader::notify_dependency_error(local_path, path, external_resources[i].type);
+			} else {
+				error = ERR_FILE_MISSING_DEPENDENCIES;
+				ERR_FAIL_V_MSG(error, "Can't load dependency: " + path + ".");
 			}
 		}
 	}

+ 1 - 1
core/io/resource_format_binary.h

@@ -60,7 +60,7 @@ class ResourceLoaderBinary {
 		String path;
 		String type;
 		ResourceUID::ID uid = ResourceUID::INVALID_ID;
-		Ref<Resource> cache;
+		Ref<ResourceLoader::LoadToken> load_token;
 	};
 
 	bool using_named_scene_ids = false;

+ 339 - 271
core/io/resource_loader.cpp

@@ -202,20 +202,72 @@ void ResourceFormatLoader::_bind_methods() {
 
 ///////////////////////////////////
 
+// This should be robust enough to be called redundantly without issues.
+void ResourceLoader::LoadToken::clear() {
+	thread_load_mutex.lock();
+
+	Thread *thread_to_destroy = nullptr;
+
+	if (!local_path.is_empty()) { // Empty is used for the special case where the load task is not registered.
+		DEV_ASSERT(thread_load_tasks.has(local_path));
+		ThreadLoadTask &load_task = thread_load_tasks[local_path];
+		thread_to_destroy = load_task.thread;
+		load_task.thread = nullptr;
+		thread_load_tasks.erase(local_path);
+		local_path.clear();
+	}
+
+	if (!user_path.is_empty()) {
+		DEV_ASSERT(user_load_tokens.has(user_path));
+		user_load_tokens.erase(user_path);
+		user_path.clear();
+	}
+
+	thread_load_mutex.unlock();
+
+	// If thread is unused, destroy it here, locally, now the token data is consistent.
+	if (thread_to_destroy) {
+		if (thread_to_destroy->is_started()) {
+			thread_to_destroy->wait_to_finish();
+		}
+		memdelete(thread_to_destroy);
+	}
+}
+
+ResourceLoader::LoadToken::~LoadToken() {
+	clear();
+}
+
 Ref<Resource> ResourceLoader::_load(const String &p_path, const String &p_original_path, const String &p_type_hint, ResourceFormatLoader::CacheMode p_cache_mode, Error *r_error, bool p_use_sub_threads, float *r_progress) {
-	bool found = false;
+	load_nesting++;
+	if (load_paths_stack.size()) {
+		thread_load_mutex.lock();
+		HashMap<String, ThreadLoadTask>::Iterator E = thread_load_tasks.find(load_paths_stack[load_paths_stack.size() - 1]);
+		if (E) {
+			E->value.sub_tasks.insert(p_path);
+		}
+		thread_load_mutex.unlock();
+	}
+	load_paths_stack.push_back(p_path);
 
 	// Try all loaders and pick the first match for the type hint
+	bool found = false;
+	Ref<Resource> res;
 	for (int i = 0; i < loader_count; i++) {
 		if (!loader[i]->recognize_path(p_path, p_type_hint)) {
 			continue;
 		}
 		found = true;
-		Ref<Resource> res = loader[i]->load(p_path, !p_original_path.is_empty() ? p_original_path : p_path, r_error, p_use_sub_threads, r_progress, p_cache_mode);
-		if (res.is_null()) {
-			continue;
+		res = loader[i]->load(p_path, !p_original_path.is_empty() ? p_original_path : p_path, r_error, p_use_sub_threads, r_progress, p_cache_mode);
+		if (!res.is_null()) {
+			break;
 		}
+	}
 
+	load_paths_stack.resize(load_paths_stack.size() - 1);
+	load_nesting--;
+
+	if (!res.is_null()) {
 		return res;
 	}
 
@@ -232,47 +284,45 @@ Ref<Resource> ResourceLoader::_load(const String &p_path, const String &p_origin
 
 void ResourceLoader::_thread_load_function(void *p_userdata) {
 	ThreadLoadTask &load_task = *(ThreadLoadTask *)p_userdata;
-	load_task.loader_id = Thread::get_caller_id();
-
-	if (load_task.cond_var) {
-		//this is an actual thread, so wait for Ok from semaphore
-		thread_load_semaphore->wait(); //wait until its ok to start loading
+	// Thread-safe either if it's the current thread or a brand new one.
+	if (load_task.first_in_stack) {
+		if (!load_task.dependent_path.is_empty()) {
+			load_paths_stack.push_back(load_task.dependent_path);
+		}
+	} else {
+		DEV_ASSERT(load_task.dependent_path.is_empty());
 	}
-	load_task.resource = _load(load_task.remapped_path, load_task.remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, load_task.cache_mode, &load_task.error, load_task.use_sub_threads, &load_task.progress);
+	// --
 
-	load_task.progress = 1.0; //it was fully loaded at this point, so force progress to 1.0
+	Ref<Resource> res = _load(load_task.remapped_path, load_task.remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, load_task.cache_mode, &load_task.error, load_task.use_sub_threads, &load_task.progress);
+
+	thread_load_mutex.lock();
+
+	load_task.resource = res;
 
-	thread_load_mutex->lock();
+	load_task.progress = 1.0; //it was fully loaded at this point, so force progress to 1.0
 	if (load_task.error != OK) {
 		load_task.status = THREAD_LOAD_FAILED;
 	} else {
 		load_task.status = THREAD_LOAD_LOADED;
 	}
-	if (load_task.cond_var) {
-		if (load_task.start_next && thread_waiting_count > 0) {
-			thread_waiting_count--;
-			//thread loading count remains constant, this ends but another one begins
-			thread_load_semaphore->post();
-		} else {
-			thread_loading_count--; //no threads waiting, just reduce loading count
-		}
-
-		print_lt("END: load count: " + itos(thread_loading_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_loading_count - thread_suspended_count));
 
+	if (load_task.cond_var) {
 		load_task.cond_var->notify_all();
 		memdelete(load_task.cond_var);
 		load_task.cond_var = nullptr;
 	}
 
 	if (load_task.resource.is_valid()) {
-		load_task.resource->set_path(load_task.local_path);
+		if (load_task.cache_mode != ResourceFormatLoader::CACHE_MODE_IGNORE) {
+			load_task.resource->set_path(load_task.local_path);
+		}
 
 		if (load_task.xl_remapped) {
 			load_task.resource->set_as_translation_remapped(true);
 		}
 
 #ifdef TOOLS_ENABLED
-
 		load_task.resource->set_edited(false);
 		if (timestamp_on_load) {
 			uint64_t mt = FileAccess::get_modified_time(load_task.remapped_path);
@@ -286,7 +336,16 @@ void ResourceLoader::_thread_load_function(void *p_userdata) {
 		}
 	}
 
-	thread_load_mutex->unlock();
+	if (load_nesting == 0) {
+		thread_active_count--;
+		if (thread_waiting_count) {
+			thread_active_cond_var.notify_one();
+		}
+	}
+
+	print_lt("END: load count: " + itos(thread_active_count + thread_suspended_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_active_count));
+
+	thread_load_mutex.unlock();
 }
 
 static String _validate_local_path(const String &p_path) {
@@ -299,91 +358,158 @@ static String _validate_local_path(const String &p_path) {
 		return ProjectSettings::get_singleton()->localize_path(p_path);
 	}
 }
-Error ResourceLoader::load_threaded_request(const String &p_path, const String &p_type_hint, bool p_use_sub_threads, ResourceFormatLoader::CacheMode p_cache_mode, const String &p_source_resource) {
-	String local_path = _validate_local_path(p_path);
 
-	thread_load_mutex->lock();
+Error ResourceLoader::load_threaded_request(const String &p_path, const String &p_type_hint, bool p_use_sub_threads, ResourceFormatLoader::CacheMode p_cache_mode) {
+	thread_load_mutex.lock();
+	if (user_load_tokens.has(p_path)) {
+		print_verbose("load_threaded_request(): Another threaded load for resource path '" + p_path + "' has been initiated. Not an error.");
+		user_load_tokens[p_path]->reference(); // Additional request.
+		thread_load_mutex.unlock();
+		return OK;
+	}
+	user_load_tokens[p_path] = nullptr;
+	thread_load_mutex.unlock();
+
+	Ref<ResourceLoader::LoadToken> token = _load_start(p_path, p_type_hint, p_use_sub_threads ? LOAD_THREAD_DISTRIBUTE : LOAD_THREAD_SPAWN_SINGLE, p_cache_mode);
+	if (token.is_valid()) {
+		thread_load_mutex.lock();
+		token->user_path = p_path;
+		token->reference(); // First request.
+		user_load_tokens[p_path] = token.ptr();
+		print_lt("REQUEST: user load tokens: " + itos(user_load_tokens.size()));
+		thread_load_mutex.unlock();
+		return OK;
+	} else {
+		return FAILED;
+	}
+}
 
-	if (!p_source_resource.is_empty()) {
-		//must be loading from this resource
-		if (!thread_load_tasks.has(p_source_resource)) {
-			thread_load_mutex->unlock();
-			ERR_FAIL_V_MSG(ERR_INVALID_PARAMETER, "There is no thread loading source resource '" + p_source_resource + "'.");
-		}
-		//must not be already added as s sub tasks
-		if (thread_load_tasks[p_source_resource].sub_tasks.has(local_path)) {
-			thread_load_mutex->unlock();
-			ERR_FAIL_V_MSG(ERR_INVALID_PARAMETER, "Thread loading source resource '" + p_source_resource + "' already is loading '" + local_path + "'.");
-		}
+Ref<Resource> ResourceLoader::load(const String &p_path, const String &p_type_hint, ResourceFormatLoader::CacheMode p_cache_mode, Error *r_error) {
+	if (r_error) {
+		*r_error = OK;
 	}
 
-	if (thread_load_tasks.has(local_path)) {
-		thread_load_tasks[local_path].requests++;
-		if (!p_source_resource.is_empty()) {
-			thread_load_tasks[p_source_resource].sub_tasks.insert(local_path);
+	Ref<LoadToken> load_token = _load_start(p_path, p_type_hint, LOAD_THREAD_FROM_CURRENT, p_cache_mode);
+	if (!load_token.is_valid()) {
+		if (r_error) {
+			*r_error = FAILED;
 		}
-		thread_load_mutex->unlock();
-		return OK;
+		return Ref<Resource>();
 	}
 
-	{
-		//create load task
-
-		ThreadLoadTask load_task;
+	Ref<Resource> res = _load_complete(*load_token.ptr(), r_error);
+	return res;
+}
 
-		load_task.requests = 1;
-		load_task.remapped_path = _path_remap(local_path, &load_task.xl_remapped);
-		load_task.local_path = local_path;
-		load_task.type_hint = p_type_hint;
-		load_task.cache_mode = p_cache_mode;
-		load_task.use_sub_threads = p_use_sub_threads;
+Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode) {
+	String local_path = _validate_local_path(p_path);
 
-		{ //must check if resource is already loaded before attempting to load it in a thread
+	Ref<LoadToken> load_token;
+	bool must_not_register = false;
+	ThreadLoadTask unregistered_load_task; // Once set, must be valid up to the call to do the load.
+	ThreadLoadTask *load_task_ptr = nullptr;
+	bool run_on_current_thread = false;
+	{
+		MutexLock thread_load_lock(thread_load_mutex);
 
-			if (load_task.loader_id == Thread::get_caller_id()) {
-				thread_load_mutex->unlock();
-				ERR_FAIL_V_MSG(ERR_INVALID_PARAMETER, "Attempted to load a resource already being loaded from this thread, cyclic reference?");
+		if (thread_load_tasks.has(local_path)) {
+			load_token = Ref<LoadToken>(thread_load_tasks[local_path].load_token);
+			if (!load_token.is_valid()) {
+				// The token is dying (reached 0 on another thread).
+				// Ensure it's killed now so the path can be safely reused right away.
+				thread_load_tasks[local_path].load_token->clear();
+			} else {
+				if (p_cache_mode != ResourceFormatLoader::CACHE_MODE_IGNORE) {
+					return load_token;
+				}
 			}
+		}
 
-			Ref<Resource> existing = ResourceCache::get_ref(local_path);
+		load_token.instantiate();
+		load_token->local_path = local_path;
 
-			if (existing.is_valid()) {
-				//referencing is fine
-				load_task.resource = existing;
-				load_task.status = THREAD_LOAD_LOADED;
-				load_task.progress = 1.0;
+		//create load task
+		{
+			ThreadLoadTask load_task;
+
+			load_task.remapped_path = _path_remap(local_path, &load_task.xl_remapped);
+			load_task.load_token = load_token.ptr();
+			load_task.local_path = local_path;
+			load_task.type_hint = p_type_hint;
+			load_task.cache_mode = p_cache_mode;
+			load_task.use_sub_threads = p_thread_mode == LOAD_THREAD_DISTRIBUTE;
+			if (p_cache_mode != ResourceFormatLoader::CACHE_MODE_IGNORE) {
+				Ref<Resource> existing = ResourceCache::get_ref(local_path);
+				if (existing.is_valid()) {
+					//referencing is fine
+					load_task.resource = existing;
+					load_task.status = THREAD_LOAD_LOADED;
+					load_task.progress = 1.0;
+					thread_load_tasks[local_path] = load_task;
+					return load_token;
+				}
 			}
-		}
 
-		if (!p_source_resource.is_empty()) {
-			thread_load_tasks[p_source_resource].sub_tasks.insert(local_path);
+			// If we want to ignore cache, but there's another task loading it, we can't add this one to the map and we also have to finish unconditionally synchronously.
+			must_not_register = thread_load_tasks.has(local_path) && p_cache_mode == ResourceFormatLoader::CACHE_MODE_IGNORE;
+			if (must_not_register) {
+				load_token->local_path.clear();
+				unregistered_load_task = load_task;
+			} else {
+				thread_load_tasks[local_path] = load_task;
+			}
+
+			load_task_ptr = must_not_register ? &unregistered_load_task : &thread_load_tasks[local_path];
 		}
 
-		thread_load_tasks[local_path] = load_task;
-	}
+		print_lt("REQUEST: load count: " + itos(thread_active_count + thread_suspended_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_active_count));
 
-	ThreadLoadTask &load_task = thread_load_tasks[local_path];
+		run_on_current_thread = must_not_register || p_thread_mode == LOAD_THREAD_FROM_CURRENT;
 
-	if (load_task.resource.is_null()) { //needs to be loaded in thread
+		if (!run_on_current_thread && thread_active_count >= thread_active_max && load_nesting > 0) {
+			// No free slots for another thread, but this one is already active, so keep working here.
+			run_on_current_thread = true;
+		}
 
-		load_task.cond_var = memnew(ConditionVariable);
-		if (thread_loading_count < thread_load_max) {
-			thread_loading_count++;
-			thread_load_semaphore->post(); //we have free threads, so allow one
-		} else {
-			thread_waiting_count++;
+		load_task_ptr->first_in_stack = run_on_current_thread ? load_nesting == 0 : true;
+
+		if (load_task_ptr->first_in_stack) {
+			if (!run_on_current_thread && load_paths_stack.size()) {
+				// The paths stack is lost across thread boundaries, so we have to remember what was the topmost path.
+				load_task_ptr->dependent_path = load_paths_stack[load_paths_stack.size() - 1];
+			}
+			if (thread_active_count >= thread_active_max) {
+				// Either the current or a new thread needs to wait for a free slot to become active.
+				thread_waiting_count++;
+				do {
+					thread_active_cond_var.wait(thread_load_lock);
+				} while (thread_active_count >= thread_active_max);
+				thread_waiting_count--;
+			}
+			thread_active_count++;
 		}
 
-		print_lt("REQUEST: load count: " + itos(thread_loading_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_loading_count - thread_suspended_count));
+		if (cleaning_tasks) {
+			load_task_ptr->status = THREAD_LOAD_FAILED;
+			return load_token;
+		}
 
-		load_task.thread = memnew(Thread);
-		load_task.thread->start(_thread_load_function, &thread_load_tasks[local_path]);
-		load_task.loader_id = load_task.thread->get_id();
+		if (run_on_current_thread) {
+			load_task_ptr->loader_id = Thread::get_caller_id();
+			if (must_not_register) {
+				load_token->res_if_unregistered = load_task_ptr->resource;
+			}
+		} else {
+			load_task_ptr->thread = memnew(Thread);
+			load_task_ptr->loader_id = load_task_ptr->thread->start(_thread_load_function, load_task_ptr);
+		}
 	}
 
-	thread_load_mutex->unlock();
+	if (run_on_current_thread) {
+		_thread_load_function(load_task_ptr);
+	}
 
-	return OK;
+	return load_token;
 }
 
 float ResourceLoader::_dependency_get_progress(const String &p_path) {
@@ -409,13 +535,22 @@ float ResourceLoader::_dependency_get_progress(const String &p_path) {
 }
 
 ResourceLoader::ThreadLoadStatus ResourceLoader::load_threaded_get_status(const String &p_path, float *r_progress) {
-	String local_path = _validate_local_path(p_path);
+	MutexLock thread_load_lock(thread_load_mutex);
 
-	thread_load_mutex->lock();
+	if (!user_load_tokens.has(p_path)) {
+		print_verbose("load_threaded_get_status(): No threaded load for resource path '" + p_path + "' has been initiated or its result has already been collected.");
+		return THREAD_LOAD_INVALID_RESOURCE;
+	}
+
+	String local_path = _validate_local_path(p_path);
 	if (!thread_load_tasks.has(local_path)) {
-		thread_load_mutex->unlock();
+#ifdef DEV_ENABLED
+		CRASH_NOW();
+#endif
+		// On non-dev, be defensive and at least avoid crashing (at this point at least).
 		return THREAD_LOAD_INVALID_RESOURCE;
 	}
+
 	ThreadLoadTask &load_task = thread_load_tasks[local_path];
 	ThreadLoadStatus status;
 	status = load_task.status;
@@ -423,198 +558,115 @@ ResourceLoader::ThreadLoadStatus ResourceLoader::load_threaded_get_status(const
 		*r_progress = _dependency_get_progress(local_path);
 	}
 
-	thread_load_mutex->unlock();
-
 	return status;
 }
 
 Ref<Resource> ResourceLoader::load_threaded_get(const String &p_path, Error *r_error) {
-	String local_path = _validate_local_path(p_path);
-
-	MutexLock thread_load_lock(*thread_load_mutex);
-	if (!thread_load_tasks.has(local_path)) {
-		if (r_error) {
-			*r_error = ERR_INVALID_PARAMETER;
-		}
-		return Ref<Resource>();
+	if (r_error) {
+		*r_error = OK;
 	}
 
-	ThreadLoadTask &load_task = thread_load_tasks[local_path];
+	Ref<Resource> res;
+	{
+		MutexLock thread_load_lock(thread_load_mutex);
 
-	if (load_task.status == THREAD_LOAD_IN_PROGRESS) {
-		if (load_task.loader_id == Thread::get_caller_id()) {
-			// Load is in progress, but it's precisely this thread the one in charge.
-			// That means this is a cyclic load.
+		if (!user_load_tokens.has(p_path)) {
+			print_verbose("load_threaded_get(): No threaded load for resource path '" + p_path + "' has been initiated or its result has already been collected.");
 			if (r_error) {
-				*r_error = ERR_BUSY;
+				*r_error = ERR_INVALID_PARAMETER;
 			}
 			return Ref<Resource>();
-		} else if (!load_task.cond_var) {
-			// Load is in progress, but a condition variable was never created for it.
-			// That happens when a load has been initiated with subthreads disabled,
-			// but now another load thread needs to interact with this one (either
-			// because of subthreads being used this time, or because it's simply a
-			// threaded load running on a different thread).
-			// Since we want to be notified when the load ends, we must create the
-			// condition variable now.
-			load_task.cond_var = memnew(ConditionVariable);
-		}
-	}
-
-	//cond var still exists, meaning it's still loading, request poll
-	if (load_task.cond_var) {
-		{
-			// As we got a cond var, this means we are going to have to wait
-			// until the sub-resource is done loading
-			//
-			// As this thread will become 'blocked' we should "exchange" its
-			// active status with a waiting one, to ensure load continues.
-			//
-			// This ensures loading is never blocked and that is also within
-			// the maximum number of active threads.
-
-			if (thread_waiting_count > 0) {
-				thread_waiting_count--;
-				thread_loading_count++;
-				thread_load_semaphore->post();
-
-				load_task.start_next = false; //do not start next since we are doing it here
-			}
-
-			thread_suspended_count++;
-
-			print_lt("GET: load count: " + itos(thread_loading_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_loading_count - thread_suspended_count));
 		}
 
-		bool still_valid = true;
-		bool was_thread = load_task.thread;
-		do {
-			load_task.cond_var->wait(thread_load_lock);
-			if (!thread_load_tasks.has(local_path)) { //may have been erased during unlock and this was always an invalid call
-				still_valid = false;
-				break;
-			}
-		} while (load_task.cond_var); // In case of spurious wakeup.
-
-		if (was_thread) {
-			thread_suspended_count--;
-		}
-
-		if (!still_valid) {
+		LoadToken *load_token = user_load_tokens[p_path];
+		if (!load_token) {
+			// This happens if requested from one thread and rapidly querying from another.
 			if (r_error) {
-				*r_error = ERR_INVALID_PARAMETER;
+				*r_error = ERR_BUSY;
 			}
 			return Ref<Resource>();
 		}
+		res = _load_complete_inner(*load_token, r_error, thread_load_lock);
+		if (load_token->unreference()) {
+			memdelete(load_token);
+		}
 	}
 
-	Ref<Resource> resource = load_task.resource;
-	if (r_error) {
-		*r_error = load_task.error;
-	}
-
-	load_task.requests--;
+	print_lt("GET: user load tokens: " + itos(user_load_tokens.size()));
 
-	if (load_task.requests == 0) {
-		if (load_task.thread) { //thread may not have been used
-			load_task.thread->wait_to_finish();
-			memdelete(load_task.thread);
-		}
-		thread_load_tasks.erase(local_path);
-	}
+	return res;
+}
 
-	return resource;
+Ref<Resource> ResourceLoader::_load_complete(LoadToken &p_load_token, Error *r_error) {
+	MutexLock thread_load_lock(thread_load_mutex);
+	return _load_complete_inner(p_load_token, r_error, thread_load_lock);
 }
 
-Ref<Resource> ResourceLoader::load(const String &p_path, const String &p_type_hint, ResourceFormatLoader::CacheMode p_cache_mode, Error *r_error) {
+Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Error *r_error, MutexLock<SafeBinaryMutex<BINARY_MUTEX_TAG>> &p_thread_load_lock) {
 	if (r_error) {
-		*r_error = ERR_CANT_OPEN;
+		*r_error = OK;
 	}
 
-	String local_path = _validate_local_path(p_path);
+	if (!p_load_token.local_path.is_empty()) {
+		if (!thread_load_tasks.has(p_load_token.local_path)) {
+#ifdef DEV_ENABLED
+			CRASH_NOW();
+#endif
+			// On non-dev, be defensive and at least avoid crashing (at this point at least).
+			if (r_error) {
+				*r_error = ERR_BUG;
+			}
+			return Ref<Resource>();
+		}
 
-	if (p_cache_mode != ResourceFormatLoader::CACHE_MODE_IGNORE) {
-		thread_load_mutex->lock();
+		ThreadLoadTask &load_task = thread_load_tasks[p_load_token.local_path];
 
-		//Is it already being loaded? poll until done
-		if (thread_load_tasks.has(local_path)) {
-			Error err = load_threaded_request(p_path, p_type_hint);
-			if (err != OK) {
+		if (load_task.status == THREAD_LOAD_IN_PROGRESS) {
+			if (load_task.loader_id == Thread::get_caller_id()) {
+				// Load is in progress, but it's precisely this thread the one in charge.
+				// That means this is a cyclic load.
 				if (r_error) {
-					*r_error = err;
+					*r_error = ERR_BUSY;
 				}
-				thread_load_mutex->unlock();
 				return Ref<Resource>();
-			}
-			thread_load_mutex->unlock();
-
-			return load_threaded_get(p_path, r_error);
-		}
-
-		//Is it cached?
-
-		Ref<Resource> existing = ResourceCache::get_ref(local_path);
-
-		if (existing.is_valid()) {
-			thread_load_mutex->unlock();
-
-			if (r_error) {
-				*r_error = OK;
+			} else if (!load_task.cond_var) {
+				// This is the first time some thread needs to wait for this one.
+				load_task.cond_var = memnew(ConditionVariable);
 			}
 
-			return existing; //use cached
-		}
-
-		//load using task (but this thread)
-		ThreadLoadTask load_task;
-
-		load_task.requests = 1;
-		load_task.local_path = local_path;
-		load_task.remapped_path = _path_remap(local_path, &load_task.xl_remapped);
-		load_task.type_hint = p_type_hint;
-		load_task.cache_mode = p_cache_mode; //ignore
-		load_task.loader_id = Thread::get_caller_id();
-
-		thread_load_tasks[local_path] = load_task;
-
-		thread_load_mutex->unlock();
+			// Wait for load to complete.
+			thread_suspended_count++;
 
-		_thread_load_function(&thread_load_tasks[local_path]);
+			print_lt("GET: load count: " + itos(thread_active_count + thread_suspended_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_active_count));
 
-		return load_threaded_get(p_path, r_error);
+			do {
+				load_task.cond_var->wait(p_thread_load_lock);
+				DEV_ASSERT(thread_load_tasks.has(p_load_token.local_path) && p_load_token.get_reference_count());
+			} while (load_task.cond_var);
 
-	} else {
-		bool xl_remapped = false;
-		String path = _path_remap(local_path, &xl_remapped);
-
-		if (path.is_empty()) {
-			ERR_FAIL_V_MSG(Ref<Resource>(), "Remapping '" + local_path + "' failed.");
+			thread_suspended_count--;
 		}
 
-		print_verbose("Loading resource: " + path);
-		float p;
-		Ref<Resource> res = _load(path, local_path, p_type_hint, p_cache_mode, r_error, false, &p);
-
-		if (res.is_null()) {
-			print_verbose("Failed loading resource: " + path);
-			return Ref<Resource>();
+		if (cleaning_tasks) {
+			load_task.resource = Ref<Resource>();
+			load_task.error = FAILED;
 		}
 
-		if (xl_remapped) {
-			res->set_as_translation_remapped(true);
+		Ref<Resource> resource = load_task.resource;
+		if (r_error) {
+			*r_error = load_task.error;
 		}
-
-#ifdef TOOLS_ENABLED
-
-		res->set_edited(false);
-		if (timestamp_on_load) {
-			uint64_t mt = FileAccess::get_modified_time(path);
-			//printf("mt %s: %lli\n",remapped_path.utf8().get_data(),mt);
-			res->set_last_modified_time(mt);
+		return resource;
+	} else {
+		// Special case of an unregistered task.
+		// The resource should have been loaded by now.
+		Ref<Resource> resource = p_load_token.res_if_unregistered;
+		if (!resource.is_valid()) {
+			if (r_error) {
+				*r_error = FAILED;
+			}
 		}
-#endif
-
-		return res;
+		return resource;
 	}
 }
 
@@ -958,32 +1010,43 @@ void ResourceLoader::clear_translation_remaps() {
 }
 
 void ResourceLoader::clear_thread_load_tasks() {
-	thread_load_mutex->lock();
-
-	for (KeyValue<String, ResourceLoader::ThreadLoadTask> &E : thread_load_tasks) {
-		switch (E.value.status) {
-			case ResourceLoader::ThreadLoadStatus::THREAD_LOAD_LOADED: {
-				E.value.resource = Ref<Resource>();
-			} break;
-
-			case ResourceLoader::ThreadLoadStatus::THREAD_LOAD_IN_PROGRESS: {
-				if (E.value.thread != nullptr) {
-					E.value.thread->wait_to_finish();
-					memdelete(E.value.thread);
-					E.value.thread = nullptr;
+	// Bring the thing down as quickly as possible without causing deadlocks or leaks.
+
+	thread_load_mutex.lock();
+	cleaning_tasks = true;
+
+	while (true) {
+		bool none_running = true;
+		if (thread_load_tasks.size()) {
+			for (KeyValue<String, ResourceLoader::ThreadLoadTask> &E : thread_load_tasks) {
+				if (E.value.status == THREAD_LOAD_IN_PROGRESS) {
+					if (E.value.cond_var) {
+						E.value.cond_var->notify_all();
+						memdelete(E.value.cond_var);
+						E.value.cond_var = nullptr;
+					}
+					none_running = false;
 				}
-				E.value.resource = Ref<Resource>();
-			} break;
-
-			case ResourceLoader::ThreadLoadStatus::THREAD_LOAD_FAILED:
-			default: {
-				// do nothing
 			}
 		}
+		if (none_running) {
+			break;
+		}
+		thread_active_cond_var.notify_all();
+		thread_load_mutex.unlock();
+		OS::get_singleton()->delay_usec(1000);
+		thread_load_mutex.lock();
 	}
+
+	for (KeyValue<String, LoadToken *> &E : user_load_tokens) {
+		memdelete(E.value);
+	}
+	user_load_tokens.clear();
+
 	thread_load_tasks.clear();
 
-	thread_load_mutex->unlock();
+	cleaning_tasks = false;
+	thread_load_mutex.unlock();
 }
 
 void ResourceLoader::load_path_remaps() {
@@ -1080,20 +1143,19 @@ void ResourceLoader::remove_custom_loaders() {
 	}
 }
 
+bool ResourceLoader::is_cleaning_tasks() {
+	MutexLock lock(thread_load_mutex);
+	return cleaning_tasks;
+}
+
 void ResourceLoader::initialize() {
-	thread_load_mutex = memnew(SafeBinaryMutex<BINARY_MUTEX_TAG>);
-	thread_load_max = OS::get_singleton()->get_processor_count();
-	thread_loading_count = 0;
+	thread_active_max = OS::get_singleton()->get_processor_count();
+	thread_active_count = 0;
 	thread_waiting_count = 0;
 	thread_suspended_count = 0;
-	thread_load_semaphore = memnew(Semaphore);
 }
 
-void ResourceLoader::finalize() {
-	clear_thread_load_tasks();
-	memdelete(thread_load_mutex);
-	memdelete(thread_load_semaphore);
-}
+void ResourceLoader::finalize() {}
 
 ResourceLoadErrorNotify ResourceLoader::err_notify = nullptr;
 void *ResourceLoader::err_notify_ud = nullptr;
@@ -1105,16 +1167,22 @@ bool ResourceLoader::create_missing_resources_if_class_unavailable = false;
 bool ResourceLoader::abort_on_missing_resource = true;
 bool ResourceLoader::timestamp_on_load = false;
 
+thread_local int ResourceLoader::load_nesting = 0;
+thread_local Vector<String> ResourceLoader::load_paths_stack;
+
 template <>
 thread_local uint32_t SafeBinaryMutex<ResourceLoader::BINARY_MUTEX_TAG>::count = 0;
-SafeBinaryMutex<ResourceLoader::BINARY_MUTEX_TAG> *ResourceLoader::thread_load_mutex = nullptr;
+SafeBinaryMutex<ResourceLoader::BINARY_MUTEX_TAG> ResourceLoader::thread_load_mutex;
 HashMap<String, ResourceLoader::ThreadLoadTask> ResourceLoader::thread_load_tasks;
-Semaphore *ResourceLoader::thread_load_semaphore = nullptr;
+ConditionVariable ResourceLoader::thread_active_cond_var;
 
-int ResourceLoader::thread_loading_count = 0;
+int ResourceLoader::thread_active_count = 0;
 int ResourceLoader::thread_waiting_count = 0;
 int ResourceLoader::thread_suspended_count = 0;
-int ResourceLoader::thread_load_max = 0;
+int ResourceLoader::thread_active_max = 0;
+bool ResourceLoader::cleaning_tasks = false;
+
+HashMap<String, ResourceLoader::LoadToken *> ResourceLoader::user_load_tokens;
 
 SelfList<Resource>::List ResourceLoader::remapped_list;
 HashMap<String, Vector<String>> ResourceLoader::translation_remaps;

+ 38 - 9
core/io/resource_loader.h

@@ -107,9 +107,30 @@ public:
 		THREAD_LOAD_LOADED
 	};
 
+	enum LoadThreadMode {
+		LOAD_THREAD_FROM_CURRENT,
+		LOAD_THREAD_SPAWN_SINGLE,
+		LOAD_THREAD_DISTRIBUTE,
+	};
+
+	struct LoadToken : public RefCounted {
+		String local_path;
+		String user_path;
+		Ref<Resource> res_if_unregistered;
+
+		void clear();
+
+		virtual ~LoadToken();
+	};
+
 	static const int BINARY_MUTEX_TAG = 1;
 
+	static Ref<LoadToken> _load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode);
+	static Ref<Resource> _load_complete(LoadToken &p_load_token, Error *r_error);
+
 private:
+	static Ref<Resource> _load_complete_inner(LoadToken &p_load_token, Error *r_error, MutexLock<SafeBinaryMutex<BINARY_MUTEX_TAG>> &p_thread_load_lock);
+
 	static Ref<ResourceFormatLoader> loader[MAX_LOADERS];
 	static int loader_count;
 	static bool timestamp_on_load;
@@ -129,8 +150,7 @@ private:
 	static SelfList<Resource>::List remapped_list;
 
 	friend class ResourceFormatImporter;
-	friend class ResourceInteractiveLoader;
-	// Internal load function.
+
 	static Ref<Resource> _load(const String &p_path, const String &p_original_path, const String &p_type_hint, ResourceFormatLoader::CacheMode p_cache_mode, Error *r_error, bool p_use_sub_threads, float *r_progress);
 
 	static ResourceLoadedCallback _loaded_callback;
@@ -140,9 +160,12 @@ private:
 	struct ThreadLoadTask {
 		Thread *thread = nullptr;
 		Thread::ID loader_id = 0;
+		bool first_in_stack = false;
 		ConditionVariable *cond_var = nullptr;
+		LoadToken *load_token = nullptr;
 		String local_path;
 		String remapped_path;
+		String dependent_path;
 		String type_hint;
 		float progress = 0.0;
 		ThreadLoadStatus status = THREAD_LOAD_IN_PROGRESS;
@@ -151,24 +174,28 @@ private:
 		Ref<Resource> resource;
 		bool xl_remapped = false;
 		bool use_sub_threads = false;
-		bool start_next = true;
-		int requests = 0;
 		HashSet<String> sub_tasks;
 	};
 
 	static void _thread_load_function(void *p_userdata);
-	static SafeBinaryMutex<BINARY_MUTEX_TAG> *thread_load_mutex;
+
+	static thread_local int load_nesting;
+	static thread_local Vector<String> load_paths_stack;
+	static SafeBinaryMutex<BINARY_MUTEX_TAG> thread_load_mutex;
 	static HashMap<String, ThreadLoadTask> thread_load_tasks;
-	static Semaphore *thread_load_semaphore;
+	static ConditionVariable thread_active_cond_var;
+	static int thread_active_count;
 	static int thread_waiting_count;
-	static int thread_loading_count;
 	static int thread_suspended_count;
-	static int thread_load_max;
+	static int thread_active_max;
+	static bool cleaning_tasks;
+
+	static HashMap<String, LoadToken *> user_load_tokens;
 
 	static float _dependency_get_progress(const String &p_path);
 
 public:
-	static Error load_threaded_request(const String &p_path, const String &p_type_hint = "", bool p_use_sub_threads = false, ResourceFormatLoader::CacheMode p_cache_mode = ResourceFormatLoader::CACHE_MODE_REUSE, const String &p_source_resource = String());
+	static Error load_threaded_request(const String &p_path, const String &p_type_hint = "", bool p_use_sub_threads = false, ResourceFormatLoader::CacheMode p_cache_mode = ResourceFormatLoader::CACHE_MODE_REUSE);
 	static ThreadLoadStatus load_threaded_get_status(const String &p_path, float *r_progress = nullptr);
 	static Ref<Resource> load_threaded_get(const String &p_path, Error *r_error = nullptr);
 
@@ -237,6 +264,8 @@ public:
 	static void set_create_missing_resources_if_class_unavailable(bool p_enable);
 	_FORCE_INLINE_ static bool is_creating_missing_resources_if_class_unavailable_enabled() { return create_missing_resources_if_class_unavailable; }
 
+	static bool is_cleaning_tasks();
+
 	static void initialize();
 	static void finalize();
 };

+ 18 - 1
core/os/mutex.h

@@ -119,8 +119,25 @@ class MutexLock {
 
 public:
 	_ALWAYS_INLINE_ explicit MutexLock(const MutexT &p_mutex) :
+			lock(p_mutex.mutex){};
+};
+
+// This specialization is needed so manual locking and MutexLock can be used
+// at the same time on a SafeBinaryMutex.
+template <int Tag>
+class MutexLock<SafeBinaryMutex<Tag>> {
+	friend class ConditionVariable;
+
+	std::unique_lock<std::mutex> lock;
+
+public:
+	_ALWAYS_INLINE_ explicit MutexLock(const SafeBinaryMutex<Tag> &p_mutex) :
 			lock(p_mutex.mutex) {
-	}
+		SafeBinaryMutex<Tag>::count++;
+	};
+	_ALWAYS_INLINE_ ~MutexLock() {
+		SafeBinaryMutex<Tag>::count--;
+	};
 };
 
 using Mutex = MutexImpl<std::recursive_mutex>; // Recursive, for general use

+ 3 - 2
core/os/thread.cpp

@@ -66,11 +66,12 @@ void Thread::callback(ID p_caller_id, const Settings &p_settings, Callback p_cal
 	}
 }
 
-void Thread::start(Thread::Callback p_callback, void *p_user, const Settings &p_settings) {
-	ERR_FAIL_COND_MSG(id != UNASSIGNED_ID, "A Thread object has been re-started without wait_to_finish() having been called on it.");
+Thread::ID Thread::start(Thread::Callback p_callback, void *p_user, const Settings &p_settings) {
+	ERR_FAIL_COND_V_MSG(id != UNASSIGNED_ID, UNASSIGNED_ID, "A Thread object has been re-started without wait_to_finish() having been called on it.");
 	id = id_counter.increment();
 	std::thread new_thread(&Thread::callback, id, p_settings, p_callback, p_user);
 	thread.swap(new_thread);
+	return id;
 }
 
 bool Thread::is_started() const {

+ 1 - 1
core/os/thread.h

@@ -109,7 +109,7 @@ public:
 
 	static Error set_name(const String &p_name);
 
-	void start(Thread::Callback p_callback, void *p_user, const Settings &p_settings = Settings());
+	ID start(Thread::Callback p_callback, void *p_user, const Settings &p_settings = Settings());
 	bool is_started() const;
 	///< waits until thread is finished, and deallocates it.
 	void wait_to_finish();

+ 2 - 2
main/main.cpp

@@ -3446,6 +3446,8 @@ void Main::cleanup(bool p_force) {
 		movie_writer->end();
 	}
 
+	ResourceLoader::clear_thread_load_tasks();
+
 	ResourceLoader::remove_custom_loaders();
 	ResourceSaver::remove_custom_savers();
 
@@ -3462,8 +3464,6 @@ void Main::cleanup(bool p_force) {
 	ResourceLoader::clear_translation_remaps();
 	ResourceLoader::clear_path_remaps();
 
-	ResourceLoader::clear_thread_load_tasks();
-
 	ScriptServer::finish_languages();
 
 	// Sync pending commands that may have been queued from a different thread during ScriptServer finalization

+ 27 - 56
scene/resources/resource_format_text.cpp

@@ -150,32 +150,31 @@ Error ResourceLoaderText::_parse_ext_resource(VariantParser::Stream *p_stream, R
 
 		String path = ext_resources[id].path;
 		String type = ext_resources[id].type;
+		Ref<ResourceLoader::LoadToken> &load_token = ext_resources[id].load_token;
 
-		if (ext_resources[id].cache.is_valid()) {
-			r_res = ext_resources[id].cache;
-		} else if (use_sub_threads) {
-			Ref<Resource> res = ResourceLoader::load_threaded_get(path);
+		if (load_token.is_valid()) { // If not valid, it's OK since then we know this load accepts broken dependencies.
+			Ref<Resource> res = ResourceLoader::_load_complete(*load_token.ptr(), &err);
 			if (res.is_null()) {
-				if (ResourceLoader::get_abort_on_missing_resources()) {
-					error = ERR_FILE_MISSING_DEPENDENCIES;
-					error_text = "[ext_resource] referenced nonexistent resource at: " + path;
-					_printerr();
-					err = error;
-				} else {
-					ResourceLoader::notify_dependency_error(local_path, path, type);
+				if (!ResourceLoader::is_cleaning_tasks()) {
+					if (ResourceLoader::get_abort_on_missing_resources()) {
+						error = ERR_FILE_MISSING_DEPENDENCIES;
+						error_text = "[ext_resource] referenced non-existent resource at: " + path;
+						_printerr();
+						err = error;
+					} else {
+						ResourceLoader::notify_dependency_error(local_path, path, type);
+					}
 				}
 			} else {
-				ext_resources[id].cache = res;
+#ifdef TOOLS_ENABLED
+				//remember ID for saving
+				res->set_id_for_path(path, id);
+#endif
 				r_res = res;
 			}
 		} else {
-			error = ERR_FILE_MISSING_DEPENDENCIES;
-			error_text = "[ext_resource] referenced non-loaded resource at: " + path;
-			_printerr();
-			err = error;
+			r_res = Ref<Resource>();
 		}
-	} else {
-		r_res = Ref<Resource>();
 	}
 
 	VariantParser::get_token(p_stream, token, line, r_err_str);
@@ -462,48 +461,20 @@ Error ResourceLoaderText::load() {
 			path = remaps[path];
 		}
 
-		ExtResource er;
-		er.path = path;
-		er.type = type;
-
-		if (use_sub_threads) {
-			Error err = ResourceLoader::load_threaded_request(path, type, use_sub_threads, ResourceFormatLoader::CACHE_MODE_REUSE, local_path);
-
-			if (err != OK) {
-				if (ResourceLoader::get_abort_on_missing_resources()) {
-					error = ERR_FILE_CORRUPT;
-					error_text = "[ext_resource] referenced broken resource at: " + path;
-					_printerr();
-					return error;
-				} else {
-					ResourceLoader::notify_dependency_error(local_path, path, type);
-				}
-			}
-
-		} else {
-			Ref<Resource> res = ResourceLoader::load(path, type);
-
-			if (res.is_null()) {
-				if (ResourceLoader::get_abort_on_missing_resources()) {
-					error = ERR_FILE_CORRUPT;
-					error_text = "[ext_resource] referenced nonexistent resource at: " + path;
-					_printerr();
-					return error;
-				} else {
-					ResourceLoader::notify_dependency_error(local_path, path, type);
-				}
+		ext_resources[id].path = path;
+		ext_resources[id].type = type;
+		ext_resources[id].load_token = ResourceLoader::_load_start(path, type, use_sub_threads ? ResourceLoader::LOAD_THREAD_DISTRIBUTE : ResourceLoader::LOAD_THREAD_FROM_CURRENT, ResourceFormatLoader::CACHE_MODE_REUSE);
+		if (!ext_resources[id].load_token.is_valid()) {
+			if (ResourceLoader::get_abort_on_missing_resources()) {
+				error = ERR_FILE_CORRUPT;
+				error_text = "[ext_resource] referenced non-existent resource at: " + path;
+				_printerr();
+				return error;
 			} else {
-#ifdef TOOLS_ENABLED
-				//remember ID for saving
-				res->set_id_for_path(local_path, id);
-#endif
+				ResourceLoader::notify_dependency_error(local_path, path, type);
 			}
-
-			er.cache = res;
 		}
 
-		ext_resources[id] = er;
-
 		error = VariantParser::parse_tag(&stream, lines, error_text, next_tag, &rp);
 
 		if (error) {

+ 1 - 1
scene/resources/resource_format_text.h

@@ -48,7 +48,7 @@ class ResourceLoaderText {
 	VariantParser::StreamFile stream;
 
 	struct ExtResource {
-		Ref<Resource> cache;
+		Ref<ResourceLoader::LoadToken> load_token;
 		String path;
 		String type;
 	};