Browse Source

Use separate WorkThreadPool for shader compiler.

Pāvels Nadtočajevs 6 months ago
parent
commit
53bb897458

+ 50 - 18
core/object/worker_thread_pool.cpp

@@ -37,6 +37,8 @@
 
 WorkerThreadPool::Task *const WorkerThreadPool::ThreadData::YIELDING = (Task *)1;
 
+HashMap<StringName, WorkerThreadPool *> WorkerThreadPool::named_pools;
+
 void WorkerThreadPool::Task::free_template_userdata() {
 	ERR_FAIL_NULL(template_userdata);
 	ERR_FAIL_NULL(native_func_userdata);
@@ -184,25 +186,25 @@ void WorkerThreadPool::_thread_function(void *p_user) {
 	while (true) {
 		Task *task_to_process = nullptr;
 		{
-			MutexLock lock(singleton->task_mutex);
+			MutexLock lock(thread_data->pool->task_mutex);
 
-			bool exit = singleton->_handle_runlevel(thread_data, lock);
+			bool exit = thread_data->pool->_handle_runlevel(thread_data, lock);
 			if (unlikely(exit)) {
 				break;
 			}
 
 			thread_data->signaled = false;
 
-			if (singleton->task_queue.first()) {
-				task_to_process = singleton->task_queue.first()->self();
-				singleton->task_queue.remove(singleton->task_queue.first());
+			if (thread_data->pool->task_queue.first()) {
+				task_to_process = thread_data->pool->task_queue.first()->self();
+				thread_data->pool->task_queue.remove(thread_data->pool->task_queue.first());
 			} else {
 				thread_data->cond_var.wait(lock);
 			}
 		}
 
 		if (task_to_process) {
-			singleton->_process_task(task_to_process);
+			thread_data->pool->_process_task(task_to_process);
 		}
 	}
 }
@@ -497,7 +499,7 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
 				}
 			}
 
-			if (singleton->task_queue.first()) {
+			if (p_caller_pool_thread->pool->task_queue.first()) {
 				task_to_process = task_queue.first()->self();
 				task_queue.remove(task_queue.first());
 			}
@@ -505,7 +507,9 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
 			if (!task_to_process) {
 				p_caller_pool_thread->awaited_task = p_task;
 
-				_unlock_unlockable_mutexes();
+				if (this == singleton) {
+					_unlock_unlockable_mutexes();
+				}
 				relock_unlockables = true;
 
 				p_caller_pool_thread->cond_var.wait(lock);
@@ -514,7 +518,7 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
 			}
 		}
 
-		if (relock_unlockables) {
+		if (relock_unlockables && this == singleton) {
 			_lock_unlockable_mutexes();
 		}
 
@@ -690,9 +694,13 @@ void WorkerThreadPool::wait_for_group_task_completion(GroupID p_group) {
 	{
 		Group *group = *groupp;
 
-		_unlock_unlockable_mutexes();
+		if (this == singleton) {
+			_unlock_unlockable_mutexes();
+		}
 		group->done_semaphore.wait();
-		_lock_unlockable_mutexes();
+		if (this == singleton) {
+			_lock_unlockable_mutexes();
+		}
 
 		uint32_t max_users = group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment.
 		uint32_t finished_users = group->finished.increment(); // fetch happens before inc, so increment later.
@@ -709,15 +717,15 @@ void WorkerThreadPool::wait_for_group_task_completion(GroupID p_group) {
 #endif
 }
 
-int WorkerThreadPool::get_thread_index() {
+int WorkerThreadPool::get_thread_index() const {
 	Thread::ID tid = Thread::get_caller_id();
-	return singleton->thread_ids.has(tid) ? singleton->thread_ids[tid] : -1;
+	return thread_ids.has(tid) ? thread_ids[tid] : -1;
 }
 
-WorkerThreadPool::TaskID WorkerThreadPool::get_caller_task_id() {
+WorkerThreadPool::TaskID WorkerThreadPool::get_caller_task_id() const {
 	int th_index = get_thread_index();
-	if (th_index != -1 && singleton->threads[th_index].current_task) {
-		return singleton->threads[th_index].current_task->self;
+	if (th_index != -1 && threads[th_index].current_task) {
+		return threads[th_index].current_task->self;
 	} else {
 		return INVALID_TASK_ID;
 	}
@@ -766,6 +774,7 @@ void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio)
 
 	for (uint32_t i = 0; i < threads.size(); i++) {
 		threads[i].index = i;
+		threads[i].pool = this;
 		threads[i].thread.start(&WorkerThreadPool::_thread_function, &threads[i]);
 		thread_ids.insert(threads[i].thread.get_id(), i);
 	}
@@ -832,10 +841,33 @@ void WorkerThreadPool::_bind_methods() {
 	ClassDB::bind_method(D_METHOD("wait_for_group_task_completion", "group_id"), &WorkerThreadPool::wait_for_group_task_completion);
 }
 
-WorkerThreadPool::WorkerThreadPool() {
-	singleton = this;
+WorkerThreadPool *WorkerThreadPool::get_named_pool(const StringName &p_name) {
+	WorkerThreadPool **pool_ptr = named_pools.getptr(p_name);
+	if (pool_ptr) {
+		return *pool_ptr;
+	} else {
+		WorkerThreadPool *pool = memnew(WorkerThreadPool(false));
+		pool->init();
+		named_pools[p_name] = pool;
+		return pool;
+	}
+}
+
+WorkerThreadPool::WorkerThreadPool(bool p_singleton) {
+	if (p_singleton) {
+		singleton = this;
+	}
 }
 
 WorkerThreadPool::~WorkerThreadPool() {
 	finish();
+
+	if (this == singleton) {
+		singleton = nullptr;
+		for (KeyValue<StringName, WorkerThreadPool *> &E : named_pools) {
+			E.value->finish();
+			memdelete(E.value);
+		}
+		named_pools.clear();
+	}
 }

+ 9 - 3
core/object/worker_thread_pool.h

@@ -119,6 +119,7 @@ private:
 		Task *current_task = nullptr;
 		Task *awaited_task = nullptr; // Null if not awaiting the condition variable, or special value (YIELDING).
 		ConditionVariable cond_var;
+		WorkerThreadPool *pool = nullptr;
 
 		ThreadData() :
 				signaled(false),
@@ -166,6 +167,8 @@ private:
 
 	uint64_t last_task = 1;
 
+	static HashMap<StringName, WorkerThreadPool *> named_pools;
+
 	static void _thread_function(void *p_user);
 
 	void _process_task(Task *task);
@@ -266,9 +269,12 @@ public:
 #endif
 	}
 
+	// Note: Do not use this unless you know what you are doing, and it is absolutely necessary. Main thread pool (`get_singleton()`) should be preferred instead.
+	static WorkerThreadPool *get_named_pool(const StringName &p_name);
+
 	static WorkerThreadPool *get_singleton() { return singleton; }
-	static int get_thread_index();
-	static TaskID get_caller_task_id();
+	int get_thread_index() const;
+	TaskID get_caller_task_id() const;
 
 #ifdef THREADS_ENABLED
 	_ALWAYS_INLINE_ static uint32_t thread_enter_unlock_allowance_zone(const MutexLock<BinaryMutex> &p_lock) { return _thread_enter_unlock_allowance_zone(p_lock._get_lock()); }
@@ -285,7 +291,7 @@ public:
 	void init(int p_thread_count = -1, float p_low_priority_task_ratio = 0.3);
 	void exit_languages_threads();
 	void finish();
-	WorkerThreadPool();
+	WorkerThreadPool(bool p_singleton = true);
 	~WorkerThreadPool();
 };
 

+ 2 - 3
servers/rendering/renderer_rd/shader_rd.cpp

@@ -555,7 +555,7 @@ void ShaderRD::_compile_version_start(Version *p_version, int p_group) {
 	compile_data.version = p_version;
 	compile_data.group = p_group;
 
-	WorkerThreadPool::GroupID group_task = WorkerThreadPool::get_singleton()->add_template_group_task(this, &ShaderRD::_compile_variant, compile_data, group_to_variant_map[p_group].size(), -1, true, SNAME("ShaderCompilation"));
+	WorkerThreadPool::GroupID group_task = WorkerThreadPool::get_named_pool(SNAME("ShaderCompilationPool"))->add_template_group_task(this, &ShaderRD::_compile_variant, compile_data, group_to_variant_map[p_group].size(), -1, true, SNAME("ShaderCompilation"));
 	p_version->group_compilation_tasks.write[p_group] = group_task;
 }
 
@@ -563,9 +563,8 @@ void ShaderRD::_compile_version_end(Version *p_version, int p_group) {
 	if (p_version->group_compilation_tasks.size() <= p_group || p_version->group_compilation_tasks[p_group] == 0) {
 		return;
 	}
-
 	WorkerThreadPool::GroupID group_task = p_version->group_compilation_tasks[p_group];
-	WorkerThreadPool::get_singleton()->wait_for_group_task_completion(group_task);
+	WorkerThreadPool::get_named_pool(SNAME("ShaderCompilationPool"))->wait_for_group_task_completion(group_task);
 	p_version->group_compilation_tasks.write[p_group] = 0;
 
 	bool all_valid = true;