Browse Source

WorkerThreadPool: Refactor deadlock prevention collaboration into a generic mechanism

This is strictly beyond a refactor because it also changes when the mutexes are relocked,
but that's only for extra safety.
Pedro J. Estébanez 1 year ago
parent
commit
03d14e436b

+ 71 - 20
core/object/worker_thread_pool.cpp

@@ -33,7 +33,6 @@
 #include "core/object/script_language.h"
 #include "core/object/script_language.h"
 #include "core/os/os.h"
 #include "core/os/os.h"
 #include "core/os/thread_safe.h"
 #include "core/os/thread_safe.h"
-#include "core/templates/command_queue_mt.h"
 
 
 WorkerThreadPool::Task *const WorkerThreadPool::ThreadData::YIELDING = (Task *)1;
 WorkerThreadPool::Task *const WorkerThreadPool::ThreadData::YIELDING = (Task *)1;
 
 
@@ -46,7 +45,9 @@ void WorkerThreadPool::Task::free_template_userdata() {
 
 
 WorkerThreadPool *WorkerThreadPool::singleton = nullptr;
 WorkerThreadPool *WorkerThreadPool::singleton = nullptr;
 
 
-thread_local CommandQueueMT *WorkerThreadPool::flushing_cmd_queue = nullptr;
+#ifdef THREADS_ENABLED
+thread_local uintptr_t WorkerThreadPool::unlockable_mutexes[MAX_UNLOCKABLE_MUTEXES] = {};
+#endif
 
 
 void WorkerThreadPool::_process_task(Task *p_task) {
 void WorkerThreadPool::_process_task(Task *p_task) {
 #ifdef THREADS_ENABLED
 #ifdef THREADS_ENABLED
@@ -416,6 +417,34 @@ Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) {
 	return OK;
 	return OK;
 }
 }
 
 
+void WorkerThreadPool::_lock_unlockable_mutexes() {
+#ifdef THREADS_ENABLED
+	for (uint32_t i = 0; i < MAX_UNLOCKABLE_MUTEXES; i++) {
+		if (unlockable_mutexes[i]) {
+			if ((((uintptr_t)unlockable_mutexes[i]) & 1) == 0) {
+				((Mutex *)unlockable_mutexes[i])->lock();
+			} else {
+				((BinaryMutex *)unlockable_mutexes[i])->lock();
+			}
+		}
+	}
+#endif
+}
+
+void WorkerThreadPool::_unlock_unlockable_mutexes() {
+#ifdef THREADS_ENABLED
+	for (uint32_t i = 0; i < MAX_UNLOCKABLE_MUTEXES; i++) {
+		if (unlockable_mutexes[i]) {
+			if ((((uintptr_t)unlockable_mutexes[i]) & 1) == 0) {
+				((Mutex *)unlockable_mutexes[i])->unlock();
+			} else {
+				((BinaryMutex *)unlockable_mutexes[i])->unlock();
+			}
+		}
+	}
+#endif
+}
+
 void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) {
 void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) {
 	// Keep processing tasks until the condition to stop waiting is met.
 	// Keep processing tasks until the condition to stop waiting is met.
 
 
@@ -423,6 +452,7 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
 
 
 	while (true) {
 	while (true) {
 		Task *task_to_process = nullptr;
 		Task *task_to_process = nullptr;
+		bool relock_unlockables = false;
 		{
 		{
 			MutexLock lock(task_mutex);
 			MutexLock lock(task_mutex);
 			bool was_signaled = p_caller_pool_thread->signaled;
 			bool was_signaled = p_caller_pool_thread->signaled;
@@ -460,13 +490,9 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
 				if (!task_to_process) {
 				if (!task_to_process) {
 					p_caller_pool_thread->awaited_task = p_task;
 					p_caller_pool_thread->awaited_task = p_task;
 
 
-					if (flushing_cmd_queue) {
-						flushing_cmd_queue->unlock();
-					}
+					_unlock_unlockable_mutexes();
+					relock_unlockables = true;
 					p_caller_pool_thread->cond_var.wait(lock);
 					p_caller_pool_thread->cond_var.wait(lock);
-					if (flushing_cmd_queue) {
-						flushing_cmd_queue->lock();
-					}
 
 
 					DEV_ASSERT(exit_threads || p_caller_pool_thread->signaled || IS_WAIT_OVER);
 					DEV_ASSERT(exit_threads || p_caller_pool_thread->signaled || IS_WAIT_OVER);
 					p_caller_pool_thread->awaited_task = nullptr;
 					p_caller_pool_thread->awaited_task = nullptr;
@@ -474,6 +500,10 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
 			}
 			}
 		}
 		}
 
 
+		if (relock_unlockables) {
+			_lock_unlockable_mutexes();
+		}
+
 		if (task_to_process) {
 		if (task_to_process) {
 			_process_task(task_to_process);
 			_process_task(task_to_process);
 		}
 		}
@@ -600,13 +630,9 @@ void WorkerThreadPool::wait_for_group_task_completion(GroupID p_group) {
 	{
 	{
 		Group *group = *groupp;
 		Group *group = *groupp;
 
 
-		if (flushing_cmd_queue) {
-			flushing_cmd_queue->unlock();
-		}
+		_unlock_unlockable_mutexes();
 		group->done_semaphore.wait();
 		group->done_semaphore.wait();
-		if (flushing_cmd_queue) {
-			flushing_cmd_queue->lock();
-		}
+		_lock_unlockable_mutexes();
 
 
 		uint32_t max_users = group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment.
 		uint32_t max_users = group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment.
 		uint32_t finished_users = group->finished.increment(); // fetch happens before inc, so increment later.
 		uint32_t finished_users = group->finished.increment(); // fetch happens before inc, so increment later.
@@ -630,16 +656,41 @@ int WorkerThreadPool::get_thread_index() {
 	return singleton->thread_ids.has(tid) ? singleton->thread_ids[tid] : -1;
 	return singleton->thread_ids.has(tid) ? singleton->thread_ids[tid] : -1;
 }
 }
 
 
-void WorkerThreadPool::thread_enter_command_queue_mt_flush(CommandQueueMT *p_queue) {
-	ERR_FAIL_COND(flushing_cmd_queue != nullptr);
-	flushing_cmd_queue = p_queue;
+#ifdef THREADS_ENABLED
+uint32_t WorkerThreadPool::thread_enter_unlock_allowance_zone(Mutex *p_mutex) {
+	return _thread_enter_unlock_allowance_zone(p_mutex, false);
+}
+
+uint32_t WorkerThreadPool::thread_enter_unlock_allowance_zone(BinaryMutex *p_mutex) {
+	return _thread_enter_unlock_allowance_zone(p_mutex, true);
 }
 }
 
 
-void WorkerThreadPool::thread_exit_command_queue_mt_flush() {
-	ERR_FAIL_NULL(flushing_cmd_queue);
-	flushing_cmd_queue = nullptr;
+uint32_t WorkerThreadPool::_thread_enter_unlock_allowance_zone(void *p_mutex, bool p_is_binary) {
+	for (uint32_t i = 0; i < MAX_UNLOCKABLE_MUTEXES; i++) {
+		if (unlikely(unlockable_mutexes[i] == (uintptr_t)p_mutex)) {
+			// Already registered in the current thread.
+			return UINT32_MAX;
+		}
+		if (!unlockable_mutexes[i]) {
+			unlockable_mutexes[i] = (uintptr_t)p_mutex;
+			if (p_is_binary) {
+				unlockable_mutexes[i] |= 1;
+			}
+			return i;
+		}
+	}
+	ERR_FAIL_V_MSG(UINT32_MAX, "No more unlockable mutex slots available. Engine bug.");
 }
 }
 
 
+void WorkerThreadPool::thread_exit_unlock_allowance_zone(uint32_t p_zone_id) {
+	if (p_zone_id == UINT32_MAX) {
+		return;
+	}
+	DEV_ASSERT(unlockable_mutexes[p_zone_id]);
+	unlockable_mutexes[p_zone_id] = 0;
+}
+#endif
+
 void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio) {
 void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio) {
 	ERR_FAIL_COND(threads.size() > 0);
 	ERR_FAIL_COND(threads.size() > 0);
 	if (p_thread_count < 0) {
 	if (p_thread_count < 0) {

+ 19 - 5
core/object/worker_thread_pool.h

@@ -41,8 +41,6 @@
 #include "core/templates/rid.h"
 #include "core/templates/rid.h"
 #include "core/templates/safe_refcount.h"
 #include "core/templates/safe_refcount.h"
 
 
-class CommandQueueMT;
-
 class WorkerThreadPool : public Object {
 class WorkerThreadPool : public Object {
 	GDCLASS(WorkerThreadPool, Object)
 	GDCLASS(WorkerThreadPool, Object)
 public:
 public:
@@ -163,7 +161,10 @@ private:
 
 
 	static WorkerThreadPool *singleton;
 	static WorkerThreadPool *singleton;
 
 
-	static thread_local CommandQueueMT *flushing_cmd_queue;
+#ifdef THREADS_ENABLED
+	static const uint32_t MAX_UNLOCKABLE_MUTEXES = 2;
+	static thread_local uintptr_t unlockable_mutexes[MAX_UNLOCKABLE_MUTEXES];
+#endif
 
 
 	TaskID _add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description);
 	TaskID _add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description);
 	GroupID _add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description);
 	GroupID _add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description);
@@ -190,6 +191,13 @@ private:
 
 
 	void _wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task);
 	void _wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task);
 
 
+#ifdef THREADS_ENABLED
+	static uint32_t _thread_enter_unlock_allowance_zone(void *p_mutex, bool p_is_binary);
+#endif
+
+	void _lock_unlockable_mutexes();
+	void _unlock_unlockable_mutexes();
+
 protected:
 protected:
 	static void _bind_methods();
 	static void _bind_methods();
 
 
@@ -232,8 +240,14 @@ public:
 	static WorkerThreadPool *get_singleton() { return singleton; }
 	static WorkerThreadPool *get_singleton() { return singleton; }
 	static int get_thread_index();
 	static int get_thread_index();
 
 
-	static void thread_enter_command_queue_mt_flush(CommandQueueMT *p_queue);
-	static void thread_exit_command_queue_mt_flush();
+#ifdef THREADS_ENABLED
+	static uint32_t thread_enter_unlock_allowance_zone(Mutex *p_mutex);
+	static uint32_t thread_enter_unlock_allowance_zone(BinaryMutex *p_mutex);
+	static void thread_exit_unlock_allowance_zone(uint32_t p_zone_id);
+#else
+	static uint32_t thread_enter_unlock_allowance_zone(void *p_mutex) { return UINT32_MAX; }
+	static void thread_exit_unlock_allowance_zone(uint32_t p_zone_id) {}
+#endif
 
 
 	void init(int p_thread_count = -1, float p_low_priority_task_ratio = 0.3);
 	void init(int p_thread_count = -1, float p_low_priority_task_ratio = 0.3);
 	void finish();
 	void finish();

+ 2 - 2
core/templates/command_queue_mt.h

@@ -364,7 +364,7 @@ class CommandQueueMT {
 
 
 		lock();
 		lock();
 
 
-		WorkerThreadPool::thread_enter_command_queue_mt_flush(this);
+		uint32_t allowance_id = WorkerThreadPool::thread_enter_unlock_allowance_zone(&mutex);
 		while (flush_read_ptr < command_mem.size()) {
 		while (flush_read_ptr < command_mem.size()) {
 			uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr];
 			uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr];
 			flush_read_ptr += 8;
 			flush_read_ptr += 8;
@@ -383,7 +383,7 @@ class CommandQueueMT {
 
 
 			flush_read_ptr += size;
 			flush_read_ptr += size;
 		}
 		}
-		WorkerThreadPool::thread_exit_command_queue_mt_flush();
+		WorkerThreadPool::thread_exit_unlock_allowance_zone(allowance_id);
 
 
 		command_mem.clear();
 		command_mem.clear();
 		flush_read_ptr = 0;
 		flush_read_ptr = 0;