Browse Source

Avoid multiple possibilites of deadlock in resource loading

Pedro J. Estébanez 2 years ago
parent
commit
a6e43f71b6

+ 27 - 8
core/io/resource_loader.cpp

@@ -476,9 +476,6 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
 
 		if (run_on_current_thread) {
 			load_task_ptr->thread_id = Thread::get_caller_id();
-			if (must_not_register) {
-				load_token->res_if_unregistered = load_task_ptr->resource;
-			}
 		} else {
 			load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_thread_load_function, load_task_ptr);
 		}
@@ -486,6 +483,9 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
 
 	if (run_on_current_thread) {
 		_thread_load_function(load_task_ptr);
+		if (must_not_register) {
+			load_token->res_if_unregistered = load_task_ptr->resource;
+		}
 	}
 
 	return load_token;
@@ -613,14 +613,33 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
 				return Ref<Resource>();
 			}
 
-			if (load_task.task_id != 0 && !load_task.awaited) {
-				// Loading thread is in the worker pool and still not awaited.
+			if (load_task.task_id != 0) {
+				// Loading thread is in the worker pool.
 				load_task.awaited = true;
 				thread_load_mutex.unlock();
-				WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id);
-				thread_load_mutex.lock();
+				Error err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id);
+				if (err == ERR_BUSY) {
+					// The WorkerThreadPool has scheduled tasks in a way that the current load depends on
+					// another one in a lower stack frame. Restart such load here. When the stack is eventually
+					// unrolled, the original load will have been notified to go on.
+#ifdef DEV_ENABLED
+					print_verbose("ResourceLoader: Load task happened to wait on another one deep in the call stack. Attempting to avoid deadlock by re-issuing the load now.");
+#endif
+					// CACHE_MODE_IGNORE is needed because, otherwise, the new request would just see there's
+					// an ongoing load for that resource and wait for it again. This value forces a new load.
+					Ref<ResourceLoader::LoadToken> token = _load_start(load_task.local_path, load_task.type_hint, LOAD_THREAD_DISTRIBUTE, ResourceFormatLoader::CACHE_MODE_IGNORE);
+					Ref<Resource> resource = _load_complete(*token.ptr(), &err);
+					if (r_error) {
+						*r_error = err;
+					}
+					thread_load_mutex.lock();
+					return resource;
+				} else {
+					DEV_ASSERT(err == OK);
+					thread_load_mutex.lock();
+				}
 			} else {
-				// Loading thread is main or user thread, or in the worker pool, but already awaited by some other thread.
+				// Loading thread is main or user thread.
 				if (!load_task.cond_var) {
 					load_task.cond_var = memnew(ConditionVariable);
 				}

+ 158 - 50
core/object/worker_thread_pool.cpp

@@ -51,6 +51,23 @@ void WorkerThreadPool::_process_task_queue() {
 
 void WorkerThreadPool::_process_task(Task *p_task) {
 	bool low_priority = p_task->low_priority;
+	int pool_thread_index = -1;
+	Task *prev_low_prio_task = nullptr; // In case this is recursively called.
+
+	if (!use_native_low_priority_threads) {
+		pool_thread_index = thread_ids[Thread::get_caller_id()];
+		ThreadData &curr_thread = threads[pool_thread_index];
+		task_mutex.lock();
+		p_task->pool_thread_index = pool_thread_index;
+		if (low_priority) {
+			low_priority_tasks_running++;
+			prev_low_prio_task = curr_thread.current_low_prio_task;
+			curr_thread.current_low_prio_task = p_task;
+		} else {
+			curr_thread.current_low_prio_task = nullptr;
+		}
+		task_mutex.unlock();
+	}
 
 	if (p_task->group) {
 		// Handling a group
@@ -126,21 +143,36 @@ void WorkerThreadPool::_process_task(Task *p_task) {
 			p_task->callable.callp(nullptr, 0, ret, ce);
 		}
 
+		task_mutex.lock();
 		p_task->completed = true;
-		p_task->done_semaphore.post();
+		for (uint8_t i = 0; i < p_task->waiting; i++) {
+			p_task->done_semaphore.post();
+		}
+		if (!use_native_low_priority_threads) {
+			p_task->pool_thread_index = -1;
+		}
+		task_mutex.unlock(); // Keep mutex down to here since on unlock the task may be freed.
 	}
 
-	if (!use_native_low_priority_threads && low_priority) {
-		// A low prioriry task was freed, so see if we can move a pending one to the high priority queue.
+	// Task may have been freed by now (all callers notified).
+	p_task = nullptr;
+
+	if (!use_native_low_priority_threads) {
 		bool post = false;
 		task_mutex.lock();
-		if (low_priority_task_queue.first()) {
-			Task *low_prio_task = low_priority_task_queue.first()->self();
-			low_priority_task_queue.remove(low_priority_task_queue.first());
-			task_queue.add_last(&low_prio_task->task_elem);
-			post = true;
-		} else {
+		ThreadData &curr_thread = threads[pool_thread_index];
+		curr_thread.current_low_prio_task = prev_low_prio_task;
+		if (low_priority) {
 			low_priority_threads_used--;
+			low_priority_tasks_running--;
+			// A low prioriry task was freed, so see if we can move a pending one to the high priority queue.
+			if (_try_promote_low_priority_task()) {
+				post = true;
+			}
+
+			if (low_priority_tasks_awaiting_others == low_priority_tasks_running) {
+				_prevent_low_prio_saturation_deadlock();
+			}
 		}
 		task_mutex.unlock();
 		if (post) {
@@ -198,6 +230,35 @@ void WorkerThreadPool::_post_task(Task *p_task, bool p_high_priority) {
 	}
 }
 
+bool WorkerThreadPool::_try_promote_low_priority_task() {
+	if (low_priority_task_queue.first()) {
+		Task *low_prio_task = low_priority_task_queue.first()->self();
+		low_priority_task_queue.remove(low_priority_task_queue.first());
+		task_queue.add_last(&low_prio_task->task_elem);
+		low_priority_threads_used++;
+		return true;
+	} else {
+		return false;
+	}
+}
+
+void WorkerThreadPool::_prevent_low_prio_saturation_deadlock() {
+	if (low_priority_tasks_awaiting_others == low_priority_tasks_running) {
+#ifdef DEV_ENABLED
+		print_verbose("WorkerThreadPool: Low-prio slots saturated with tasks all waiting for other low-prio tasks. Attempting to avoid deadlock by scheduling one extra task.");
+#endif
+		// In order not to create dependency cycles, we can only schedule the next one.
+		// We'll keep doing the same until the deadlock is broken,
+		SelfList<Task> *to_promote = low_priority_task_queue.first();
+		if (to_promote) {
+			low_priority_task_queue.remove(to_promote);
+			task_queue.add_last(to_promote);
+			low_priority_threads_used++;
+			task_available_semaphore.post();
+		}
+	}
+}
+
 WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *), void *p_userdata, bool p_high_priority, const String &p_description) {
 	return _add_task(Callable(), p_func, p_userdata, nullptr, p_high_priority, p_description);
 }
@@ -238,66 +299,113 @@ bool WorkerThreadPool::is_task_completed(TaskID p_task_id) const {
 	return completed;
 }
 
-void WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) {
+Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) {
 	task_mutex.lock();
 	Task **taskp = tasks.getptr(p_task_id);
 	if (!taskp) {
 		task_mutex.unlock();
-		ERR_FAIL_MSG("Invalid Task ID"); // Invalid task
+		ERR_FAIL_V_MSG(ERR_INVALID_PARAMETER, "Invalid Task ID"); // Invalid task
 	}
 	Task *task = *taskp;
 
-	if (task->waiting) {
-		String description = task->description;
-		task_mutex.unlock();
-		if (description.is_empty()) {
-			ERR_FAIL_MSG("Another thread is waiting on this task: " + itos(p_task_id)); // Invalid task
-		} else {
-			ERR_FAIL_MSG("Another thread is waiting on this task: " + description + " (" + itos(p_task_id) + ")"); // Invalid task
+	if (!task->completed) {
+		if (!use_native_low_priority_threads && task->pool_thread_index != -1) { // Otherwise, it's not running yet.
+			int caller_pool_th_index = thread_ids.has(Thread::get_caller_id()) ? thread_ids[Thread::get_caller_id()] : -1;
+			if (caller_pool_th_index == task->pool_thread_index) {
+				// Deadlock prevention.
+				// Waiting for a task run on this same thread? That means the task to be awaited started waiting as well
+				// and another task was run to make use of the thread in the meantime, with enough bad luck as to
+				// the need to wait for the original task arose in turn.
+				// In other words, the task we want to wait for is buried in the stack.
+				// Let's report the caller about the issue to it handles as it sees fit.
+				task_mutex.unlock();
+				return ERR_BUSY;
+			}
 		}
-	}
-
-	task->waiting = true;
 
-	task_mutex.unlock();
+		task->waiting++;
+
+		bool is_low_prio_waiting_for_another = false;
+		if (!use_native_low_priority_threads) {
+			// Deadlock prevention:
+			// If all low-prio tasks are waiting for other low-prio tasks and there are no more free low-prio slots,
+			// we have a no progressable situation. We can apply a workaround, consisting in promoting an awaited queued
+			// low-prio task to the schedule queue so it can run and break the "impasse".
+			// NOTE: A similar reasoning could be made about high priority tasks, but there are usually much more
+			// than low-prio. Therefore, a deadlock there would only happen when dealing with a very complex task graph
+			// or when there are too few worker threads (limited platforms or exotic settings). If that turns out to be
+			// an issue in the real world, a further fix can be applied against that.
+			if (task->low_priority) {
+				bool awaiter_is_a_low_prio_task = thread_ids.has(Thread::get_caller_id()) && threads[thread_ids[Thread::get_caller_id()]].current_low_prio_task;
+				if (awaiter_is_a_low_prio_task) {
+					is_low_prio_waiting_for_another = true;
+					low_priority_tasks_awaiting_others++;
+					if (low_priority_tasks_awaiting_others == low_priority_tasks_running) {
+						_prevent_low_prio_saturation_deadlock();
+					}
+				}
+			}
+		}
 
-	if (use_native_low_priority_threads && task->low_priority) {
-		task->low_priority_thread->wait_to_finish();
+		task_mutex.unlock();
 
-		task_mutex.lock();
-		native_thread_allocator.free(task->low_priority_thread);
-	} else {
-		int *index = thread_ids.getptr(Thread::get_caller_id());
-
-		if (index) {
-			// We are an actual process thread, we must not be blocked so continue processing stuff if available.
-			bool must_exit = false;
-			while (true) {
-				if (task->done_semaphore.try_wait()) {
-					// If done, exit
-					break;
-				}
-				if (!must_exit && task_available_semaphore.try_wait()) {
-					if (exit_threads) {
-						must_exit = true;
-					} else {
-						// Solve tasks while they are around.
-						_process_task_queue();
-						continue;
+		if (use_native_low_priority_threads && task->low_priority) {
+			task->done_semaphore.wait();
+		} else {
+			bool current_is_pool_thread = thread_ids.has(Thread::get_caller_id());
+			if (current_is_pool_thread) {
+				// We are an actual process thread, we must not be blocked so continue processing stuff if available.
+				bool must_exit = false;
+				while (true) {
+					if (task->done_semaphore.try_wait()) {
+						// If done, exit
+						break;
 					}
+					if (!must_exit) {
+						if (task_available_semaphore.try_wait()) {
+							if (exit_threads) {
+								must_exit = true;
+							} else {
+								// Solve tasks while they are around.
+								_process_task_queue();
+								continue;
+							}
+						} else if (!use_native_low_priority_threads && task->low_priority) {
+							// A low prioriry task started waiting, so see if we can move a pending one to the high priority queue.
+							task_mutex.lock();
+							bool post = _try_promote_low_priority_task();
+							task_mutex.unlock();
+							if (post) {
+								task_available_semaphore.post();
+							}
+						}
+					}
+					OS::get_singleton()->delay_usec(1); // Microsleep, this could be converted to waiting for multiple objects in supported platforms for a bit more performance.
 				}
-				OS::get_singleton()->delay_usec(1); // Microsleep, this could be converted to waiting for multiple objects in supported platforms for a bit more performance.
+			} else {
+				task->done_semaphore.wait();
 			}
-		} else {
-			task->done_semaphore.wait();
 		}
 
 		task_mutex.lock();
+		if (is_low_prio_waiting_for_another) {
+			low_priority_tasks_awaiting_others--;
+		}
+
+		task->waiting--;
+	}
+
+	if (task->waiting == 0) {
+		if (use_native_low_priority_threads && task->low_priority) {
+			task->low_priority_thread->wait_to_finish();
+			native_thread_allocator.free(task->low_priority_thread);
+		}
+		tasks.erase(p_task_id);
+		task_allocator.free(task);
 	}
 
-	tasks.erase(p_task_id);
-	task_allocator.free(task);
 	task_mutex.unlock();
+	return OK;
 }
 
 WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) {
@@ -429,7 +537,7 @@ void WorkerThreadPool::init(int p_thread_count, bool p_use_native_threads_low_pr
 	if (p_use_native_threads_low_priority) {
 		max_low_priority_threads = 0;
 	} else {
-		max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count);
+		max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count - 1);
 	}
 
 	use_native_low_priority_threads = p_use_native_threads_low_priority;

+ 9 - 2
core/object/worker_thread_pool.h

@@ -81,10 +81,11 @@ private:
 		bool completed = false;
 		Group *group = nullptr;
 		SelfList<Task> task_elem;
-		bool waiting = false; // Waiting for completion
+		uint32_t waiting = 0;
 		bool low_priority = false;
 		BaseTemplateUserdata *template_userdata = nullptr;
 		Thread *low_priority_thread = nullptr;
+		int pool_thread_index = -1;
 
 		void free_template_userdata();
 		Task() :
@@ -104,6 +105,7 @@ private:
 	struct ThreadData {
 		uint32_t index;
 		Thread thread;
+		Task *current_low_prio_task = nullptr;
 	};
 
 	TightLocalVector<ThreadData> threads;
@@ -116,6 +118,8 @@ private:
 	bool use_native_low_priority_threads = false;
 	uint32_t max_low_priority_threads = 0;
 	uint32_t low_priority_threads_used = 0;
+	uint32_t low_priority_tasks_running = 0;
+	uint32_t low_priority_tasks_awaiting_others = 0;
 
 	uint64_t last_task = 1;
 
@@ -127,6 +131,9 @@ private:
 
 	void _post_task(Task *p_task, bool p_high_priority);
 
+	bool _try_promote_low_priority_task();
+	void _prevent_low_prio_saturation_deadlock();
+
 	static WorkerThreadPool *singleton;
 
 	TaskID _add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description);
@@ -169,7 +176,7 @@ public:
 	TaskID add_task(const Callable &p_action, bool p_high_priority = false, const String &p_description = String());
 
 	bool is_task_completed(TaskID p_task_id) const;
-	void wait_for_task_completion(TaskID p_task_id);
+	Error wait_for_task_completion(TaskID p_task_id);
 
 	template <class C, class M, class U>
 	GroupID add_template_group_task(C *p_instance, M p_method, U p_userdata, int p_elements, int p_tasks = -1, bool p_high_priority = false, const String &p_description = String()) {

+ 4 - 1
doc/classes/WorkerThreadPool.xml

@@ -100,10 +100,13 @@
 			</description>
 		</method>
 		<method name="wait_for_task_completion">
-			<return type="void" />
+			<return type="int" enum="Error" />
 			<param index="0" name="task_id" type="int" />
 			<description>
 				Pauses the thread that calls this method until the task with the given ID is completed.
+				Returns [constant @GlobalScope.OK] if the task could be successfully awaited.
+				Returns [constant @GlobalScope.ERR_INVALID_PARAMETER] if a task with the passed ID does not exist (maybe because it was already awaited and disposed of).
+				Returns [constant @GlobalScope.ERR_BUSY] if the call is made from another running task and, due to task scheduling, the task to await is at a lower level in the call stack and therefore can't progress. This is an advanced situation that should only matter when some tasks depend on others.
 			</description>
 		</method>
 	</methods>