Ver código fonte

WorkerThreadPool: Add safety point between languages finished and pool termination

Pedro J. Estébanez 11 meses atrás
pai
commit
5d371e3378
3 arquivos alterados com 98 adições e 20 exclusões
  1. 77 17
      core/object/worker_thread_pool.cpp
  2. 19 3
      core/object/worker_thread_pool.h
  3. 2 0
      main/main.cpp

+ 77 - 17
core/object/worker_thread_pool.cpp

@@ -186,7 +186,7 @@ void WorkerThreadPool::_thread_function(void *p_user) {
 		{
 			MutexLock lock(singleton->task_mutex);
 
-			bool exit = singleton->_handle_runlevel();
+			bool exit = singleton->_handle_runlevel(thread_data, lock);
 			if (unlikely(exit)) {
 				break;
 			}
@@ -207,19 +207,24 @@ void WorkerThreadPool::_thread_function(void *p_user) {
 	}
 }
 
-void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority) {
+void WorkerThreadPool::_post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock) {
 	// Fall back to processing on the calling thread if there are no worker threads.
 	// Separated into its own variable to make it easier to extend this logic
 	// in custom builds.
 	bool process_on_calling_thread = threads.size() == 0;
 	if (process_on_calling_thread) {
-		task_mutex.unlock();
+		p_lock.temp_unlock();
 		for (uint32_t i = 0; i < p_count; i++) {
 			_process_task(p_tasks[i]);
 		}
+		p_lock.temp_relock();
 		return;
 	}
 
+	while (runlevel == RUNLEVEL_EXIT_LANGUAGES) {
+		control_cond_var.wait(p_lock);
+	}
+
 	uint32_t to_process = 0;
 	uint32_t to_promote = 0;
 
@@ -241,8 +246,6 @@ void WorkerThreadPool::_post_tasks_and_unlock(Task **p_tasks, uint32_t p_count,
 	}
 
 	_notify_threads(caller_pool_thread, to_process, to_promote);
-
-	task_mutex.unlock();
 }
 
 void WorkerThreadPool::_notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count) {
@@ -326,7 +329,8 @@ WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *
 }
 
 WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description) {
-	task_mutex.lock();
+	MutexLock<BinaryMutex> lock(task_mutex);
+
 	// Get a free task
 	Task *task = task_allocator.alloc();
 	TaskID id = last_task++;
@@ -338,7 +342,7 @@ WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable,
 	task->template_userdata = p_template_userdata;
 	tasks.insert(id, task);
 
-	_post_tasks_and_unlock(&task, 1, p_high_priority);
+	_post_tasks(&task, 1, p_high_priority, lock);
 
 	return id;
 }
@@ -454,7 +458,7 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
 			bool was_signaled = p_caller_pool_thread->signaled;
 			p_caller_pool_thread->signaled = false;
 
-			bool exit = _handle_runlevel();
+			bool exit = _handle_runlevel(p_caller_pool_thread, lock);
 			if (unlikely(exit)) {
 				break;
 			}
@@ -523,15 +527,44 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
 void WorkerThreadPool::_switch_runlevel(Runlevel p_runlevel) {
 	DEV_ASSERT(p_runlevel > runlevel);
 	runlevel = p_runlevel;
+	memset(&runlevel_data, 0, sizeof(runlevel_data));
 	for (uint32_t i = 0; i < threads.size(); i++) {
 		threads[i].cond_var.notify_one();
 		threads[i].signaled = true;
 	}
+	control_cond_var.notify_all();
 }
 
 // Returns whether threads have to exit. This may perform the check about handling needed.
-bool WorkerThreadPool::_handle_runlevel() {
-	return runlevel == RUNLEVEL_EXIT;
+bool WorkerThreadPool::_handle_runlevel(ThreadData *p_thread_data, MutexLock<BinaryMutex> &p_lock) {
+	bool exit = false;
+	switch (runlevel) {
+		case RUNLEVEL_NORMAL: {
+		} break;
+		case RUNLEVEL_PRE_EXIT_LANGUAGES: {
+			if (!p_thread_data->pre_exited_languages) {
+				if (!task_queue.first() && !low_priority_task_queue.first()) {
+					p_thread_data->pre_exited_languages = true;
+					runlevel_data.pre_exit_languages.num_idle_threads++;
+					control_cond_var.notify_all();
+				}
+			}
+		} break;
+		case RUNLEVEL_EXIT_LANGUAGES: {
+			if (!p_thread_data->exited_languages) {
+				p_lock.temp_unlock();
+				ScriptServer::thread_exit();
+				p_lock.temp_relock();
+				p_thread_data->exited_languages = true;
+				runlevel_data.exit_languages.num_exited_threads++;
+				control_cond_var.notify_all();
+			}
+		} break;
+		case RUNLEVEL_EXIT: {
+			exit = true;
+		} break;
+	}
+	return exit;
 }
 
 void WorkerThreadPool::yield() {
@@ -539,11 +572,17 @@ void WorkerThreadPool::yield() {
 	ERR_FAIL_COND_MSG(th_index == -1, "This function can only be called from a worker thread.");
 	_wait_collaboratively(&threads[th_index], ThreadData::YIELDING);
 
-	// If this long-lived task started before the scripting server was initialized,
-	// now is a good time to have scripting languages ready for the current thread.
-	// Otherwise, such a piece of setup won't happen unless another task has been
-	// run during the collaborative wait.
-	ScriptServer::thread_enter();
+	task_mutex.lock();
+	if (runlevel < RUNLEVEL_EXIT_LANGUAGES) {
+		// If this long-lived task started before the scripting server was initialized,
+		// now is a good time to have scripting languages ready for the current thread.
+		// Otherwise, such a piece of setup won't happen unless another task has been
+		// run during the collaborative wait.
+		task_mutex.unlock();
+		ScriptServer::thread_enter();
+	} else {
+		task_mutex.unlock();
+	}
 }
 
 void WorkerThreadPool::notify_yield_over(TaskID p_task_id) {
@@ -573,7 +612,8 @@ WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_ca
 		p_tasks = MAX(1u, threads.size());
 	}
 
-	task_mutex.lock();
+	MutexLock<BinaryMutex> lock(task_mutex);
+
 	Group *group = group_allocator.alloc();
 	GroupID id = last_task++;
 	group->max = p_elements;
@@ -608,7 +648,7 @@ WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_ca
 
 	groups[id] = group;
 
-	_post_tasks_and_unlock(tasks_posted, p_tasks, p_high_priority);
+	_post_tasks(tasks_posted, p_tasks, p_high_priority, lock);
 
 	return id;
 }
@@ -731,6 +771,26 @@ void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio)
 	}
 }
 
+void WorkerThreadPool::exit_languages_threads() {
+	if (threads.size() == 0) {
+		return;
+	}
+
+	MutexLock lock(task_mutex);
+
+	// Wait until all threads are idle.
+	_switch_runlevel(RUNLEVEL_PRE_EXIT_LANGUAGES);
+	while (runlevel_data.pre_exit_languages.num_idle_threads != threads.size()) {
+		control_cond_var.wait(lock);
+	}
+
+	// Wait until all threads have detached from scripting languages.
+	_switch_runlevel(RUNLEVEL_EXIT_LANGUAGES);
+	while (runlevel_data.exit_languages.num_exited_threads != threads.size()) {
+		control_cond_var.wait(lock);
+	}
+}
+
 void WorkerThreadPool::finish() {
 	if (threads.size() == 0) {
 		return;

+ 19 - 3
core/object/worker_thread_pool.h

@@ -114,20 +114,35 @@ private:
 		Thread thread;
 		bool signaled : 1;
 		bool yield_is_over : 1;
+		bool pre_exited_languages : 1;
+		bool exited_languages : 1;
 		Task *current_task = nullptr;
 		Task *awaited_task = nullptr; // Null if not awaiting the condition variable, or special value (YIELDING).
 		ConditionVariable cond_var;
 
 		ThreadData() :
 				signaled(false),
-				yield_is_over(false) {}
+				yield_is_over(false),
+				pre_exited_languages(false),
+				exited_languages(false) {}
 	};
 
 	TightLocalVector<ThreadData> threads;
 	enum Runlevel {
 		RUNLEVEL_NORMAL,
+		RUNLEVEL_PRE_EXIT_LANGUAGES, // Block adding new tasks
+		RUNLEVEL_EXIT_LANGUAGES, // All threads detach from scripting threads.
 		RUNLEVEL_EXIT,
 	} runlevel = RUNLEVEL_NORMAL;
+	union { // Cleared on every runlevel change.
+		struct {
+			uint32_t num_idle_threads;
+		} pre_exit_languages;
+		struct {
+			uint32_t num_exited_threads;
+		} exit_languages;
+	} runlevel_data;
+	ConditionVariable control_cond_var;
 
 	HashMap<Thread::ID, int> thread_ids;
 	HashMap<
@@ -155,7 +170,7 @@ private:
 
 	void _process_task(Task *task);
 
-	void _post_tasks_and_unlock(Task **p_tasks, uint32_t p_count, bool p_high_priority);
+	void _post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock);
 	void _notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count);
 
 	bool _try_promote_low_priority_task();
@@ -197,7 +212,7 @@ private:
 	void _wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task);
 
 	void _switch_runlevel(Runlevel p_runlevel);
-	bool _handle_runlevel();
+	bool _handle_runlevel(ThreadData *p_thread_data, MutexLock<BinaryMutex> &p_lock);
 
 #ifdef THREADS_ENABLED
 	static uint32_t _thread_enter_unlock_allowance_zone(THREADING_NAMESPACE::unique_lock<THREADING_NAMESPACE::mutex> &p_ulock);
@@ -262,6 +277,7 @@ public:
 #endif
 
 	void init(int p_thread_count = -1, float p_low_priority_task_ratio = 0.3);
+	void exit_languages_threads();
 	void finish();
 	WorkerThreadPool();
 	~WorkerThreadPool();

+ 2 - 0
main/main.cpp

@@ -4501,6 +4501,8 @@ void Main::cleanup(bool p_force) {
 	ResourceLoader::clear_translation_remaps();
 	ResourceLoader::clear_path_remaps();
 
+	WorkerThreadPool::get_singleton()->exit_languages_threads();
+
 	ScriptServer::finish_languages();
 
 	// Sync pending commands that may have been queued from a different thread during ScriptServer finalization