浏览代码

Merge pull request #96959 from RandomShaper/revamp_languages_exit

WorkerThreadPool: Revamp interaction with ScriptServer
Clay John 11 月之前
父节点
当前提交
48403b5358
共有 4 个文件被更改,包括 163 次插入69 次删除
  1. 130 48
      core/object/worker_thread_pool.cpp
  2. 25 3
      core/object/worker_thread_pool.h
  3. 7 1
      core/register_core_types.cpp
  4. 1 17
      main/main.cpp

+ 130 - 48
core/object/worker_thread_pool.cpp

@@ -180,13 +180,17 @@ void WorkerThreadPool::_process_task(Task *p_task) {
 
 void WorkerThreadPool::_thread_function(void *p_user) {
 	ThreadData *thread_data = (ThreadData *)p_user;
+
 	while (true) {
 		Task *task_to_process = nullptr;
 		{
 			MutexLock lock(singleton->task_mutex);
-			if (singleton->exit_threads) {
-				return;
+
+			bool exit = singleton->_handle_runlevel(thread_data, lock);
+			if (unlikely(exit)) {
+				break;
 			}
+
 			thread_data->signaled = false;
 
 			if (singleton->task_queue.first()) {
@@ -194,7 +198,6 @@ void WorkerThreadPool::_thread_function(void *p_user) {
 				singleton->task_queue.remove(singleton->task_queue.first());
 			} else {
 				thread_data->cond_var.wait(lock);
-				DEV_ASSERT(singleton->exit_threads || thread_data->signaled);
 			}
 		}
 
@@ -204,19 +207,24 @@ void WorkerThreadPool::_thread_function(void *p_user) {
 	}
 }
 
-void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority) {
+void WorkerThreadPool::_post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock) {
 	// Fall back to processing on the calling thread if there are no worker threads.
 	// Separated into its own variable to make it easier to extend this logic
 	// in custom builds.
 	bool process_on_calling_thread = threads.size() == 0;
 	if (process_on_calling_thread) {
-		task_mutex.unlock();
+		p_lock.temp_unlock();
 		for (uint32_t i = 0; i < p_count; i++) {
 			_process_task(p_tasks[i]);
 		}
+		p_lock.temp_relock();
 		return;
 	}
 
+	while (runlevel == RUNLEVEL_EXIT_LANGUAGES) {
+		control_cond_var.wait(p_lock);
+	}
+
 	uint32_t to_process = 0;
 	uint32_t to_promote = 0;
 
@@ -238,8 +246,6 @@ void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count,
 	}
 
 	_notify_threads(caller_pool_thread, to_process, to_promote);
-
-	task_mutex.unlock();
 }
 
 void WorkerThreadPool::_notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count) {
@@ -323,9 +329,8 @@ WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *
 }
 
 WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description) {
-	ERR_FAIL_COND_V_MSG(threads.is_empty(), INVALID_TASK_ID, "Can't add a task because the WorkerThreadPool is either not initialized yet or already terminated.");
+	MutexLock<BinaryMutex> lock(task_mutex);
 
-	task_mutex.lock();
 	// Get a free task
 	Task *task = task_allocator.alloc();
 	TaskID id = last_task++;
@@ -337,7 +342,7 @@ WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable,
 	task->template_userdata = p_template_userdata;
 	tasks.insert(id, task);
 
-	_post_tasks_and_unlock(&task, 1, p_high_priority);
+	_post_tasks(&task, 1, p_high_priority, lock);
 
 	return id;
 }
@@ -444,22 +449,34 @@ void WorkerThreadPool::_unlock_unlockable_mutexes() {
 void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) {
 	// Keep processing tasks until the condition to stop waiting is met.
 
-#define IS_WAIT_OVER (unlikely(p_task == ThreadData::YIELDING) ? p_caller_pool_thread->yield_is_over : p_task->completed)
-
 	while (true) {
 		Task *task_to_process = nullptr;
 		bool relock_unlockables = false;
 		{
 			MutexLock lock(task_mutex);
+
 			bool was_signaled = p_caller_pool_thread->signaled;
 			p_caller_pool_thread->signaled = false;
 
-			if (IS_WAIT_OVER) {
-				if (unlikely(p_task == ThreadData::YIELDING)) {
+			bool exit = _handle_runlevel(p_caller_pool_thread, lock);
+			if (unlikely(exit)) {
+				break;
+			}
+
+			bool wait_is_over = false;
+			if (unlikely(p_task == ThreadData::YIELDING)) {
+				if (p_caller_pool_thread->yield_is_over) {
 					p_caller_pool_thread->yield_is_over = false;
+					wait_is_over = true;
 				}
+			} else {
+				if (p_task->completed) {
+					wait_is_over = true;
+				}
+			}
 
-				if (!exit_threads && was_signaled) {
+			if (wait_is_over) {
+				if (was_signaled) {
 					// This thread was awaken for some additional reason, but it's about to exit.
 					// Let's find out what may be pending and forward the requests.
 					uint32_t to_process = task_queue.first() ? 1 : 0;
@@ -474,28 +491,26 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
 				break;
 			}
 
-			if (!exit_threads) {
-				if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) {
-					if (_try_promote_low_priority_task()) {
-						_notify_threads(p_caller_pool_thread, 1, 0);
-					}
+			if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) {
+				if (_try_promote_low_priority_task()) {
+					_notify_threads(p_caller_pool_thread, 1, 0);
 				}
+			}
 
-				if (singleton->task_queue.first()) {
-					task_to_process = task_queue.first()->self();
-					task_queue.remove(task_queue.first());
-				}
+			if (singleton->task_queue.first()) {
+				task_to_process = task_queue.first()->self();
+				task_queue.remove(task_queue.first());
+			}
 
-				if (!task_to_process) {
-					p_caller_pool_thread->awaited_task = p_task;
+			if (!task_to_process) {
+				p_caller_pool_thread->awaited_task = p_task;
 
-					_unlock_unlockable_mutexes();
-					relock_unlockables = true;
-					p_caller_pool_thread->cond_var.wait(lock);
+				_unlock_unlockable_mutexes();
+				relock_unlockables = true;
 
-					DEV_ASSERT(exit_threads || p_caller_pool_thread->signaled || IS_WAIT_OVER);
-					p_caller_pool_thread->awaited_task = nullptr;
-				}
+				p_caller_pool_thread->cond_var.wait(lock);
+
+				p_caller_pool_thread->awaited_task = nullptr;
 			}
 		}
 
@@ -509,16 +524,65 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
 	}
 }
 
+void WorkerThreadPool::_switch_runlevel(Runlevel p_runlevel) {
+	DEV_ASSERT(p_runlevel > runlevel);
+	runlevel = p_runlevel;
+	memset(&runlevel_data, 0, sizeof(runlevel_data));
+	for (uint32_t i = 0; i < threads.size(); i++) {
+		threads[i].cond_var.notify_one();
+		threads[i].signaled = true;
+	}
+	control_cond_var.notify_all();
+}
+
+// Returns whether threads have to exit. This may perform the check about handling needed.
+bool WorkerThreadPool::_handle_runlevel(ThreadData *p_thread_data, MutexLock<BinaryMutex> &p_lock) {
+	bool exit = false;
+	switch (runlevel) {
+		case RUNLEVEL_NORMAL: {
+		} break;
+		case RUNLEVEL_PRE_EXIT_LANGUAGES: {
+			if (!p_thread_data->pre_exited_languages) {
+				if (!task_queue.first() && !low_priority_task_queue.first()) {
+					p_thread_data->pre_exited_languages = true;
+					runlevel_data.pre_exit_languages.num_idle_threads++;
+					control_cond_var.notify_all();
+				}
+			}
+		} break;
+		case RUNLEVEL_EXIT_LANGUAGES: {
+			if (!p_thread_data->exited_languages) {
+				p_lock.temp_unlock();
+				ScriptServer::thread_exit();
+				p_lock.temp_relock();
+				p_thread_data->exited_languages = true;
+				runlevel_data.exit_languages.num_exited_threads++;
+				control_cond_var.notify_all();
+			}
+		} break;
+		case RUNLEVEL_EXIT: {
+			exit = true;
+		} break;
+	}
+	return exit;
+}
+
 void WorkerThreadPool::yield() {
 	int th_index = get_thread_index();
 	ERR_FAIL_COND_MSG(th_index == -1, "This function can only be called from a worker thread.");
 	_wait_collaboratively(&threads[th_index], ThreadData::YIELDING);
 
-	// If this long-lived task started before the scripting server was initialized,
-	// now is a good time to have scripting languages ready for the current thread.
-	// Otherwise, such a piece of setup won't happen unless another task has been
-	// run during the collaborative wait.
-	ScriptServer::thread_enter();
+	task_mutex.lock();
+	if (runlevel < RUNLEVEL_EXIT_LANGUAGES) {
+		// If this long-lived task started before the scripting server was initialized,
+		// now is a good time to have scripting languages ready for the current thread.
+		// Otherwise, such a piece of setup won't happen unless another task has been
+		// run during the collaborative wait.
+		task_mutex.unlock();
+		ScriptServer::thread_enter();
+	} else {
+		task_mutex.unlock();
+	}
 }
 
 void WorkerThreadPool::notify_yield_over(TaskID p_task_id) {
@@ -543,13 +607,13 @@ void WorkerThreadPool::notify_yield_over(TaskID p_task_id) {
 }
 
 WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) {
-	ERR_FAIL_COND_V_MSG(threads.is_empty(), INVALID_TASK_ID, "Can't add a group task because the WorkerThreadPool is either not initialized yet or already terminated.");
 	ERR_FAIL_COND_V(p_elements < 0, INVALID_TASK_ID);
 	if (p_tasks < 0) {
 		p_tasks = MAX(1u, threads.size());
 	}
 
-	task_mutex.lock();
+	MutexLock<BinaryMutex> lock(task_mutex);
+
 	Group *group = group_allocator.alloc();
 	GroupID id = last_task++;
 	group->max = p_elements;
@@ -584,7 +648,7 @@ WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_ca
 
 	groups[id] = group;
 
-	_post_tasks_and_unlock(tasks_posted, p_tasks, p_high_priority);
+	_post_tasks(tasks_posted, p_tasks, p_high_priority, lock);
 
 	return id;
 }
@@ -687,6 +751,9 @@ void WorkerThreadPool::thread_exit_unlock_allowance_zone(uint32_t p_zone_id) {
 
 void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio) {
 	ERR_FAIL_COND(threads.size() > 0);
+
+	runlevel = RUNLEVEL_NORMAL;
+
 	if (p_thread_count < 0) {
 		p_thread_count = OS::get_singleton()->get_default_thread_pool_size();
 	}
@@ -704,6 +771,26 @@ void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio)
 	}
 }
 
+void WorkerThreadPool::exit_languages_threads() {
+	if (threads.size() == 0) {
+		return;
+	}
+
+	MutexLock lock(task_mutex);
+
+	// Wait until all threads are idle.
+	_switch_runlevel(RUNLEVEL_PRE_EXIT_LANGUAGES);
+	while (runlevel_data.pre_exit_languages.num_idle_threads != threads.size()) {
+		control_cond_var.wait(lock);
+	}
+
+	// Wait until all threads have detached from scripting languages.
+	_switch_runlevel(RUNLEVEL_EXIT_LANGUAGES);
+	while (runlevel_data.exit_languages.num_exited_threads != threads.size()) {
+		control_cond_var.wait(lock);
+	}
+}
+
 void WorkerThreadPool::finish() {
 	if (threads.size() == 0) {
 		return;
@@ -716,15 +803,10 @@ void WorkerThreadPool::finish() {
 			print_error("Task waiting was never re-claimed: " + E->self()->description);
 			E = E->next();
 		}
-	}
 
-	{
-		MutexLock lock(task_mutex);
-		exit_threads = true;
-	}
-	for (ThreadData &data : threads) {
-		data.cond_var.notify_one();
+		_switch_runlevel(RUNLEVEL_EXIT);
 	}
+
 	for (ThreadData &data : threads) {
 		data.thread.wait_to_finish();
 	}
@@ -755,5 +837,5 @@ WorkerThreadPool::WorkerThreadPool() {
 }
 
 WorkerThreadPool::~WorkerThreadPool() {
-	DEV_ASSERT(threads.size() == 0 && "finish() hasn't been called!");
+	finish();
 }

+ 25 - 3
core/object/worker_thread_pool.h

@@ -114,17 +114,35 @@ private:
 		Thread thread;
 		bool signaled : 1;
 		bool yield_is_over : 1;
+		bool pre_exited_languages : 1;
+		bool exited_languages : 1;
 		Task *current_task = nullptr;
 		Task *awaited_task = nullptr; // Null if not awaiting the condition variable, or special value (YIELDING).
 		ConditionVariable cond_var;
 
 		ThreadData() :
 				signaled(false),
-				yield_is_over(false) {}
+				yield_is_over(false),
+				pre_exited_languages(false),
+				exited_languages(false) {}
 	};
 
 	TightLocalVector<ThreadData> threads;
-	bool exit_threads = false;
+	enum Runlevel {
+		RUNLEVEL_NORMAL,
+		RUNLEVEL_PRE_EXIT_LANGUAGES, // Block adding new tasks
+		RUNLEVEL_EXIT_LANGUAGES, // All threads detach from scripting threads.
+		RUNLEVEL_EXIT,
+	} runlevel = RUNLEVEL_NORMAL;
+	union { // Cleared on every runlevel change.
+		struct {
+			uint32_t num_idle_threads;
+		} pre_exit_languages;
+		struct {
+			uint32_t num_exited_threads;
+		} exit_languages;
+	} runlevel_data;
+	ConditionVariable control_cond_var;
 
 	HashMap<Thread::ID, int> thread_ids;
 	HashMap<
@@ -152,7 +170,7 @@ private:
 
 	void _process_task(Task *task);
 
-	void _post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority);
+	void _post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock);
 	void _notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count);
 
 	bool _try_promote_low_priority_task();
@@ -193,6 +211,9 @@ private:
 
 	void _wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task);
 
+	void _switch_runlevel(Runlevel p_runlevel);
+	bool _handle_runlevel(ThreadData *p_thread_data, MutexLock<BinaryMutex> &p_lock);
+
 #ifdef THREADS_ENABLED
 	static uint32_t _thread_enter_unlock_allowance_zone(THREADING_NAMESPACE::unique_lock<THREADING_NAMESPACE::mutex> &p_ulock);
 #endif
@@ -256,6 +277,7 @@ public:
 #endif
 
 	void init(int p_thread_count = -1, float p_low_priority_task_ratio = 0.3);
+	void exit_languages_threads();
 	void finish();
 	WorkerThreadPool();
 	~WorkerThreadPool();

+ 7 - 1
core/register_core_types.cpp

@@ -107,6 +107,8 @@ static Time *_time = nullptr;
 static core_bind::Geometry2D *_geometry_2d = nullptr;
 static core_bind::Geometry3D *_geometry_3d = nullptr;
 
+static WorkerThreadPool *worker_thread_pool = nullptr;
+
 extern Mutex _global_mutex;
 
 static GDExtensionManager *gdextension_manager = nullptr;
@@ -295,6 +297,8 @@ void register_core_types() {
 	GDREGISTER_NATIVE_STRUCT(AudioFrame, "float left;float right");
 	GDREGISTER_NATIVE_STRUCT(ScriptLanguageExtensionProfilingInfo, "StringName signature;uint64_t call_count;uint64_t total_time;uint64_t self_time");
 
+	worker_thread_pool = memnew(WorkerThreadPool);
+
 	OS::get_singleton()->benchmark_end_measure("Core", "Register Types");
 }
 
@@ -345,7 +349,7 @@ void register_core_singletons() {
 	Engine::get_singleton()->add_singleton(Engine::Singleton("Time", Time::get_singleton()));
 	Engine::get_singleton()->add_singleton(Engine::Singleton("GDExtensionManager", GDExtensionManager::get_singleton()));
 	Engine::get_singleton()->add_singleton(Engine::Singleton("ResourceUID", ResourceUID::get_singleton()));
-	Engine::get_singleton()->add_singleton(Engine::Singleton("WorkerThreadPool", WorkerThreadPool::get_singleton()));
+	Engine::get_singleton()->add_singleton(Engine::Singleton("WorkerThreadPool", worker_thread_pool));
 
 	OS::get_singleton()->benchmark_end_measure("Core", "Register Singletons");
 }
@@ -378,6 +382,8 @@ void unregister_core_types() {
 
 	// Destroy singletons in reverse order to ensure dependencies are not broken.
 
+	memdelete(worker_thread_pool);
+
 	memdelete(_engine_debugger);
 	memdelete(_marshalls);
 	memdelete(_classdb);

+ 1 - 17
main/main.cpp

@@ -140,7 +140,6 @@ static Engine *engine = nullptr;
 static ProjectSettings *globals = nullptr;
 static Input *input = nullptr;
 static InputMap *input_map = nullptr;
-static WorkerThreadPool *worker_thread_pool = nullptr;
 static TranslationServer *translation_server = nullptr;
 static Performance *performance = nullptr;
 static PackedData *packed_data = nullptr;
@@ -691,8 +690,6 @@ Error Main::test_setup() {
 
 	register_core_settings(); // Here globals are present.
 
-	worker_thread_pool = memnew(WorkerThreadPool);
-
 	translation_server = memnew(TranslationServer);
 	tsman = memnew(TextServerManager);
 
@@ -803,8 +800,6 @@ void Main::test_cleanup() {
 	ResourceSaver::remove_custom_savers();
 	PropertyListHelper::clear_base_helpers();
 
-	WorkerThreadPool::get_singleton()->finish();
-
 #ifdef TOOLS_ENABLED
 	GDExtensionManager::get_singleton()->deinitialize_extensions(GDExtension::INITIALIZATION_LEVEL_EDITOR);
 	uninitialize_modules(MODULE_INITIALIZATION_LEVEL_EDITOR);
@@ -846,9 +841,6 @@ void Main::test_cleanup() {
 	if (physics_server_2d_manager) {
 		memdelete(physics_server_2d_manager);
 	}
-	if (worker_thread_pool) {
-		memdelete(worker_thread_pool);
-	}
 	if (globals) {
 		memdelete(globals);
 	}
@@ -939,7 +931,6 @@ Error Main::setup(const char *execpath, int argc, char *argv[], bool p_second_ph
 
 	register_core_settings(); //here globals are present
 
-	worker_thread_pool = memnew(WorkerThreadPool);
 	translation_server = memnew(TranslationServer);
 	performance = memnew(Performance);
 	GDREGISTER_CLASS(Performance);
@@ -2629,10 +2620,6 @@ error:
 	if (translation_server) {
 		memdelete(translation_server);
 	}
-	if (worker_thread_pool) {
-		worker_thread_pool->finish();
-		memdelete(worker_thread_pool);
-	}
 	if (globals) {
 		memdelete(globals);
 	}
@@ -4514,7 +4501,7 @@ void Main::cleanup(bool p_force) {
 	ResourceLoader::clear_translation_remaps();
 	ResourceLoader::clear_path_remaps();
 
-	WorkerThreadPool::get_singleton()->finish();
+	WorkerThreadPool::get_singleton()->exit_languages_threads();
 
 	ScriptServer::finish_languages();
 
@@ -4606,9 +4593,6 @@ void Main::cleanup(bool p_force) {
 	if (physics_server_2d_manager) {
 		memdelete(physics_server_2d_manager);
 	}
-	if (worker_thread_pool) {
-		memdelete(worker_thread_pool);
-	}
 	if (globals) {
 		memdelete(globals);
 	}